中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

地推拉新接單網(wǎng)seo網(wǎng)站建站

地推拉新接單網(wǎng),seo網(wǎng)站建站,洛陽免費網(wǎng)站建設(shè),做網(wǎng)站入門Spark計算框架 一、Spark概述二、Spark的安裝部署(安裝部署Spark的Cluster Manager-資源調(diào)度管理器的)1、Spark的安裝模式1.1、Spark(單節(jié)點)本地安裝1.2 Spark的Standalone部署模式的偽分布式安裝1.3Spark的YARN部署模式1.4Spark…

Spark計算框架

  • 一、Spark概述
  • 二、Spark的安裝部署(安裝部署Spark的Cluster Manager-資源調(diào)度管理器的)
    • 1、Spark的安裝模式
      • 1.1、Spark(單節(jié)點)本地安裝
      • 1.2 Spark的Standalone部署模式的偽分布式安裝
      • 1.3Spark的YARN部署模式
      • 1.4Spark的Standalone部署模式的完全分布式安裝
      • 1.5Spark的HA配置
    • 2、Spark運行程序的歷史日志服務(wù)器
    • 3、Spark安裝部署涉及到的端口
  • 三、Spark運行中集群角色
  • 四、Spark程序的部署運行的方式 —— Spark執(zhí)行jar包
  • 五、Spark的編程方式
  • 六、Spark的核心基礎(chǔ)Spark Core
    • 1、Spark Core中最核心的有兩個概念
    • 2、RDD的屬性(RDD具備的一些特征)
    • 3、RDD的彈性的體現(xiàn)
    • 4、RDD的特點
    • 5、RDD的分類
    • 6、RDD的編程
      • 6.1、RDD的創(chuàng)建操作
      • 6.2、RDD的轉(zhuǎn)換操作(轉(zhuǎn)換算子)
      • 6.3、RDD的行動算子
      • 6.4、RDD的一些比較特殊的行動算子(只針對整數(shù)類型的RDD有效)
    • 7、RDD的持久化(緩存)
    • 8、RDD的檢查點機制
    • 9、RDD算子的依賴關(guān)系
    • 10、RDD的兩個特殊的使用
    • 11、RDD的分區(qū)機制
  • 七、【補充】Scala的比較器問題

一、Spark概述

Spark的誕生背景

  • Spark 2009年誕生的一個技術(shù),誕生的主要原因是因為Hadoop大數(shù)據(jù)解決方案存在一些弊端
    • MR程序是基于磁盤進(jìn)行運算,因此導(dǎo)致MR程序計算效率底下。
    • MR程序無法計算復(fù)雜的任務(wù),如果想要實現(xiàn)復(fù)雜的計算邏輯,可能編寫多個MR Job,其中后續(xù)的Job依賴于前一個Job的輸出,但是多個Job無法知道前一個job,需要通過任務(wù)調(diào)度框架自己指定多job的依賴關(guān)系。

Spark概念

  • Spark是一個計算框架,內(nèi)部包含了很多的子組件,子組件解決了各種各樣的大數(shù)據(jù)計算問題,子組件都是計算框架。Spark本身也是一個分布式計算程序,代碼的運行也得需要分布式資源調(diào)度。

  • Spark主要解決了Hadoop的MR存在的問題,Spark是基于內(nèi)存運算的一種迭代式計算框架。

  • Spark相當(dāng)于是Hadoop的升級版本的解決方案,基于內(nèi)存進(jìn)行運算,并且Spark內(nèi)部實現(xiàn)迭代式計算思想,可以在一個應(yīng)用程序編寫復(fù)雜的計算邏輯。

  • Spark之所以可以實現(xiàn)基于內(nèi)存的迭代式計算,主要也是因為Spark Core中的一個核心數(shù)據(jù)抽象RDD。

  • Spark有一個思想(one stack to rule them all) - 一棧式解決方案,一個技術(shù)實現(xiàn)大數(shù)據(jù)中各種計算場景的應(yīng)用問題。Spark中包含很多的計算子組件。

    • Spark Core:Spark的核心基礎(chǔ),Spark的任務(wù)調(diào)度規(guī)則,Spark的基礎(chǔ)語法,數(shù)據(jù)抽象RDD。
    • Spark SQL(結(jié)構(gòu)化數(shù)據(jù)查詢):借助SQL或者Hive版本的HQL進(jìn)行結(jié)構(gòu)化數(shù)據(jù)的處理。
    • Spark Streaming(準(zhǔn)實時計算):內(nèi)部采用了微批次處理思想,實現(xiàn)數(shù)據(jù)的實時計算。

    數(shù)據(jù)處理和開發(fā)


    • Spark MLlib(算法)

    • Spark GraphX(圖計算)

    • Spark R

    數(shù)據(jù)科學(xué)或者算法計算


  • 官網(wǎng)地址

image-20230918152114750

Spark的特點

  • 計算快速:Spark相當(dāng)于Hadoop的升級版的大數(shù)據(jù)計算解決方案
  • 易用性:Spark提供了多種語法的編程風(fēng)格
  • 兼容性:Spark計算框架和大數(shù)據(jù)中很多技術(shù)無縫銜接,比如Spark支持直接從HDFS、Kafka、HBase、Hive、MySQL…等等地方直接讀取數(shù)據(jù)處理
  • 通用性:Spark一個技術(shù)??梢越鉀Q大數(shù)據(jù)中遇到的大部分計算場景問題,而且Spark各個子組件都是基于Spark Core的,因此Spark的各個子組件可以無縫的銜接轉(zhuǎn)換

二、Spark的安裝部署(安裝部署Spark的Cluster Manager-資源調(diào)度管理器的)

【注意】Spark的安裝部署,Spark本身就是一個分布式計算框架,如果使用Spark,我們需要使用對應(yīng)的編程語言編寫Spark代碼,編寫Spark程序不需要部署Spark程序,因此Spark的安裝部署主要指的是編寫好的Spark程序在什么環(huán)境下運行(編寫好的Spark程序使用哪種資源調(diào)度器進(jìn)行資源的申請和調(diào)度)。
Spark的安裝部署就是安裝部署Spark運行的資源調(diào)度器的。

  • Spark的資源調(diào)度器常用的有三個:Spark自帶的standalone獨立調(diào)度器、Hadoop的YARN、Apache的Mesos。

1、Spark的安裝模式

Spark的安裝部署就是安裝Spark的不同的資源調(diào)度器。

前提:服務(wù)器上先安裝部署JDK

**本地安裝模式(不使用任何的資源調(diào)度器,只在本地運行Spark程序):**解壓配置環(huán)境,Spark程序的運行只能由本地的CPU進(jìn)行資源調(diào)度,這種部署模式只能做測試學(xué)習(xí)使用。

**Standalone獨立調(diào)度器部署模式:**使用Spark自帶的獨立資源調(diào)度器進(jìn)行資源調(diào)度。部署Master和Worker節(jié)點(主從架構(gòu)):這種部署模式既可以測試學(xué)習(xí)、也可以做項目開發(fā)部署。

  • 偽分布式:將Standalone的master、worker安裝到一臺節(jié)點上,同時worker只有一個
  • 完全分布式
  • HA高可用模式
四個核心配置文件:
spark-env.sh
spark-default.conf
workers
sbin/spark-config.sh兩個腳本文件的名字

**Hadoop的YARN部署模式:**使用YARN當(dāng)作Spark程序的資源調(diào)度器,部署Spark程序在YARN上運行,這種模式一般項目生產(chǎn)環(huán)境用的比較多。

**Apache的Mesos部署模式Mesos部署模式:**使用Mesos當(dāng)作Spark程序的資源調(diào)度器,部署Spark程序在Mesos上運行,這種模式一般項目生產(chǎn)環(huán)境用的比較多。

K8S部署模式

1.1、Spark(單節(jié)點)本地安裝

Step1:從官網(wǎng)下載壓縮包,Download --> 往下滑找到Archived releases中的release archives --> 我下載的是3.1.1版本的。

image-20230918164911181

Step2:下載好安裝包后,將其上傳到虛擬機上面,并進(jìn)行解壓。

image-20230918165351721

Step3:軟件重命名

image-20230918165534152

Step4:重命名啟動腳本,避免與Hadoop啟動腳本起沖突。

image-20230918170015530

Step5:配置環(huán)境變量

image-20230918170443776

image-20230918170255529

Step6:測試本地模式是否可以使用,顯示如圖即為安裝成功。

spark-submit --class org.apache.spark.examples.SparkPi --master local /opt/app/spark-3.1.1/examples/jars/spark-examples_2.12-3.1.1.jar 100

image-20230918191434988

1.2 Spark的Standalone部署模式的偽分布式安裝

Step1:重命名配置文件

image-20230918191925500

Step2:配置workers

image-20230918192001873

Step3:修改spark-env.sh文件添加如下配置

SPARK_MASTER_HOST=single
SPARK_MASTER_PORT=7077
SPARK_MASTER_WEBUI_PORT=8080

image-20230918193504822

Step4:啟動Spark程序start-spark-all.sh

image-20230918193752310

【注意】如果遇到 “JAVA_HOME not set” 異常,可以在sbin目錄下的spark-config.sh 文件中加入如下配置:

export JAVA_HOME=XXXX

image-20230918194534630

再次啟動Spark,即可啟動成功!

image-20230918194601538

image-20230918194701520

執(zhí)行spark-submit --class org.apache.spark.examples.SparkPi --master spark://single:7077 /opt/app/spark-3.1.1/examples/jars/spark-examples_2.12-3.1.1.jar 100

image-20230918194855926

image-20230918194943376

image-20230918194958860

1.3Spark的YARN部署模式

Step1:修改Hadoop配置下的yarn-site.xml

<property><name>yarn.nodemanager.pmem-check-enabled</name><value>false</value>
</property>
<property><name>yarn.nodemanager.vmem-check-enabled</name><value>false</value>
</property>

