rmr2
的 MapReduce 範例
以下介紹如何使用 R 的 rmr2
套件,撰寫一些簡單的 MapReduce 範例。
Map 函數用法
MapReduce 的 Map 結構與 R 的 lapply
函數相當類似,首先我們來看一個最簡單的 lapply
範例:
small.ints <- 1:1000 sapply(small.ints, function(x) x^2)
上面這個範例會計算 small.ints
中每個數值的平方,我們可以使用 MapReduce 改寫這個範例:
library(rmr2) # 將資料儲存至 HDFS 檔案系統 small.ints <- to.dfs(1:1000) # 以 MapReduce 計算每個數的平方 output <- mapreduce( input = small.ints, map = function(k, v) cbind(v, v^2)) # 從 HDFS 檔案系統取回計算結果 from.dfs(output)
to.dfs
函數會將資料存放進 HDFS 檔案系統中,若使用者未指定在 HDFS 中的存放位置,則 to.dfs
會自動將資料儲存在一個暫存的 HDFS 檔案中,使用完畢之後就自動刪除。to.dfs
的傳回值是一個特殊的 big data 物件,其中包含了資料儲存位置的資訊,我們可以將其儲存於 R 的變數當中,並且傳給其他 rmr2
套件的函數使用。
第二行的 mapreduce
就是執行 MapReduce 的函數,其 input
參數是指定資料的輸入來源,可以使用 to.dfs
的輸出、檔案路徑、或是兩者混合的列表(list)。
mapreduce
的 map
參數就是指定 map 動作的 R 函數,其必須符合兩個條件:
- 此函數包含兩個輸入參數,分別為 key 與 value。
- 此函數的傳回值必須是由
keyval
函數所產生的鍵值對應,或是NULL
。
若在不需要 shuffle 的情況(沒有 reduce),map
函數可以直接傳回普通的 R 變數 x
,這時傳回值會被自動轉為 keyval(NULL, x)
。
這個範例中並沒有使用到 Reduce,所以 Map 所產生的輸出就會作為整個 MapReduce 的結果。
mapreduce
函數預設也是會傳回一個 big data 物件,我們可以使用 from.dfs
將資料從 HDFS 中取回,而在取回資料時也要自己注意資料的大小,若資料過大、超過記憶體的大小,取回時就會發生問題。
to.dfs
是將記憶體中的資料寫入 HDFS 檔案系統,並不是一個適合處理巨量資料的函數,在實務上若要將巨量資料放進 HDFS,可以考慮 Flume 或 Sqoop 這類的工具。Map 與 Reduce 函數用法
上面的範例中我們只有使用到 Map,接下來要再加入一個 Reduce,而加入 Reduce 的情況就跟 R 的 tapply
比較類似,以下是一個簡單的 tapply
範例:
groups <- rbinom(32, n = 50, prob = 0.4) tapply(groups, groups, length)
6 7 9 10 11 12 13 14 15 16 18 20 22 1 1 2 3 8 5 8 10 4 4 2 1 1
這了例子是產生 32
個二項分配的隨機亂數,並計算每個值的出現次數。接著我們使用 MapReduce 改寫這段程式碼:
# 將資料儲存至 HDFS 檔案系統 groups.dfs <- to.dfs(groups) # 以 MapReduce 計算每個亂數的出現次數 output <- mapreduce( input = groups.dfs, map = function(., v) keyval(v, 1), reduce = function(k, vv) keyval(k, length(vv))) # 從 HDFS 檔案系統取回計算結果 from.dfs(output)
$key [1] 18 20 6 22 7 9 10 11 12 13 14 15 16 $val [1] 2 1 1 1 1 2 3 8 5 8 10 4 4
這裡的 mapreduce
函數中多了一個 reduce
參數,此參數所指定的函數跟 map
差不多,也是一個由 keyval
函數所產生鍵值對應,或是 NULL
,若傳回 R 的變數 x
,則會自動轉為 keyval(NULL, x)
。
這裡我們使用 Map 將每一個隨機亂數都對應到 1
,然後在 Reduce 中計算每一個隨機亂數出現的次數,最後得到一張跟 tapply
範例一樣的結果。
計算字數
計算字數(word count)的範例在 MapReduce 中算是一個最簡單的 hello world 程式,我們以這一小段文字作為測試用的輸入檔案。
This is a book That is a desk I have a book I have a desk
以下是用 R 的 mapreduce
來實做的 word count 程式:
# 資料來源 my.input <- "/path/to/input" # 定義 Map 函數 wc.map <- function(., lines) { k <- unlist(strsplit(lines, " ")) keyval(k, 1) } # 定義 Reduce 函數 wc.reduce <- function(word, counts) { keyval(word, sum(counts)) } # 以 MapReduce 計算字數 output <- mapreduce( input = my.input, input.format = "text", map = wc.map, reduce = wc.reduce, combine = TRUE) # 從 HDFS 檔案系統取回計算結果 from.dfs(output)
$key [1] "That" "have" "a" "book" "desk" "is" "This" "I" $val [1] 1 2 4 2 2 2 1 2
若要將結果直接寫入 HDFS 中,讓其他的程式讀取,可以這樣寫:
# 資料輸入與輸出路徑 my.input <- "/path/to/input" my.output <- "/path/to/output" # 以 MapReduce 計算字數 mapreduce( input = my.input, input.format = "text", output = my.output, output.format = "text", map = wc.map, reduce = wc.reduce )
使用非 native
的輸出格式時,因為 bugs 的因素,要將 combine
拿掉才能正常執行,否則可能會出現類似這樣的錯誤:
Error: java.lang.RuntimeException: java.io.IOException: wrong key class: class org.apache.hadoop.io.Text is not class org.apache.hadoop.typedbytes.TypedBytesWritable at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:339) at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538) at org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:134) at org.apache.hadoop.mapred.Task$OldCombinerRunner.combine(Task.java:1585) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1634) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1486) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:460) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) Caused by: java.io.IOException: wrong key class: class org.apache.hadoop.io.Text is not class org.apache.hadoop.typedbytes.TypedBytesWritable at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:191) at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1315) at org.apache.hadoop.streaming.PipeMapRed$MROutputThread.run(PipeMapRed.java:384)
在 Linux shell 下查看 HDFS 中所儲存的結果:
hadoop fs -cat /path/to/output/*
That 1 have 2 a 4 book 2 desk 2 is 2 This 1 I 2
參考資料:RDataMining、GitHub、Hadoop.R、粉丝日志
繼續閱讀: 12