專業(yè)公司網(wǎng)站設(shè)計(jì)企業(yè)seo網(wǎng)絡(luò)推廣知識
1、背景介紹? ? ? ??
????????Louvain是大規(guī)模圖譜的譜聚類算法,引入模塊度的概念分二階段進(jìn)行聚類,直到收斂為止。分布式的代碼可以在如下網(wǎng)址進(jìn)行下載。
GitHub - Sotera/spark-distributed-louvain-modularity: Spark / graphX implementation of the distributed louvain modularity algorithm
? 該代碼依賴的spark-core和spark-graphx、scala-lang是2.10版本,采用的gradle的進(jìn)行打包,也可以采用maven進(jìn)行打包,解決相關(guān)的依賴問題之后,本地模式可以很快跑通。但是轉(zhuǎn)向集群的時候,發(fā)現(xiàn)集群的spark的scala版本是2.12,我采用的是maven的scala編譯的版本是2.10, 編譯用到的scala和運(yùn)行環(huán)境的scala版本不一致,結(jié)果無法進(jìn)行spark集群模式的運(yùn)行。
2、LouvainMethod的升級之路
????????首先更改環(huán)境,即把louvain的代碼依賴保持2.10,把spark的scala版本改成2.10,但是這樣會影響其他任務(wù)的執(zhí)行,其他任務(wù)可能依賴2.12的版本。因此,踏上了LouvainMethod的升級之路,即由2.10升級到2.12。
? ? ? ? 將項(xiàng)目依賴的版本和編譯的scala版本改為2.12之后,發(fā)現(xiàn)在新的高版本的spark-graphx_2.12版本里Graph對象沒有了mapReduceTriplet方法,通過查找發(fā)現(xiàn)該方法在2.12版本的GraphXUtils類里,以一個私有方法存在,只能在包graphx下被訪問,對外部不可見,因此首先想到的是通過反射機(jī)制對該私有方法進(jìn)行訪問,參照了如下的方法:
? ? ? ? ?在任意scala對象中調(diào)用私有方法 - 問答 - 騰訊云開發(fā)者社區(qū)-騰訊云
代碼調(diào)試后,私有方法帶有泛類型參數(shù)和普通參數(shù),可以正常被反射出來,然而在調(diào)用的時候,始終報 wrong-number-of-arguments的問題。原因還沒有查到。繼而通過高版本的api是實(shí)現(xiàn)低版本的mapReduceTriplets方法。? ?參照該文檔??GraphX - Spark 3.4.1 Documentation? ?的api接口含義,注意到新版2.12的Graph里aggregateMessage方法和低版本的mapReduceTriplets返回值一致,參數(shù)類型有diff,高版本的參數(shù)是EdgeContext,低版本的是EdgeTriplet,高版本通過sendToDst和sendToSrc對低版本進(jìn)行了簡化,使用功能更強(qiáng)大,因此嘗試用aggregateMessage實(shí)現(xiàn)mapReduceTriplets。
val nodeWeightMapFunc = (e:EdgeTriplet[VD,Long]) => Iterator((e.srcId,e.attr), (e.dstId,e.attr))
val nodeWeightReduceFunc = (e1:Long,e2:Long) => e1+e2
轉(zhuǎn)化為:
def nodeWeightMapFunc(e:EdgeContext[VD, Long, Long]) {
e.sendToDst(e.attr)
e.sendToSrc(e.attr)
}
Msg與reduceFunc的返回值保持一致。
通過如下方式進(jìn)行調(diào)用:val nodeWeights = graph.aggregateMessages[Long](nodeWeightMapFunc,nodeWeightReduceFunc)
sendMsg的低版本如下:
private def sendMsg(et:EdgeTriplet[VertexState,Long]) = {
? ? val m1 = (et.dstId,Map((et.srcAttr.community,et.srcAttr.communitySigmaTot)->et.attr))
? ?val m2 = (et.srcId,Map((et.dstAttr.community,et.dstAttr.communitySigmaTot)->et.attr))
? ?Iterator(m1, m2)
}
升級為:
private def sendMsg(et: EdgeContext[VertexState, Long, Map[(Long,Long),Long]]) = {
et.sendToSrc(Map((et.dstAttr.community, et.dstAttr.communitySigmaTot) -> et.attr))
et.sendToDst(Map((et.srcAttr.community, et.srcAttr.communitySigmaTot) -> et.attr))
}
4、在集群運(yùn)行相關(guān)jar的及運(yùn)行腳本
?