中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁(yè) > news >正文

政府網(wǎng)站建設(shè)十強(qiáng)百度seo公司興田德潤(rùn)

政府網(wǎng)站建設(shè)十強(qiáng),百度seo公司興田德潤(rùn),微信網(wǎng)站如何制作軟件,做年報(bào)的網(wǎng)站怎么登不上去了RDD相關(guān)知識(shí) RDD介紹 RDD 是Spark的核心抽象,即 彈性分布式數(shù)據(jù)集(residenta distributed dataset)。代表一個(gè)不可變,可分區(qū),里面元素可并行計(jì)算的集合。其具有數(shù)據(jù)流模型的特點(diǎn):自動(dòng)容錯(cuò),位置…

RDD相關(guān)知識(shí)

RDD介紹

RDDSpark核心抽象,即 彈性分布式數(shù)據(jù)集residenta distributed dataset)。代表一個(gè)不可變,可分區(qū),里面元素可并行計(jì)算的集合。其具有數(shù)據(jù)流模型的特點(diǎn):自動(dòng)容錯(cuò),位置感知性調(diào)度和可伸縮性。Spark中,對(duì)數(shù)據(jù)的所有操作不外乎創(chuàng)建RDD轉(zhuǎn)化已有RDD以及調(diào)用 RDD操作進(jìn)行求值。
?

RDD結(jié)構(gòu)圖

RDD具有五大特性
  1. 一組分片Partition),即數(shù)據(jù)集的基本組成單位(RDD是由一系列的partition組成的)。將數(shù)據(jù)加載為RDD時(shí),一般會(huì)遵循數(shù)據(jù)的本地性(一般一個(gè)HDFS里的block會(huì)加載為一個(gè)partition)。

  2. RDD之間的依賴關(guān)系。依賴還具體分為寬依賴和窄依賴,但并不是所有的RDD都有依賴。為了容錯(cuò)(重算,cachecheckpoint),也就是說(shuō)在內(nèi)存中的RDD操作時(shí)出錯(cuò)或丟失會(huì)進(jìn)行重算。

  3. 由一個(gè)函數(shù)計(jì)算每一個(gè)分片Spark中的RDD的計(jì)算是以分片為單位的,每個(gè)RDD都會(huì)實(shí)現(xiàn)compute函數(shù)以達(dá)到這個(gè)目的。compute函數(shù)會(huì)對(duì)迭代器進(jìn)行復(fù)合,不需要保存每次計(jì)算的結(jié)果。

  4. (可選)如果RDD里面存的數(shù)據(jù)是key-value形式,則可以傳遞一個(gè)自定義的Partitioner進(jìn)行重新分區(qū)。

  5. (可選)RDD提供一系列最佳的計(jì)算位置,即數(shù)據(jù)的本地性

RDD之間的依賴關(guān)系

RDD之間有一系列的依賴關(guān)系,依賴關(guān)系又分為窄依賴和寬依賴。

窄依賴父RDD子RDD partition之間的關(guān)系是一對(duì)一的。或者父RDD一個(gè)partition只對(duì)應(yīng)一個(gè)子RDDpartition情況下的父RDD子RDD partition關(guān)系是多對(duì)一的,也可以理解為沒(méi)有觸發(fā)shuffle。

寬依賴父RDD子RDD partition之間的關(guān)系是一對(duì)多父RDD的一個(gè)分區(qū)的數(shù)據(jù)去到子RDD的不同分區(qū)里面。也可以理解為觸發(fā)了shuffle。

特別說(shuō)明:對(duì)于join操作有兩種情況,如果join操作的使用每個(gè)partition僅僅和已知的Partition進(jìn)行join,此時(shí)的join操作就是窄依賴;其他情況的join操作就是寬依賴。