image-20230918203823577

Step2:修改spark-env.sh添加

HADOOP_CONF_DIR=/opt/app/hadoop-3.1.4/etc/hadoop
YARN_CONF_DIR=/opt/app/hadoop-3.1.4/etc/hadoop

image-20230918204113462

Step3:啟動yarn

image-20230918204251852

Step4:運行Spark程序spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode client /opt/app/spark-3.1.1/examples/jars/spark-examples_2.12-3.1.1.jar 100

image-20230918204535489

image-20230918204549353

image-20230918204605262

1.4Spark的Standalone部署模式的完全分布式安裝

Step1:上傳解壓修改配置文件名字(/conf路徑)和之前步驟一樣

Step2:vim workers

image-20230918213437481

Step3:vim spark-env.sh

image-20230918224936346

Step4:vim spark-defaults.conf

image-20230918214038042

Step5:vim spark-config.sh (/sbin目錄)

image-20230918214236016

Step6:分發(fā)文件

scp -r /opt/app/spark-3.1.1/ root@node2:/opt/app/

scp -r /opt/app/spark-3.1.1/ root@node3:/opt/app/

Step7:統(tǒng)一配置環(huán)境變量

image-20230918214656541

Step8:創(chuàng)建日志文件夾hdfs dfs -mkdir /spark-job-history于node1節(jié)點上

Step9:啟動歷史日志服務(wù)器start-history-server.sh

image-20230918225012392

Step10:修改腳本文件的名字

image-20230918225326473

Step11:啟動Sparkstart-spark-all.sh

image-20230918225701610

Step12:將spark-env.sh的端口號進(jìn)行修改(偽分布式環(huán)境下不需要修改,完全分布式環(huán)境下需要修改)

image-20230918230332249

Step13:image-20230918230445121

Step14:執(zhí)行測試Spark運行程序spark-submit --class org.apache.spark.examples.SparkPi --master spark://node1:7077 /opt/app/spark-3.1.1/examples/jars/spark-examples_2.12-3.1.1.jar 100

image-20230918230930045

image-20230918230947658

image-20230918231009862

1.5Spark的HA配置

Step1:修改修改配置文件spark-env.shexport SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node1:2181,node2:2181,node3:2181 -Dspark.deploy.zookeeper.dir=/spark"

image-20230918231542340

Step2:發(fā)送文件給另外兩臺節(jié)點

image-20230918231700268

Step3:啟動Sparkstart-spark-all.sh,然后在第二臺節(jié)點上單獨啟動master

image-20230918232010775

image-20230918232120638

image-20230918232133108

Step4:stop-master.sh(node1)

image-20230918232337911

image-20230918232350656

即配置成功!

2、Spark運行程序的歷史日志服務(wù)器

【注意】Spark的歷史日志服務(wù)器:匯聚Spark的應(yīng)用程序的計算日志,借助于HDFS完成操作

spark-default.confspark.eventLog.enabled   true
# HDFS上的路徑必須提前存在
spark.eventLog.dir       hdfs://single:9000/spark-job-history
spark.eventLog.compress  true
--------------------------------------------
spark-env.shexport SPARK_HISTORY_OPTS="-Dspark.history.ui.port=4000 -Dspark.history.retainedApplications=3 -Dspark.history.fs.logDirectory=hdfs://single:9000/spark-job-history"

image-20230918200551904

image-20230918202643816

先開啟hdfs,并在hdfs上創(chuàng)建spark-job-history的目錄

image-20230918200944740

啟動歷史服務(wù)器start-history-server.sh

image-20230918202719386

image-20230918202747798

然后運行Spark程序,就可以在歷史日志服務(wù)器看到運行歷史

image-20230918203244019

Spark的歷史服務(wù)器可以匯總不同運行模式下的spark程序,不僅僅只是在standalone模式下的spark程序。

3、Spark安裝部署涉及到的端口

  • 7077 spark的standalone模式下的master節(jié)點的通信端口
  • 4000 Spark的歷史日志服務(wù)器的默認(rèn)端口
  • 8080/自定義端口 Spark的standalone模式下Master節(jié)點的webui端口
  • 8088:YARN的web訪問端口

三、Spark運行中集群角色

  • Driver驅(qū)動程序:就是我們自己編寫的代碼程序,代碼程序包含著程序運行的DAG有向無環(huán)圖。驅(qū)動程序啟動之后,會給我們提供一個web界面,用來展示當(dāng)前Spark程序的運行日志,web界面當(dāng)Driver運行完成,自動銷毀。

  • Cluster Manager:資源管理器

  • Executor:執(zhí)行器

  • 從物理部署層面上來看,Spark主要分為兩種類型的節(jié)點,Master節(jié)點和Worker節(jié)點,Master節(jié)點主要運行集群管理器的中心化部分,所承載的作用是分配Application到Worker節(jié)點,維護(hù)Worker節(jié)點,Driver,Application的狀態(tài)。Worker節(jié)點負(fù)責(zé)具體的業(yè)務(wù)運行。

    從Spark程序運行的層面來看,Spark主要分為驅(qū)動器節(jié)點和執(zhí)行器節(jié)點。

    Spark集群角色

    • Driver驅(qū)動程序

      • Driver是一個JVM Process 進(jìn)程,編寫的Spark應(yīng)用程序就運行在Driver上,由Driver進(jìn)程執(zhí)行。

      • Driver首先會向集群管理者(standalone、yarn,mesos)申請spark應(yīng)用所需的資源,也就是executor,然后集群管理者會根據(jù)spark應(yīng)用所設(shè)置的參數(shù)在各個worker上分配一定數(shù)量的executor,每個executor都占用一定數(shù)量的cpu和memory。在申請到應(yīng)用所需的資源以后,driver就開始調(diào)度和執(zhí)行我們編寫的應(yīng)用代碼了。

      • Driver進(jìn)程會將我們編寫的spark應(yīng)用代碼拆分成多個stage,每個stage執(zhí)行一部分代碼片段,并為每個stage創(chuàng)建一批tasks,然后將這些tasks分配到各個executor中執(zhí)行。

    • 集群管理器Cluster Manager

      • Spark的集群管理器主要包括Spark Standalone、Yarn、Mesos。

      • Master(ResourceManager):是一個JVM Process 進(jìn)程,主要負(fù)責(zé)資源的調(diào)度和分配,并進(jìn)行集群的監(jiān)控等職責(zé)。

      • Worker(NodeManager):是一個JVM Process 進(jìn)程,一個Worker運行在集群中的一臺服務(wù)器上,主要負(fù)責(zé)兩個職責(zé),一個是用自己的內(nèi)存存儲RDD的某個或某些partition;另一個是啟動其他進(jìn)程和線程(Executor),對RDD上的partition進(jìn)行并行的處理和計算。

    • 執(zhí)行器Executor

      • 是一個JVM Process 進(jìn)程,一個Worker(NodeManager)上可以運行多個Executor,Executor通過啟動多個線程(task)來執(zhí)行對RDD的partition進(jìn)行并行計算,也就是執(zhí)行我們對RDD定義的例如map、flatMap、reduce等算子操作。

      • executor進(jìn)程宿主在worker節(jié)點上,一個worker可以有多個executor。每個executor持有一個線程池,每個線程可以執(zhí)行一個task,executor執(zhí)行完task以后將結(jié)果返回給driver,

      • 每個executor執(zhí)行的task都屬于同一個應(yīng)用。此外executor還有一個功能就是為應(yīng)用程序中要求緩存的RDD提供內(nèi)存式存儲,RDD是直接緩存在executor進(jìn)程內(nèi)

      --num-executors   配置Executor的數(shù)量 
      --driver-memory   配置Driver內(nèi)存(影響不大)
      --executor-memory 配置每個Executor的內(nèi)存大小
      --executor-cores  配置每個Executor的CPU core數(shù)量
      

    Spark中的其他核心概念

    • Application:指的是用戶編寫的Spark應(yīng)用程序,包含了含有一個Driver功能的代碼和分布在集群中多個節(jié)點上運行的Executor代碼。一個Spark程序可以包含多個job。
    • Driver:運行Application的main函數(shù)并創(chuàng)建SparkContext,SparkContext的目的是為了準(zhǔn)備Spark應(yīng)用程序的運行環(huán)境。SparkContext負(fù)責(zé)資源的申請、任務(wù)分配和監(jiān)控等。當(dāng)Executor運行結(jié)束后,Driver負(fù)責(zé)關(guān)閉SparkContext。
    • Job:一個Application可以產(chǎn)生多個Job,其中Job由Spark Action觸發(fā)產(chǎn)生。每個Job包含多個Task組成的并行計算。
    • Stage:每個Job會拆分為多個Task,作為一個TaskSet,稱為Stage;Stage的劃分和調(diào)度是由DAGScheduler負(fù)責(zé)的。Stage分為Result Stage和Shuffle Map Stage。核心就是用來劃分shuffle階段的,一個stage階段可能包含多個RDD的計算的,因此一個stage中包含多個Task的。
    • Task:Application的運行基本單位,Executor上的工作單元。其調(diào)度和管理由TaskScheduler負(fù)責(zé)。每一個executor內(nèi)部可以同時啟動多個任務(wù),Task就是Spark程序運行的最小單位,一個executor可以運行多少個task取決于cpu core。假如Spark程序總共有100個任務(wù),一般分配30個左右task。
    • RDD:Spark基本計算單元,是Spark最核心的東西。表示已被分區(qū)、被序列化、不可變的、有容錯機制的、能被并行操作的數(shù)據(jù)集合。在Spark程序中,無外乎就三種操作:創(chuàng)建RDD、轉(zhuǎn)化RDD、從RDD中獲取結(jié)果/將結(jié)果輸出保存。
    • DAGScheduler:記錄RDD之間的依賴關(guān)系的,也是用來劃分stage階段的。
    • TaskScheduler:任務(wù)調(diào)度器,Driver驅(qū)動程序分配任務(wù)給task運行的。將TaskSet提交給Worker運行,每個Worker運行了什么Task于此處分配。同時還負(fù)責(zé)監(jiān)控、匯報任務(wù)運行情況等。

