rmr2 的 MapReduce 範例

以下介紹如何使用 R 的 rmr2 套件,撰寫一些簡單的 MapReduce 範例。

Map 函數用法

MapReduce 的 Map 結構與 R 的 lapply 函數相當類似,首先我們來看一個最簡單的 lapply 範例:

small.ints <- 1:1000
sapply(small.ints, function(x) x^2)

上面這個範例會計算 small.ints 中每個數值的平方,我們可以使用 MapReduce 改寫這個範例:

library(rmr2)
# 將資料儲存至 HDFS 檔案系統
small.ints <- to.dfs(1:1000)

# 以 MapReduce 計算每個數的平方
output <- mapreduce(
  input = small.ints,
  map = function(k, v) cbind(v, v^2))

# 從 HDFS 檔案系統取回計算結果
from.dfs(output)

to.dfs 函數會將資料存放進 HDFS 檔案系統中,若使用者未指定在 HDFS 中的存放位置,則 to.dfs 會自動將資料儲存在一個暫存的 HDFS 檔案中,使用完畢之後就自動刪除。to.dfs 的傳回值是一個特殊的 big data 物件,其中包含了資料儲存位置的資訊,我們可以將其儲存於 R 的變數當中,並且傳給其他 rmr2 套件的函數使用。

第二行的 mapreduce 就是執行 MapReduce 的函數,其 input 參數是指定資料的輸入來源,可以使用 to.dfs 的輸出、檔案路徑、或是兩者混合的列表(list)。

mapreducemap 參數就是指定 map 動作的 R 函數,其必須符合兩個條件:

  1. 此函數包含兩個輸入參數,分別為 key 與 value。
  2. 此函數的傳回值必須是由 keyval 函數所產生的鍵值對應,或是 NULL

若在不需要 shuffle 的情況(沒有 reduce),map 函數可以直接傳回普通的 R 變數 x,這時傳回值會被自動轉為 keyval(NULL, x)

這個範例中並沒有使用到 Reduce,所以 Map 所產生的輸出就會作為整個 MapReduce 的結果。

mapreduce 函數預設也是會傳回一個 big data 物件,我們可以使用 from.dfs 將資料從 HDFS 中取回,而在取回資料時也要自己注意資料的大小,若資料過大、超過記憶體的大小,取回時就會發生問題。

to.dfs 是將記憶體中的資料寫入 HDFS 檔案系統,並不是一個適合處理巨量資料的函數,在實務上若要將巨量資料放進 HDFS,可以考慮 Flume 或 Sqoop 這類的工具。

Map 與 Reduce 函數用法

上面的範例中我們只有使用到 Map,接下來要再加入一個 Reduce,而加入 Reduce 的情況就跟 R 的 tapply 比較類似,以下是一個簡單的 tapply 範例:

groups <- rbinom(32, n = 50, prob = 0.4)
tapply(groups, groups, length)
 6  7  9 10 11 12 13 14 15 16 18 20 22 
 1  1  2  3  8  5  8 10  4  4  2  1  1

這了例子是產生 32 個二項分配的隨機亂數,並計算每個值的出現次數。接著我們使用 MapReduce 改寫這段程式碼:

# 將資料儲存至 HDFS 檔案系統
groups.dfs <- to.dfs(groups)

# 以 MapReduce 計算每個亂數的出現次數
output <- mapreduce(
  input = groups.dfs,
  map = function(., v) keyval(v, 1),
  reduce = function(k, vv) keyval(k, length(vv)))

# 從 HDFS 檔案系統取回計算結果
from.dfs(output)
$key
 [1] 18 20  6 22  7  9 10 11 12 13 14 15 16

$val
 [1]  2  1  1  1  1  2  3  8  5  8 10  4  4

這裡的 mapreduce 函數中多了一個 reduce 參數,此參數所指定的函數跟 map 差不多,也是一個由 keyval 函數所產生鍵值對應,或是 NULL,若傳回 R 的變數 x,則會自動轉為 keyval(NULL, x)

這裡我們使用 Map 將每一個隨機亂數都對應到 1,然後在 Reduce 中計算每一個隨機亂數出現的次數,最後得到一張跟 tapply 範例一樣的結果。

計算字數

計算字數(word count)的範例在 MapReduce 中算是一個最簡單的 hello world 程式,我們以這一小段文字作為測試用的輸入檔案。

This is a book
That is a desk
I have a book
I have a desk

以下是用 R 的 mapreduce 來實做的 word count 程式:

# 資料來源
my.input <- "/path/to/input"

# 定義 Map 函數
wc.map <- function(., lines) {
  k <- unlist(strsplit(lines, " "))
  keyval(k, 1)
}

# 定義 Reduce 函數
wc.reduce <- function(word, counts) {
  keyval(word, sum(counts))
}

# 以 MapReduce 計算字數
output <- mapreduce(
  input = my.input,
  input.format = "text",
  map = wc.map,
  reduce = wc.reduce,
  combine = TRUE)

# 從 HDFS 檔案系統取回計算結果
from.dfs(output)
$key
[1] "That" "have" "a"    "book" "desk" "is"   "This" "I"   

$val
[1] 1 2 4 2 2 2 1 2

若要將結果直接寫入 HDFS 中,讓其他的程式讀取,可以這樣寫:

# 資料輸入與輸出路徑
my.input <- "/path/to/input"
my.output <- "/path/to/output"

# 以 MapReduce 計算字數
mapreduce(
  input = my.input,
  input.format = "text",
  output = my.output,
  output.format = "text",
  map = wc.map,
  reduce = wc.reduce
)

使用非 native 的輸出格式時,因為 bugs 的因素,要將 combine 拿掉才能正常執行,否則可能會出現類似這樣的錯誤:

Error: java.lang.RuntimeException: java.io.IOException: wrong key class: class org.apache.hadoop.io.Text is not class org.apache.hadoop.typedbytes.TypedBytesWritable
	at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:339)
	at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
	at org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:134)
	at org.apache.hadoop.mapred.Task$OldCombinerRunner.combine(Task.java:1585)
	at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1634)
	at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1486)
	at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:460)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: java.io.IOException: wrong key class: class org.apache.hadoop.io.Text is not class org.apache.hadoop.typedbytes.TypedBytesWritable
	at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:191)
	at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1315)
	at org.apache.hadoop.streaming.PipeMapRed$MROutputThread.run(PipeMapRed.java:384)

在 Linux shell 下查看 HDFS 中所儲存的結果:

hadoop fs -cat /path/to/output/*
That	1
have	2
a	4
book	2
desk	2
is	2
This	1
I	2

參考資料:RDataMiningGitHubHadoop.R粉丝日志