網(wǎng)站建設設電工培訓課程
文章目錄
- SparkSQL 核心編程
- 1、新的起點
- 2、SQL 語法
- 1) 讀取 json 文件創(chuàng)建 DataFrame
- 2) 對 DataFrame 創(chuàng)建一個臨時表
- 3) 通過SQL語句實現(xiàn)查詢?nèi)?/li>
- 3、DSL 語法
- 1) 創(chuàng)建一個DataFrame
- 2) 查看DataFrame的Schema信息
- 3) 只查看"username"列數(shù)據(jù)
- 4) 查看"username"列以及"age"+1數(shù)據(jù)
- 5) 查看"age"大于"20"的數(shù)據(jù)
- 6) 按照"age"分組,查看數(shù)據(jù)條數(shù)
- 4、RDD 轉換為 DataFrame
- 5、DataSet
- 1) 創(chuàng)建 DataSet
- 2) DataFrame 轉換為 DataSet
- 3)RDD 直接轉換為 DataSet
SparkSQL 核心編程
學習如何使用 Spark SQL 提供的 DataFrame 和 DataSet 模型進行編程,以及了解他們之間的關系和轉換,關于具體的SQL
書寫不是我們的重點。
1、新的起點
Spark Core 中,如果想要執(zhí)行應用程序,需要首先構建上下文環(huán)境對象,SparkContext
,Spark SQL 其實可以理解為對 Spark Core的一種封裝,不僅僅在模型上進行了封裝,上下文環(huán)境對象也進行了封裝。
在老的版本中,SparkSQL提供兩種 SQL 查詢起始點,一個叫 SQLContext,用于 Spark 自己提供的 SQL 查詢,一個叫 HiveContext,用于連接 Hive 查詢。
SparkSession
是 Spark 最新的 SQL 查詢起點,實質(zhì)是上 SQLContext 和 HiveContext 的組合,所以在 SQLContext 和 HiveContext 上可用的API在 SparkSession 上同樣是可以使用的。SparkSession 內(nèi)部封裝了 SparkContext,所以實際上是由sparkContext 完成的。當我們使用 spark-shell 的時候,spark 框架會自動創(chuàng)建一個名稱叫做spark的SparkSession對象,就像我們以前可以自動獲取到一個sc
來表示SparkContext
對象一樣。
這下面是在終端命令行簡單的演示,是怎么用spark 執(zhí)行sql語句執(zhí)行的。
讀取json文件創(chuàng)建DataFrame:
val df = spark.read.json("input/user.json")
注意:
從內(nèi)存中獲取數(shù)據(jù),spark 可以知道數(shù)據(jù)具體是什么。如果是數(shù)字,默認作為 Int 處理,但是從文件中讀取的數(shù)字,不能確定是什么類型,所以用 bigint(大整形) 接收,可以和 Long 類型轉換,但是和 Int 不能進行轉換。
2、SQL 語法
SQL 語法風格是指我們查詢數(shù)據(jù)的時候使用 SQL 語句來查詢,這種風格的查詢必須要有臨時視圖或者全局視圖來輔助。
1) 讀取 json 文件創(chuàng)建 DataFrame
2) 對 DataFrame 創(chuàng)建一個臨時表
要想用sql語句,那肯定首先就要有個表,所以將DataFrame轉換為一個臨時表,就可以用sql語句了。創(chuàng)建臨時表使用 createReplaceTempView("pepole")
,創(chuàng)建臨時視圖使用 createTempView("pepole")
注意:普通臨時表是Session范圍內(nèi)的,如果想應用范圍內(nèi)有效,可以使用全局臨時表。使用全局臨時表時需要全路徑訪問。
比如下面這里就是newSession 開啟了一個新的會話,之前那個臨時表就用不了了,找不到。
3) 通過SQL語句實現(xiàn)查詢?nèi)?/h3>
spark 查詢語句:spark.sql("select * from user")
這個user就是上面創(chuàng)建的臨時視圖,必須要創(chuàng)建個這樣的對象,才能進行sql 語句查詢。
這個就是查詢的結果
3、DSL 語法
DataFrame 提供一個特定領域語言(domain-specific language,DSL)去管理結構化數(shù)據(jù)。可以在 Scala,Java,Python,和 R 中使用 DSL,使用 DSL 語法風格不必去創(chuàng)建臨時視圖了。
1) 創(chuàng)建一個DataFrame
val df = spark.read.json("input/user.json")
2) 查看DataFrame的Schema信息
df.printSchema
用這個看到看信息,說明spark的那些方法都是可以用的。
這里可以看到,這種DSL 不需要創(chuàng)建什么表,這個是可以直接用 DataFrame對象直接進行select的查詢
3) 只查看"username"列數(shù)據(jù)
df.select("username").show
4) 查看"username"列以及"age"+1數(shù)據(jù)
df.select($"username",$"age" + 1)
df.select('username,'age + 1)
注意:涉及到運算的時候,每列都必須使用$,或者采用引號表達式:單引號+字段名
或者不要雙引號,在每個字段的前面加上一個單引號也是可以的。
5) 查看"age"大于"20"的數(shù)據(jù)
就不是select了,使用filter進行篩選過濾。
df.filter($"age">20).show
注意:這里這個大于20,上面那個20+1那個是不算的。
6) 按照"age"分組,查看數(shù)據(jù)條數(shù)
使用groupBy,分組完還必要要用count統(tǒng)計
df.groupBy("age").count.show
4、RDD 轉換為 DataFrame
在 IDEA 開發(fā)程序時,如果需要 RDD 與 DF 或者 DS 之間互相操作,那么需要引入import spark.implicits._
這里的 spark 不是 Scala 中的包名,而是創(chuàng)建的sparkSession 對象的變量名稱,所以必須先創(chuàng)建 SparkSession 對象再導入。這里的 spark 對象那個不能那個使用 var 聲明,因為Scala 只支持 val 修飾的對象的引入。
spark-shell 中無需導入,自動完成此操作。
首先創(chuàng)建一個rdd
val rdd = sc.makeRDD(List(1,2,3,4))
然后可以看到下面有很多的方法,其中有一個toDF
方法,就是 RDD 轉換為 DataFrame的。
val df = rdd.toDF("id")
我們將數(shù)據(jù)轉換為DataFrame 那我們得讓他知道我們的數(shù)據(jù)是什么意思,所以給他一個列字段名,“id”。
要是想從DataFrame轉換回RDD的話,那么直接 df.rdd
就轉換回去了。
5、DataSet
DataSet 是具有強類型
的數(shù)據(jù)集合,需要提供對應的類型信息。
1) 創(chuàng)建 DataSet
使用樣例類序列創(chuàng)建DataSet
上面創(chuàng)建了一個樣例類的列表的數(shù)據(jù) ,然后直接使用toDS
方法之間轉換為DataSet
轉換好之后,數(shù)據(jù)就可以直接看了。
2) DataFrame 轉換為 DataSet
首先從RDD轉換為DataFrame使用rdd.toDF
,然后我們要創(chuàng)建一個樣例類,注意樣例類里面這個列字段名要和那個DataFrame里面的那個字段名是一樣的,比如這里這個是value,然后用df.as[fengz]
有了類型他就變成DataSet了。
3)RDD 直接轉換為 DataSet
直接先創(chuàng)建一個樣例類,把他的類型先確定好,然后創(chuàng)建一個RDD,RDD里面的數(shù)據(jù)直接使用這個樣例類創(chuàng)建,然后直接使用rdd.toDS
直接就從RDD轉換為DataSet了。