四、Spark程序的部署運行的方式 —— Spark執(zhí)行jar包

Spark部署運行和MR程序的部署運行方式一致的,需要將我們編寫的Spark程序打包成為一個jar包,放到我們的Spark集群中,然后通過Spark相關(guān)命令啟動運行Spark程序即可

spark-submit [options] <app jar | python file | R file> [app arguments]spark-submit --class 全限定類名 --master local|local[*]|local[n]|mesos|yarn|spark://ip:port --deploy-mode client|cluster jar包的路徑 參數(shù)spark-submit   --class   全限定類名   --master  運行的資源管理器 --deploy-mode  部署運行的模式    --num-executors   只在yarn模式下使用  指定executor的數(shù)量--executor-cores   指定每一個executor具備多少個CPU內(nèi)核,一個內(nèi)核可以運行一個TASK--executor-memory   每一個executor占用的內(nèi)存jar包路徑main函數(shù)的args參數(shù)列表

options的常用選型以及含義:

  • –master masterurl 將Spark程序部署到哪個資源管理器運行
    spark://host:port, mesos://host:port, yarn,k8s://https://host:port, or local (Default: local[*]).
  • –deploy-mode mode Spark應(yīng)用程序的部署模式(YARN場景下)
    取值 client cluster
  • –class class_name jar包中Driver驅(qū)動程序的全限定類名
  • –name name spark應(yīng)用程序的別名
  • –driver-memory 1024M driver驅(qū)動程序
  • –executor-memory 1G 等同于YARN中容器,一個容器有多少內(nèi)存
  • –executor-cores num 每一個executor中有多少個內(nèi)核

五、Spark的編程方式

1、REPL交互式命令行窗口代碼編程:Spark提供了一個REPL工具:spark-shell
spark-shell --master local[*]

image-20230919193941570

image-20230919194430605

2、Java/Scala/Python等等代碼進(jìn)行編程

本地運行spark編寫的單詞計數(shù)程序

  • 引入依賴
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.kang</groupId><artifactId>spark-study</artifactId><version>1.0</version><packaging>jar</packaging><name>spark-study</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><!--1、引入Spark Core的編程依賴  --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.1.1</version></dependency><!--2、引入log4j的依賴--><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.6.1</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>3.1.4</version></dependency></dependencies>
<!--  現(xiàn)在Spark編程需要用到scala語言,但是idea默認(rèn)不支持把Scala語言的代碼打成jar包,如果想把Scala語言的代碼打成jar包需要引入maven的編譯插件  scala-maven-plugin--><build><finalName>spark-study</finalName></build>
</project>
  • 編寫scala代碼
package com.kangimport org.apache.spark.{SparkConf, SparkContext}/*** Spark的單詞計數(shù)案例的實現(xiàn)*/
object WordCount {def main(args: Array[String]): Unit = {//1、創(chuàng)建一個Spark程序執(zhí)行入口 SparkContext(Scala中)   JavaSparkContext(Java中)  首先需要一個Spark的配置文件對象SparkConfval sparkConf:SparkConf = new SparkConf().setMaster("local[*]").setAppName("wordcount")val sc:SparkContext = new SparkContext(sparkConf)//2、編寫spark的運行代碼  調(diào)用spark的算子完成計算邏輯sc.textFile("hdfs://single:9000/wc.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey((_+_)).collect().foreach(println)//3、關(guān)閉sparkContext對象sc.stop()}
}

image-20230919204606413

image-20230919204615174

部署到服務(wù)器上運行

  • 刪除scala代碼中的master

image-20230919204728160

  • 用maven打jar包 Lifecycle --> package,然后上傳到服務(wù)器上
  • 運行jar包,spark-submit --class com.kang.WordCount --master yarn --deploy-mode client spark-study.jar

image-20230919205213153

image-20230919205235785

在yarn的web端界面上也有顯示,如果沒有顯示出來,請檢查yarn的相關(guān)配置是否成功或者檢查打包后jar包中的代碼是否是修改之后的代碼,jar包中的class文件使用jd-gui來查看。

六、Spark的核心基礎(chǔ)Spark Core

  • Spark Core是Spark計算框架的核心基礎(chǔ),Spark中子組件都是基于Spark Core封裝而來的。

  • Spark Core中包含了Spark的運行調(diào)度機制、Spark的迭代式計算、基于內(nèi)存的運算機制。

1、Spark Core中最核心的有兩個概念

  • SparkContext:Spark的上下文對象,Spark程序的提交運行,任務(wù)分配等等都是由SparkContext來完成的。
  • RDD(Resilient Distributed Dataset):叫做彈性分布式數(shù)據(jù)集,是Spark中最基本的數(shù)據(jù)抽象,它代表一個不可變、可分區(qū)、里面的元素可并行計算的集合。也是Spark最核心最重要的概念,也是Spark中最基礎(chǔ)的數(shù)據(jù)抽象(spark處理的所有數(shù)據(jù)都會封裝成為RDD然后進(jìn)行處理)。

2、RDD的屬性(RDD具備的一些特征)

  • 一組分區(qū)(一組切片):RDD可分區(qū)的數(shù)據(jù)集,RDD內(nèi)部的數(shù)據(jù)是以分區(qū)的形式存在,每一個分區(qū)的數(shù)據(jù)可以存儲在不同的節(jié)點上。
  • 一個計算每一個分區(qū)(切片)數(shù)據(jù)的compute函數(shù):RDD計算的時候每一個分區(qū)的數(shù)據(jù)是并行計算的,通過一個函數(shù)將計算邏輯封裝在分區(qū)數(shù)據(jù)上運行計算。
  • 一個用來記錄RDD依賴關(guān)系的列表:記錄RDD的依賴關(guān)系,容錯機制。
  • 一個分區(qū)機制(RDD必須得是鍵值對類型的RDD):分區(qū)器只對鍵值對類型的RDD生效。
  • 一個用來記錄分區(qū)位置的列表:如果計算程序和數(shù)據(jù)不在同一個節(jié)點上,會把數(shù)據(jù)移動到計算節(jié)點。

3、RDD的彈性的體現(xiàn)

  • 存儲的彈性:rdd數(shù)據(jù)可以在內(nèi)存和磁盤之間自由切換。
  • 計算的彈性:rdd在計算的時候,stage、task都有可能計算失敗,如果失敗了stage和task都會進(jìn)行特定次數(shù)的重試,默認(rèn)重試4次。
  • 容錯的彈性:rdd計算中如果數(shù)據(jù)丟失,可以根據(jù)依賴鏈重新計算。
  • 分片的彈性:rdd計算中,我們可以根據(jù)實際情況,在代碼中動態(tài)的調(diào)整分片。

4、RDD的特點

  • 1、可分區(qū)
  • 2、只讀:RDD是只讀的,不可變的,RDD一旦創(chuàng)建,內(nèi)部不能改變了,只能根據(jù)RDD計算返回一個新的RDD,而原有的RDD不受任務(wù)的干擾。
  • 3、依賴
    • 寬依賴:父RDD的一個分區(qū)數(shù)據(jù)被子RDD的多個分區(qū)同時使用,一般在shuffle算子中才會出現(xiàn)。
    • 窄依賴:父RDD的分區(qū)數(shù)據(jù)只能給子RDD的一個分區(qū)。
    • 依賴是Spark程序劃分stage的核心依據(jù),stage劃分規(guī)則是從上一個寬依賴算子到下一個寬依賴算子之前的操作都屬于同一個stage。
  • 4、可緩存
  • 5、可設(shè)置檢查點

5、RDD的分類

  • RDD數(shù)據(jù)集,內(nèi)部可以存放各種各樣的數(shù)據(jù)類型,根據(jù)存儲的數(shù)據(jù)類型不同,將RDD分為兩類:數(shù)值類型的RDD(RDD)、鍵值對類型的RDD(PairRDD)。
  • 數(shù)值類型的RDD存放的數(shù)據(jù)類型可以是任何類型,包括鍵值對類型
    RDD[String]、RDD[People]
  • 鍵值對類型的RDD指的是數(shù)據(jù)集中存放的數(shù)據(jù)類型是一個二元組,是一種比較特殊的數(shù)值類型的RDD
    RDD[(String,Int)]、RDD[(Int,(String,Int))]
  • 鍵值對類型的RDD有它自己獨特的一些算子操作,同時鍵值對類型的RDD可以使用數(shù)值類型RDD的所有操作。

6、RDD的編程

  • 在Spark中,對數(shù)據(jù)操作其實就是對RDD的操作,對RDD的操作無外乎三種:

    • 1、創(chuàng)建RDD
    • 2、轉(zhuǎn)換操作(Transformation):從一個RDD中得到另外一個RDD的算子。
    • 3、行動操作(Action):從RDD得到一個Scala集合、Scala標(biāo)量、將RDD數(shù)據(jù)保存到外部存儲中。

    RDD計算操作是惰性計算的,遇到轉(zhuǎn)換算子不會計算,只會先記錄RDD的依賴關(guān)系,只有當(dāng)遇到行動算子,才會根據(jù)記錄的依賴鏈依次計算。

  • RDD的編程方式主要分為兩種:命令行編程方式(spark-shell – 數(shù)據(jù)科學(xué)、算法研究)、API編程方法(數(shù)據(jù)處理 java scala python R)

6.1、RDD的創(chuàng)建操作

