東莞網(wǎng)站建設(shè)招聘內(nèi)蒙古最新消息
目錄
一、Filter方法
功能
語法
代碼
總結(jié)
filter算子
二、distinct方法
功能
語法
代碼
總結(jié)
distinct算子
三、SortBy方法
功能
語法
代碼?
總結(jié)
sortBy算子
四、數(shù)據(jù)計(jì)算練習(xí)
需求:
解答
總結(jié)
去重函數(shù):
過濾函數(shù):
轉(zhuǎn)換函數(shù):
排序函數(shù):
于是我駐足,享受無法復(fù)刻的一些瞬間
????????????????????????????????????????????????????????—— 24.11.9
一、Filter方法
功能
過濾想要的數(shù)據(jù)進(jìn)行保留
語法
基于filter中我們傳入的函數(shù),決定rdd對象中哪個(gè)保留哪個(gè)丟棄
代碼
from pyspark import SparkConf,SparkContext# 設(shè)置spark中的python解釋器對象
import os
os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# 準(zhǔn)備一個(gè)RDD
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
# 對RDD的數(shù)據(jù)進(jìn)行過濾,保留奇數(shù),去除偶數(shù)# 方法1:
def Retain(data):if data % 2 == 1:return Trueelse:return False# 對RDD數(shù)據(jù)進(jìn)行過濾,留下奇數(shù)
rdd1 = rdd.filter(Retain)
print(rdd1.collect())# 方法2:
rdd2 = rdd.filter(lambda num:num % 2 == 1)
print(rdd2.collect())
總結(jié)
filter算子
接受一個(gè)處理函數(shù),可用lambda匿名函數(shù)快速編寫
函數(shù)對RDD數(shù)據(jù)逐個(gè)處理,得到True的保留到返回值的RDD中
二、distinct方法
功能
對RDD數(shù)據(jù)進(jìn)行去重,返回新RDD
語法
rdd.distinct() # 無需傳參
代碼
from pyspark import SparkConf,SparkContext# 設(shè)置spark中的python解釋器對象
import os
os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# 準(zhǔn)備一個(gè)RDD
rdd = sc.parallelize([1,3,3,4,4,4,7,8,9,9])
rdd = rdd.distinct()
print(rdd.collect())
總結(jié)
distinct算子
完成對Rdd內(nèi)數(shù)據(jù)的去重操作
三、SortBy方法
功能
對RDD數(shù)據(jù)進(jìn)行排序,基于指定的排序依據(jù)
語法
rdd.sortBy()
rdd.sortBy(func, ascending = False, numPartitions = 1)
# func:(T) - > U: 告知按照rdd中的哪個(gè)數(shù)據(jù)進(jìn)行排序,比如 lambda x:x[1] 表示按照rdd中的第二列元素進(jìn)行排序
# ascending: True升序 False 降序
# numPartitions: 用多少分區(qū)排序
代碼?
from pyspark import SparkConf,SparkContext# 設(shè)置spark中的python解釋器對象
import os
os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# 讀取數(shù)據(jù)文件
rdd = sc.textFile("D:/2LFE\Desktop\WordCount.txt")
# 取出全部單詞
word_rdd = rdd.flatMap(lambda x:x.split(" "))
print(word_rdd.collect())# 將所有單詞都轉(zhuǎn)換成二元元組,單詞為key,value設(shè)置為1
word_with_one_rdd = word_rdd.map(lambda word:(word,1))
# 分組并求和
result_rdd = word_with_one_rdd.reduceByKey(lambda a,b:a+b)
# 對結(jié)果進(jìn)行排序
result_rdd = result_rdd.sortBy(lambda x:x[1],ascending = False,numPartitions = 1)
# 打印并輸出結(jié)果
print(result_rdd.collect())
總結(jié)
sortBy算子
接收一個(gè)處理函數(shù),可用lambda快速編寫
函數(shù)表示用來決定排序的依據(jù)
可以控制升序或降序
全局排序需要設(shè)置分區(qū)數(shù)為1
四、數(shù)據(jù)計(jì)算練習(xí)
需求:
復(fù)制以上內(nèi)容到文件中,使用Spark讀取文件進(jìn)行計(jì)算:
① 各個(gè)城市銷售額排名,從大到小
② 全部城市,有哪些商品類別在售賣
③ 北京市有哪些商品類別在售賣
解答
from pyspark import SparkConf,SparkContext
import json# 設(shè)置spark中的python解釋器對象
import os
os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# 讀取文件得到RDD
file_rdd = sc.textFile("E:\python.learning\pyspark\sortBy.txt")# 取出一個(gè)個(gè)JSON字符串
json_str_rdd = file_rdd.flatMap(lambda x:x.split("|"))# 將一個(gè)JSON字符串轉(zhuǎn)換為字典 json模塊
dict_rdd = json_str_rdd.map(lambda x:json.loads(x))# 取出城市和銷售額數(shù)據(jù):(城市,銷售額)
city_with_money_rdd = dict_rdd.map(lambda x:(x['areaName'],int(x['money'])))# 按銷售額對結(jié)果進(jìn)行聚合然后根據(jù)銷售額降序排序
city_result_rdd = city_with_money_rdd.reduceByKey(lambda x,y:x+y)
res1 = city_result_rdd.sortBy(lambda x:x[1],ascending = False,numPartitions = 1)
print("需求1結(jié)果:" , res1.collect())# 需求2 對全部商品進(jìn)行去重
category_rdd = dict_rdd.map(lambda x: x['category']).distinct()
print("需求2結(jié)果:",category_rdd.collect())# 需求3 過濾北京市的數(shù)據(jù)
BJ_data_rdd = dict_rdd.filter(lambda x:x['areaName'] == '北京')
print("需求3結(jié)果:",BJ_data_rdd.collect())# 需求4 對北京市的商品類別進(jìn)行商品類別去重
res2 = BJ_data_rdd.map(lambda x:x['category']).distinct()
print("需求4結(jié)果:",res2.collect())
總結(jié)
去重函數(shù):
在 PySpark 框架下,distinct
函數(shù)用于返回一個(gè)新的 RDD,其中包含原始 RDD 中的不同元素。
過濾函數(shù):
filter
函數(shù)用于從彈性分布式數(shù)據(jù)集(RDD)中篩選出滿足特定條件的元素,返回一個(gè)新的 RDD 只包含滿足條件的元素。
轉(zhuǎn)換函數(shù):
在 PySpark 中,map
函數(shù)是對彈性分布式數(shù)據(jù)集(RDD)進(jìn)行轉(zhuǎn)換操作的一種重要方法。map
函數(shù)對 RDD 中的每個(gè)元素應(yīng)用一個(gè)函數(shù),返回一個(gè)新的 RDD,其中包含應(yīng)用函數(shù)后的結(jié)果。
排序函數(shù):
sortBy
?函數(shù)用于對RDD 中的元素進(jìn)行排序,它接受一個(gè)函數(shù)或者一個(gè)字段名作為參數(shù),根據(jù)這個(gè)參數(shù)來確定排序的依據(jù)。