本篇敘述如何使用 RHadoop 的 MapReduce 實作 k-means 分群演算法。
在架設好 RHadoop 計算環境之後,接著就可以使用 MapReduce 撰寫各種分析程式,以下是用 MapReduce 實作 k-means 的 R 程式碼。
# 載入 rmr2 套件 library(rmr2) # 使用 MapReduce 實作 k-means 分群演算法 kmeans.mr <- function( P, # 點座標矩陣,每一列代表一個點 num.clusters, # 群集的數量 num.iter, # 迭代次數 combine, in.memory.combine) { # 計算每個中心點座標 C 與每一點 P 的距離 dist.fun <- function(C, P) { apply( C, 1, function(x) colSums((t(P) - x)^2)) } # Map 函數,分散計算各點與所有中心點的距離 kmeans.map <- function(., P) { nearest = { if(is.null(C)) { # 若沒有初始化中心點,則隨機產生 sample( 1:num.clusters, nrow(P), replace = TRUE) } else { # 計算部分點與所有中心點的距離 D = dist.fun(C, P) # 挑出最近的中心點編號 nearest = max.col(-D) } } if(!(combine || in.memory.combine)) keyval(nearest, P) else # 加入一個都是常數 1 的一行,記錄資料筆數,方便之後計算平均 keyval(nearest, cbind(1, P)) } # Reduce 函數,分散計算各點與所有中心點的距離 kmeans.reduce = { if (!(combine || in.memory.combine) ) { # 計算每一行的平均,得到新的中心點位置 function(., P) t(as.matrix(apply(P, 2, mean))) } else { # 計算每一行的總和,用於 reduce 與 combine function(k, P) keyval( k, t(as.matrix(apply(P, 2, sum)))) } } # 使用 MapReduce 迭代計算中心點 C C <- NULL for (i in 1:num.iter ) { C <- values(from.dfs(mapreduce( P, map = kmeans.map, reduce = kmeans.reduce))) # 若有使用 combine,則在此用總和計算平均 if(combine || in.memory.combine) C = C[, -1]/C[, 1] # 處理中心點消失問題 if(nrow(C) < num.clusters) { C <- rbind( C, matrix( rnorm( (num.clusters - nrow(C)) * nrow(C)), ncol = nrow(C)) %*% C) } } # 傳回結果 C }
產生測試用的資料:
# 設定亂數種子 set.seed(0) # 產生測試座標點資料 P <- do.call(rbind, rep( list(matrix(rnorm(10, sd = 10), ncol = 2)), 20)) + matrix(rnorm(200), ncol = 2)
若要查看資料,可以用 ggplot2
將資料點畫出來看:
# 查看測試資料 library(ggplot2) qplot(V1, V2, data = as.data.frame(P))
執行 MapReduce 版本的 k-means:
kmeans.mr( to.dfs(P), num.clusters = 4, num.iter = 10, combine = FALSE, in.memory.combine = FALSE)
[,1] [,2] [1,] 12.931487 -14.925114 [2,] -3.331972 -9.848412 [3,] 12.946355 -1.409949 [4,] 3.997781 23.809110
若需要測試程式的正確性,可以使用單機版的 MapReduce 來測試:
rmr.options(backend = "local") kmeans.mr( to.dfs(P), num.clusters = 4, num.iter = 10, combine = FALSE, in.memory.combine = FALSE)