  • 將數(shù)據(jù)源的數(shù)據(jù)轉(zhuǎn)換成為Spark中的RDD,RDD的創(chuàng)建主要分為三種:1、從外部存儲設(shè)備創(chuàng)建RDD(HDFS、Hive、HBase、Kafka、本地文件系統(tǒng)…)2、Scala|Java集合中創(chuàng)建RDD 3、從已有的RDD轉(zhuǎn)換成為一個新的RDD(RDD的轉(zhuǎn)換算子)

  • 1、從集合中創(chuàng)建RDD

    • parallelize(Seq[T],num)
    • makeRDD(Seq[T],num) 底層就是parallelize函數(shù)的實現(xiàn)了

    都可以傳遞一個第二個參數(shù),第二個參數(shù)代表的是RDD的并行度(RDD的分區(qū)數(shù)),默認(rèn)分區(qū)數(shù)就是master中設(shè)置的cpu核數(shù)。


    • makeRDD(Seq[(T, Seq[String])]) 這種方式創(chuàng)建的RDD是帶有分區(qū)編號的 ,集合創(chuàng)建的RDD的分區(qū)數(shù)就是指定的分區(qū)數(shù)。

    • 代碼示例

    package com.kang.createimport org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}/*** 1、創(chuàng)建RDD* 【注意】如果我們想要編寫Spark程序,我們必須先創(chuàng)建一個SparkContext,因為Spark程序的提交運行、RDD的創(chuàng)建操作都是由SparkContext完成的。*/
    object ScalaDemo01 {def main(args: Array[String]): Unit = {//1、創(chuàng)建SparkContextval sparkConf:SparkConf = new SparkConf().setAppName("create-rdd").setMaster("local[*]")val sc:SparkContext = new SparkContext(sparkConf)//2、集合中創(chuàng)建RDD
    //    val rdd:RDD[Int] = sc.parallelize(1 until 100)
    //    val rdd[Int] = sc.makeRDD(1 to 100)val rdd:RDD[List[Int]] = sc.makeRDD(Array((List(1,2,3),List("node1","node2")),(List(4,5,6),List("node2","node3"))))rdd.collect().foreach(println)sc.stop()}
    }
    
  • 2、從外部存儲創(chuàng)建RDD

    • textFile()

    • wholeTextFile()

    • sc.sequenceFiile(path,classof[Key],classof[V]):RDD[(Key,V)](sequenceFile文件夾的目錄) 讀取sequenceFile文件成為鍵值對類型的RDD
      【注意】需要傳入key和value的Class類型,是hadoop序列化之前的類型

    • objectFile(path)讀取ObjectFile文件成為RDD,RDD的類型取決于寫出的Object文件的類型

      • package com.kang.createimport org.apache.spark.rdd.RDD
        import org.apache.spark.{SparkConf, SparkContext}object Demo02 {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName("create-rdd").setMaster("local[5]")val sc: SparkContext = new SparkContext(sparkConf)val rdd = sc.sequenceFile("file:///D://Desktop/a.sequence",classOf[String],classOf[Int])rdd.foreach(println)val rdd1:RDD[Int] = sc.objectFile("file:///D://Desktop/a.obj")rdd1.foreach(println)rdd.cache()rdd.persist()sc.stop()}
        }
        
      • image-20230924152506408

      • image-20230924152515327

    • 根據(jù)JDBC創(chuàng)建RDD

      • package com.kang.createimport org.apache.spark.rdd.{JdbcRDD, RDD}
        import org.apache.spark.{SparkConf, SparkContext}import java.sql.{DriverManager, ResultSet}
        case class Student(id:Int,name:String,age:Int,sex:String)
        object Demo03 {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName("create-rdd-jdbc").setMaster("local[5]")val sc: SparkContext = new SparkContext(sparkConf)val rdd:RDD[Student] = new JdbcRDD[Student](sc, () => {DriverManager.getConnection("jdbc:mysql://localhost:3306/spark?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8","root","root")}, "select * from student where id>=? and id <=?", 1, 3, 2, (rs: ResultSet) => {val id = rs.getInt("id")val name = rs.getString("name")val age = rs.getInt("age")val sex = rs.getString("sex")Student(id, name, age, sex)})rdd.foreach(println)sc.stop()}
        }
        
      • image-20230925202310743

    • 代碼示例

    package com.kang.createimport org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}/*** 1、創(chuàng)建RDD* 【注意】如果我們想要編寫Spark程序,我們必須先創(chuàng)建一個SparkContext,因為Spark程序的提交運行、RDD的創(chuàng)建操作都是由SparkContext完成的。*/
    object ScalaDemo01 {def main(args: Array[String]): Unit = {//1、創(chuàng)建SparkContextval sparkConf:SparkConf = new SparkConf().setAppName("create-rdd").setMaster("local[5]")val sc:SparkContext = new SparkContext(sparkConf)//3、從外部存儲創(chuàng)建RDD  外部文件必須得是text file  只能讀取一個文件
    //    val rdd1:RDD[String] = sc.textFile("hdfs://single:9000/wc.txt")
    //    println(rdd1.getNumPartitions)
    //    rdd1.collect().foreach(println)val rdd2:RDD[(String,String)] = sc.wholeTextFiles("hdfs://single:9000/dataCollect/2023-07-15")rdd2.collect().foreach(println)//4、關(guān)閉SparkContextsc.stop()}
    }
    
    package com.kang.create;import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;import java.util.Arrays;
    import java.util.List;public class JavaDemo01 {public static void main(String[] args) {SparkConf sparkConf = new SparkConf().setAppName("java-rdd").setMaster("local[*]");JavaSparkContext jsc = new JavaSparkContext(sparkConf);JavaRDD<Integer> javaRDD = jsc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6));List<Integer> collect = javaRDD.collect();for (Integer integer : collect) {System.out.println(integer);}jsc.stop();}
    }
    

