網站優(yōu)化工作怎么樣seo和點擊付費的區(qū)別
目錄
Map算子使用
FlatMap算子使用
Filter算子使用-數(shù)據過濾
Distinct算子使用-數(shù)據去重
groupBy算子使用-數(shù)據分組
sortBy算子使用-數(shù)據排序
Map算子使用
# map算子主要使用長場景,一個轉化rdd中每個元素的數(shù)據類型,拼接rdd中的元素數(shù)據,對rdd中的元素進行需求處理
# 需求,處理hdfs中的學生數(shù)據,單獨獲取每個學生的信息
from pyspark import SparkContextsc = SparkContext()# 1-讀取數(shù)據
rdd = sc.textFile("hdfs://node1:8020/data/student.txt")
# 2- 使用轉化算子進行數(shù)據處理
# map中的lambda表達式,必須定義一個參數(shù),用來接收rdd中的元素數(shù)據, 注意:x參數(shù)如何處理,要看x接收的數(shù)據類型
rdd2 = rdd.map(lambda x : x.split(','))
# 3-從rdd2中獲取姓名數(shù)據
rdd3 =rdd2.map(lambda x : x[1])# lambda 函數(shù)能進行簡單的數(shù)據計算,如果遇到復雜數(shù)據計算時,就需要使用自定義函數(shù)
# 獲取年齡數(shù)據,并且轉化年齡數(shù)據為int類型,將年齡和性別合并一起保存成元組
## 獲取年齡
def func(x):# 1-切割數(shù)據data_split = x.split(',')# 2-轉換數(shù)據類型age = int(data_split[3])# 3-拼接性別與年齡data_tuple = (data_split[2],age)return data_tuple# 將函數(shù)的名字傳遞到map中,不要加括號
rdd4 = rdd.map(func)# 觸發(fā)執(zhí)行算子,查看讀取的數(shù)據
res = rdd.collect()
print(res)res2 = rdd2.collect()
print(res2)res3 = rdd3.collect()
print(res3)res4 = rdd4.collect()
print(res4)
FlatMap算子使用
# FlatMap算子使用
# 主要場景是對二維嵌套的數(shù)據降維操作 [[1,張三],[2,李四],[3,王五]] --->> [1,張三,2,李四,3,王五]
from pyspark import SparkContextsc = SparkContext()# 生成的rdd
rdd = sc.parallelize([['1', 'alice', 'F', '32'], ['2', 'Tom', 'M', '22'], ['3', 'lili', 'F', '18'], ['4', 'jerry', 'M', '24']])# 使用flatmap
rdd1 = rdd.flatMap(lambda x: x) # 直接返回x,會自動將x中的元素數(shù)據取出,放入新的rdd中# 查看數(shù)據
res = rdd1.collect()
print(res)
Filter算子使用-數(shù)據過濾
# RDD數(shù)據過濾
# 需求:過濾年齡大于20歲的信息
from pyspark import SparkContext
sc = SparkContext()# 1- 讀取hdfs中的學生數(shù)據
rdd = sc.textFile('hdfs://node1:8020/data/student.txt')# 2- 使用轉化算子進行數(shù)據處理
# map中的lambda表達式,必須定義一個參數(shù),用來接收rdd中的元素數(shù)據, 注意:x參數(shù)如何處理,要看x接收的數(shù)據類型
rdd2 = rdd.map(lambda x:x.split(','))
# 使用fliter方法進行數(shù)據過濾
# lambda x:過濾條件 可以當成 if 操作 if 條件
# 符合條件的數(shù)據會返回保存在新的rdd中
rdd3 = rdd2.filter(lambda x :int(x[3]) > 20)# 查看數(shù)據
res = rdd2.collect()
print(res)res3 = rdd3.collect()
print(res3)
Distinct算子使用-數(shù)據去重
# distinct 去重算子
# rdd中有重復數(shù)據時,可以進行去重
from pyspark import SparkContext
sc = SparkContext()# 1- 讀取hdfs中的學生數(shù)據
rdd = sc.textFile('hdfs://node1:8020/data/student.txt')# 2- 使用轉化算子進行數(shù)據處理
# map中的lambda表達式,必須定義一個參數(shù),用來接收rdd中的元素數(shù)據, 注意:x參數(shù)如何處理,要看x接收的數(shù)據類型
rdd2 = rdd.map(lambda x:x.split(','))# 3-從rdd2中獲取性別數(shù)據
rdd3 = rdd2.map(lambda x : x[2])# 對rdd3中重復數(shù)據去重
rdd4 = rdd3.distinct()# 查看數(shù)據
res = rdd3.collect()
print(res)res1 = rdd4.collect()
print(res1)
groupBy算子使用-數(shù)據分組
from pyspark import SparkContext
sc = SparkContext()# 1- 讀取hdfs中的學生數(shù)據
rdd = sc.textFile('hdfs://node1:8020/data/student.txt')# 2- 使用轉化算子進行數(shù)據處理
# map中的lambda表達式,必須定義一個參數(shù),用來接收rdd中的元素數(shù)據, 注意:x參數(shù)如何處理,要看x接收的數(shù)據類型
rdd2 = rdd.map(lambda x:x.split(','))# 3-對性別進行分組
# lambda x: hash取余的計算 hash(數(shù)據)%分組數(shù) 余數(shù)相同的數(shù)據會放在一起
rdd3 = rdd.groupBy(lambda x:hash(x[2]) % 2)
# 查看分組的數(shù)據內容 mapValues 取出分組后的數(shù)據值,對數(shù)據值轉為列表即可
rdd4 = rdd3.mapValues(lambda x:list(x))# 查看數(shù)據
res2 = rdd2.collect()
print(res2)res3 = rdd3.collect()
print(res3)res4 = rdd4.collect()
print(res4)
分組算子用到了哈希算法,lambda x: hash取余的計算 ?hash(數(shù)據)%分組數(shù) ? ? ?余數(shù)相同的數(shù)據會放在一起
rdd3 = rdd.groupBy(lambda x:hash(x[2]) % 2)
sortBy算子使用-數(shù)據排序
# RDD的數(shù)據排序
from pyspark import SparkContextsc = SparkContext()# 創(chuàng)建數(shù)據
# 非kv數(shù)據
rdd = sc.parallelize([10,45,27,18,5,29])# 在spark中可以使用元組表示kv數(shù)據(k,v)
rdd2 = sc.parallelize([('張三',27),('李四',18),('王五',31),('趙六',21)])rdd1 = sc.parallelize([(666,'火眼金睛'),(2000,'筋斗云'),(888,'順風耳'),(1314,'降龍十八掌')])# 數(shù)據排序
# 非kv數(shù)據
rdd3 = rdd.sortBy(lambda x: x) # 默認升序,從小到大排
rdd4 = rdd.sortBy(lambda x: x,ascending=False) # 降序# kv數(shù)據排序 x接收(k,v)數(shù)據 需要指定采用哪個值進行排序
# 根據v值進行排序
rdd5 = rdd2.sortBy(lambda x: x[1])
rdd6 = rdd2.sortBy(lambda x: x[1],ascending=False)# 根據k值進行排序
rdd7 = rdd1.sortBy(lambda x: x[0])
rdd8 = rdd1.sortBy(lambda x: x[0],ascending=False)# 查看結果
# 非kv數(shù)據
res1 = rdd3.collect()
res2 = rdd4.collect()
print(res1)
print(res2)# kv數(shù)據排序
res5 = rdd5.collect()
res6 = rdd6.collect()
print(res5)
print(res6)res7 = rdd7.collect()
res8 = rdd8.collect()
print(res7)
print(res8)
join算子使用-數(shù)據關聯(lián)
準備數(shù)據,模擬表關聯(lián)
students.txt
students2.txt
from pyspark import SparkContext
# rdd也是使用join算子進行kv數(shù)據關聯(lián) ,如果需要將多個rdd數(shù)據關聯(lián)在一起
# 需要現(xiàn)將rdd的數(shù)據轉為kv結構,關聯(lián)的字段數(shù)據作為key
sc = SparkContext()
# 分別讀取兩個文件數(shù)據
rdd1 = sc.textFile('hdfs://node1:8020/data/students.txt')
rdd2 = sc.textFile('hdfs://node1:8020/data/students2.txt')# 切割行數(shù)
rdd_line1 = rdd1.map(lambda x:x.split(','))
rdd_line2 = rdd2.map(lambda x:x.split(','))# 將rdd數(shù)據進行關聯(lián)
# 將關聯(lián)的數(shù)據轉為kv結構
rdd_kv1 = rdd_line1.map(lambda x:(x[0],x))
rdd_kv2 = rdd_line2.map(lambda x:(x[0],x))# 使用join關聯(lián)
rdd_join = rdd_kv1.join(rdd_kv2) # 內關聯(lián)
rdd_leftjoin = rdd_kv1.leftOuterJoin(rdd_kv2) # 左關聯(lián)
rdd_rightjoin = rdd_kv1.rightOuterJoin(rdd_kv2) # 右關聯(lián)# 查看數(shù)據res3 = rdd_join.sortBy(lambda x:x[0]).collect() # 找相同數(shù)據
print(res3)res4 = rdd_leftjoin.collect() # 左表數(shù)據全部展示,右邊右相同數(shù)據展示,沒有相同數(shù)據為空None
print(res4)res5 = rdd_rightjoin.collect() # 右表數(shù)據全部展示,左邊右相同數(shù)據展示,沒有相同數(shù)據為空None
print(res5)
?join內關聯(lián):只有共同的才展示
leftOuterJoin左關聯(lián):左表數(shù)據全部展示,右邊右相同數(shù)據展示,沒有相同數(shù)據為空None
rightOuterJoin右關聯(lián):右表數(shù)據全部展示,左邊右相同數(shù)據展示,沒有相同數(shù)據為空None