관리 메뉴

moozi

RHadoop예제 본문

TIS_2018/응용sw2018_1기

RHadoop예제

moozi 2018. 7. 20. 12:02

======== 예제1 =========================


R

Sys.setenv(HADOOP_HOME="/home/a2d/hadoop/hadoop-2.8.0")
Sys.setenv(HADOOP_CMD="/home/a2d/hadoop/hadoop-2.8.0/bin/hadoop")

Sys.setenv(HADOOP_STREAMING="/home/a2d/hadoop/hadoop-2.8.0/share/hadoop/tools/lib/hadoop-streaming-2.8.0.jar")

library(rhdfs)

hdfs.init()

library(rmr2)

 


small.ints <- to.dfs(1:10)

from.dfs(small.ints)

result <- mapreduce(input = small.ints,
map = function(k,v) cbind(v,v^2)
)

out <- from.dfs(result)

out


====== 예제2 ==================================

infile <- "/tmp/ex2"

if(dfs.exists(infile)) dfs.rmr(infile)

inputValue = to.dfs(1:1000, output=infile)

mr_func <- mapreduce(
 input = inputValue,
 map = function(k, v){
  lapply(seq_along(v), function(r){   
  x <- rnorm(100)   
  keyval(r, max(x))
 }
      ) 
   }
)


output <- from.dfs(mr_func)
maxs <- do.call("c", lapply(output$val, "[[", 2))
maxs <- as.numeric(maxs)


hist(maxs, breaks=30)
hist(maxs, col=rainbow(7),breaks=seq(1,5,by=0.1))

 

 

 

 


============== 예제3 ================================

bin/hadoop fs -mkdir -p /input/airline
bin/hdfs dfs -rm /input/airline/2008.csv
bin/hdfs dfs -put /home/a2d/hadoop/hadoop-2.8.0/warehouse/2008.csv /input/airline

R

Sys.setenv(HADOOP_CMD="/home/a2d/hadoop/hadoop-2.8.0/bin/hadoop")
Sys.setenv(HADOOP_STREAMING="/home/a2d/hadoop/hadoop-2.8.0/share/hadoop/tools/lib/hadoop-streaming-2.8.0.jar")
library(rJava)
library(rhdfs)
hdfs.init()
library(rmr2)
library(dplyr)
library(reshape2)
install.packages("gplots") ##처음 설치
library(gplots)

hdfs.del("/tmp/ex1")

inputfile <- "/input/airline/2008.csv"
 
#### map function
mapper_flight = function(.,fields) {
delay = as.numeric(as.character(fields[[16]]))
filter = !is.na(delay)
year = as.character(fields[[1]])
month = as.character(fields[[2]])
day = as.character(fields[[3]])
carrier = as.character(fields[[9]])
key.df = data.frame( date = paste(year[filter], month[filter], day[filter], sep='-'), carrier = carrier[filter] )
output.val = data.frame( delay = delay[filter] )
output.val$delay = as.numeric(output.val$delay)
keyval( key.df , output.val ) }

### reduce function
reducer = function(k, v) {
output.val = data.frame(avg = mean(v$delay, na.rm=T))
keyval(k, output.val)
}

### mapreduce
result = mapreduce(
input = inputfile,
output = "/tmp/ex1",
input.format = make.input.format("csv", sep = ","),
map = mapper_flight,
reduce = reducer
)

###### preprocessing
data = from.dfs(result)
data = as.data.frame(data, stringsAsFactors = F)
colnames(data) = c("date", "carrier", "avg")
data = transform(data
 , date = as.Date(date)
 , carrier = as.character(carrier)
 , avg = scale(avg))
mat_carrierByDate = dcast(data, carrier ~ date)
row.names(mat_carrierByDate) = mat_carrierByDate$carrier
mat_carrierByDate = mat_carrierByDate[,-1]
mat_carrierByDate = data.matrix(mat_carrierByDate)

date = colnames(mat_carrierByDate)
month = seq(from=1, to = length(date), by = 30)
date[-month] = ""

##### visualization
heatmap.2(data.matrix(data),dendrogram="none",trace="none",scale="column", Rowv=FALSE, Colv=FALSE,
 main = paste0("year:",2008), key = T, density.info="none", symkey=FALSE,
 ,cexRow=1,cexCol=1, margins=c(5,5), labCol = date, srtCol = 45)

 


Sys.setenv(HADOOP_STREAMING="/home/a2d/hadoop/hadoop-2.8.0/share/hadoop/tools/lib/hadoop-streaming-2.8.0.jar")

 

sbin/start-all.sh

'TIS_2018 > 응용sw2018_1기' 카테고리의 다른 글

하둡시작이 잘 안될때  (0) 2018.07.19
하둡연습문제01  (1) 2018.07.18
pandas excel barchart  (0) 2018.07.18
성적엑셀파일  (0) 2018.07.18
python pandas  (0) 2018.07.17
Comments