6.2、RDD的轉(zhuǎn)換操作(轉(zhuǎn)換算子)

  • RDD之所以可以實現(xiàn)迭代式操作,就是因為RDD中提供了很多算子,算子之間進(jìn)行操作時,會記錄算子之間的依賴關(guān)系。

  • RDD中具備一個轉(zhuǎn)換操作的算子,轉(zhuǎn)換算子是用來從一個已有的RDD經(jīng)過某種操作得到一個新的RDD的,轉(zhuǎn)換算子是惰性計算規(guī)則,只有當(dāng)RDD遇到行動算子,轉(zhuǎn)換算子才會去執(zhí)行。

  • 算子:就是Spark已經(jīng)給我們封裝好的一些計算規(guī)則,只不過這些計算規(guī)則內(nèi)部還需要傳入計算邏輯,代碼層面上,算子就是需要傳入函數(shù)的函數(shù)。Spark提供了80+個算子。

  • 數(shù)值型RDD的轉(zhuǎn)換算子(通用算子)

    • map(f:T=>U)算子–一對一算子
    • mapPartitions(f:Iterator[T]=>Iterator[U])算子—一對一算子,一個分區(qū)的數(shù)據(jù)統(tǒng)一執(zhí)行一次map操作
    • mapPartitionsWithIndex(f:(Index,Iterator[T])=>Iterator[U])----一對一算子,和mapPartitions算子的邏輯一模一樣的,只不過就是多了一個分區(qū)編號。
    • 以上三種適用場景:將RDD的數(shù)據(jù)類型轉(zhuǎn)換為另外一種數(shù)據(jù)類型。
    • filter(f:T=>Boolean) 算子—過濾算子,對原有RDD的每一個算子應(yīng)用一個f函數(shù),如果函數(shù)返回true,那么數(shù)據(jù)保留,如果返回false,那么數(shù)據(jù)舍棄。適用場景:清洗數(shù)據(jù),RDD的數(shù)據(jù)類型不會發(fā)生任何的更改。
    • 代碼示例:
    package com.kang.transformationimport org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}/*** 數(shù)值型的RDD轉(zhuǎn)換算子**/
    object ScalaDemo02 {def main(args: Array[String]): Unit = {val sparkConf:SparkConf = new SparkConf().setAppName("transformation").setMaster("local[*]")val sc:SparkContext = new SparkContext(sparkConf)//創(chuàng)建RDD
    //    val rdd:RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8))val rdd:RDD[String] = sc.makeRDD(List("spark","scala","spark","hadoop"))/*** 1、map算子,一對一算子* 對原有RDD的每一個數(shù)據(jù)應(yīng)用一個函數(shù),經(jīng)過函數(shù)計算得到一個新的返回值,新的返回值組成一個新的RDD* 原有的RDD一個數(shù)據(jù)通過這個算子返回一個數(shù)據(jù)*/
    //    val rdd1:RDD[Int] = rdd.map((a: Int) => {a * 3})
    //    val rdd1:RDD[Int] = rdd.map((_*3))//簡化版本
    //    val rdd1:RDD[(String,Int)] = rdd.map((a:String)=>{(a,1)})
    //    val rdd1:RDD[(String,Int)] = rdd.map((_,1))//簡化版本
    //    val rdd1:RDD[(String,Int)] = rdd.mapPartitions((list:Iterator[String])=>{
    //      import scala.collection.mutable.ArrayBuffer
    //      var ab:ArrayBuffer[(String,Int)] = ArrayBuffer()
    //      for (elem <- list) {
    //        ab.+=((elem,1))
    //      }
    //      ab.iterator
    //    })
    //    val rdd1:RDD[(String,Int)] = rdd.mapPartitionsWithIndex((index:Int,list:Iterator[String])=>{
    //      println(s"現(xiàn)在是第$index 分區(qū)的數(shù)據(jù),分區(qū)數(shù)據(jù)為${list.mkString(",")}")
    //      import scala.collection.mutable.ArrayBuffer
    //            var ab:ArrayBuffer[(String,Int)] = ArrayBuffer()
    //            for (elem <- list) {
    //              ab.+=((elem,1))
    //            }
    //            ab.iterator
    //    })/*** 2、filter過濾算子*/val rdd1:RDD[String] = rdd.filter((word:String)=>{word.startsWith("h")})rdd1.collect().foreach(println)sc.stop()}
    }
    
    • flatmap(f: T => TraversableOnce[U]):RDD[U]:壓扁算子,一對多的算子,一條輸入數(shù)據(jù)可以被映射成為0個或多個數(shù)據(jù),最后函數(shù)的返回值必須是一個集合類型,最后得到的RDD的類型就是集合元素的類型。
    /**
    * flatmap算子
    */
    val rdd1:RDD[String] = rdd.flatMap((line:String)=>{val array:Array[String] = line.split(" ")array
    })
    //val rdd1: RDD[String] = rdd.flatMap( _.split(" "))簡化版本
    
    • sample(boolean是否為有放回的抽樣,抽取比例,種子 - 底層抽樣算法使用默認(rèn)值):抽樣算子,數(shù)據(jù)量越大,抽取的數(shù)據(jù)越精準(zhǔn),數(shù)據(jù)量越小,抽取的數(shù)據(jù)偏差越大。適用場景:隨機抽取原始RDD的部分?jǐn)?shù)據(jù),RDD的數(shù)據(jù)類型不會發(fā)生任何的更改,一般使用在源RDD的數(shù)據(jù)量過多。
    /*** 從flatmap算子計算結(jié)果中,抽取50%的數(shù)據(jù)*/
    val rdd2:RDD[String] = rdd1.sample(false,0.5)
    
    • union(RDD[T]):RDD[T] 并集算子,將兩個RDD中所有數(shù)據(jù)組合成為一個新的RDD然后返回。
    /*** union算子*/    
    val rdd3:RDD[String] = sc.makeRDD(Array("hadoop","spark","storm"))
    val rdd4:RDD[String] = rdd1.union(rdd3)
    rdd4.collect().foreach(println)
    
    • intersection(RDD[T]):RDD[T]:交集算子,將兩個RDD取交集返回?!?通過equals方法判斷重復(fù)
    /*** intersection*/
    val rdd5:RDD[String] = rdd1.intersection(rdd3)
    
    • subtract差集算子 —— 通過equals方法判斷重復(fù)
    /*** 創(chuàng)建兩個RDD subtract差集算子*/
    val rdd1:RDD[Int] = sc.parallelize(Array(1,2,3,4))
    val rdd2:RDD[Int] = sc.makeRDD(Array(3,4,5,6))
    val rdd3:RDD[Int] = rdd1.subtract(rdd2)
    ----------------------------------------------------------------------------------------------
    case class Animal(name: String, age: Int)
    val rdd: RDD[Animal] = sc.parallelize(Array(Animal("zs", 30), Animal("ls", 20)))
    val rdd1: RDD[Animal] = sc.makeRDD(Array(Animal("zs", 30), Animal("ww", 20)))
    val rdd2: RDD[Animal] = rdd.subtract(rdd1)
    
    • distinct([numPartitions]))(implicat ordering = null):去重算子,對RDD元素去重,借助元素的equals方法去重的,第二個隱式參數(shù)的目的是為了去重之后對數(shù)據(jù)分區(qū)進(jìn)行排序,如果沒有排序規(guī)則,不排序了。 —— 通過equals方法判斷重復(fù)
    /*** distinct([numPartitions]))*/
    val rdd6:RDD[String] = rdd3.distinct()
    //scalaBean的去重
    case class People(name:String,age:Int)
    val rdd7:RDD[People] = sc.makeRDD(Array(People("zs",20),People("ls",30),People("zs",20)))
    val rdd8:RDD[People] = rdd7.distinct()
    
    • cartesian(RDD[U]):生成笛卡爾乘積,在T和U類型的RDD上,列出T和U的所有組合情況,返回一個新的RDD[(T,U)]。
    /*** 笛卡爾乘積*/
    val rdd11:RDD[((String,Int),(String,Int))] = rdd5.cartesian(rdd6)
    rdd11.collect().foreach(println)
    sc.stop()
    
    • sortBy(T=>U,asc:Boolean=true)(implicit ordering[U]):排序算子, 將RDD中T類型轉(zhuǎn)換成為U類型然后對RDD進(jìn)行排序,返回的還是RDD[T]
      【注意】U必須能排序的,兩種方式:實現(xiàn)Ordered接口,定義一個隱式類是Ordering[U]的子類
      當(dāng)然我們也可以手動在sortBy函數(shù)的第二個括號中傳遞一個Ordering的匿名內(nèi)部類
    /*** sortBy*/
    case class Teacher(name:String,age:Int)
    val rddC:RDD[(Int,Teacher)] = sc.makeRDD(Array((1,Teacher("zs", 20)), (2,Teacher("ls", 25))))
    implicit val teacherOrdering: Ordering[Teacher] = new Ordering[Teacher] {override def compare(x: Teacher, y: Teacher): Int = {if (x.age > y.age) {1} else {-1}}
    }
    val rdd10: RDD[(Int, Teacher)] = rddC.sortBy(data => data._2)
    
    • zip拉鏈算子 兩個RDD的元素個數(shù)必須相同
    /*** zip拉鏈算子*/
    val rdd3:RDD[(Animal,Animal)] = rdd.zip(rdd1)
    
    • repartition(num) 分區(qū)算子,將RDD數(shù)據(jù)重新分區(qū)之后得到一個新的RDD
    val rdd:RDD[Int] = sc.makeRDD(Array(1,2,3,4,5,6,7,8,9,10,11,12))
    val rdd1: RDD[Int] = rdd.repartition(3)
    val rdd2: RDD[Int] = rdd1.mapPartitionsWithIndex((a: Int, b: Iterator[Int]) => {println(s"第${a}個分區(qū),數(shù)據(jù)為${b.mkString(",")}")b
    })
    println(rdd2.getNumPartitions)
    println(rdd2.partitioner)
    rdd2.collect()sc.stop()
    

    image-20230924103057979

  • 鍵值對類型RDD的轉(zhuǎn)換算子

    • groupByKey([numPartitions]):分組算子,根據(jù)RDD的鍵值對數(shù)據(jù)的key值把Value數(shù)據(jù)聚合到一起,然后返回一個新的RDD,新的RDD也是kv類型,v變成集合類型。
    //創(chuàng)建一個kv鍵值對rdd
    val rdd:RDD[(String,Int)] = sc.makeRDD((Array(("spark",1),("flink",1),("spark",1))))
    val rdd1:RDD[(String,Iterable[Int])] = rdd.groupByKey()
    
    • join(RDD[(K,W)]):內(nèi)連接算子,和另外一個鍵值對RDD做inner join操作,返回RDD[(K,(V,W))]

      /*** join操作 等同于SQL中join操作 內(nèi)連接*/
      //學(xué)生的數(shù)學(xué)成績
      val rdd5:RDD[(String,Int)] = sc.makeRDD(Array(("zs",80),("ls",72),("ww",92),("ww",85)))
      //學(xué)生的語文成績
      val rdd6:RDD[(String,Int)] = sc.makeRDD(Array(("zs",90),("ls",82),("ww",70),("zsf",75)))
      val rdd7:RDD[(String,(Int,Int))] = rdd5.join(rdd6)
      rdd7.collect().foreach((data:(String,(Int,Int)))=>{println(s"學(xué)生姓名為${data._1},數(shù)學(xué)成績?yōu)?/span>${data._2._1},語文成績?yōu)?/span>${data._2._2}")
      })
      
    • leftOutJoin|rightOutJoin|fullOutJoin(RDD(K,W))和另外一個RDD做外連接操作,外連接算子
      左連接:返回RDD[(K,(V,Option[W]))] —— 保留調(diào)用者信息
      右連接:返回RDD[(K,(Option[V],W))] —— 保留參數(shù)信息
      全外連接:返回RDD[(K,(Option[V],Option[W]))] —— 保留全部信息
      Option是為了防止空指針異常的,Option的取值有兩種:None、Some,如果Option包含的數(shù)據(jù)不為Null,那么使用Some將數(shù)據(jù)封裝,然后我們可以使用get方法獲取里面的值,如果數(shù)據(jù)為Null,那么使用None將數(shù)據(jù)封裝,不能使用get獲取數(shù)據(jù)。

      /*** 左外連接*/
      val rdd8: RDD[(String, (Int, Option[Int]))] = rdd5.leftOuterJoin(rdd6)
      rdd8.collect().foreach((data: (String, (Int, Option[Int]))) => {println(s"學(xué)生姓名為${data._1},數(shù)學(xué)成績?yōu)?/span>${data._2._1},語文成績?yōu)?/span>${data._2._2}")
      })
      /*** 右外連接*/
      val rdd9: RDD[(String,(Option[Int],Int))] = rdd5.rightOuterJoin(rdd6)
      rdd9.collect().foreach((data: (String, (Option[Int],Int))) => {println(s"學(xué)生姓名為${data._1},數(shù)學(xué)成績?yōu)?/span>${data._2._1},語文成績?yōu)?/span>${data._2._2}")
      })
      /*** 全外連接*/
      val rdd10: RDD[(String, (Option[Int], Option[Int]))] = rdd5.fullOuterJoin(rdd6)
      rdd10.collect().foreach((data: (String, (Option[Int], Option[Int]))) => {println(s"學(xué)生姓名為${data._1},數(shù)學(xué)成績?yōu)?/span>${data._2._1},語文成績?yōu)?/span>${data._2._2}")
      })
      
    • cogroup(RDD[(K,W)]):連接算子plus版本,返回一個 RDD[(K, (Iterable, Iterable)) ] 將兩個RDD中所有key值相同的數(shù)據(jù)全部聚合到一塊,RDD1中相同的Value組成Iterable[V] RDD2中相同的value組成Iterable[W]

      /*** cogroup算子*/
      //學(xué)生的數(shù)學(xué)成績
      val rdd12: RDD[(String, Int)] = sc.makeRDD(Array(("zs", 80), ("ls", 72), ("ww", 92), ("zs", 90)))
      //學(xué)生的語文成績
      val rdd13: RDD[(String, Int)] = sc.makeRDD(Array(("zs", 90), ("ls", 82), ("ww", 70), ("zs", 80)))
      val rdd14:RDD[(String,(Iterable[Int],Iterable[Int]))] = rdd12.cogroup(rdd13)
      rdd14.collect().foreach(date => {println(s"學(xué)生為${date._1},數(shù)學(xué)成績?yōu)?/span>${date._2._1.mkString(" ")},語文成績?yōu)?/span>${date._2._2.mkString(" ")}")
      })
      
    • mapValues(f: V => U) :RDD[(K,U)] 操作鍵值對的value數(shù)據(jù)算子,一對一,針對KV類型的RDD只對v操作返回一個新的類型,由新的類型和原有的key組成一個新的RDD

      /*** mapValues* 可以使用map算子替代,但是map算子替代比較復(fù)雜*/
      val rdd15:RDD[(String,Int)] = rdd12.mapValues((value:Int) => {value+5})rdd15.collect().foreach(println)
      
    • flatMapValues 操作鍵值對的value數(shù)據(jù)算子 一對多

      /*** flatMapValues算子*/
      val rdd4: RDD[(String, String)] = sc.makeRDD(Array(("zs", "80-90-85"), ("ls", "70-75-90")))
      val rdd5: RDD[(String, String)] = rdd4.flatMapValues((value: String) => {value.split("-")
      })
      

    image-20230924101719619

    • reduceByKey(func: (V, V) => V) :分組聚合算子,reduceByKey=groupBykey+reduce操作,函數(shù)輸入數(shù)據(jù)有兩個,輸出有一個,輸出類型和輸出類型是同一個類型。根據(jù)key值,把value聚合到一起,并且對value求出一個聚合結(jié)果,RDD的類型不會發(fā)生變化。
      輸入的兩個v:第一個v是上一次聚合的結(jié)果 第二v是本次要聚合的value
      輸出的v就是本次聚合的結(jié)果

      /*** reduceBykey*/
      val rdd2:RDD[(String,Int)] = rdd.reduceByKey((a:Int,b:Int)=>{a+b})
      val rdd3:RDD[(String,Int)] = sc.makeRDD(Array(("zs",80),("zs",90),("ls",70),("ls",85)))
      val rdd4:RDD[(String,Int)] = rdd3.reduceByKey((a:Int,b:Int) =>{if(a>b) {a}else{b}
      })
      
    • combineByKey( createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C) 分組聚合Plus算子,combiner也是根據(jù)key值聚合value,只不過value如何聚合,是什么樣的聚合邏輯,我們要通過三個函數(shù)說明(比reduceByKey的功能要強大):

      • createCombiner:V=>C 將key值對應(yīng)得value數(shù)據(jù)先進(jìn)行初始化操作,返回一個新的類型。
      • mergeValue:(C,V)=>C 每一個分區(qū)都會單獨執(zhí)行一個mergeValue函數(shù),通過mergeValue函數(shù)將當(dāng)前分區(qū)的key的value值和剛剛創(chuàng)建的初始值做計算 得到當(dāng)前分區(qū)下的唯一的計算結(jié)果,結(jié)算結(jié)果的類型必須和初始化之后的類型保持一致。
      • mergeCombiners:(C,C)=>C 將所有分區(qū)當(dāng)前key值計算出來的結(jié)果C 再進(jìn)行一次全局的聚合,得到唯一的結(jié)果,結(jié)果就是我們這個combineByKey的計算結(jié)果。
      • 返回RDD[(K,C)]
      /*** combineByKey實現(xiàn)類似于ReduceByKey的效果*/
      val rdd1:RDD[(String,Int)] = rdd.combineByKey((a:Int)=>a,(a:Int,b:Int)=>{if (a>b) {a}else{b}
      },(a:Int,b:Int)=>{if(a>b){a}else{b}
      })/*** combineByKey計算科目總成績以及科目的數(shù)量*/
      val rdd2:RDD[(String,(Int,Int))] = rdd.combineByKey((a:Int)=>(a:Int,1),(a:(Int,Int),b:Int)=>{((a._1+b),(a._2+1))
      },((a:(Int,Int),b:(Int,Int))=>{((a._1+b._1),(a._2+b._2))
      }))
      val rdd3:RDD[(String,Double)] = rdd2.mapValues((a:(Int,Int))=>{a._1.toDouble /a._2.toDouble})	
      
    • aggregateByKey(zeroValue:U)(mergerValue:(U,V)=>U,mergerCombiner:(U,U)=>U)分組聚合plus算子
      aggreGateByKey算子和CombineByKey算子實現(xiàn)的效果是一樣的,區(qū)別在于初始值不一樣的,combineBykey的初始值是根據(jù)函數(shù)計算來的,是根據(jù)每一個分區(qū)的一個真實的value數(shù)據(jù)計算得來的,而aggregateByKey的初始值是我們隨意給的。

      /*** aggregateByKey算子計算科目總成績以及科目的數(shù)量*/
      val rdd4:RDD[(String,(Int,Int))] = rdd.aggregateByKey((0,0))((a:(Int,Int),b:Int)=>{(a._1+b,a._2+1)},(a:(Int,Int),b:(Int,Int))=>{((a._1+b._1),(a._2+b._2))})
      val rdd5:RDD[(String,Double)] = rdd4.mapValues((a:(Int,Int))=>{a._1.toDouble/a._2.toDouble})
      val rdd6:RDD[(String,Int)] = rdd.aggregateByKey(Int.MaxValue)((a:Int,b:Int)=>{a.min(b)},((a:Int,b:Int)=>{a.min(b)}))
      
    • foldByKey(zeroValue:V)(f:(V,V)=>V)):aggregateByKey算子的簡化版,相當(dāng)于是aggregateByKey的簡化版,當(dāng)aggregateByKey的mergeValue和mergeCombiner函數(shù)的計算邏輯一致,并且zerovalue初始化類型的值和原先RDD的value的類型一致的時候,就可以使用foldByKey簡化。

      /*** foldByKey*/
      val rdd7:RDD[(String,Int)] = rdd.foldByKey(Int.MaxValue)((a:Int,b:Int)=>{a.min(b)})
      val rdd7:RDD[(String,Int)] = rdd.foldByKey(Int.MaxValue)(_ min _)//簡化版本
      
    • sortByKey(asc:Boolean=true):根據(jù)鍵值對kv的key進(jìn)行排序,默認(rèn)升序排序,RDD的類型不會改變。
      【注意】key值必須實現(xiàn)了Ordered比較器接口,如果想讓排序規(guī)則準(zhǔn)確,那么你的Ordered接口中排序邏輯必須得是升序(前者大于后者,返回正數(shù))邏輯。

      /*** sortByKey*/
      val rddA:RDD[(String,Int)] = sc.makeRDD(Array(("zs",80),("ls",80),("ww",92),("ml",90),("qsf",75),("bwj",72)),2)
      val rdd8:RDD[(String,Int)] = rddA.sortByKey(false)
      case class Student(name:String,age:Int) extends Ordered[Student] {override def compare(that: Student): Int = {if(age>that.age){1}else{-1}}
      }
      val rddB:RDD[(Student,Int)] = sc.makeRDD(Array((Student("zs", 20), 1), (Student("ls", 25), 2)))
      val rdd9:RDD[(Student,Int)] = rddB.sortByKey(false)
      
    • partitionBy(分區(qū)器):只有再涉及到shuffle算子的時候才會出現(xiàn)分區(qū)器的概念
      HashPartitioner 默認(rèn)的分區(qū)器,可能會出現(xiàn)數(shù)據(jù)傾斜問題
      RangePartitioner 范圍分區(qū)器–盡可能保證每個分區(qū)的數(shù)據(jù)一致,抽樣算法
      自定義分區(qū)器

      package com.kang.transformationimport org.apache.spark.rdd.RDD
      import org.apache.spark.{Partitioner, SparkConf, SparkContext}object ScalDemo07 {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("kv-transformation").setMaster("local[2]")val sc: SparkContext = new SparkContext(conf)val rdd: RDD[(String, Int)] = sc.makeRDD(Array(("zs", 20), ("zs", 30), ("zs", 50), ("zs", 40), ("ls", 30), ("ww", 20), ("ml", 20), ("zsf", 30)));rdd.mapPartitionsWithIndex((a, b) => {println(s"第${a}個分區(qū),數(shù)據(jù)為${b.mkString(",")}")b}).collect()
      //    val rdd1:RDD[(String,Int)] = rdd.partitionBy(new HashPartitioner(2))
      //    val rdd1:RDD[(String,Int)] = rdd.partitionBy(new RangePartitioner(2,rdd))val rdd1:RDD[(String,Int)] = rdd.partitionBy(new MyPartitioner())rdd1.mapPartitionsWithIndex((a, b) => {println(s"第${a}個分區(qū),數(shù)據(jù)為${b.mkString(",")}")b}).collect()println(rdd1.partitioner)println(rdd1.getNumPartitions)sc.stop()}
      }
      class MyPartitioner extends Partitioner{override def numPartitions: Int = 2override def getPartition(key: Any): Int = {val str: String = key.toStringif(str.startsWith("z")){0}else{1}}
      }
      
  • 查看一個RDD的分區(qū)數(shù)和分區(qū)器:rdd中存在兩個內(nèi)容:partitioner 屬性、getNumPartitions 函數(shù)。

