準備執行環境
若要使用 R 與 Hadoop Streaming API 分析資料,要先準備好標準的 Hadoop 環境,在測試時可以自己安裝單節點的 Hadoop 環境,或是使用 Hortonworks 或 Cloudera 這類整合好的虛擬機器也可以,然後在每一個 Hadoop 節點上安裝一般的 R 執行環境。
安裝好之後,確認每一個 Hadoop 都可以正常執行 Rscript:
Rscript --version
R scripting front-end version 3.3.3 (2017-03-06)
使用 MapReduce 計算字數
在整合 R 與 Hadoop Streaming API 時,我們需要用 R 寫出 mapper.R 與 reducer.R 兩個指令稿,分別處理 Map 與 Reduce 的工作。
這是 mapper.R 指令稿,它會從標準輸入(stdin)讀取資料,將資料轉換成以 tab 分隔的鍵值對應,然後輸出至標準輸出(stdout):
#! /usr/bin/env Rscript
# mapper.R
# 移除空白函數
trimWhiteSpace <- function(line) gsub("(^ +)|( +$)", "", line)
# 分割英文單字函數
splitIntoWords <- function(line) unlist(strsplit(line, "[[:space:]]+"))
con <- file("stdin", open = "r")
while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) {
line <- trimWhiteSpace(line)
words <- splitIntoWords(line)
## 亦可使用 cat(paste(words, "\t1\n", sep=""), sep="")
for (w in words) cat(w, "\t1\n", sep="")
}
close(con)
這是 reducer.R 指令稿,這個指令稿也是從標準輸入讀取資料,然後將處理好的資料輸出至標準輸出:
#! /usr/bin/env Rscript
# reducer.R
# 移除空白函數
trimWhiteSpace <- function(line) gsub("(^ +)|( +$)", "", line)
# 分割鍵與值
splitLine <- function(line) {
val <- unlist(strsplit(line, "\t"))
list(word = val[1], count = as.integer(val[2]))
}
# 使用環境空間作為雜湊(hash)
env <- new.env(hash = TRUE)
con <- file("stdin", open = "r")
while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) {
line <- trimWhiteSpace(line)
split <- splitLine(line)
word <- split$word
count <- split$count
# 檢查 word 變數是否已經存在
if (exists(word, envir = env, inherits = FALSE)) {
# 更新既有的 word 變數
oldcount <- get(word, envir = env)
assign(word, oldcount + count, envir = env)
} else {
# 建立名稱為 word 的變數
assign(word, count, envir = env)
}
}
close(con)
# 輸出結果
for (w in ls(env, all = TRUE))
cat(w, "\t", get(w, envir = env), "\n", sep = "")
在開發階段,我們可以拿比較少量的資料,以一般的 Linux 指令來測試指令稿是否可以正常處理資料:
# 測試 mapper.R
echo "this is a test that is a test" | Rscript mapper.R
# 以檔案測試 mapper.R
cat input.txt | Rscript mapper.R
# 測試 reducer.R
echo "this is a test that is a test" | Rscript mapper.R \
| sort -k1,1 | Rscript reducer.R
# 以檔案測試 reducer.R
cat input.txt | Rscript mapper.R \
| sort -k1,1 | Rscript reducer.R
確認 mapper.R 與 reducer.R 兩個指令稿都沒有問題之後,即可放進 Hadoop 中用 Streaming API 執行:
hadoop jar hadoop-streaming-2.6.0-cdh5.5.1.jar \
-file ./mapper.R \
-file ./reducer.R \
-mapper ./mapper.R \
-reducer ./reducer.R \
-numReduceTasks 1 \
-input /path/to/input \
-output /path/to/output
查看結果:
hadoop fs -cat /path/to/output/*
取回結果:
hadoop fs -get /path/to/output/*
