本篇介紹如何手動架設 RHadoop 計算伺服器,並使用 RHadoop 相關套件進行巨量資料分析。
RHadoop 是由 Revolution Analytics 所發展的 R 套件集,可讓 R 使用者更方便的使用 Hadoop 分析巨量資料,適用於 Cloudera、Hortonworks 等 Hadoop 發行版,以下是基本的 RHadoop 計算環境架設流程、MapReduce 用法與簡單的範例程式碼。
基本 Hadoop 環境架設
請參考 Ubuntu Linux 架設 Hadoop 單節點測試主機教學,將基本的 Hadoop 計算環境架設好。
使用前先測試一下 Hadoop 環境是否正常,先啟動 NameNode 與 DataNode daemon:
start-dfs.sh
檢查 daemon 是否有正常啟動:
jps
3370 Jps 3068 DataNode 3261 SecondaryNameNode
測試執行範例程式:
hadoop jar \
$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar \
pi 10 100
[略] Job Finished in 5.478 seconds Estimated value of Pi is 3.14800000000000000000
測試完 Hadoop 環境之後,停止 NameNode 與 DataNode daemon:
stop-dfs.sh
安裝 R
在 Ubuntu Linux 中使用 apt 安裝 R 套件:
sudo apt install r-base-core
安裝 RHadoop
RHadoop 是一套 R 語言的工具組,可讓使用者結合 R 與 Hadoop 進行巨量資料的分析,其包含五個 R 套件。
| 套件 | 說明 |
|---|---|
rhdfs | 提供 HDFS 檔案系統的存取功能,R 程式設計者可以透過此套件瀏覽、讀取、寫入或修改 HDFS 中的檔案。 |
rhbase | 提供 HBASE 資料庫的存取功能,R 程式設計者可以透過此套件瀏覽、讀取、寫入或修改 HBASE 中的資料表。 |
plyrmr | 提供類似 plyr 與 reshape2 的資料處理功能,使用 Hadoop 的 MapReduce 架構整理資料,但是在使用上比較類似 plyr,沒有太多複雜的 MapReduce 細節。 |
rmr2 | 在 Hadoop 架構下,以 MapReduce 對巨量資料進行各種統計分析。 |
ravro | 提供本機與 HDFS 上 avro 檔案格式的存取功能。 |
安裝 rmr2
Hadoop 的每一個節點都要安裝 rmr2,首先安裝必要的 R 套件:
install.packages(c("Rcpp", "RJSONIO", "digest", "functional", "reshape2", "stringr", "plyr", "caTools"))
從 GitHub 上下載 rmr2 套件檔,直接安裝:
install.packages("rmr2_3.3.1.tar.gz")
設定 rmr2 所需的 Linux shell 環境變數:
export HADOOP_CMD=$HADOOP_HOME/bin/hadoop
export HADOOP_STREAMING=$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar
安裝 plyrmr
Hadoop 的每一個節點都要安裝 plyrmr,先裝一些必要的套件:
install.packages(c("dplyr", "reshape2", "plyr", "R.methodsS3", "Hmisc", "functional", "digest", "memoise", "lazyeval", "rjson"))
從 GitHub 上下載 plyrmr 套件檔,直接安裝:
install.packages("plyrmr_0.6.0.tar.gz")
設定 plyrmr 所需的 Linux shell 環境變數:
export HADOOP_CMD=$HADOOP_HOME/bin/hadoop
export HADOOP_STREAMING=$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar
安裝 rhdfs
安裝 rJava:
install.packages("rJava")
從 GitHub 上下載 plyrmr 套件檔,直接安裝:
install.packages("rhdfs_1.0.8.tar.gz")
設定 rhdfs 所需的 Linux shell 環境變數:
export HADOOP_CMD=$HADOOP_HOME/bin/hadoop
安裝 rhbase
從 Apache Thrift 官方網站上下載原始碼,自行編譯安裝 Thrift。
從 GitHub 上下載 rhbase 套件檔,直接安裝:
install.packages("rhbase_1.2.1.tar.gz")
安裝 ravro
安裝必要的 R 套件:
install.packages(c("bit64", "rjson", "Rcpp"))
從 GitHub 上下載 ravro 套件檔,直接安裝:
install.packages("ravro_1.0.4.tar.gz")
測試 RHadoop
首先啟動 NameNode 與 DataNode daemon:
start-dfs.sh
進入 R 環境:
R
載入並測試 rmr2 套件:
library(rmr2)
from.dfs(to.dfs(1:100))
from.dfs(mapreduce(to.dfs(1:100)))
載入並測試 rhdfs 套件:
library(rhdfs)
hdfs.init()
hdfs.ls("/")
載入並測試 rhbase 套件:
library(rhbase)
hb.init()
hb.list.tables()
問題與解決方法
我在測試 rhdfs 時,會出現這樣的問題:
17/04/24 14:37:44 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
StackOverflow 上有一些解決方法,我是在 .bashrc 中加入一行設定來解決:
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HADOOP_HOME/lib/native
rmr2 的 MapReduce 範例
以下介紹如何使用 R 的 rmr2 套件,撰寫一些簡單的 MapReduce 範例。
Map 函數用法
MapReduce 的 Map 結構與 R 的 lapply 函數相當類似,首先我們來看一個最簡單的 lapply 範例:
small.ints <- 1:1000
sapply(small.ints, function(x) x^2)
上面這個範例會計算 small.ints 中每個數值的平方,我們可以使用 MapReduce 改寫這個範例:
library(rmr2)
# 將資料儲存至 HDFS 檔案系統
small.ints <- to.dfs(1:1000)
# 以 MapReduce 計算每個數的平方
output <- mapreduce(
input = small.ints,
map = function(k, v) cbind(v, v^2))
# 從 HDFS 檔案系統取回計算結果
from.dfs(output)
to.dfs 函數會將資料存放進 HDFS 檔案系統中,若使用者未指定在 HDFS 中的存放位置,則 to.dfs 會自動將資料儲存在一個暫存的 HDFS 檔案中,使用完畢之後就自動刪除。to.dfs 的傳回值是一個特殊的 big data 物件,其中包含了資料儲存位置的資訊,我們可以將其儲存於 R 的變數當中,並且傳給其他 rmr2 套件的函數使用。
第二行的 mapreduce 就是執行 MapReduce 的函數,其 input 參數是指定資料的輸入來源,可以使用 to.dfs 的輸出、檔案路徑、或是兩者混合的列表(list)。
mapreduce 的 map 參數就是指定 map 動作的 R 函數,其必須符合兩個條件:
- 此函數包含兩個輸入參數,分別為 key 與 value。
- 此函數的傳回值必須是由
keyval函數所產生的鍵值對應,或是NULL。
若在不需要 shuffle 的情況(沒有 reduce),map 函數可以直接傳回普通的 R 變數 x,這時傳回值會被自動轉為 keyval(NULL, x)。
這個範例中並沒有使用到 Reduce,所以 Map 所產生的輸出就會作為整個 MapReduce 的結果。
mapreduce 函數預設也是會傳回一個 big data 物件,我們可以使用 from.dfs 將資料從 HDFS 中取回,而在取回資料時也要自己注意資料的大小,若資料過大、超過記憶體的大小,取回時就會發生問題。
to.dfs是將記憶體中的資料寫入 HDFS 檔案系統,並不是一個適合處理巨量資料的函數,在實務上若要將巨量資料放進 HDFS,可以考慮 Flume 或 Sqoop 這類的工具。
Map 與 Reduce 函數用法
上面的範例中我們只有使用到 Map,接下來要再加入一個 Reduce,而加入 Reduce 的情況就跟 R 的 tapply 比較類似,以下是一個簡單的 tapply 範例:
groups <- rbinom(32, n = 50, prob = 0.4)
tapply(groups, groups, length)
6 7 9 10 11 12 13 14 15 16 18 20 22 1 1 2 3 8 5 8 10 4 4 2 1 1
這了例子是產生 32 個二項分配的隨機亂數,並計算每個值的出現次數。接著我們使用 MapReduce 改寫這段程式碼:
# 將資料儲存至 HDFS 檔案系統
groups.dfs <- to.dfs(groups)
# 以 MapReduce 計算每個亂數的出現次數
output <- mapreduce(
input = groups.dfs,
map = function(., v) keyval(v, 1),
reduce = function(k, vv) keyval(k, length(vv)))
# 從 HDFS 檔案系統取回計算結果
from.dfs(output)
$key [1] 18 20 6 22 7 9 10 11 12 13 14 15 16 $val [1] 2 1 1 1 1 2 3 8 5 8 10 4 4
這裡的 mapreduce 函數中多了一個 reduce 參數,此參數所指定的函數跟 map 差不多,也是一個由 keyval 函數所產生鍵值對應,或是 NULL,若傳回 R 的變數 x,則會自動轉為 keyval(NULL, x)。
這裡我們使用 Map 將每一個隨機亂數都對應到 1,然後在 Reduce 中計算每一個隨機亂數出現的次數,最後得到一張跟 tapply 範例一樣的結果。
計算字數
計算字數(word count)的範例在 MapReduce 中算是一個最簡單的 hello world 程式,我們以這一小段文字作為測試用的輸入檔案。
This is a book That is a desk I have a book I have a desk
以下是用 R 的 mapreduce 來實做的 word count 程式:
# 資料來源
my.input <- "/path/to/input"
# 定義 Map 函數
wc.map <- function(., lines) {
k <- unlist(strsplit(lines, " "))
keyval(k, 1)
}
# 定義 Reduce 函數
wc.reduce <- function(word, counts) {
keyval(word, sum(counts))
}
# 以 MapReduce 計算字數
output <- mapreduce(
input = my.input,
input.format = "text",
map = wc.map,
reduce = wc.reduce,
combine = TRUE)
# 從 HDFS 檔案系統取回計算結果
from.dfs(output)
$key [1] "That" "have" "a" "book" "desk" "is" "This" "I" $val [1] 1 2 4 2 2 2 1 2
若要將結果直接寫入 HDFS 中,讓其他的程式讀取,可以這樣寫:
# 資料輸入與輸出路徑
my.input <- "/path/to/input"
my.output <- "/path/to/output"
# 以 MapReduce 計算字數
mapreduce(
input = my.input,
input.format = "text",
output = my.output,
output.format = "text",
map = wc.map,
reduce = wc.reduce
)
使用非 native 的輸出格式時,因為 bugs 的因素,要將 combine 拿掉才能正常執行,否則可能會出現類似這樣的錯誤:
Error: java.lang.RuntimeException: java.io.IOException: wrong key class: class org.apache.hadoop.io.Text is not class org.apache.hadoop.typedbytes.TypedBytesWritable at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:339) at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538) at org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:134) at org.apache.hadoop.mapred.Task$OldCombinerRunner.combine(Task.java:1585) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1634) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1486) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:460) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) Caused by: java.io.IOException: wrong key class: class org.apache.hadoop.io.Text is not class org.apache.hadoop.typedbytes.TypedBytesWritable at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:191) at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1315) at org.apache.hadoop.streaming.PipeMapRed$MROutputThread.run(PipeMapRed.java:384)
在 Linux shell 下查看 HDFS 中所儲存的結果:
hadoop fs -cat /path/to/output/*
That 1 have 2 a 4 book 2 desk 2 is 2 This 1 I 2