6.3、RDD的行動算子

  • 行動算子是用來觸發(fā)依賴鏈的執(zhí)行的,在Spark程序中,一個行動算子觸發(fā)的一個依賴鏈會單獨成為Spark中job運行。

  • 數(shù)值型RDD的行動算子(通用算子)

    • reduce((T,T)=>T):T 聚合算子,從RDD中把所有的數(shù)據(jù)聚合得到一個結(jié)果,結(jié)果的類型必須和RDD中數(shù)據(jù)類型保持一致。

    image-20230924111108535

    • aggregate(zerovalue:U)(mergeValue,combineValue) :U:聚合算子的plus版本。

    image-20230924111121207

    package com.kang.actionimport org.apache.spark.{SparkConf, SparkContext}object Demo01 {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("transformation").setMaster("local[*]")val sc: SparkContext = new SparkContext(conf)val rdd = sc.makeRDD(Array(1,2,3,4,5,6,7,8,9))/*** reduce行動算子*/val max:Int = rdd.reduce((a,b)=>{if(a>b) a else b})println(max)/*** aggregate算子*/val result:(Int,Int) = rdd.aggregate((0,0))((a:(Int,Int),b:Int)=>{(a._1+b,a._2+1)},(a:(Int,Int),b:(Int,Int))=>{(a._1+b._1,a._2+b._2)})println(result._1/result._2)sc.stop()}
    }
    
    • fold(zerovalue:T)(f:(T,T)=>T):T aggregate的簡化版本。
    • collect() :Array[T] 算子慎用,很可能造成OOM異常,將RDD所有分區(qū)的數(shù)據(jù)拉取到Driver驅(qū)動程序端以數(shù)組的形式在內(nèi)存中保存RDD中的所有數(shù)據(jù)。
    • foreach(T=>Unit)|foreachPartition(Iterator[T] => Unit)對RDD中的數(shù)據(jù)進(jìn)行一個函數(shù)操作,函數(shù)無返回值,這個函數(shù)中我們既可以輸出數(shù)據(jù)(不用擔(dān)心OOM問題),同時也可以在函數(shù)內(nèi)部編寫保存數(shù)據(jù)代碼,保存到外部存儲中。
      使用foreach替換collect去檢查數(shù)據(jù)。
    package com.kang.actionimport com.mysql.cj.jdbc.Driver
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}import java.sql.DriverManagerobject Demo02 {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("transformation").setMaster("local[1]")val sc: SparkContext = new SparkContext(conf)case class Student(id:Int,name:String,age:Int,sex:String)val rdd:RDD[Student] = sc.parallelize(Array(Student(1,"zs",20,"man"),Student(2,"ls",30,"woman")))rdd.foreach((a:Student)=>{var conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8","root","root")var sql = "insert into student values(?,?,?,?)"val statement = conn.prepareStatement(sql)statement.setInt(1,a.id)statement.setString(2,a.name)statement.setInt(3,a.age)statement.setString(4,a.sex)val i = statement.executeUpdate()statement.close()conn.close()})sc.stop()}
    }
    

    image-20230924114317430

    • count():Long 返回RDD中數(shù)據(jù)量

    • 獲取RDD中的部分?jǐn)?shù)據(jù)的算子

      • first():T 獲取RDD中第一個元素,底層實現(xiàn)就是take(1)
      • take(n): Array[T] 獲取RDD中的前n個元素
      • takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] 獲取RDD排好序之后的前N個元素,
        【注意】RDD中的T類型必須可以比較大小,Scala中所有數(shù)值型的數(shù)據(jù)類型都不需要傳遞
      • takeSample(withReplacement, num, [seed]):Array[T] 隨機抽取RDD中的num條數(shù)據(jù) 返回一個array數(shù)組。
      package com.kang.actionimport org.apache.spark.rdd.RDD
      import org.apache.spark.{SparkConf, SparkContext}object Demo03 {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("transformation").setMaster("local[2]")val sc: SparkContext = new SparkContext(conf)val rdd:RDD[Int] = sc.makeRDD(Array(50,34,56,78,23,15,19))val first:Int = rdd.first()val array:Array[Int] = rdd.take(3)val array1:Array[Int] = rdd.takeOrdered(3)val array2:Array[Int] = rdd.takeSample(true, 5)println(s"rdd的第一條數(shù)據(jù)為$first")println(s"rdd的未排序之前的前三條數(shù)據(jù)為${array.mkString(",")}")println(s"rdd的排序之后的前三條數(shù)據(jù)為${array1.mkString(",")}")println(s"rdd的隨機抽取五條為${array2.mkString(",")}")Thread.sleep(1000000000)sc.stop()}
      }
      

      image-20230924150548158

    • 用來保存數(shù)據(jù)到文件中算子

      • saveAsTextFile(path)
      • saveAsObjectFile(path)
      package com.kang.actionimport org.apache.hadoop.io.compress.GzipCodec
      import org.apache.spark.io.CompressionCodec
      import org.apache.spark.rdd.RDD
      import org.apache.spark.{SparkConf, SparkContext}object Demo04 {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("transformation").setMaster("local[2]")val sc: SparkContext = new SparkContext(conf)val rdd:RDD[Int] = sc.makeRDD(Array(50,34,56,78,23,15,19))rdd.saveAsTextFile("file:///D://Desktop/a.txt")rdd.saveAsObjectFile("file:///D://Desktop/a.obj")val rdd1:RDD[(String,Int)] = sc.makeRDD(Array(("zs",20),("ls",20)))rdd1.saveAsSequenceFile("file:///D://Desktop/a.sequence",Option(classOf[GzipCodec]))Thread.sleep(1000000000)sc.stop()}
      }
      
  • 鍵值對類型RDD的行動算子

    • saveAsSequenceFile(path)
    • countByKey(): Map[K,Long] 將鍵值對RDD中key值出現(xiàn)的次數(shù)以map集合的形式給我們返回。
    package com.kang.actionimport org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}object Demo05 {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("action").setMaster("local[2]")val sc: SparkContext = new SparkContext(conf)val rdd:RDD[String] = sc.makeRDD(Array("hadoop", "spark", "flink", "hadoop"))val rdd1:RDD[(String,Int)] = rdd.map((_,1))val map:scala.collection.Map[String,Long] = rdd1.countByKey()map.foreach(println)sc.stop()}
    }
    

    image-20230924153152453

