怎樣才能做好網(wǎng)絡(luò)推廣優(yōu)化大師怎么下載
目錄
創(chuàng)建 DataFrames
生成我們自己的 JSON 數(shù)據(jù)
創(chuàng)建 DataFrame
創(chuàng)建臨時(shí)表
簡(jiǎn)單的 DataFrame 查詢
DataFrame API 查詢
SQL 查詢
創(chuàng)建 DataFrames
通常,您會(huì)通過使用 SparkSession(或在 PySpark shell 中調(diào)用 spark)導(dǎo)入數(shù)據(jù)來創(chuàng)建 DataFrame。
我們將討論如何將數(shù)據(jù)導(dǎo)入到本地文件系統(tǒng)、Hadoop 分布式文件系統(tǒng)(HDFS)或其他云存儲(chǔ)系統(tǒng)(例如,S3 或 WASB)。在本文中,我們將專注于在 Spark 內(nèi)直接生成您自己的 DataFrame 數(shù)據(jù)或利用 Databricks 社區(qū)版中已經(jīng)可用的數(shù)據(jù)源。
首先,我們將不訪問文件系統(tǒng),而是通過生成數(shù)據(jù)來創(chuàng)建 DataFrame。在這種情況下,我們將首先創(chuàng)建 stringJSONRDD RDD,然后將其轉(zhuǎn)換為 DataFrame。這段代碼片段創(chuàng)建了一個(gè)包含游泳者(他們的 ID、姓名、年齡和眼睛顏色)的 JSON 格式的 RDD。
生成我們自己的 JSON 數(shù)據(jù)
下面,我們將最初生成 stringJSONRDD RDD:
stringJSONRDD = sc.parallelize(("""{ "id": "123",
"name": "Katie",
"age": 19,
"eyeColor": "brown"}""",
"""{
"id": "234",
"name": "Michael",
"age": 22,
"eyeColor": "green"}""",
"""{
"id": "345",
"name": "Simone",
"age": 23,
"eyeColor": "blue"}""")
)
現(xiàn)在我們已經(jīng)創(chuàng)建了 RDD,我們將使用 SparkSession 的 read.json 方法(即 spark.read.json(...))將其轉(zhuǎn)換為 DataFrame。我們還將使用 .createOrReplaceTempView 方法創(chuàng)建一個(gè)臨時(shí)表。
創(chuàng)建 DataFrame
以下是創(chuàng)建 DataFrame 的代碼:
swimmersJSON = spark.read.json(stringJSONRDD)
創(chuàng)建臨時(shí)表
以下是創(chuàng)建臨時(shí)表的代碼:
swimmersJSON.createOrReplaceTempView("swimmersJSON")
如前文所述,許多 RDD 操作是轉(zhuǎn)換,這些轉(zhuǎn)換直到執(zhí)行動(dòng)作操作時(shí)才執(zhí)行。例如,在前面的代碼片段中,sc.parallelize 是一個(gè)轉(zhuǎn)換,當(dāng)使用 spark.read.json 從 RDD 轉(zhuǎn)換為 DataFrame 時(shí)執(zhí)行。注意,在這段代碼的筆記本截圖中(左下角附近),直到包含 spark.read.json 操作的第二個(gè)單元格,Spark 作業(yè)才執(zhí)行。
為了進(jìn)一步強(qiáng)調(diào)這一點(diǎn),在下圖的右側(cè)窗格中,我們展示了執(zhí)行的 DAG 圖。
在下面的截圖中,您可以看到 Spark 作業(yè)的 parallelize 操作來自生成 RDD stringJSONRDD 的第一個(gè)單元格,而 map 和 mapPartitions 操作是創(chuàng)建 DataFrame 所需的操作:
需要注意的是,parallelize、map 和 mapPartitions 都是 RDD 轉(zhuǎn)換。在 DataFrame 操作 spark.read.json(在本例中)中,不僅有 RDD 轉(zhuǎn)換,還有將 RDD 轉(zhuǎn)換為 DataFrame 的動(dòng)作。這是一個(gè)重要的說明,因?yàn)榧词鼓趫?zhí)行 DataFrame 操作,要調(diào)試您的操作,您需要記住您將在 Spark UI 中理解 RDD 操作。
請(qǐng)注意,創(chuàng)建臨時(shí)表是一個(gè) DataFrame 轉(zhuǎn)換,并且在執(zhí)行 DataFrame 動(dòng)作之前不會(huì)執(zhí)行(例如,要執(zhí)行的 SQL 查詢)。
簡(jiǎn)單的 DataFrame 查詢
現(xiàn)在您已經(jīng)創(chuàng)建了 swimmersJSON DataFrame,我們將能夠在其上運(yùn)行 DataFrame API 以及 SQL 查詢。讓我們從一個(gè)簡(jiǎn)單的查詢開始,顯示 DataFrame 中的所有行。
DataFrame API 查詢
要使用 DataFrame API 執(zhí)行此操作,您可以使用 show(<n>) 方法,該方法將前 n 行打印到控制臺(tái):
# DataFrame API
swimmersJSON.show()
這將給出以下輸出:
SQL 查詢
如果您更傾向于編寫 SQL 語句,您可以編寫以下查詢:
spark.sql("select * from swimmersJSON").collect()
這將給出以下輸出:
我們使用了 .collect() 方法,它返回所有記錄作為一個(gè)行對(duì)象(Row objects)的列表。請(qǐng)注意,您可以對(duì) DataFrames 和 SQL 查詢使用 collect() 或 show() 方法。只要確保,如果您使用 .collect(),這是針對(duì)小 DataFrame 的,因?yàn)樗鼘⒎祷?DataFrame 中的所有行,并將它們從執(zhí)行器移回驅(qū)動(dòng)程序。您可以改用 take(<n>) 或 show(<n>),這允許您通過指定 <n> 來限制返回的行數(shù):