RDD創(chuàng)建
  1. Hadoop文件系統(tǒng)(或與Hadoop兼容的其他持久化存儲(chǔ)系統(tǒng),如Hive、CassandraHBase)輸入(例如HDFS創(chuàng)建。

  2. 通過(guò)集合進(jìn)行創(chuàng)建。

算子

算子可以分為Transformation 轉(zhuǎn)換算子和Action 行動(dòng)算子。 RDD懶執(zhí)行的,如果沒(méi)有行動(dòng)操作出現(xiàn),所有的轉(zhuǎn)換操作都不會(huì)執(zhí)行。

RDD直觀圖,如下:

RDD 的 五大特性
  • 一組分片(Partition),即數(shù)據(jù)集的基本組成單位。對(duì)于RDD來(lái)說(shuō),每個(gè)分片都會(huì)被一個(gè)計(jì)算任務(wù)處理,并決定并行計(jì)算的粒度。用戶可以在創(chuàng)建RDD時(shí)指定RDD的分片個(gè)數(shù),如果沒(méi)有指定,那么就會(huì)采用默認(rèn)值。默認(rèn)值就是程序所分配到的CPU Core的數(shù)目。

  • 一個(gè)計(jì)算每個(gè)分區(qū)的函數(shù)。SparkRDD的計(jì)算是以分片為單位的,每個(gè) RDD都會(huì)實(shí)現(xiàn) compute 函數(shù)以達(dá)到這個(gè)目的。compute函數(shù)會(huì)對(duì)迭代器進(jìn)行復(fù)合,不需要保存每次計(jì)算的結(jié)果。

  • RDD之間的依賴關(guān)系。RDD的每次轉(zhuǎn)換都會(huì)生成一個(gè)新的RDD,所以RDD之間就會(huì)形成類似于流水線一樣的前后依賴關(guān)系。在部分分區(qū)數(shù)據(jù)丟失時(shí),Spark 可以通過(guò)這個(gè)依賴關(guān)系重新計(jì)算丟失的分區(qū)數(shù)據(jù),而不是對(duì)RDD的所有分區(qū)進(jìn)行重新計(jì)算。

  • 一個(gè)Partitioner,即RDD的分片函數(shù)。當(dāng)前Spark中實(shí)現(xiàn)了兩種類型的分片函數(shù),一個(gè)是基于哈希的HashPartitioner,另外一個(gè)是基于范圍的 RangePartitioner。只有對(duì)于于key-valueRDD,才會(huì)有Partitioner,非key-valueRDDParititioner的值是NonePartitioner函數(shù)不但決定了RDD本身的分片數(shù)量,也決定了parent RDD Shuffle輸出時(shí)的分片數(shù)量。

  • 一個(gè)列表,存儲(chǔ)存取每個(gè)Partition的優(yōu)先位置(preferred location)。對(duì)于一個(gè)HDFS文件來(lái)說(shuō),這個(gè)列表保存的就是每個(gè) Partition 所在的塊的位置。按照“移動(dòng)數(shù)據(jù)不如移動(dòng)計(jì)算”的理念,Spark 在進(jìn)行任務(wù)調(diào)度的時(shí)候,會(huì)盡可能地將計(jì)算任務(wù)分配到其所要處理數(shù)據(jù)塊的存儲(chǔ)位置。

相關(guān)API介紹
  • SparkContext創(chuàng)建;
  1. sc = SparkContext("local", "Simple App")

說(shuō)明:"local" 是指讓Spark程序本地運(yùn)行,"Simple App" 是指Spark程序的名稱,這個(gè)名稱可以任意(為了直觀明了的查看,最好設(shè)置有意義的名稱)。

  • 集合并行化創(chuàng)建RDD
  1. data = [1,2,3,4]
  2. rdd = sc.parallelize(data)
  • collect算子:在驅(qū)動(dòng)程序中將數(shù)據(jù)集的所有元素作為數(shù)組返回(注意數(shù)據(jù)集不能過(guò)大);
  1. rdd.collect()
  • 停止SparkContext。
  1. sc.stop()
# -*- coding: UTF-8 -*-
from pyspark import SparkContextif __name__ == "__main__":#********** Begin **********## 1.初始化 SparkContext,該對(duì)象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2.創(chuàng)建一個(gè)1到8的列表Listdata = [1, 2, 3, 4, 5, 6, 7, 8]# 3.通過(guò) SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(data)# 4.使用 rdd.collect() 收集 rdd 的內(nèi)容。 rdd.collect() 是 Spark Action 算子,在后續(xù)內(nèi)容中將會(huì)詳細(xì)說(shuō)明,主要作用是:收集 rdd 的數(shù)據(jù)內(nèi)容result = rdd.collect()# 5.打印 rdd 的內(nèi)容print(result)# 6.停止 SparkContextsc.stop()#********** End **********#

讀取外部數(shù)據(jù)集創(chuàng)建RDD?

編寫讀取本地文件創(chuàng)建Spark RDD的程序。

相關(guān)知識(shí)

為了完成本關(guān)任務(wù),你需要掌握:1.如何讀取本地文件系統(tǒng)中的文件來(lái)創(chuàng)建Spark RDD。

textFile 介紹

PySpark可以從Hadoop支持的任何存儲(chǔ)源創(chuàng)建分布式數(shù)據(jù)集,包括本地文件系統(tǒng),HDFSCassandraHBaseAmazon S3 等。Spark支持文本文件,SequenceFiles和任何其他Hadoop InputFormat。

文本文件RDD可以使用創(chuàng)建SparkContextextFile方法。此方法需要一個(gè) URI的文件(本地路徑的機(jī)器上,或一個(gè)hdfs://,s3a:// 等 URI),并讀取其作為行的集合。這是一個(gè)示例調(diào)用:

  1. distFile = sc.textFile("data.txt")
    # -*- coding: UTF-8 -*-
    from pyspark import SparkContextif __name__ == '__main__':#********** Begin **********## 1.初始化 SparkContext,該對(duì)象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 文本文件 RDD 可以使用創(chuàng)建 SparkContext 的t extFile 方法。
    #此方法需要一個(gè) URI的 文件(本地路徑的機(jī)器上,或一個(gè)hdfs://,s3a://等URI),
    #并讀取其作為行的集合# 2.讀取本地文件,URI為:/root/wordcount.txtrdd = sc.textFile("/root/wordcount.txt")# 3.使用 rdd.collect() 收集 rdd 的內(nèi)容。 
    #rdd.collect() 是 Spark Action 算子,在后續(xù)內(nèi)容中將會(huì)詳細(xì)說(shuō)明,主要作用是:收集 rdd 的數(shù)據(jù)內(nèi)容result = rdd.collect()# 4.打印 rdd 的內(nèi)容print(result)# 5.停止 SparkContextsc.stop()#********** End **********#

map 算子

本關(guān)任務(wù):使用Sparkmap 算子按照相關(guān)需求完成轉(zhuǎn)換操作。

相關(guān)知識(shí)

為了完成本關(guān)任務(wù),你需要掌握:如何使用map算子。

map

將原來(lái)RDD的每個(gè)數(shù)據(jù)項(xiàng)通過(guò)map中的用戶自定義函數(shù) f 映射轉(zhuǎn)變?yōu)橐粋€(gè)新的元素。

圖中每個(gè)方框表示一個(gè)RDD 分區(qū),左側(cè)的分區(qū)經(jīng)過(guò)自定義函數(shù) f:T->U 映射為右側(cè)的新 RDD 分區(qū)。但是,實(shí)際只有等到 Action 算子觸發(fā)后,這個(gè) f 函數(shù)才會(huì)和其他函數(shù)在一個(gè) Stage 中對(duì)數(shù)據(jù)進(jìn)行運(yùn)算。

map 案例
  1. sc = SparkContext("local", "Simple App")
    data = [1,2,3,4,5,6]
    rdd = sc.parallelize(data)
    print(rdd.collect())
    rdd_map = rdd.map(lambda x: x * 2)
    print(rdd_map.collect())

輸出:

[1, 2, 3, 4, 5, 6] [2, 4, 6, 8, 10, 12]

說(shuō)明:rdd1 的元素( 1 , 2 , 3 , 4 , 5 , 6 )經(jīng)過(guò) map 算子( x -> x*2 )轉(zhuǎn)換成了 rdd2 ( 2 , 4 , 6 , 8 , 10 )。

編程要求

請(qǐng)仔細(xì)閱讀右側(cè)代碼,根據(jù)方法內(nèi)的提示,在Begin - End區(qū)域內(nèi)進(jìn)行代碼補(bǔ)充,具體任務(wù)如下:

需求:使用 map 算子,將rdd的數(shù)據(jù) (1, 2, 3, 4, 5) 按照下面的規(guī)則進(jìn)行轉(zhuǎn)換操作,規(guī)則如下:

  • 偶數(shù)轉(zhuǎn)換成該數(shù)的平方;
  • 奇數(shù)轉(zhuǎn)換成該數(shù)的立方。
# -*- coding: UTF-8 -*-
from pyspark import SparkContextif __name__ == "__main__":#********** Begin **********## 1.初始化 SparkContext,該對(duì)象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2.創(chuàng)建一個(gè)1到5的列表Listdata = [1, 2, 3, 4, 5]# 3.通過(guò) SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(data)# 4.使用rdd.collect() 收集 rdd 的元素。print(rdd.collect())"""使用 map 算子,將 rdd 的數(shù)據(jù) (1, 2, 3, 4, 5) 按照下面的規(guī)則進(jìn)行轉(zhuǎn)換操作,規(guī)則如下:需求:偶數(shù)轉(zhuǎn)換成該數(shù)的平方奇數(shù)轉(zhuǎn)換成該數(shù)的立方"""# 5.使用 map 算子完成以上需求rdd_map = rdd.map(lambda x: x * x if x % 2 == 0 else x * x * x)# 6.使用rdd.collect() 收集完成 map 轉(zhuǎn)換的元素print(rdd_map.collect())# 7.停止 SparkContextsc.stop()#********** End **********#

mapPartitions算子

mapPartitions

mapPartitions函數(shù)獲取到每個(gè)分區(qū)的迭代器,在函數(shù)中通過(guò)這個(gè)分區(qū)整體的迭 代器對(duì)整個(gè)分區(qū)的元素進(jìn)行操作。

圖中每個(gè)方框表示一個(gè)RDD分區(qū),左側(cè)的分區(qū)經(jīng)過(guò)自定義函數(shù) f:T->U 映射為右側(cè)的新RDD分區(qū)。

mapPartitions 與 map

map:遍歷算子,可以遍歷RDD中每一個(gè)元素,遍歷的單位是每條記錄。

mapPartitions:遍歷算子,可以改變RDD格式,會(huì)提高RDD并行度,遍歷單位是Partition,也就是在遍歷之前它會(huì)將一個(gè)Partition的數(shù)據(jù)加載到內(nèi)存中。

那么問(wèn)題來(lái)了,用上面的兩個(gè)算子遍歷一個(gè)RDD誰(shuí)的效率高? mapPartitions算子效率高。

mapPartitions 案例
  1. def f(iterator):
    list = []
    for x in iterator:
    list.append(x*2)
    return listif __name__ == "__main__":
    sc = SparkContext("local", "Simple App")
    data = [1,2,3,4,5,6]
    rdd = sc.parallelize(data)
    print(rdd.collect())
    partitions = rdd.mapPartitions(f)
    print(partitions.collect())

輸出:


[1, 2, 3, 4, 5, 6]
[2, 4, 6, 8, 10, 12]

mapPartitions():傳入的參數(shù)是rdditerator(元素迭代器),返回也是一個(gè)iterator(迭代器)。

# -*- coding: UTF-8 -*-
from pyspark import SparkContext#********** Begin **********#
def f(iterator):list = []for x in iterator:list.append((x, len(x)))return list#********** End **********#
if __name__ == "__main__":#********** Begin **********## 1.初始化 SparkContext,該對(duì)象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2. 一個(gè)內(nèi)容為("dog", "salmon", "salmon", "rat", "elephant")的列表Listdata = ["dog", "salmon", "salmon", "rat", "elephant"]# 3.通過(guò) SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(data)# 4.使用rdd.collect() 收集 rdd 的元素。print(rdd.collect())"""使用 mapPartitions 算子,將 rdd 的數(shù)據(jù) ("dog", "salmon", "salmon", "rat", "elephant") 按照下面的規(guī)則進(jìn)行轉(zhuǎn)換操作,規(guī)則如下:需求:將字符串與該字符串的長(zhǎng)度組合成一個(gè)元組,例如:dog  -->  (dog,3)salmon   -->  (salmon,6)"""# 5.使用 mapPartitions 算子完成以上需求partitions = rdd.mapPartitions(f)# 6.使用rdd.collect() 收集完成 mapPartitions 轉(zhuǎn)換的元素print(partitions.collect())# 7.停止 SparkContextsc.stop()#********** End **********#

filter算子。

filter

filter 函數(shù)功能是對(duì)元素進(jìn)行過(guò)濾,對(duì)每個(gè)元素應(yīng)用f函數(shù),返 回值為 true的元素在RDD中保留,返回值為false的元素將被過(guò)濾掉。內(nèi)部實(shí)現(xiàn)相當(dāng)于生成。

  1. FilteredRDD(this,sc.clean(f))

下面代碼為函數(shù)的本質(zhì)實(shí)現(xiàn):

  1. def filter(self, f):
    """
    Return a new RDD containing only the elements that satisfy a predicate.>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
    >>> rdd.filter(lambda x: x % 2 == 0).collect()
    [2, 4]
    """
    def func(iterator):
    return filter(fail_on_stopiteration(f), iterator)
    return self.mapPartitions(func, True)

上圖中每個(gè)方框代表一個(gè) RDD 分區(qū), T 可以是任意的類型。通過(guò)用戶自定義的過(guò)濾函數(shù) f,對(duì)每個(gè)數(shù)據(jù)項(xiàng)操作,將滿足條件、返回結(jié)果為 true 的數(shù)據(jù)項(xiàng)保留。例如,過(guò)濾掉 V2V3 保留了 V1,為區(qū)分命名為 V’1。

filter 案例
  1. sc = SparkContext("local", "Simple App")
    data = [1,2,3,4,5,6]
    rdd = sc.parallelize(data)
    print(rdd.collect())
    rdd_filter = rdd.filter(lambda x: x>2)
    print(rdd_filter.collect())

輸出:

  1. [1, 2, 3, 4, 5, 6]
  2. [3, 4, 5, 6]

說(shuō)明:rdd1( [ 1 , 2 , 3 , 4 , 5 , 6 ] ) 經(jīng)過(guò) filter 算子轉(zhuǎn)換成 rdd2( [ 3 ,4 , 5 , 6 ] )。

使用 filter 算子,將 rdd 中的數(shù)據(jù) (1, 2, 3, 4, 5, 6, 7, 8) 按照以下規(guī)則進(jìn)行過(guò)濾,規(guī)則如下:

  • 過(guò)濾掉rdd中的所有奇數(shù)。
# -*- coding: UTF-8 -*-
from pyspark import SparkContextif __name__ == "__main__":#********** Begin **********## 1.初始化 SparkContext,該對(duì)象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2.創(chuàng)建一個(gè)1到8的列表Listdata = [1, 2, 3, 4, 5, 6, 7, 8]# 3.通過(guò) SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(data)# 4.使用rdd.collect() 收集 rdd 的元素。print(rdd.collect())"""使用 filter 算子,將 rdd 的數(shù)據(jù) (1, 2, 3, 4, 5, 6, 7, 8) 按照下面的規(guī)則進(jìn)行轉(zhuǎn)換操作,規(guī)則如下:需求:過(guò)濾掉rdd中的奇數(shù)"""# 5.使用 filter 算子完成以上需求rdd_filter = rdd.filter(lambda x: x % 2 == 0)# 6.使用rdd.collect() 收集完成 filter 轉(zhuǎn)換的元素print(rdd_filter.collect())# 7.停止 SparkContextsc.stop()#********** End **********#

flatMap算子

flatMap

將原來(lái)RDD中的每個(gè)元素通過(guò)函數(shù)f轉(zhuǎn)換為新的元素,并將生成的RDD中每個(gè)集合的元素合并為一個(gè)集合,內(nèi)部創(chuàng)建:

  1. FlatMappedRDD(this,sc.clean(f))

上圖表示RDD的一個(gè)分區(qū),進(jìn)行flatMap函數(shù)操作,flatMap中傳入的函數(shù)為f:T->UTU可以是任意的數(shù)據(jù)類型。將分區(qū)中的數(shù)據(jù)通過(guò)用戶自定義函數(shù)f轉(zhuǎn)換為新的數(shù)據(jù)。外部大方框可以認(rèn)為是一個(gè)RDD分區(qū),小方框代表一個(gè)集合。V1、V2、V3在一個(gè)集合作為RDD的一個(gè)數(shù)據(jù)項(xiàng),可能存儲(chǔ)為數(shù)組或其他容器,轉(zhuǎn)換為V’1、V’2、V’3后,將原來(lái)的數(shù)組或容器結(jié)合拆散,拆散的數(shù)據(jù)形成RDD中的數(shù)據(jù)項(xiàng)。

flatMap 案例
sc = SparkContext("local", "Simple App")
data = [["m"], ["a", "n"]]
rdd = sc.parallelize(data)
print(rdd.collect())
flat_map = rdd.flatMap(lambda x: x)
print(flat_map.collect())

輸出:

  1. [['m'], ['a', 'n']]
  2. ['m', 'a', 'n']

flatMap:將兩個(gè)集合轉(zhuǎn)換成一個(gè)集合
?

需求:使用 flatMap 算子,將rdd的數(shù)據(jù) ([1, 2, 3], [4, 5, 6], [7, 8, 9]) 按照下面的規(guī)則進(jìn)行轉(zhuǎn)換操作,規(guī)則如下:

  • 合并RDD的元素,例如:
    1. ([1,2,3],[4,5,6]) --> (1,2,3,4,5,6)
    2. ([2,3],[4,5],[6]) --> (1,2,3,4,5,6)
      from pyspark import SparkContextif __name__ == "__main__":#********** Begin **********## 1.初始化 SparkContext,該對(duì)象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2.創(chuàng)建一個(gè)[[1, 2, 3], [4, 5, 6], [7, 8, 9]] 的列表Listlist = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]# 3.通過(guò) SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(list)# 4.使用rdd.collect() 收集 rdd 的元素。print(rdd.collect())        """使用 flatMap 算子,將 rdd 的數(shù)據(jù) ([1, 2, 3], [4, 5, 6], [7, 8, 9]) 按照下面的規(guī)則進(jìn)行轉(zhuǎn)換操作,規(guī)則如下:需求:合并RDD的元素,例如:([1,2,3],[4,5,6])  -->  (1,2,3,4,5,6)([2,3],[4,5],[6])  -->  (1,2,3,4,5,6)"""# 5.使用 filter 算子完成以上需求flat_map = rdd.flatMap(lambda x: x)# 6.使用rdd.collect() 收集完成 filter 轉(zhuǎn)換的元素print(flat_map.collect())# 7.停止 SparkContextsc.stop()#********** End **********#
      

      distinct 算子

      distinct

      distinctRDD 中的元素進(jìn)行去重操作。

      上圖中的每個(gè)方框代表一個(gè) RDD 分區(qū),通過(guò) distinct 函數(shù),將數(shù)據(jù)去重。 例如,重復(fù)數(shù)據(jù) V1、 V1 去重后只保留一份 V1 。

      distinct 案例
      sc = SparkContext("local", "Simple App")
      data = ["python", "python", "python", "java", "java"]
      rdd = sc.parallelize(data)
      print(rdd.collect())
      distinct = rdd.distinct()
    3. 輸出

      ['python', 'python', 'python', 'java', 'java']
      ['python', 'java']
    4. print(distinct.collect())

      sortByKey 算子

      sortByKey
    5. def sortByKey(self, ascending=True, numPartitions=None, keyfunc=lambda x: x):
      if numPartitions is None:
      numPartitions = self._defaultReducePartitions()memory = self._memory_limit()
      serializer = self._jrdd_deserializerdef sortPartition(iterator):
      sort = ExternalSorter(memory * 0.9, serializer).sorted
      return iter(sort(iterator, key=lambda kv: keyfunc(kv[0]), reverse=(not ascending)))if numPartitions == 1:
      if self.getNumPartitions() > 1:
      self = self.coalesce(1)
      return self.mapPartitions(sortPartition, True)# first compute the boundary of each part via sampling: we want to partition
      # the key-space into bins such that the bins have roughly the same
      # number of (key, value) pairs falling into them
      rddSize = self.count()
      if not rddSize:
      return self # empty RDD
      maxSampleSize = numPartitions * 20.0 # constant from Spark's RangePartitioner
      f\fraction = min(maxSampleSize / max(rddSize, 1), 1.0)
      samples = self.sample(False, f\fraction, 1).map(lambda kv: kv[0]).collect()
      samples = sorted(samples, key=keyfunc)# we have numPartitions many parts but one of the them has
      # an implicit boundary
      bounds = [samples[int(len(samples) * (i + 1) / numPartitions)]
      for i in range(0, numPartitions - 1)]def rangePartitioner(k):
      p = bisect.bisect_left(bounds, keyfunc(k))
      if ascending:
      return p
      else:
      return numPartitions - 1 - preturn self.partitionBy(numPartitions, rangePartitioner).mapPartitions(sortPartition, True)
    6. 說(shuō)明:ascending參數(shù)是指排序(升序還是降序),默認(rèn)是升序。numPartitions參數(shù)是重新分區(qū),默認(rèn)與上一個(gè)RDD保持一致。keyfunc參數(shù)是排序規(guī)則。

      sortByKey 案例
    7. sc = SparkContext("local", "Simple App")
    8. data = [("a",1),("a",2),("c",1),("b",1)]
    9. rdd = sc.parallelize(data)
    10. key = rdd.sortByKey()
    11. print(key.collect())
    12. 輸出:

    13. [('a', 1), ('a', 2), ('b', 1), ('c', 1)]

?需求:使用 sortBy 算子,將 rdd 中的數(shù)據(jù)進(jìn)行排序(升序)。

from pyspark import SparkContextif __name__ == "__main__":# ********** Begin **********## 1.初始化 SparkContext,該對(duì)象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2.創(chuàng)建一個(gè)內(nèi)容為[('B',1),('A',2),('C',3)]的列表ListList = [('B',1),('A',2),('C',3)]# 3.通過(guò) SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(List)# 4.使用rdd.collect() 收集 rdd 的元素print(rdd.collect())"""使用 sortByKey 算子,將 rdd 的數(shù)據(jù) ('B', 1), ('A', 2), ('C', 3) 按照下面的規(guī)則進(jìn)行轉(zhuǎn)換操作,規(guī)則如下:需求:元素排序,例如:[(3,3),(2,2),(1,1)]  -->  [(1,1),(2,2),(3,3)]"""# 5.使用 sortByKey 算子完成以上需求key = rdd.sortByKey()# 6.使用rdd.collect() 收集完成 sortByKey 轉(zhuǎn)換的元素print(key.collect())# 7.停止 SparkContextsc.stop()# ********** End **********#

mapValues 算子

mapValues

mapValues :針對(duì)(Key, Value)型數(shù)據(jù)中的 Value 進(jìn)行 Map 操作,而不對(duì) Key 進(jìn)行處理。

上圖中的方框代表 RDD 分區(qū)。 a=>a+2 代表對(duì) (V1,1) 這樣的 Key Value 數(shù)據(jù)對(duì),數(shù)據(jù)只對(duì) Value 中的 1 進(jìn)行加 2 操作,返回結(jié)果為 3。

mapValues 案例
  1. sc = SparkContext("local", "Simple App")
    data = [("a",1),("a",2),("b",1)]
    rdd = sc.parallelize(data)
    values = rdd.mapValues(lambda x: x + 2)
    print(values.collect())

輸出:

  1. [('a', 3), ('a', 4), ('b', 3)]

需求:使用mapValues算子,將rdd的數(shù)據(jù) ("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5) 按照下面的規(guī)則進(jìn)行轉(zhuǎn)換操作,規(guī)則如下:

  • 偶數(shù)轉(zhuǎn)換成該數(shù)的平方
  • 奇數(shù)轉(zhuǎn)換成該數(shù)的立方
    from pyspark import SparkContextif __name__ == "__main__":# ********** Begin **********## 1.初始化 SparkContext,該對(duì)象是 Spark 程序的入口sc = SparkContext("local", "Simple App")    # 2.創(chuàng)建一個(gè)內(nèi)容為[("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)]的列表ListList = [("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)]# 3.通過(guò) SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(List)# 4.使用rdd.collect() 收集 rdd 的元素print(rdd.collect())"""使用 mapValues 算子,將 rdd 的數(shù)據(jù) ("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5) 按照下面的規(guī)則進(jìn)行轉(zhuǎn)換操作,規(guī)則如下:需求:元素(key,value)的value進(jìn)行以下操作:偶數(shù)轉(zhuǎn)換成該數(shù)的平方奇數(shù)轉(zhuǎn)換成該數(shù)的立方"""# 5.使用 mapValues 算子完成以上需求values = rdd.mapValues(lambda x: x + 2)# 6.使用rdd.collect() 收集完成 mapValues 轉(zhuǎn)換的元素print(values.collect())# 7.停止 SparkContextsc.stop()# ********** End **********#
    

    reduceByKey 算子

    reduceByKey

    reduceByKey 算子,只是兩個(gè)值合并成一個(gè)值,比如疊加。

    函數(shù)實(shí)現(xiàn)

    def reduceByKey(self, func, numPartitions=None, partitionFunc=portable_hash):
    return self.combineByKey(lambda x: x, func, func, numPartitions, partitionFunc)

上圖中的方框代表 RDD 分區(qū)。通過(guò)自定義函數(shù) (A,B) => (A + B) ,將相同 key 的數(shù)據(jù) (V1,2)(V1,1)value 做加法運(yùn)算,結(jié)果為( V1,3)

reduceByKey 案例
sc = SparkContext("local", "Simple App")
data = [("a",1),("a",2),("b",1)]
rdd = sc.parallelize(data)
print(rdd.reduceByKey(lambda x,y:x+y).collect())

輸出:

[('a', 3), ('b', 1)]

需求:使用 reduceByKey 算子,將 rdd(key-value類型) 中的數(shù)據(jù)進(jìn)行值累加。

例如:

  1. ("soma",4), ("soma",1), ("soma",2) -> ("soma",7)
from pyspark import SparkContextif __name__ == "__main__":# ********** Begin **********## 1.初始化 SparkContext,該對(duì)象是 Spark 程序的入口sc = SparkContext("local", "Simple App")    # 2.創(chuàng)建一個(gè)內(nèi)容為[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]的列表ListList = [("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]# 3.通過(guò) SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(List)  # 4.使用rdd.collect() 收集 rdd 的元素print(rdd.collect())"""使用 reduceByKey 算子,將 rdd 的數(shù)據(jù)[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)] 按照下面的規(guī)則進(jìn)行轉(zhuǎn)換操作,規(guī)則如下:需求:元素(key-value)的value累加操作,例如:(1,1),(1,1),(1,2)  --> (1,4)(1,1),(1,1),(2,2),(2,2)  --> (1,2),(2,4)"""# 5.使用 reduceByKey 算子完成以上需求reduce = rdd.reduceByKey(lambda x,y:x+y)# 6.使用rdd.collect() 收集完成 reduceByKey 轉(zhuǎn)換的元素print(reduce.collect())# 7.停止 SparkContextsc.stop()# ********** End **********#

Action 的常用算子

count

count():返回 RDD 的元素個(gè)數(shù)。

示例:

  1. sc = SparkContext("local", "Simple App")
    data = ["python", "python", "python", "java", "java"]
    rdd = sc.parallelize(data)
    print(rdd.count())

輸出:

  1. 5
first

first():返回 RDD 的第一個(gè)元素(類似于take(1))。

示例:

  1. sc = SparkContext("local", "Simple App")
    data = ["python", "python", "python", "java", "java"]
    rdd = sc.parallelize(data)
    print(rdd.first())

輸出:

  1. python
take

take(n):返回一個(gè)由數(shù)據(jù)集的前 n 個(gè)元素組成的數(shù)組。

示例:

  1. sc = SparkContext("local", "Simple App")
    data = ["python", "python", "python", "java", "java"]
    rdd = sc.parallelize(data)
    print(rdd.take(2))

輸出:

  1. ['python', 'python']
reduce

reduce():通過(guò)func函數(shù)聚集 RDD 中的所有元素,該函數(shù)應(yīng)該是可交換的和關(guān)聯(lián)的,以便可以并行正確計(jì)算。

示例:

  1. sc = SparkContext("local", "Simple App")
    data = [1,1,1,1]
    rdd = sc.parallelize(data)
    print(rdd.reduce(lambda x,y:x+y))

輸出:

  1. 4
collect

collect():在驅(qū)動(dòng)程序中,以數(shù)組的形式返回?cái)?shù)據(jù)集的所有元素。

