寧波企業(yè)網(wǎng)站開(kāi)發(fā)百度seo教程
寬依賴窄依賴的區(qū)別
- 窄依賴:RDD 之間分區(qū)是一一對(duì)應(yīng)的
- 寬依賴:發(fā)生shuffle,多對(duì)多的關(guān)系
- 寬依賴是子RDD的一個(gè)分區(qū)依賴了父RDD的多個(gè)分區(qū)
- 父RDD的一個(gè)分區(qū)的數(shù)據(jù),分別流入到子RDD的不同分區(qū)
- 特例:cartesian算子對(duì)應(yīng)的CartesianRDD,是通過(guò)創(chuàng)建了兩個(gè) NarrowDependency 完成了笛卡爾乘積操作,屬于窄依賴。
窄依賴
搜索源碼,RangeDependency只有UnionRDD使用到了
val rdd1 = sc.parallelize(List(("a",1),("b",2)))
rdd1.partitions.size
//val res4: Int = 2
val rdd2 = sc.parallelize(List(("c",3),("d",4),("a",1)))
rdd2.partitions.size
//val res5: Int = 2
val rdd3 = rdd1.union(rdd2)
//val rdd3: org.apache.spark.rdd.RDD[(String, Int)] = UnionRDD[3] at union at <console>:1
rdd3.partitions.size
//val res7: Int = 4
rdd3.foreach(print)
//輸出結(jié)果為:(a,1)(b,2)(c,3)(d,4)(a,1)
寬依賴
情況一
舉例:cogroup算子、join算子
功能:將兩個(gè)RDD中鍵值對(duì)的形式元素,按照相同的key,連接而成,只是將兩個(gè)在類型為(K,V)和(K,W)的 RDD ,返回一個(gè)(K,(Iterable<V>,Iterable<W>))類型的 RDD
//cogroup
val rdd1 = sc.parallelize(List(("a",1),("b",2)))
rdd1.partitions.size
//val res4: Int = 2
val rdd2 = sc.parallelize(List(("c",3),("d",4),("a",1)))
rdd2.partitions.size
//val res5: Int = 2
val newRDD = rdd1.cogroup(rdd2)
//val newRDD: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[8] at cogroup at <console>:1
newRDD.foreach(println)
//(a,(Seq(1),Seq(1)))
//(c,(Seq(),Seq(3)))
//(d,(Seq(),Seq(4)))
//(b,(Seq(2),Seq()))//join
val join = rdd1.join(rdd2)
//val join: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[11] at join at <console>:1
join.foreach(println)
//(a,(1,1))
情況二
舉例:groupByKey算子、reduceByKey算子
//groupByKey
val rdd = sc.parallelize(List(("a",1),("b",2),("a",1),("b",2)))
val groupRdd = rdd1.groupByKey()
//val groupRdd: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[16] at groupByKey at <console>:1
groupRdd.foreach(println)
//(b,Seq(2, 2))
//(a,Seq(1, 1))//reduceByKey
val reduceRdd = rdd.reduceByKey(_+_)
//val reduceRdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[18] at reduceByKey at <console>:1
reduceRdd.foreach(println)
//(a,2)
//(b,4)
特例:cartesian算子
val rdd1 = sc.parallelize(List(("a",1),("b",2)))
rdd1.partitions.size
//val res4: Int = 2
val rdd2 = sc.parallelize(List(("c",3),("d",4),("a",1)))
rdd2.partitions.size
//val res5: Int = 2
val cartesianRdd = rdd1.cartesian(rdd2)
//val cartesianRdd: org.apache.spark.rdd.RDD[((String, Int), (String, Int))] = CartesianRDD[20] at cartesian at <console>:1
cartesianRdd.partitions.size
//val res24: Int = 4
cartesianRdd.foreach(println)
//((a,1),(c,3))
//((b,2),(c,3))
//((a,1),(d,4))
//((a,1),(a,1))
//((b,2),(d,4))
//((b,2),(a,1))