準備執行環境

若要使用 R 與 Hadoop Streaming API 分析資料,要先準備好標準的 Hadoop 環境,在測試時可以自己安裝單節點的 Hadoop 環境,或是使用 Hortonworks 或 Cloudera 這類整合好的虛擬機器也可以,然後在每一個 Hadoop 節點上安裝一般的 R 執行環境。

安裝好之後,確認每一個 Hadoop 都可以正常執行 Rscript

Rscript --version
R scripting front-end version 3.3.3 (2017-03-06)

使用 MapReduce 計算字數

在整合 R 與 Hadoop Streaming API 時,我們需要用 R 寫出 mapper.Rreducer.R 兩個指令稿,分別處理 Map 與 Reduce 的工作。

這是 mapper.R 指令稿,它會從標準輸入(stdin)讀取資料,將資料轉換成以 tab 分隔的鍵值對應,然後輸出至標準輸出(stdout):

#! /usr/bin/env Rscript
# mapper.R

# 移除空白函數
trimWhiteSpace <- function(line) gsub("(^ +)|( +$)", "", line)

# 分割英文單字函數
splitIntoWords <- function(line) unlist(strsplit(line, "[[:space:]]+"))

con <- file("stdin", open = "r")
while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) {
  line <- trimWhiteSpace(line)
  words <- splitIntoWords(line)

  ## 亦可使用 cat(paste(words, "\t1\n", sep=""), sep="")
  for (w in words) cat(w, "\t1\n", sep="")
}
close(con)

這是 reducer.R 指令稿,這個指令稿也是從標準輸入讀取資料,然後將處理好的資料輸出至標準輸出:

#! /usr/bin/env Rscript
# reducer.R

# 移除空白函數
trimWhiteSpace <- function(line) gsub("(^ +)|( +$)", "", line)

# 分割鍵與值
splitLine <- function(line) {
  val <- unlist(strsplit(line, "\t"))
  list(word = val[1], count = as.integer(val[2]))
}

# 使用環境空間作為雜湊(hash)
env <- new.env(hash = TRUE)

con <- file("stdin", open = "r")
while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) {
  line <- trimWhiteSpace(line)
  split <- splitLine(line)
  word <- split$word
  count <- split$count
  # 檢查 word 變數是否已經存在
  if (exists(word, envir = env, inherits = FALSE)) {
    # 更新既有的 word 變數
    oldcount <- get(word, envir = env)
    assign(word, oldcount + count, envir = env)
  } else {
    # 建立名稱為 word 的變數
    assign(word, count, envir = env)
  }
}
close(con)

# 輸出結果
for (w in ls(env, all = TRUE))
  cat(w, "\t", get(w, envir = env), "\n", sep = "")

在開發階段,我們可以拿比較少量的資料,以一般的 Linux 指令來測試指令稿是否可以正常處理資料:

# 測試 mapper.R
echo "this is a test that is a test" | Rscript mapper.R

# 以檔案測試 mapper.R
cat input.txt | Rscript mapper.R

# 測試 reducer.R
echo "this is a test that is a test" | Rscript mapper.R \
  | sort -k1,1 | Rscript reducer.R

# 以檔案測試 reducer.R
cat input.txt | Rscript mapper.R \
  | sort -k1,1 | Rscript reducer.R

確認 mapper.Rreducer.R 兩個指令稿都沒有問題之後,即可放進 Hadoop 中用 Streaming API 執行:

hadoop jar hadoop-streaming-2.6.0-cdh5.5.1.jar \
  -file ./mapper.R \
  -file ./reducer.R \
  -mapper ./mapper.R \
  -reducer ./reducer.R \
  -numReduceTasks 1 \
  -input /path/to/input \
  -output /path/to/output

查看結果:

hadoop fs -cat /path/to/output/*

取回結果:

hadoop fs -get /path/to/output/*

參考資料:Coderwallmediumdatascience+DZoneSoftware & ServiceHortonworksData Scientist in Trainingthecloudavenue