這裡示範如何使用 R 與 Hadoop MapReduce 分析 Stack Exchange 網站的傾印資料。

Stack Exchange 是一個程式設計領域非常知名的問答網站,上面有非常多具有參考價值的問題解答,Stack Exchange 也將其整個網站的內容傾印成 XML 檔,以創用 CC 授權的方式開放出來,放在 archive.org 提供大家免費下載使用。


以下我們將使用 R 與 Hadoop MapReduce 示範如何處理與分析 Stack Exchange 網站的傾印資料。

準備資料

Stack Exchange 所提供的資料有好多種,我們這裡僅選擇主要的文章資料來進行分析。參考 readme.txt 的說明後,我們可以知道文章的資料儲存於 **posts**.xml 中,而其檔案格式如下:

- Id
- PostTypeId
- 1: Question
- 2: Answer
- ParentID (only present if PostTypeId is 2)
- AcceptedAnswerId (only present if PostTypeId is 1)
- CreationDate
- Score
- ViewCount
- Body
- OwnerUserId
- LastEditorUserId
- LastEditorDisplayName="Jeff Atwood"
- LastEditDate="2009-03-05T22:28:34.823"
- LastActivityDate="2009-03-11T12:51:01.480"
- CommunityOwnedDate="2009-03-11T12:51:01.480"
- ClosedDate="2009-03-11T12:51:01.480"
- Title=
- Tags=
- AnswerCount
- CommentCount
- FavoriteCount

由於 Stack Exchange 旗下有非常多的網站,我們選擇 raspberrypi.stackexchange.com 這個專門討論樹莓派(Raspberry Pi)的網站作為示範。

下載的 raspberrypi.stackexchange.com 傾印資料檔:

wget https://archive.org/download/stackexchange/raspberrypi.stackexchange.com.7z

使用 7zip 解壓縮:

7za x raspberrypi.stackexchange.com.7z

檢查 Posts.xml 的檔案格式:

head Posts.xml
<?xml version="1.0" encoding="utf-8"?>
<posts>
  <row Id="1" PostTypeId="1" AcceptedAnswerId="225" CreationDate="2012-06-12T19:45:29.157" Score="71" ViewCount="46943" Body="&lt;p&gt;I already asked this &lt;a href=&quot;http://stackoverflow.com/questions/10973020/cross-compilation-for-raspberry-pi-in-gcc-where-to-start&quot;&gt;question&lt;/a&gt; on Stack Overflow, but  I would like to know if anyone managed to build a GCC 4.7 toolchain for ARM cross-compilation (for a x86/x86-64 Linux host). There are many instructins for building GCC from source and many available cross-compilers for pre-4.7 GCC versions, just not the latest one.&lt;/p&gt;&#xA;&#xA;&lt;p&gt;Compiling on Rasp Pi itself works fine but is just a bit too slow for practical purposes.&lt;/p&gt;&#xA;&#xA;&lt;p&gt;I am eager to get compiling and I would like to use the latest and the best tools.&lt;/p&gt;&#xA;" OwnerUserId="13" LastEditorUserId="86" LastEditDate="2012-07-18T21:17:22.913" LastActivityDate="2016-02-20T19:57:27.583" Title="How do I build a GCC 4.7 toolchain for cross-compiling?" Tags="&lt;software-development&gt;&lt;cross-compilation&gt;&lt;gcc&gt;" AnswerCount="8" CommentCount="3" FavoriteCount="35" />
  <row Id="2" PostTypeId="1" CreationDate="2012-06-12T19:52:39.397" Score="14" ViewCount="935" Body="&lt;p&gt;I have come across the &lt;a href=&quot;http://elinux.org/RPi_Advanced_Setup&quot; rel=&quot;nofollow noreferrer&quot;&gt;RPi Advanced Setup&lt;/a&gt; but there are incomplete steps. For example, where can I find &lt;code&gt;mkcard.txt&lt;/code&gt; from the &lt;strong&gt;Advanced SD card setup&lt;/strong&gt; section? Do I just reuse the one from the &lt;a href=&quot;http://downloads.angstrom-distribution.org/demo/beaglebone/mkcard.txt&quot; rel=&quot;nofollow noreferrer&quot;&gt;BeagleBone demo site&lt;/a&gt;? Then there is an important steps section under &lt;strong&gt;Finally booting GNU/Linux&lt;/strong&gt; that only contains the text &quot;to be completed&quot;. &lt;/p&gt;&#xA;&#xA;&lt;p&gt;Does anybody know what additional things need to be done to boot into a Debian release for example?&lt;/p&gt;&#xA;" OwnerUserId="22" LastEditorUserId="46969" LastEditDate="2017-02-17T16:52:52.020" LastActivityDate="2017-02-17T16:52:52.020" Title="On the RPi Advanced setup page, what does mkcard.txt do?" Tags="&lt;setup&gt;" AnswerCount="2" CommentCount="0" />
  <row Id="3" PostTypeId="1" AcceptedAnswerId="4" CreationDate="2012-06-12T19:53:29.027" Score="6" ViewCount="1261" Body="&lt;p&gt;Which distributors and partners are officially authorized to sell Raspberry Pi's?&lt;/p&gt;&#xA;" OwnerUserId="8" LastEditorUserId="8" LastEditDate="2012-06-14T17:35:31.210" LastActivityDate="2012-06-14T17:35:31.210" Title="Which distributors are authorized to sell device units?" Tags="&lt;purchasing&gt;" AnswerCount="6" CommentCount="0" ClosedDate="2012-06-14T23:19:43.213" />
  <row Id="4" PostTypeId="2" ParentId="3" CreationDate="2012-06-12T19:55:24.513" Score="16" Body="&lt;p&gt;&lt;a href=&quot;http://www.farnell.com/&quot; rel=&quot;nofollow&quot;&gt;Farnell&lt;/a&gt; and &lt;a href=&quot;http://uk.rs-online.com/web/&quot; rel=&quot;nofollow&quot;&gt;RS Components&lt;/a&gt; are the only companies listed on &lt;a href=&quot;http://www.raspberrypi.org/faqs&quot; rel=&quot;nofollow&quot;&gt;RaspberryPi.org&lt;/a&gt; authorized to sell the official devices so far.&lt;/p&gt;&#xA;&#xA;&lt;p&gt;Additionally, their is an official &lt;a href=&quot;http://elinux.org/RPi_Buying_Links_By_Country&quot; rel=&quot;nofollow&quot;&gt;Wiki page&lt;/a&gt; of additional sources that the RPi can be purchased from, outside the UK. (Thanks &lt;a href=&quot;http://raspberrypi.stackexchange.com/a/9/8&quot;&gt;@Shane Hudson&lt;/a&gt; for this information!)&lt;/p&gt;&#xA;" OwnerUserId="8" LastEditorUserId="8" LastEditDate="2012-06-13T13:38:42.810" LastActivityDate="2012-06-13T13:38:42.810" CommentCount="1" />
  <row Id="5" PostTypeId="1" AcceptedAnswerId="11" CreationDate="2012-06-12T19:55:35.857" Score="58" ViewCount="110720" Body="&lt;p&gt;The site &lt;a href=&quot;http://www.raspberrypi.org/quick-start-guide&quot;&gt;http://www.raspberrypi.org/quick-start-guide&lt;/a&gt; says:&lt;/p&gt;&#xA;&#xA;&lt;blockquote&gt;&#xA;  &lt;p&gt;You will need an SD card with an operating system preloaded before you&#xA;  can boot the Raspberry Pi. A brand-name (not generic) Class 4 card of&#xA;  4GB or more is recommended. To obtain an SD card image, and for&#xA;  instructions on how to flash an SD card from a Linux or Windows PC,&#xA;  please refer to &lt;a href=&quot;http://www.raspberrypi.org/downloads&quot;&gt;http://www.raspberrypi.org/downloads&lt;/a&gt;.&lt;/p&gt;&#xA;&lt;/blockquote&gt;&#xA;&#xA;&lt;p&gt;Which SD cards are compatible with the Raspberry Pi?&lt;/p&gt;&#xA;" OwnerUserId="37" LastEditorUserId="90" LastEditDate="2012-06-13T14:53:21.153" LastActivityDate="2015-10-01T08:26:45.943" Title="Which SD cards are compatible?" Tags="&lt;sd-card&gt;&lt;compatibility&gt;" AnswerCount="1" CommentCount="3" FavoriteCount="11" />
  <row Id="6" PostTypeId="2" ParentId="3" CreationDate="2012-06-12T19:56:55.850" Score="5" Body="&lt;p&gt;You need to register your interest before you can purchase:&lt;/p&gt;&#xA;&#xA;&lt;p&gt;&lt;a href=&quot;http://uk.rs-online.com/web/generalDisplay.html?id=raspberrypi&quot;&gt;http://uk.rs-online.com/web/generalDisplay.html?id=raspberrypi&lt;/a&gt;&lt;/p&gt;&#xA;&#xA;&lt;p&gt;&lt;a href=&quot;http://uk.farnell.com/jsp/bespoke/bespoke7.jsp?ICID=I-RASP-HPBLOF-0015&amp;amp;bespokepage=farnell/en_UK/promotions/raspberryPi.jsp&quot;&gt;http://uk.farnell.com/jsp/bespoke/bespoke7.jsp?ICID=I-RASP-HPBLOF-0015&amp;amp;bespokepage=farnell/en_UK/promotions/raspberryPi.jsp&lt;/a&gt;&lt;/p&gt;&#xA;" OwnerUserId="30" LastActivityDate="2012-06-12T19:56:55.850" CommentCount="0" />