示例:

  1. sc = SparkContext("local", "Simple App")
    data = [1,1,1,1]
    rdd = sc.parallelize(data)
    print(rdd.collect())

輸出:

  1. [1,1,1,1]
具體任務(wù)如下:

需求1:使用 count 算子,統(tǒng)計(jì)下 rdd 中元素的個(gè)數(shù);

需求2:使用 first 算子,獲取 rdd 首個(gè)元素;

需求3:使用 take 算子,獲取 rdd 前三個(gè)元素;

需求4:使用 reduce 算子,進(jìn)行累加操作;

需求5:使用 collect 算子,收集所有元素。
?

from pyspark import SparkContext
if __name__ == "__main__":# ********** Begin **********## 1.初始化 SparkContext,該對(duì)象是 Spark 程序的入口sc = SparkContext("local", "Simple App")# 2.創(chuàng)建一個(gè)內(nèi)容為[1, 3, 5, 7, 9, 8, 6, 4, 2]的列表ListList = [1, 3, 5, 7, 9, 8, 6, 4, 2]  # 3.通過(guò) SparkContext 并行化創(chuàng)建 rddrdd = sc.parallelize(List)# 4.收集rdd的所有元素并print輸出print(rdd.collect())# 5.統(tǒng)計(jì)rdd的元素個(gè)數(shù)并print輸出print(rdd.count())# 6.獲取rdd的第一個(gè)元素并print輸出print(rdd.first())# 7.獲取rdd的前3個(gè)元素并print輸出print(rdd.take(3))# 8.聚合rdd的所有元素并print輸出print(rdd.reduce(lambda x,y:x+y))# 9.停止 SparkContextsc.stop()# ********** End **********#

