關于建設網(wǎng)站的培訓知識長沙做優(yōu)化的公司
1 概述
Spark SQL通過DataFrame接口支持對多種數(shù)據(jù)源進行操作。
DataFrame可使用關系型變換進行操作,也可用于創(chuàng)建臨時視圖。將DataFrame注冊為臨時視圖可以讓你對其數(shù)據(jù)運行SQL查詢。
本節(jié)介紹使用Spark數(shù)據(jù)源加載和保存數(shù)據(jù)的一般方法,并進一步介紹可用于內(nèi)置數(shù)據(jù)源的特定選項。
數(shù)據(jù)源關鍵操作:
- load
- save
2 大數(shù)據(jù)作業(yè)基本流程
input 業(yè)務邏輯 output
不管是使用MR/Hive/Spark/Flink/Storm。
Spark能處理多種數(shù)據(jù)源的數(shù)據(jù),而且這些數(shù)據(jù)源可以是在不同地方:
- file/HDFS/S3/OSS/COS/RDBMS
- json/ORC/Parquet/JDBC
object DataSourceApp {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().master("local").getOrCreate()text(spark)// json(spark)// common(spark)// parquet(spark)// convert(spark)// jdbc(spark)jdbc2(spark)spark.stop()}
}
3 text數(shù)據(jù)源讀寫
讀取文本文件的 API,SparkSession.read.text()
參數(shù):
path
:讀取文本文件的路徑??梢允菃蝹€文件、文件夾或者包含通配符的文件路徑。wholetext
:如果為 True,則將整個文件讀取為一條記錄;否則將每行讀取為一條記錄。lineSep
:如果指定,則使用指定的字符串作為行分隔符。pathGlobFilter
:用于篩選文件的通配符模式。recursiveFileLookup
:是否遞歸查找子目錄中的文件。allowNonExistingFiles
:是否允許讀取不存在的文件。allowEmptyFiles
:是否允許讀取空文件。
返回一個 DataFrame 對象,其中每行是文本文件中的一條記錄。
def text(spark: SparkSession): Unit = {import spark.implicits._val textDF: DataFrame = spark.read.text("/Users/javaedge/Downloads/sparksql-train/data/people.txt")val result: Dataset[(String, String)] = textDF.map(x => {val splits: Array[String] = x.getString(0).split(",")(splits(0).trim, splits(1).trim)})
編譯無問題,運行時報錯:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Text data source supports only a single column, and you have 2 columns.;
思考下,如何使用text方式,輸出多列的值?
修正后
val result: Dataset[String] = textDF.map(x => {val splits: Array[String] = x.getString(0).split(",")splits(0).trim
})result.write.text("out")
繼續(xù)報錯:
Exception in thread "main" org.apache.spark.sql.AnalysisException: path file:/Users/javaedge/Downloads/sparksql-train/out already exists.;
回想Hadoop中MapReduce的輸出:
- 第一次0K
- 第二次也會報錯輸出目錄已存在
這關系到 Spark 中的 mode
SaveMode
Spark SQL中,使用DataFrame或Dataset的write方法將數(shù)據(jù)寫入外部存儲系統(tǒng)時,使用“SaveMode”參數(shù)指定如何處理已存在的數(shù)據(jù)。
SaveMode有四種取值:
- SaveMode.ErrorIfExists:如果目標路徑已經(jīng)存在,則會引發(fā)異常
- SaveMode.Append:將數(shù)據(jù)追加到現(xiàn)有數(shù)據(jù)
- SaveMode.Overwrite:覆蓋現(xiàn)有數(shù)據(jù)
- SaveMode.Ignore:若目標路徑已經(jīng)存在,則不執(zhí)行任何操作
所以,修正如下:
result.write.mode(SaveMode.overwrite).text("out")
4 JSON 數(shù)據(jù)源
// JSON
def json(spark: SparkSession): Unit = {import spark.implicits._val jsonDF: DataFrame = spark.read.json("/Users/javaedge/Downloads/sparksql-train/data/people.json")jsonDF.show()// 只要age>20的數(shù)據(jù)jsonDF.filter("age > 20").select("name").write.mode(SaveMode.Overwrite).json("out")output:
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
嵌套 JSON
// 嵌套 JSON
val jsonDF2: DataFrame = spark.read.json("/Users/javaedge/Downloads/sparksql-train/data/people2.json")
jsonDF2.show()jsonDF2.select($"name",$"age",$"info.work".as("work"),$"info.home".as("home")).write.mode("overwrite").json("out")output:
+---+-------------------+----+
|age| info|name|
+---+-------------------+----+
| 30|[shenzhen, beijing]| PK|
+---+-------------------+----+
5 標準寫法
// 標準API寫法
private def common(spark: SparkSession): Unit = {import spark.implicits._val textDF: DataFrame = spark.read.format("text").load("/Users/javaedge/Downloads/sparksql-train/data/people.txt")val jsonDF: DataFrame = spark.read.format("json").load("/Users/javaedge/Downloads/sparksql-train/data/people.json")textDF.show()println("~~~~~~~~")jsonDF.show()jsonDF.write.format("json").mode("overwrite").save("out")}output:
+-----------+
| value|
+-----------+
|Michael, 29|
| Andy, 30|
| Justin, 19|
+-----------+~~~~~~~~
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
6 Parquet數(shù)據(jù)源
6.1 簡介
一種列式存儲格式,在大數(shù)據(jù)環(huán)境中高效地存儲和處理數(shù)據(jù)。由Hadoop生態(tài)系統(tǒng)中的Apache Parquet項目開發(fā)的。
6.2 設計目標
支持高效的列式存儲和壓縮,并提供高性能的讀/寫能力,以便處理大規(guī)模結構化數(shù)據(jù)。
Parquet可以與許多不同的計算框架一起使用,如Apache Hadoop、Apache Spark、Apache Hive等,因此廣泛用于各種大數(shù)據(jù)應用程序中。
6.3 優(yōu)點
高性能、節(jié)省存儲空間、支持多種編程語言和數(shù)據(jù)類型、易于集成和擴展等。
private def parquet(spark: SparkSession): Unit = {import spark.implicits._val parquetDF: DataFrame = spark.read.parquet("/Users/javaedge/Downloads/sparksql-train/data/users.parquet")parquetDF.printSchema()parquetDF.show()parquetDF.select("name", "favorite_numbers").write.mode("overwrite").option("compression", "none").parquet("out")output:
root|-- name: string (nullable = true)|-- favorite_color: string (nullable = true)|-- favorite_numbers: array (nullable = true)| |-- element: integer (containsNull = true)+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3, 9, 15, 20]|
| Ben| red| []|
+------+--------------+----------------+
7convert
方便從一種數(shù)據(jù)源寫到另一種數(shù)據(jù)源。
存儲類型轉換:JSON==>Parquet
def convert(spark: SparkSession): Unit = {import spark.implicits._val jsonDF: DataFrame = spark.read.format("json").load("/Users/javaedge/Downloads/sparksql-train/data/people.json")jsonDF.show()jsonDF.filter("age>20").write.format("parquet").mode(SaveMode.Overwrite).save("out")
8 JDBC
有些數(shù)據(jù)是在MySQL,使用Spark處理,肯定要通過Spark讀出MySQL的數(shù)據(jù)。
數(shù)據(jù)源是text/json,通過Spark處理完后,要將統(tǒng)計結果寫入MySQL。
查 DB
寫法一
def jdbc(spark: SparkSession): Unit = {import spark.implicits._val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306").option("dbtable", "smartrm_monolith.order").option("user", "root").option("password", "root").load()jdbcDF.filter($"order_id" > 150).show(100)
}
寫法二
val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "root")val jdbcDF2: DataFrame = spark.read.jdbc(url, srcTable, connectionProperties)jdbcDF2.filter($"order_id" > 100)
寫 DB
val connProps = new Properties()
connProps.put("user", "root")
connProps.put("password", "root")val jdbcDF: DataFrame = spark.read.jdbc(url, srcTable, connProps)jdbcDF.filter($"order_id" > 100).write.jdbc(url, "smartrm_monolith.order_bak", connProps)
若 目標表不存在,會自動幫你創(chuàng)建:
統(tǒng)一配置管理
如何將那么多數(shù)據(jù)源配置參數(shù)統(tǒng)一管理呢?
先引入依賴:
<dependency><groupId>com.typesafe</groupId><artifactId>config</artifactId><version>1.3.3</version>
</dependency>
配置文件:
讀配置的程序:
package com.javaedge.bigdata.chapter05import com.typesafe.config.{Config, ConfigFactory}object ParamsApp {def main(args: Array[String]): Unit = {val config: Config = ConfigFactory.load()val url: String = config.getString("db.default.url")println(url)}}
private def jdbcConfig(spark: SparkSession): Unit = {import spark.implicits._val config = ConfigFactory.load()val url = config.getString("db.default.url")val user = config.getString("db.default.user")val password = config.getString("db.default.password")val driver = config.getString("db.default.driver")val database = config.getString("db.default.database")val table = config.getString("db.default.table")val sinkTable = config.getString("db.default.sink.table")val connectionProperties = new Properties()connectionProperties.put("user", user)connectionProperties.put("password", password)val jdbcDF: DataFrame = spark.read.jdbc(url, s"$database.$table", connectionProperties)jdbcDF.filter($"order_id" > 100).show()
寫到新表:
jdbcDF.filter($"order_id" > 158)
.write.jdbc(url, s"$database.$sinkTable", connectionProperties)