若要使用 R 與 Hadoop Streaming API 分析資料,要先準備好標準的 Hadoop 環境,在測試時可以自己安裝單節點的 Hadoop 環境,或是使用 Hortonworks 或 Cloudera 這類整合好的虛擬機器也可以,然後在每一個 Hadoop 節點上安裝一般的 R 執行環境。
安裝好之後,確認每一個 Hadoop 都可以正常執行 Rscript
:
Rscript --version
R scripting front-end version 3.3.3 (2017-03-06)
在整合 R 與 Hadoop Streaming API 時,我們需要用 R 寫出 mapper.R
與 reducer.R
兩個指令稿,分別處理 Map 與 Reduce 的工作。
這是 mapper.R
指令稿,它會從標準輸入(stdin)讀取資料,將資料轉換成以 tab 分隔的鍵值對應,然後輸出至標準輸出(stdout):
#! /usr/bin/env Rscript # mapper.R # 移除空白函數 trimWhiteSpace <- function(line) gsub("(^ +)|( +$)", "", line) # 分割英文單字函數 splitIntoWords <- function(line) unlist(strsplit(line, "[[:space:]]+")) con <- file("stdin", open = "r") while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) { line <- trimWhiteSpace(line) words <- splitIntoWords(line) ## 亦可使用 cat(paste(words, "\t1\n", sep=""), sep="") for (w in words) cat(w, "\t1\n", sep="") } close(con)
這是 reducer.R
指令稿,這個指令稿也是從標準輸入讀取資料,然後將處理好的資料輸出至標準輸出:
#! /usr/bin/env Rscript # reducer.R # 移除空白函數 trimWhiteSpace <- function(line) gsub("(^ +)|( +$)", "", line) # 分割鍵與值 splitLine <- function(line) { val <- unlist(strsplit(line, "\t")) list(word = val[1], count = as.integer(val[2])) } # 使用環境空間作為雜湊(hash) env <- new.env(hash = TRUE) con <- file("stdin", open = "r") while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) { line <- trimWhiteSpace(line) split <- splitLine(line) word <- split$word count <- split$count # 檢查 word 變數是否已經存在 if (exists(word, envir = env, inherits = FALSE)) { # 更新既有的 word 變數 oldcount <- get(word, envir = env) assign(word, oldcount + count, envir = env) } else { # 建立名稱為 word 的變數 assign(word, count, envir = env) } } close(con) # 輸出結果 for (w in ls(env, all = TRUE)) cat(w, "\t", get(w, envir = env), "\n", sep = "")
在開發階段,我們可以拿比較少量的資料,以一般的 Linux 指令來測試指令稿是否可以正常處理資料:
# 測試 mapper.R echo "this is a test that is a test" | Rscript mapper.R # 以檔案測試 mapper.R cat input.txt | Rscript mapper.R # 測試 reducer.R echo "this is a test that is a test" | Rscript mapper.R \ | sort -k1,1 | Rscript reducer.R # 以檔案測試 reducer.R cat input.txt | Rscript mapper.R \ | sort -k1,1 | Rscript reducer.R
確認 mapper.R
與 reducer.R
兩個指令稿都沒有問題之後,即可放進 Hadoop 中用 Streaming API 執行:
hadoop jar hadoop-streaming-2.6.0-cdh5.5.1.jar \ -file ./mapper.R \ -file ./reducer.R \ -mapper ./mapper.R \ -reducer ./reducer.R \ -numReduceTasks 1 \ -input /path/to/input \ -output /path/to/output
查看結果:
hadoop fs -cat /path/to/output/*
取回結果:
hadoop fs -get /path/to/output/*
參考資料:Coderwall、medium、datascience+、DZone、Software & Service、Hortonworks、Data Scientist in Training、thecloudavenue