6.4、RDD的一些比較特殊的行動算子(只針對整數(shù)類型的RDD有效)

val rdd2:RDD[Int] = sc.makeRDD(Array(1,2,3,4,5,6,7))
val sum:Double = rdd2.sum()
println(sum)
val d = rdd2.mean()
println(d)

image-20230924153231426

image-20230924153241509

image-20230924110402767

【補充】在Scala中,每一個RDD都是RDD類型的,可調(diào)用的方法按道理來說只能是RDD內(nèi)部定義的方法,但是有些特殊的RDD(鍵值對RDD、整數(shù)類型的RDD)可以調(diào)用非RDD內(nèi)部聲明的函數(shù),底層采用了Scala的隱式轉(zhuǎn)換機制擴(kuò)充了特殊RDD類型的功能。

package com.kang.RDD的底層隱式轉(zhuǎn)換的模擬class RDDModel[T] {def test():Unit={}
}
class DoubleRDDModel[T]{def test01(): Unit = {}
}object Demo01{def main(args: Array[String]): Unit = {implicit def a(r:RDDModel[Double]):DoubleRDDModel[Double]={new DoubleRDDModel[Double]();}implicit def b(r:RDDModel[Int]):DoubleRDDModel[Double]={new DoubleRDDModel[Double]();}var rdd:RDDModel[Int] = new RDDModel[Int]()rdd.test01()}
}

7、RDD的持久化(緩存)

  • 在一個Spark的Application中,可能一個RDD被多個Job,或者被同一Job多次使用,但是RDD每次計算完成之后,下次如果還需要使用,需要根據(jù)依賴鏈從頭開始計算RDD,這樣的話,效率太低,根據(jù)依賴鏈計算確實挺安全,但是也特別浪費時間。如果我們想讓計算快速完成,Spark提供了一種機制,緩存機制,可以實現(xiàn)將重復(fù)性使用的RDD緩存起來(內(nèi)存、磁盤、內(nèi)存+磁盤),RDD緩存只有當(dāng)觸發(fā)了第一個行動算子之后才會進(jìn)行緩存操作。這樣的話第二個job和后續(xù)的job再使用RDD直接從緩存獲取,就不需要重新計算了。而且如果緩存的數(shù)據(jù)丟失,可以根據(jù)依賴鏈重新計算。

  • 緩存涉及到兩個算子:cache() persist() persist(StorageLevel)。

    • cache底層實現(xiàn)是persist(),persist底層實現(xiàn)persist(StorageLevel.MEMORY_ONLY)
    • StorageLevel有很多種緩存級別
    package com.kang.cacheimport org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}object Demo01 {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName("create-rdd").setMaster("local[5]")val sc: SparkContext = new SparkContext(sparkConf)val rdd:RDD[Int] = sc.makeRDD(1 to 10)val rdd1:RDD[Int] = rdd.map(a=>{print("map算子執(zhí)行了")a})rdd1.cache()rdd1.collect().foreach(print)rdd1.foreach(print)rdd1.take(5).foreach(print)sc.stop()}
    }
    

8、RDD的檢查點機制

  • 檢查點也是一種另類的RDD緩存方式,只不過和RDD持久化的區(qū)別在于,檢查點會把依賴鏈斷掉,同時檢查點的數(shù)據(jù)保存到HDFS分布式文件系統(tǒng)中,這樣依靠HDFS的副本機制保證緩存的高可靠性。RDD檢查點一旦設(shè)置成功,依賴鏈斷了,下一次如果我們再重新運行Spark程序,會從檢查點獲取數(shù)據(jù)往后運行,RDD之前的依賴計算全部不用執(zhí)行了。
  • 如果設(shè)置緩存點,那么設(shè)置之前,必須先使用SparkContext設(shè)置檢查點目錄,sc.setCheckPointDir(hdfspath),然后需要進(jìn)行設(shè)置檢查點的RDD,使用rdd.checkpoint()。
  • 檢查點也是第一次觸發(fā)行動算子之后才會進(jìn)行操作的。