[略]

這個 xml 檔的結構非常簡單,每一筆資料都是用一行 row 表示,所以我們可以直接使用 MapReduce 逐行解析,不需要經過特別的處理。

將資料以 bzip2 壓縮,節省磁碟空間:

bzip2 -z Posts.xml

將資料放進 HDFS 檔案系統:

hadoop fs -mkdir raspberrypi
hadoop fs -put Posts.xml.bz2 raspberrypi
hadoop fs -ls raspberrypi
Found 1 items
-rw-r--r--   3 s00ser01 s00ser00   11444598 2017-05-04 11:00 raspberrypi/Posts.xml.bz2

撰寫 MapReduce 程式

R 與 Hadoop 配合使用的方式主要有兩種,一種是使用 RHadoop 的 rmr2 套件,完全在 R 中執行 MapReduce;另外一種是使用 Hadoop Streaming 的方式將資料傳給 R 來處理。

RHadoop 套件(rmr2

這裡我們將分析每篇文章,計算每個作者所獲的 score 總值。

這是使用 rmr2 實作 MapReduce 的 R 程式碼:

library(rmr2)

# 資料來源
my.input <- "raspberrypi/Posts.xml.bz2"

# 定義 Map 函數
se.map <- function(., lines) {
  score <- c()
  owner.uid <- c()
  for (text in lines) {
    # 取出 Score 值
    pattern.s <- '(?<=Score=")[[:digit:]]+(?=")'
    m.s <- regexpr(pattern.s, text, perl = TRUE)
    if ( m.s == -1 ) next

    # 取出 OwnerUserId 值
    pattern.o <- '(?<=OwnerUserId=")[[:digit:]]+(?=")'
    m.o <- regexpr(pattern.o, text, perl = TRUE)
    if ( m.o == -1 ) next

    score <- c(score, regmatches(text, m.s))
    owner.uid <- c(owner.uid, regmatches(text, m.o))
  }
  keyval(as.numeric(owner.uid), as.numeric(score))
}

# 定義 Reduce 函數
se.reduce <- function(owner.uid, score) {
  keyval(owner.uid, sum(score))
}

# 以 MapReduce 計算使用者的 Score 總值
output <- mapreduce(
  input = my.input,
  input.format = "text",
  map = se.map,
  reduce = se.reduce,
  combine = TRUE
)

# 從 HDFS 檔案系統取回計算結果
result.list <- from.dfs(output)
result.df <- data.frame(owner.uid = result.list$key, score = result.list$val)
result.df <- result.df[order(result.df$score, decreasing = TRUE),]

# 取出 Score 最高的前 20 名使用者
result.df.top20 <- result.df[1:20,]
result.df.top20
      owner.uid score
74         5538  2741
13393     13650  2410
8439         56  2164
947          40  1916
1070       8697  1307
1858         86  1157
7498        894  1157
14154       590   938
3705         93   918
6530         13   752
4673       1070   729
14253      7274   726
4775       8496   719
4932      19949   678
13156        35   639
14569     32756   601
1986       8631   533
7479         54   495
2796        181   458
13157        68   440

R 與 Hadoop Streaming

使用 R 與 Hadoop Streaming API 來實作的版本,概念上大同小異,這是 mapper.R

#! /usr/bin/env Rscript
con <- file("stdin", open = "r")
while (length(text <- readLines(con, n = 1, warn = FALSE)) > 0) {
  # 取出 Score 值
  pattern.s <- '(?<=Score=")[[:digit:]]+(?=")'
  m.s <- regexpr(pattern.s, text, perl = TRUE)
  if ( m.s == -1 ) next
  # 取出 OwnerUserId 值
  pattern.o <- '(?<=OwnerUserId=")[[:digit:]]+(?=")'
  m.o <- regexpr(pattern.o, text, perl = TRUE)
  if ( m.o == -1 ) next

  score <- regmatches(text, m.s)
  owner.uid <- regmatches(text, m.o)

  cat(owner.uid, "\t", score, "\n", sep="")
}
close(con)

這是 reducer.R

#! /usr/bin/env Rscript

# 移除空白函數
trimWhiteSpace <- function(line) gsub("(^ +)|( +$)", "", line)

# 分割欄位函數
splitLine <- function(line) {
  val <- unlist(strsplit(line, "\t"))
  list(owner.uid = as.character(val[1]), score = as.integer(val[2]))
}

# 使用環境空間作為雜湊(hash)
env <- new.env(hash = TRUE)

con <- file("stdin", open = "r")
while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) {
  line <- trimWhiteSpace(line)
  split <- splitLine(line)
  owner.uid <- split$owner.uid
  score <- split$score
  # 檢查 owner.uid 是否已經存在
  if (exists(owner.uid, envir = env, inherits = FALSE)) {
    # 更新既有的 owner.uid 變數
    old.score <- get(owner.uid, envir = env)
    assign(owner.uid, old.score + score, envir = env)
  } else {
    # 建立名稱為 owner.uid 的變數
    assign(owner.uid, score, envir = env)
  }
}
close(con)

# 輸出結果
for (o in ls(env, all = TRUE))
    cat(o, "\t", get(o, envir = env), "\n", sep = "")

執行:

hadoop jar hadoop-streaming-2.6.0-cdh5.5.1.jar \
  -file ./mapper.R \
  -file ./reducer.R \
  -mapper ./mapper.R \
  -reducer ./reducer.R \
  -numReduceTasks 1 \
  -input raspberrypi/Posts.xml.bz2 \
  -output raspberrypi_output

將資料從 HDFS 取出,並整理出前 20 名使用者:

hadoop fs -cat raspberrypi_output/* | sort -r -n -k 2 | head -n 20

輸出結果為:

5538	2741
13650	2410
56	2164
40	1916
8697	1307
894	1157
86	1157
590	938
93	918
13	752
1070	729
7274	726
8496	719
19949	678
35	639
32756	601
8631	533
54	495
181	458
68	440