本篇敘述如何使用 RHadoop 的 MapReduce 實作 k-means 分群演算法。

在架設好 RHadoop 計算環境之後,接著就可以使用 MapReduce 撰寫各種分析程式,以下是用 MapReduce 實作 k-means 的 R 程式碼。


這個 k-means 範例只是用來示範 MapReduce 的實作方法,程式碼比較簡單,不適合在實際的應用上使用。

# 載入 rmr2 套件
library(rmr2)

# 使用 MapReduce 實作 k-means 分群演算法
kmeans.mr <- function(
    P, # 點座標矩陣,每一列代表一個點
    num.clusters, # 群集的數量
    num.iter, # 迭代次數
    combine,
    in.memory.combine) {

  # 計算每個中心點座標 C 與每一點 P 的距離
  dist.fun <- function(C, P) {
    apply(
      C,
      1,
      function(x) colSums((t(P) - x)^2))
  }

  # Map 函數,分散計算各點與所有中心點的距離
  kmeans.map <- function(., P) {
    nearest = {
      if(is.null(C)) {
        # 若沒有初始化中心點,則隨機產生
        sample(
          1:num.clusters,
          nrow(P),
          replace = TRUE)
      } else {
        # 計算部分點與所有中心點的距離
        D = dist.fun(C, P)
        # 挑出最近的中心點編號
        nearest = max.col(-D)
      }
    }
    if(!(combine || in.memory.combine))
      keyval(nearest, P)
    else
      # 加入一個都是常數 1 的一行,記錄資料筆數,方便之後計算平均
      keyval(nearest, cbind(1, P))
  }

  # Reduce 函數,分散計算各點與所有中心點的距離
  kmeans.reduce = {
    if (!(combine || in.memory.combine) ) {
      # 計算每一行的平均,得到新的中心點位置
      function(., P) t(as.matrix(apply(P, 2, mean)))
    } else {
      # 計算每一行的總和,用於 reduce 與 combine
      function(k, P)
        keyval(
          k,
          t(as.matrix(apply(P, 2, sum))))
    }
  }

  # 使用 MapReduce 迭代計算中心點 C
  C <- NULL
  for (i in 1:num.iter ) {
    C <- values(from.dfs(mapreduce(
      P,
      map = kmeans.map,
      reduce = kmeans.reduce)))
    # 若有使用 combine,則在此用總和計算平均
    if(combine || in.memory.combine)
      C = C[, -1]/C[, 1]

    # 處理中心點消失問題
    if(nrow(C) < num.clusters) {
      C <- rbind(
        C,
        matrix(
          rnorm(
            (num.clusters -
               nrow(C)) * nrow(C)),
          ncol = nrow(C)) %*% C)
    }
  }
  # 傳回結果
  C
}

產生測試用的資料:

# 設定亂數種子
set.seed(0)

# 產生測試座標點資料
P <- do.call(rbind, rep(
  list(matrix(rnorm(10, sd = 10), ncol = 2)), 20)) +
  matrix(rnorm(200), ncol = 2)

若要查看資料,可以用 ggplot2 將資料點畫出來看:

# 查看測試資料
library(ggplot2)
qplot(V1, V2, data = as.data.frame(P))

K-Means 測試資料

執行 MapReduce 版本的 k-means:

kmeans.mr(
  to.dfs(P),
  num.clusters = 4,
  num.iter = 10,
  combine = FALSE,
  in.memory.combine = FALSE)
          [,1]       [,2]
[1,] 12.931487 -14.925114
[2,] -3.331972  -9.848412
[3,] 12.946355  -1.409949
[4,]  3.997781  23.809110

若需要測試程式的正確性,可以使用單機版的 MapReduce 來測試:

rmr.options(backend = "local")
kmeans.mr(
  to.dfs(P),
  num.clusters = 4,
  num.iter = 10,
  combine = FALSE,
  in.memory.combine = FALSE)

參考資料:GitHubedurekaQuora