package com.kang.checkpointimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Demo01 {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName("create-rdd").setMaster("local[5]")val sc: SparkContext = new SparkContext(sparkConf)sc.setCheckpointDir("hdfs://single:9000/checkpoint")val rdd:RDD[Int] = sc.makeRDD(1 to 10)val rdd1:RDD[Int] = rdd.map(a=>{print("map算子執(zhí)行了")a})rdd1.checkpoint()rdd1.collect().foreach(print)Thread.sleep(120000)rdd1.foreach(print)rdd1.take(5).foreach(print)sc.stop()}
}

9、RDD算子的依賴關(guān)系

  • RDD算子的依賴分為兩種
    • 寬依賴:shuffle類型的算子,父RDD的一個分區(qū)的數(shù)據(jù)被子RDD的多個分區(qū)使用,同時子RDD的一個分區(qū)數(shù)據(jù)也可能來自于多個父RDD的分區(qū)。
    • 窄依賴:父RDD的一個分區(qū)數(shù)據(jù)只能被子RDD的一個分區(qū)使用,但是子RDD的分區(qū)可以來自多個父RDD。
  • 如何查看一個算子的前一個依賴是寬依賴還是窄依賴:rdd.dependencies 函數(shù)。
  • stage劃分依據(jù) —— 一個stage指的是從一個shuffle算子開始到另一個shuffle算子之前的操作都?xì)w屬于同一個stage。
  • DAG生成 —— 基于依賴鏈和stage生成的。
package com.kang.dependimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object WC {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[5]")val sc: SparkContext = new SparkContext(sparkConf)val rdd:RDD[String] = sc.textFile("hdfs://single:9000/wc.txt")println(s"rdd的依賴為${rdd.dependencies}")val rdd1:RDD[String] = rdd.flatMap((line:String)=>{line.split(" ")})println(s"rdd1的依賴為${rdd1.dependencies}")val rdd2:RDD[(String,Long)] = rdd1.map((word:String)=>{(word,1L)})println(s"rdd2的依賴為${rdd2.dependencies}")val rdd3:RDD[(String,Long)] =rdd2.reduceByKey((a:Long,b:Long)=>{a+b})println(s"rdd3的依賴為${rdd3.dependencies}")val rdd4:RDD[(String,Long)] = rdd3.mapValues((v:Long)=>{v+5})println(s"rdd4的依賴為${rdd4.dependencies}")rdd4.foreach(println)Thread.sleep(10000000)sc.stop()}
}

  • 依賴關(guān)系是我們劃分stage階段的關(guān)鍵,stage劃分的依據(jù)就是根據(jù)寬依賴劃分。

10、RDD的兩個特殊的使用

  • RDD的累加器

    • 累加器就是在程序運行中獲取一些感興趣的數(shù)據(jù)的量,Spark中累加器功能比較強大的,除了獲取感興趣的數(shù)據(jù)量,還可以自定義累加器的類型,獲取一些其他的數(shù)據(jù)。

    • 累加器的使用有一個注意點:累加器一般是在Driver端定義,然后在RDD分區(qū)中修改累加器的數(shù)值,然后在Driver端獲取累加器的結(jié)果。

    • 用法

      • 1、需要在Driver中創(chuàng)建一個累加器 — Spark自帶的,累加整數(shù)類型的值
        val accu = sc.xxxxaccumulator(累加器的名字)
      • 2、在RDD的算子計算中對累加器進(jìn)行賦值操作
        accu.add(1)
      • 3、在Driver端獲取累加器的結(jié)果
        accu.value
      package com.kang.accuimport org.apache.spark.rdd.RDD
      import org.apache.spark.util.LongAccumulator
      import org.apache.spark.{SparkConf, SparkContext}object Demo01 {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[5]")val sc: SparkContext = new SparkContext(sparkConf)val accumulator:LongAccumulator = sc.longAccumulator("wordcount")val rdd: RDD[String] = sc.makeRDD(Array("spark flink sqoop","hive hadoop spark","hadoop spark"))val rdd1: RDD[String] = rdd.flatMap((line: String) => {line.split(" ")})val rdd2: RDD[(String, Long)] = rdd1.map((word: String) => {accumulator.add(1L)(word, 1L)})val rdd3: RDD[(String, Long)] = rdd2.reduceByKey((a: Long, b: Long) => {a + b})val rdd4: RDD[(String, Long)] = rdd3.mapValues((v: Long) => {v + 5})rdd4.foreach(println)println(s"總共有${accumulator.value}個單詞")}
      }
      

      image-20230925203728163

  • RDD的廣播變量

    • 廣播變量和累加器還挺像的,廣播變量是只能讓RDD的分區(qū)獲取值,而不能修改值,廣播變量是只讀的。

    • 在Driver端聲明一個廣播變量以后,這樣的話可以在任何一個RDD的任何一個分區(qū)中獲取廣播變量的值計算。而且廣播變量的數(shù)據(jù)類型可以自定義

    • 用法

      • 1、Driver端設(shè)置廣播變量
        val factorBC:Broadcast[T] = sc.broadcast(變量名)
      • 2、RDD分區(qū)中使用廣播變量
        factorBC.value
      package com.kang.broadimport org.apache.spark.broadcast.Broadcast
      import org.apache.spark.rdd.RDD
      import org.apache.spark.{SparkConf, SparkContext}object Demo01 {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("transformation").setMaster("local[*]")val sc: SparkContext = new SparkContext(conf)val rdd:RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 2)var num = 5val broad:Broadcast[Int] = sc.broadcast(num)rdd.map(a=>{a*broad.value}).foreach(println)sc.stop()}
      }
      

      image-20230925204600106

11、RDD的分區(qū)機制

  • 只有鍵值對類型的RDD才有分區(qū)器,分區(qū)器在執(zhí)行shuffle算子的時候才會生效。
  • HashPartitioner(默認(rèn))、RangePartitioner、自定義分區(qū)器

七、【補充】Scala的比較器問題

在編程語言中,數(shù)據(jù)類型基本上都是比較大小的,數(shù)值類型的數(shù)據(jù)類型可以使用大于小于比較運算符直接比較大小,面向?qū)ο笾幸脭?shù)據(jù)類型也是一種數(shù)據(jù)類型(自定義類型),因此我們就得需要通過一個比較器來告訴編譯器我們自定義的類型如何去比較大小。

Java中存在兩個比較器用于比較Java類的大小關(guān)系,Java的比較器有兩個ComparableComparator,區(qū)別在于Comparable是讓Java類必須實現(xiàn)的,Comparator是在使用比較器的時候使用匿名內(nèi)部類的形式傳遞比較規(guī)則的。
Scala也是面向?qū)ο蟮?#xff0c;Scala中也存在類的概念,類在有些情況下也是必須能比較大小的。Scala也給我們提供了兩個比較器,兩個比較器是Java兩個比較器的子接口。
Ordered 是Comparable的子接口
Ordering 是Comparator的子接口
http://www.risenshineclean.com/news/53883.html

相關(guān)文章:

  • wordpress表單編輯插件下載湖南靠譜的關(guān)鍵詞優(yōu)化
  • 網(wǎng)站底部懸浮廣告投放數(shù)據(jù)分析
  • 網(wǎng)站搭建中企動力第一推薦幾個靠譜的網(wǎng)站
  • bootstrap 做企業(yè)網(wǎng)站百度關(guān)鍵詞優(yōu)化策略
  • 衡水網(wǎng)站網(wǎng)站建設(shè)成都最好的網(wǎng)站推廣優(yōu)化公司
  • 仿做靜態(tài)網(wǎng)站多少錢seo網(wǎng)址超級外鏈工具
  • 游戲網(wǎng)站如何做濰坊seo計費
  • 怎樣優(yōu)化手機網(wǎng)站愛采購seo
  • 吳江城鄉(xiāng)建設(shè)局網(wǎng)站搜索引擎優(yōu)化特點
  • 新鄉(xiāng)網(wǎng)站建設(shè)那家好友情鏈接是什么
  • 營銷型網(wǎng)站建設(shè)申請域名時公司類型的域名后綴一般是廣州今日頭條新聞
  • 凡科做網(wǎng)站有什么用軟文代寫費用
  • wordpress qq聯(lián)系代碼app優(yōu)化排名
  • 網(wǎng)站后臺統(tǒng)計代碼福州網(wǎng)站seo公司
  • 專做藥材的網(wǎng)站有哪些東莞優(yōu)化網(wǎng)站關(guān)鍵詞優(yōu)化
  • 網(wǎng)站開發(fā)算前端嗎合肥網(wǎng)站建設(shè)
  • 網(wǎng)站建設(shè)項目經(jīng)理考題怎么seo網(wǎng)站關(guān)鍵詞優(yōu)化
  • 網(wǎng)站排名易下拉教程app推廣在哪里可以接單
  • 東莞美食網(wǎng)站建設(shè)報價承德網(wǎng)絡(luò)推廣
  • 網(wǎng)站建設(shè)截圖中國女排聯(lián)賽排名
  • 商贏網(wǎng)站建設(shè)百度注冊
  • 免費自制主題app滕州seo
  • 用護(hù)衛(wèi)神做共享網(wǎng)站優(yōu)化課程設(shè)置
  • 建設(shè)網(wǎng)站個人簡介范文短視頻入口seo
  • 網(wǎng)站目錄管理系統(tǒng)模板aso搜索排名優(yōu)化
  • 電子商務(wù)網(wǎng)站的建設(shè)收益seo做的比較牛的公司
  • 網(wǎng)站建設(shè)尺寸金華網(wǎng)站推廣
  • 如何讓網(wǎng)站被百度收入如何優(yōu)化網(wǎng)站快速排名
  • 學(xué)院網(wǎng)站建設(shè) 需求分析百度自己的宣傳廣告
  • 高端網(wǎng)站優(yōu)化公司專業(yè)制作網(wǎng)站的公司哪家好