政府網(wǎng)站建設(shè)十強(qiáng)百度seo公司興田德潤(rùn)
RDD相關(guān)知識(shí)
RDD介紹
RDD
是Spark
的核心抽象,即 彈性分布式數(shù)據(jù)集(residenta distributed dataset
)。代表一個(gè)不可變,可分區(qū),里面元素可并行計(jì)算的集合。其具有數(shù)據(jù)流模型的特點(diǎn):自動(dòng)容錯(cuò),位置感知性調(diào)度和可伸縮性。 在Spark
中,對(duì)數(shù)據(jù)的所有操作不外乎創(chuàng)建RDD
、轉(zhuǎn)化已有RDD
以及調(diào)用 RDD
操作進(jìn)行求值。
?
RDD結(jié)構(gòu)圖
RDD具有五大特性
-
一組分片(
Partition
),即數(shù)據(jù)集的基本組成單位(RDD
是由一系列的partition
組成的)。將數(shù)據(jù)加載為RDD
時(shí),一般會(huì)遵循數(shù)據(jù)的本地性(一般一個(gè)HDFS
里的block
會(huì)加載為一個(gè)partition
)。 -
RDD
之間的依賴關(guān)系。依賴還具體分為寬依賴和窄依賴,但并不是所有的RDD
都有依賴。為了容錯(cuò)(重算,cache
,checkpoint
),也就是說(shuō)在內(nèi)存中的RDD
操作時(shí)出錯(cuò)或丟失會(huì)進(jìn)行重算。 -
由一個(gè)函數(shù)計(jì)算每一個(gè)分片。
Spark
中的RDD
的計(jì)算是以分片為單位的,每個(gè)RDD
都會(huì)實(shí)現(xiàn)compute
函數(shù)以達(dá)到這個(gè)目的。compute
函數(shù)會(huì)對(duì)迭代器進(jìn)行復(fù)合,不需要保存每次計(jì)算的結(jié)果。 -
(可選)如果
RDD
里面存的數(shù)據(jù)是key-value
形式,則可以傳遞一個(gè)自定義的Partitioner
進(jìn)行重新分區(qū)。 -
(可選)
RDD
提供一系列最佳的計(jì)算位置,即數(shù)據(jù)的本地性。
RDD之間的依賴關(guān)系
RDD
之間有一系列的依賴關(guān)系,依賴關(guān)系又分為窄依賴和寬依賴。
窄依賴:父RDD
和子RDD
partition
之間的關(guān)系是一對(duì)一的。或者父RDD
一個(gè)partition
只對(duì)應(yīng)一個(gè)子RDD
的partition
情況下的父RDD
和子RDD
partition
關(guān)系是多對(duì)一的,也可以理解為沒(méi)有觸發(fā)shuffle
。
寬依賴:父RDD
與子RDD
partition
之間的關(guān)系是一對(duì)多。 父RDD
的一個(gè)分區(qū)的數(shù)據(jù)去到子RDD
的不同分區(qū)里面。也可以理解為觸發(fā)了shuffle
。
特別說(shuō)明:對(duì)于join
操作有兩種情況,如果join
操作的使用每個(gè)partition
僅僅和已知的Partition
進(jìn)行join
,此時(shí)的join
操作就是窄依賴;其他情況的join
操作就是寬依賴。
RDD創(chuàng)建
-
從
Hadoop
文件系統(tǒng)(或與Hadoop
兼容的其他持久化存儲(chǔ)系統(tǒng),如Hive
、Cassandra
、HBase
)輸入(例如HDFS
)創(chuàng)建。 -
通過(guò)集合進(jìn)行創(chuàng)建。
算子
算子可以分為Transformation
轉(zhuǎn)換算子和Action
行動(dòng)算子。 RDD
是懶執(zhí)行的,如果沒(méi)有行動(dòng)操作出現(xiàn),所有的轉(zhuǎn)換操作都不會(huì)執(zhí)行。
RDD
直觀圖,如下:
RDD 的 五大特性
-
一組分片(
Partition
),即數(shù)據(jù)集的基本組成單位。對(duì)于RDD
來(lái)說(shuō),每個(gè)分片都會(huì)被一個(gè)計(jì)算任務(wù)處理,并決定并行計(jì)算的粒度。用戶可以在創(chuàng)建RDD
時(shí)指定RDD
的分片個(gè)數(shù),如果沒(méi)有指定,那么就會(huì)采用默認(rèn)值。默認(rèn)值就是程序所分配到的CPU Core
的數(shù)目。 -
一個(gè)計(jì)算每個(gè)分區(qū)的函數(shù)。
Spark
中RDD
的計(jì)算是以分片為單位的,每個(gè)RDD
都會(huì)實(shí)現(xiàn)compute
函數(shù)以達(dá)到這個(gè)目的。compute
函數(shù)會(huì)對(duì)迭代器進(jìn)行復(fù)合,不需要保存每次計(jì)算的結(jié)果。 -
RDD
之間的依賴關(guān)系。RDD
的每次轉(zhuǎn)換都會(huì)生成一個(gè)新的RDD
,所以RDD
之間就會(huì)形成類似于流水線一樣的前后依賴關(guān)系。在部分分區(qū)數(shù)據(jù)丟失時(shí),Spark
可以通過(guò)這個(gè)依賴關(guān)系重新計(jì)算丟失的分區(qū)數(shù)據(jù),而不是對(duì)RDD
的所有分區(qū)進(jìn)行重新計(jì)算。 -
一個(gè)
Partitioner
,即RDD
的分片函數(shù)。當(dāng)前Spark
中實(shí)現(xiàn)了兩種類型的分片函數(shù),一個(gè)是基于哈希的HashPartitioner
,另外一個(gè)是基于范圍的RangePartitioner
。只有對(duì)于于key-value
的RDD
,才會(huì)有Partitioner
,非key-value
的RDD
的Parititioner
的值是None
。Partitioner
函數(shù)不但決定了RDD
本身的分片數(shù)量,也決定了parent RDD Shuffle
輸出時(shí)的分片數(shù)量。 -
一個(gè)列表,存儲(chǔ)存取每個(gè)
Partition
的優(yōu)先位置(preferred location
)。對(duì)于一個(gè)HDFS
文件來(lái)說(shuō),這個(gè)列表保存的就是每個(gè) Partition 所在的塊的位置。按照“移動(dòng)數(shù)據(jù)不如移動(dòng)計(jì)算”的理念,Spark 在進(jìn)行任務(wù)調(diào)度的時(shí)候,會(huì)盡可能地將計(jì)算任務(wù)分配到其所要處理數(shù)據(jù)塊的存儲(chǔ)位置。
相關(guān)API介紹
SparkContext
創(chuàng)建;
sc = SparkContext("local", "Simple App")
說(shuō)明:"local"
是指讓Spark
程序本地運(yùn)行,"Simple App"
是指Spark
程序的名稱,這個(gè)名稱可以任意(為了直觀明了的查看,最好設(shè)置有意義的名稱)。
- 集合并行化創(chuàng)建
RDD
;
data = [1,2,3,4]
rdd = sc.parallelize(data)
collect
算子:在驅(qū)動(dòng)程序中將數(shù)據(jù)集的所有元素作為數(shù)組返回(注意數(shù)據(jù)集不能過(guò)大);
rdd.collect()
- 停止
SparkContext
。
sc.stop()
# -*- coding: UTF-8 -*-
from pyspark import SparkContextif __name__ == "__main__":#********** Begin **********## 1.初始化 SparkContext,該對(duì)象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2.創(chuàng)建一個(gè)1到8的列表Listdata = [1, 2, 3, 4, 5, 6, 7, 8]# 3.通過(guò) SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(data)# 4.使用 rdd.collect() 收集 rdd 的內(nèi)容。 rdd.collect() 是 Spark Action 算子,在后續(xù)內(nèi)容中將會(huì)詳細(xì)說(shuō)明,主要作用是:收集 rdd 的數(shù)據(jù)內(nèi)容result = rdd.collect()# 5.打印 rdd 的內(nèi)容print(result)# 6.停止 SparkContextsc.stop()#********** End **********#
讀取外部數(shù)據(jù)集創(chuàng)建RDD?
編寫讀取本地文件創(chuàng)建Spark RDD
的程序。
相關(guān)知識(shí)
為了完成本關(guān)任務(wù),你需要掌握:1.如何讀取本地文件系統(tǒng)中的文件來(lái)創(chuàng)建Spark RDD
。
textFile 介紹
PySpark
可以從Hadoop
支持的任何存儲(chǔ)源創(chuàng)建分布式數(shù)據(jù)集,包括本地文件系統(tǒng),HDFS
,Cassandra
,HBase
,Amazon S3
等。Spark
支持文本文件,SequenceFiles
和任何其他Hadoop InputFormat
。
文本文件RDD
可以使用創(chuàng)建SparkContex
的textFile
方法。此方法需要一個(gè) URI
的文件(本地路徑的機(jī)器上,或一個(gè)hdfs://,s3a://
等 URI),并讀取其作為行的集合。這是一個(gè)示例調(diào)用:
distFile = sc.textFile("data.txt")
# -*- coding: UTF-8 -*- from pyspark import SparkContextif __name__ == '__main__':#********** Begin **********## 1.初始化 SparkContext,該對(duì)象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 文本文件 RDD 可以使用創(chuàng)建 SparkContext 的t extFile 方法。 #此方法需要一個(gè) URI的 文件(本地路徑的機(jī)器上,或一個(gè)hdfs://,s3a://等URI), #并讀取其作為行的集合# 2.讀取本地文件,URI為:/root/wordcount.txtrdd = sc.textFile("/root/wordcount.txt")# 3.使用 rdd.collect() 收集 rdd 的內(nèi)容。 #rdd.collect() 是 Spark Action 算子,在后續(xù)內(nèi)容中將會(huì)詳細(xì)說(shuō)明,主要作用是:收集 rdd 的數(shù)據(jù)內(nèi)容result = rdd.collect()# 4.打印 rdd 的內(nèi)容print(result)# 5.停止 SparkContextsc.stop()#********** End **********#
map
算子
本關(guān)任務(wù):使用Spark
的 map
算子按照相關(guān)需求完成轉(zhuǎn)換操作。
相關(guān)知識(shí)
為了完成本關(guān)任務(wù),你需要掌握:如何使用map
算子。
map
將原來(lái)RDD
的每個(gè)數(shù)據(jù)項(xiàng)通過(guò)map
中的用戶自定義函數(shù) f
映射轉(zhuǎn)變?yōu)橐粋€(gè)新的元素。
圖中每個(gè)方框表示一個(gè)RDD
分區(qū),左側(cè)的分區(qū)經(jīng)過(guò)自定義函數(shù) f:T->U
映射為右側(cè)的新 RDD
分區(qū)。但是,實(shí)際只有等到 Action
算子觸發(fā)后,這個(gè) f
函數(shù)才會(huì)和其他函數(shù)在一個(gè) Stage
中對(duì)數(shù)據(jù)進(jìn)行運(yùn)算。
map 案例
-
sc = SparkContext("local", "Simple App") data = [1,2,3,4,5,6] rdd = sc.parallelize(data) print(rdd.collect()) rdd_map = rdd.map(lambda x: x * 2) print(rdd_map.collect())
輸出:
[1, 2, 3, 4, 5, 6]
[2, 4, 6, 8, 10, 12]
說(shuō)明:rdd1
的元素( 1 , 2 , 3 , 4 , 5 , 6
)經(jīng)過(guò) map
算子( x -> x*2
)轉(zhuǎn)換成了 rdd2
( 2 , 4 , 6 , 8 , 10
)。
編程要求
請(qǐng)仔細(xì)閱讀右側(cè)代碼,根據(jù)方法內(nèi)的提示,在Begin - End
區(qū)域內(nèi)進(jìn)行代碼補(bǔ)充,具體任務(wù)如下:
需求:使用 map
算子,將rdd
的數(shù)據(jù) (1, 2, 3, 4, 5)
按照下面的規(guī)則進(jìn)行轉(zhuǎn)換操作,規(guī)則如下:
- 偶數(shù)轉(zhuǎn)換成該數(shù)的平方;
- 奇數(shù)轉(zhuǎn)換成該數(shù)的立方。
# -*- coding: UTF-8 -*-
from pyspark import SparkContextif __name__ == "__main__":#********** Begin **********## 1.初始化 SparkContext,該對(duì)象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2.創(chuàng)建一個(gè)1到5的列表Listdata = [1, 2, 3, 4, 5]# 3.通過(guò) SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(data)# 4.使用rdd.collect() 收集 rdd 的元素。print(rdd.collect())"""使用 map 算子,將 rdd 的數(shù)據(jù) (1, 2, 3, 4, 5) 按照下面的規(guī)則進(jìn)行轉(zhuǎn)換操作,規(guī)則如下:需求:偶數(shù)轉(zhuǎn)換成該數(shù)的平方奇數(shù)轉(zhuǎn)換成該數(shù)的立方"""# 5.使用 map 算子完成以上需求rdd_map = rdd.map(lambda x: x * x if x % 2 == 0 else x * x * x)# 6.使用rdd.collect() 收集完成 map 轉(zhuǎn)換的元素print(rdd_map.collect())# 7.停止 SparkContextsc.stop()#********** End **********#
mapPartitions
算子
mapPartitions
mapPartitions
函數(shù)獲取到每個(gè)分區(qū)的迭代器,在函數(shù)中通過(guò)這個(gè)分區(qū)整體的迭 代器對(duì)整個(gè)分區(qū)的元素進(jìn)行操作。
圖中每個(gè)方框表示一個(gè)RDD
分區(qū),左側(cè)的分區(qū)經(jīng)過(guò)自定義函數(shù) f:T->U
映射為右側(cè)的新RDD
分區(qū)。
mapPartitions 與 map
map
:遍歷算子,可以遍歷RDD
中每一個(gè)元素,遍歷的單位是每條記錄。
mapPartitions
:遍歷算子,可以改變RDD
格式,會(huì)提高RDD
并行度,遍歷單位是Partition
,也就是在遍歷之前它會(huì)將一個(gè)Partition
的數(shù)據(jù)加載到內(nèi)存中。
那么問(wèn)題來(lái)了,用上面的兩個(gè)算子遍歷一個(gè)RDD
誰(shuí)的效率高? mapPartitions
算子效率高。
mapPartitions 案例
-
def f(iterator): list = [] for x in iterator: list.append(x*2) return listif __name__ == "__main__": sc = SparkContext("local", "Simple App") data = [1,2,3,4,5,6] rdd = sc.parallelize(data) print(rdd.collect()) partitions = rdd.mapPartitions(f) print(partitions.collect())
輸出:
[1, 2, 3, 4, 5, 6]
[2, 4, 6, 8, 10, 12]
mapPartitions()
:傳入的參數(shù)是rdd
的 iterator
(元素迭代器),返回也是一個(gè)iterator
(迭代器)。
# -*- coding: UTF-8 -*-
from pyspark import SparkContext#********** Begin **********#
def f(iterator):list = []for x in iterator:list.append((x, len(x)))return list#********** End **********#
if __name__ == "__main__":#********** Begin **********## 1.初始化 SparkContext,該對(duì)象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2. 一個(gè)內(nèi)容為("dog", "salmon", "salmon", "rat", "elephant")的列表Listdata = ["dog", "salmon", "salmon", "rat", "elephant"]# 3.通過(guò) SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(data)# 4.使用rdd.collect() 收集 rdd 的元素。print(rdd.collect())"""使用 mapPartitions 算子,將 rdd 的數(shù)據(jù) ("dog", "salmon", "salmon", "rat", "elephant") 按照下面的規(guī)則進(jìn)行轉(zhuǎn)換操作,規(guī)則如下:需求:將字符串與該字符串的長(zhǎng)度組合成一個(gè)元組,例如:dog --> (dog,3)salmon --> (salmon,6)"""# 5.使用 mapPartitions 算子完成以上需求partitions = rdd.mapPartitions(f)# 6.使用rdd.collect() 收集完成 mapPartitions 轉(zhuǎn)換的元素print(partitions.collect())# 7.停止 SparkContextsc.stop()#********** End **********#
filter
算子。
filter
filter
函數(shù)功能是對(duì)元素進(jìn)行過(guò)濾,對(duì)每個(gè)元素應(yīng)用f
函數(shù),返 回值為 true
的元素在RDD
中保留,返回值為false
的元素將被過(guò)濾掉。內(nèi)部實(shí)現(xiàn)相當(dāng)于生成。
FilteredRDD(this,sc.clean(f))
下面代碼為函數(shù)的本質(zhì)實(shí)現(xiàn):
-
def filter(self, f): """ Return a new RDD containing only the elements that satisfy a predicate.>>> rdd = sc.parallelize([1, 2, 3, 4, 5]) >>> rdd.filter(lambda x: x % 2 == 0).collect() [2, 4] """ def func(iterator): return filter(fail_on_stopiteration(f), iterator) return self.mapPartitions(func, True)
上圖中每個(gè)方框代表一個(gè) RDD
分區(qū), T
可以是任意的類型。通過(guò)用戶自定義的過(guò)濾函數(shù) f
,對(duì)每個(gè)數(shù)據(jù)項(xiàng)操作,將滿足條件、返回結(jié)果為 true
的數(shù)據(jù)項(xiàng)保留。例如,過(guò)濾掉 V2
和 V3
保留了 V1
,為區(qū)分命名為 V’1
。
filter 案例
-
sc = SparkContext("local", "Simple App") data = [1,2,3,4,5,6] rdd = sc.parallelize(data) print(rdd.collect()) rdd_filter = rdd.filter(lambda x: x>2) print(rdd_filter.collect())
輸出:
[1, 2, 3, 4, 5, 6]
[3, 4, 5, 6]
說(shuō)明:rdd1( [ 1 , 2 , 3 , 4 , 5 , 6 ] )
經(jīng)過(guò) filter
算子轉(zhuǎn)換成 rdd2( [ 3 ,4 , 5 , 6 ] )
。
使用 filter
算子,將 rdd
中的數(shù)據(jù) (1, 2, 3, 4, 5, 6, 7, 8)
按照以下規(guī)則進(jìn)行過(guò)濾,規(guī)則如下:
- 過(guò)濾掉
rdd
中的所有奇數(shù)。
# -*- coding: UTF-8 -*-
from pyspark import SparkContextif __name__ == "__main__":#********** Begin **********## 1.初始化 SparkContext,該對(duì)象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2.創(chuàng)建一個(gè)1到8的列表Listdata = [1, 2, 3, 4, 5, 6, 7, 8]# 3.通過(guò) SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(data)# 4.使用rdd.collect() 收集 rdd 的元素。print(rdd.collect())"""使用 filter 算子,將 rdd 的數(shù)據(jù) (1, 2, 3, 4, 5, 6, 7, 8) 按照下面的規(guī)則進(jìn)行轉(zhuǎn)換操作,規(guī)則如下:需求:過(guò)濾掉rdd中的奇數(shù)"""# 5.使用 filter 算子完成以上需求rdd_filter = rdd.filter(lambda x: x % 2 == 0)# 6.使用rdd.collect() 收集完成 filter 轉(zhuǎn)換的元素print(rdd_filter.collect())# 7.停止 SparkContextsc.stop()#********** End **********#
flatMap
算子
flatMap
將原來(lái)RDD
中的每個(gè)元素通過(guò)函數(shù)f
轉(zhuǎn)換為新的元素,并將生成的RDD
中每個(gè)集合的元素合并為一個(gè)集合,內(nèi)部創(chuàng)建:
FlatMappedRDD(this,sc.clean(f))
上圖表示RDD
的一個(gè)分區(qū),進(jìn)行flatMap
函數(shù)操作,flatMap
中傳入的函數(shù)為f:T->U
,T
和U
可以是任意的數(shù)據(jù)類型。將分區(qū)中的數(shù)據(jù)通過(guò)用戶自定義函數(shù)f
轉(zhuǎn)換為新的數(shù)據(jù)。外部大方框可以認(rèn)為是一個(gè)RDD
分區(qū),小方框代表一個(gè)集合。V1
、V2
、V3
在一個(gè)集合作為RDD
的一個(gè)數(shù)據(jù)項(xiàng),可能存儲(chǔ)為數(shù)組或其他容器,轉(zhuǎn)換為V’1
、V’2
、V’3
后,將原來(lái)的數(shù)組或容器結(jié)合拆散,拆散的數(shù)據(jù)形成RDD
中的數(shù)據(jù)項(xiàng)。
flatMap 案例
sc = SparkContext("local", "Simple App")
data = [["m"], ["a", "n"]]
rdd = sc.parallelize(data)
print(rdd.collect())
flat_map = rdd.flatMap(lambda x: x)
print(flat_map.collect())
輸出:
[['m'], ['a', 'n']]
['m', 'a', 'n']
flatMap
:將兩個(gè)集合轉(zhuǎn)換成一個(gè)集合
?
需求:使用 flatMap
算子,將rdd
的數(shù)據(jù) ([1, 2, 3], [4, 5, 6], [7, 8, 9])
按照下面的規(guī)則進(jìn)行轉(zhuǎn)換操作,規(guī)則如下:
- 合并
RDD
的元素,例如:([1,2,3],[4,5,6]) --> (1,2,3,4,5,6)
([2,3],[4,5],[6]) --> (1,2,3,4,5,6)
from pyspark import SparkContextif __name__ == "__main__":#********** Begin **********## 1.初始化 SparkContext,該對(duì)象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2.創(chuàng)建一個(gè)[[1, 2, 3], [4, 5, 6], [7, 8, 9]] 的列表Listlist = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]# 3.通過(guò) SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(list)# 4.使用rdd.collect() 收集 rdd 的元素。print(rdd.collect()) """使用 flatMap 算子,將 rdd 的數(shù)據(jù) ([1, 2, 3], [4, 5, 6], [7, 8, 9]) 按照下面的規(guī)則進(jìn)行轉(zhuǎn)換操作,規(guī)則如下:需求:合并RDD的元素,例如:([1,2,3],[4,5,6]) --> (1,2,3,4,5,6)([2,3],[4,5],[6]) --> (1,2,3,4,5,6)"""# 5.使用 filter 算子完成以上需求flat_map = rdd.flatMap(lambda x: x)# 6.使用rdd.collect() 收集完成 filter 轉(zhuǎn)換的元素print(flat_map.collect())# 7.停止 SparkContextsc.stop()#********** End **********#
distinct
算子distinct
distinct
將RDD
中的元素進(jìn)行去重操作。上圖中的每個(gè)方框代表一個(gè)
RDD
分區(qū),通過(guò)distinct
函數(shù),將數(shù)據(jù)去重。 例如,重復(fù)數(shù)據(jù)V1
、V1
去重后只保留一份V1
。distinct 案例
sc = SparkContext("local", "Simple App") data = ["python", "python", "python", "java", "java"] rdd = sc.parallelize(data) print(rdd.collect()) distinct = rdd.distinct()
-
輸出
['python', 'python', 'python', 'java', 'java'] ['python', 'java']
-
print(distinct.collect())
sortByKey
算子sortByKey
-
def sortByKey(self, ascending=True, numPartitions=None, keyfunc=lambda x: x): if numPartitions is None: numPartitions = self._defaultReducePartitions()memory = self._memory_limit() serializer = self._jrdd_deserializerdef sortPartition(iterator): sort = ExternalSorter(memory * 0.9, serializer).sorted return iter(sort(iterator, key=lambda kv: keyfunc(kv[0]), reverse=(not ascending)))if numPartitions == 1: if self.getNumPartitions() > 1: self = self.coalesce(1) return self.mapPartitions(sortPartition, True)# first compute the boundary of each part via sampling: we want to partition # the key-space into bins such that the bins have roughly the same # number of (key, value) pairs falling into them rddSize = self.count() if not rddSize: return self # empty RDD maxSampleSize = numPartitions * 20.0 # constant from Spark's RangePartitioner f\fraction = min(maxSampleSize / max(rddSize, 1), 1.0) samples = self.sample(False, f\fraction, 1).map(lambda kv: kv[0]).collect() samples = sorted(samples, key=keyfunc)# we have numPartitions many parts but one of the them has # an implicit boundary bounds = [samples[int(len(samples) * (i + 1) / numPartitions)] for i in range(0, numPartitions - 1)]def rangePartitioner(k): p = bisect.bisect_left(bounds, keyfunc(k)) if ascending: return p else: return numPartitions - 1 - preturn self.partitionBy(numPartitions, rangePartitioner).mapPartitions(sortPartition, True)
-
說(shuō)明:
ascending
參數(shù)是指排序(升序還是降序),默認(rèn)是升序。numPartitions
參數(shù)是重新分區(qū),默認(rèn)與上一個(gè)RDD
保持一致。keyfunc
參數(shù)是排序規(guī)則。sortByKey 案例
sc = SparkContext("local", "Simple App")
data = [("a",1),("a",2),("c",1),("b",1)]
rdd = sc.parallelize(data)
key = rdd.sortByKey()
print(key.collect())
-
輸出:
-
[('a', 1), ('a', 2), ('b', 1), ('c', 1)]
-
?需求:使用 sortBy
算子,將 rdd
中的數(shù)據(jù)進(jìn)行排序(升序)。
from pyspark import SparkContextif __name__ == "__main__":# ********** Begin **********## 1.初始化 SparkContext,該對(duì)象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2.創(chuàng)建一個(gè)內(nèi)容為[('B',1),('A',2),('C',3)]的列表ListList = [('B',1),('A',2),('C',3)]# 3.通過(guò) SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(List)# 4.使用rdd.collect() 收集 rdd 的元素print(rdd.collect())"""使用 sortByKey 算子,將 rdd 的數(shù)據(jù) ('B', 1), ('A', 2), ('C', 3) 按照下面的規(guī)則進(jìn)行轉(zhuǎn)換操作,規(guī)則如下:需求:元素排序,例如:[(3,3),(2,2),(1,1)] --> [(1,1),(2,2),(3,3)]"""# 5.使用 sortByKey 算子完成以上需求key = rdd.sortByKey()# 6.使用rdd.collect() 收集完成 sortByKey 轉(zhuǎn)換的元素print(key.collect())# 7.停止 SparkContextsc.stop()# ********** End **********#
mapValues
算子
mapValues
mapValues
:針對(duì)(Key, Value)
型數(shù)據(jù)中的 Value
進(jìn)行 Map
操作,而不對(duì) Key
進(jìn)行處理。
上圖中的方框代表 RDD
分區(qū)。 a=>a+2
代表對(duì) (V1,1)
這樣的 Key Value
數(shù)據(jù)對(duì),數(shù)據(jù)只對(duì) Value
中的 1
進(jìn)行加 2
操作,返回結(jié)果為 3
。
mapValues 案例
-
sc = SparkContext("local", "Simple App") data = [("a",1),("a",2),("b",1)] rdd = sc.parallelize(data) values = rdd.mapValues(lambda x: x + 2) print(values.collect())
輸出:
-
[('a', 3), ('a', 4), ('b', 3)]
需求:使用mapValues
算子,將rdd
的數(shù)據(jù) ("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)
按照下面的規(guī)則進(jìn)行轉(zhuǎn)換操作,規(guī)則如下:
- 偶數(shù)轉(zhuǎn)換成該數(shù)的平方
- 奇數(shù)轉(zhuǎn)換成該數(shù)的立方
from pyspark import SparkContextif __name__ == "__main__":# ********** Begin **********## 1.初始化 SparkContext,該對(duì)象是 Spark 程序的入口sc = SparkContext("local", "Simple App") # 2.創(chuàng)建一個(gè)內(nèi)容為[("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)]的列表ListList = [("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)]# 3.通過(guò) SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(List)# 4.使用rdd.collect() 收集 rdd 的元素print(rdd.collect())"""使用 mapValues 算子,將 rdd 的數(shù)據(jù) ("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5) 按照下面的規(guī)則進(jìn)行轉(zhuǎn)換操作,規(guī)則如下:需求:元素(key,value)的value進(jìn)行以下操作:偶數(shù)轉(zhuǎn)換成該數(shù)的平方奇數(shù)轉(zhuǎn)換成該數(shù)的立方"""# 5.使用 mapValues 算子完成以上需求values = rdd.mapValues(lambda x: x + 2)# 6.使用rdd.collect() 收集完成 mapValues 轉(zhuǎn)換的元素print(values.collect())# 7.停止 SparkContextsc.stop()# ********** End **********#
reduceByKey
算子reduceByKey
reduceByKey
算子,只是兩個(gè)值合并成一個(gè)值,比如疊加。函數(shù)實(shí)現(xiàn)
def reduceByKey(self, func, numPartitions=None, partitionFunc=portable_hash): return self.combineByKey(lambda x: x, func, func, numPartitions, partitionFunc)
上圖中的方框代表 RDD
分區(qū)。通過(guò)自定義函數(shù) (A,B) => (A + B)
,將相同 key
的數(shù)據(jù) (V1,2)
和 (V1,1)
的 value
做加法運(yùn)算,結(jié)果為( V1,3)
。
reduceByKey 案例
sc = SparkContext("local", "Simple App")
data = [("a",1),("a",2),("b",1)]
rdd = sc.parallelize(data)
print(rdd.reduceByKey(lambda x,y:x+y).collect())
輸出:
[('a', 3), ('b', 1)]
需求:使用 reduceByKey
算子,將 rdd(key-value類型)
中的數(shù)據(jù)進(jìn)行值累加。
例如:
("soma",4), ("soma",1), ("soma",2) -> ("soma",7)
from pyspark import SparkContextif __name__ == "__main__":# ********** Begin **********## 1.初始化 SparkContext,該對(duì)象是 Spark 程序的入口sc = SparkContext("local", "Simple App") # 2.創(chuàng)建一個(gè)內(nèi)容為[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]的列表ListList = [("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]# 3.通過(guò) SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(List) # 4.使用rdd.collect() 收集 rdd 的元素print(rdd.collect())"""使用 reduceByKey 算子,將 rdd 的數(shù)據(jù)[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)] 按照下面的規(guī)則進(jìn)行轉(zhuǎn)換操作,規(guī)則如下:需求:元素(key-value)的value累加操作,例如:(1,1),(1,1),(1,2) --> (1,4)(1,1),(1,1),(2,2),(2,2) --> (1,2),(2,4)"""# 5.使用 reduceByKey 算子完成以上需求reduce = rdd.reduceByKey(lambda x,y:x+y)# 6.使用rdd.collect() 收集完成 reduceByKey 轉(zhuǎn)換的元素print(reduce.collect())# 7.停止 SparkContextsc.stop()# ********** End **********#
Action
的常用算子
count
count()
:返回 RDD
的元素個(gè)數(shù)。
示例:
-
sc = SparkContext("local", "Simple App") data = ["python", "python", "python", "java", "java"] rdd = sc.parallelize(data) print(rdd.count())
輸出:
5
first
first()
:返回 RDD
的第一個(gè)元素(類似于take(1)
)。
示例:
-
sc = SparkContext("local", "Simple App") data = ["python", "python", "python", "java", "java"] rdd = sc.parallelize(data) print(rdd.first())
輸出:
python
take
take(n)
:返回一個(gè)由數(shù)據(jù)集的前 n
個(gè)元素組成的數(shù)組。
示例:
-
sc = SparkContext("local", "Simple App") data = ["python", "python", "python", "java", "java"] rdd = sc.parallelize(data) print(rdd.take(2))
輸出:
['python', 'python']
reduce
reduce()
:通過(guò)func
函數(shù)聚集 RDD
中的所有元素,該函數(shù)應(yīng)該是可交換的和關(guān)聯(lián)的,以便可以并行正確計(jì)算。
示例:
-
sc = SparkContext("local", "Simple App") data = [1,1,1,1] rdd = sc.parallelize(data) print(rdd.reduce(lambda x,y:x+y))
輸出:
4
collect
collect()
:在驅(qū)動(dòng)程序中,以數(shù)組的形式返回?cái)?shù)據(jù)集的所有元素。
示例:
-
sc = SparkContext("local", "Simple App") data = [1,1,1,1] rdd = sc.parallelize(data) print(rdd.collect())
輸出:
[1,1,1,1]
具體任務(wù)如下:
需求1:使用 count
算子,統(tǒng)計(jì)下 rdd
中元素的個(gè)數(shù);
需求2:使用 first
算子,獲取 rdd
首個(gè)元素;
需求3:使用 take
算子,獲取 rdd
前三個(gè)元素;
需求4:使用 reduce
算子,進(jìn)行累加操作;
需求5:使用 collect
算子,收集所有元素。
?
from pyspark import SparkContext
if __name__ == "__main__":# ********** Begin **********## 1.初始化 SparkContext,該對(duì)象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2.創(chuàng)建一個(gè)內(nèi)容為[1, 3, 5, 7, 9, 8, 6, 4, 2]的列表ListList = [1, 3, 5, 7, 9, 8, 6, 4, 2] # 3.通過(guò) SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(List)# 4.收集rdd的所有元素并print輸出print(rdd.collect())# 5.統(tǒng)計(jì)rdd的元素個(gè)數(shù)并print輸出print(rdd.count())# 6.獲取rdd的第一個(gè)元素并print輸出print(rdd.first())# 7.獲取rdd的前3個(gè)元素并print輸出print(rdd.take(3))# 8.聚合rdd的所有元素并print輸出print(rdd.reduce(lambda x,y:x+y))# 9.停止 SparkContextsc.stop()# ********** End **********#