http://www.risenshineclean.com/news/32122.html

相關(guān)文章:

  • 做網(wǎng)站如何將一張圖片直接變體馮耀宗seo視頻教程
  • 邢臺(tái)網(wǎng)站建設(shè)的公司湖南網(wǎng)絡(luò)推廣排名
  • apt-get install wordpress深圳外包seo
  • 吉安網(wǎng)站建設(shè)0796abc百度小說(shuō)搜索風(fēng)云榜總榜
  • 室內(nèi)設(shè)計(jì)網(wǎng)站知乎南京響應(yīng)式網(wǎng)站建設(shè)
  • 網(wǎng)站專業(yè)術(shù)語(yǔ)中seo意思是網(wǎng)站制作基本流程
  • 做mv主題網(wǎng)站媒體發(fā)稿費(fèi)用
  • 嘉興做網(wǎng)站的銷售培訓(xùn)課程一般有哪些
  • 網(wǎng)站流量的重要性seo推廣軟件代理
  • web網(wǎng)站開(kāi)發(fā)報(bào)告深圳seo優(yōu)化
  • 豐臺(tái)網(wǎng)站建設(shè)小程序開(kāi)發(fā)哪家更靠譜
  • 天津網(wǎng)站制作計(jì)劃電商項(xiàng)目策劃書
  • 湖南省人民政府駐深圳辦事處江門網(wǎng)站優(yōu)化公司
  • 寶安做棋牌網(wǎng)站建設(shè)找哪家公司好上海網(wǎng)站優(yōu)化
  • 電商網(wǎng)站開(kāi)發(fā)價(jià)格優(yōu)化網(wǎng)站seo策略
  • 在哪個(gè)網(wǎng)站上做外貿(mào)好深圳網(wǎng)站優(yōu)化網(wǎng)站
  • 哪些網(wǎng)站是php企業(yè)qq和個(gè)人qq有什么區(qū)別
  • 做視頻網(wǎng)站收費(fèi)侵權(quán)嗎全媒體廣告代理加盟
  • 農(nóng)村網(wǎng)站做移動(dòng)濟(jì)南做網(wǎng)站比較好的公司
  • 網(wǎng)站備案信息怎么做百度熱搜高考大數(shù)據(jù)
  • pc網(wǎng)站怎么適配移動(dòng)端網(wǎng)頁(yè)設(shè)計(jì)效果圖及代碼
  • 保險(xiǎn)做的好的網(wǎng)站第三方推廣平臺(tái)
  • 鞋圖相冊(cè)網(wǎng)站怎么做app拉新平臺(tái)哪個(gè)好傭金高
  • 專業(yè)做網(wǎng)站建設(shè)建站公司網(wǎng)站怎么做
  • 建設(shè)項(xiàng)目立項(xiàng)網(wǎng)站搜索引擎優(yōu)化網(wǎng)站
  • 如何做二維碼跳轉(zhuǎn)到網(wǎng)站軟件開(kāi)發(fā)
  • 杭州余杭做網(wǎng)站公司免費(fèi)推廣網(wǎng)站地址大全
  • 傳統(tǒng)網(wǎng)站有沒(méi)有建設(shè)必要建網(wǎng)站賺錢
  • 承德網(wǎng)站建設(shè)方案在線排名優(yōu)化工具
  • 個(gè)人網(wǎng)站 數(shù)據(jù)庫(kù)如何上傳到空間視頻號(hào)推廣