中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當前位置: 首頁 > news >正文

關于建設網(wǎng)站的培訓知識長沙做優(yōu)化的公司

關于建設網(wǎng)站的培訓知識,長沙做優(yōu)化的公司,網(wǎng)站建設行業(yè)地位,網(wǎng)站策劃編輯招聘1 概述 Spark SQL通過DataFrame接口支持對多種數(shù)據(jù)源進行操作。 DataFrame可使用關系型變換進行操作,也可用于創(chuàng)建臨時視圖。將DataFrame注冊為臨時視圖可以讓你對其數(shù)據(jù)運行SQL查詢。 本節(jié)介紹使用Spark數(shù)據(jù)源加載和保存數(shù)據(jù)的一般方法,并進一步介紹…

1 概述

Spark SQL通過DataFrame接口支持對多種數(shù)據(jù)源進行操作。

DataFrame可使用關系型變換進行操作,也可用于創(chuàng)建臨時視圖。將DataFrame注冊為臨時視圖可以讓你對其數(shù)據(jù)運行SQL查詢。

本節(jié)介紹使用Spark數(shù)據(jù)源加載和保存數(shù)據(jù)的一般方法,并進一步介紹可用于內(nèi)置數(shù)據(jù)源的特定選項。

數(shù)據(jù)源關鍵操作:

  • load
  • save

2 大數(shù)據(jù)作業(yè)基本流程

input 業(yè)務邏輯 output
不管是使用MR/Hive/Spark/Flink/Storm。

Spark能處理多種數(shù)據(jù)源的數(shù)據(jù),而且這些數(shù)據(jù)源可以是在不同地方:

  • file/HDFS/S3/OSS/COS/RDBMS
  • json/ORC/Parquet/JDBC
object DataSourceApp {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().master("local").getOrCreate()text(spark)// json(spark)// common(spark)// parquet(spark)// convert(spark)// jdbc(spark)jdbc2(spark)spark.stop()}
}

3 text數(shù)據(jù)源讀寫

讀取文本文件的 API,SparkSession.read.text()

參數(shù):

  • path:讀取文本文件的路徑??梢允菃蝹€文件、文件夾或者包含通配符的文件路徑。
  • wholetext:如果為 True,則將整個文件讀取為一條記錄;否則將每行讀取為一條記錄。
  • lineSep:如果指定,則使用指定的字符串作為行分隔符。
  • pathGlobFilter:用于篩選文件的通配符模式。
  • recursiveFileLookup:是否遞歸查找子目錄中的文件。
  • allowNonExistingFiles:是否允許讀取不存在的文件。
  • allowEmptyFiles:是否允許讀取空文件。

返回一個 DataFrame 對象,其中每行是文本文件中的一條記錄。

def text(spark: SparkSession): Unit = {import spark.implicits._val textDF: DataFrame = spark.read.text("/Users/javaedge/Downloads/sparksql-train/data/people.txt")val result: Dataset[(String, String)] = textDF.map(x => {val splits: Array[String] = x.getString(0).split(",")(splits(0).trim, splits(1).trim)})

編譯無問題,運行時報錯:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Text data source supports only a single column, and you have 2 columns.;

思考下,如何使用text方式,輸出多列的值?

修正后

val result: Dataset[String] = textDF.map(x => {val splits: Array[String] = x.getString(0).split(",")splits(0).trim
})result.write.text("out")

繼續(xù)報錯:

Exception in thread "main" org.apache.spark.sql.AnalysisException: path file:/Users/javaedge/Downloads/sparksql-train/out already exists.;

回想Hadoop中MapReduce的輸出:

  • 第一次0K
  • 第二次也會報錯輸出目錄已存在

這關系到 Spark 中的 mode

SaveMode

Spark SQL中,使用DataFrame或Dataset的write方法將數(shù)據(jù)寫入外部存儲系統(tǒng)時,使用“SaveMode”參數(shù)指定如何處理已存在的數(shù)據(jù)。

SaveMode有四種取值:

  1. SaveMode.ErrorIfExists:如果目標路徑已經(jīng)存在,則會引發(fā)異常
  2. SaveMode.Append:將數(shù)據(jù)追加到現(xiàn)有數(shù)據(jù)
  3. SaveMode.Overwrite:覆蓋現(xiàn)有數(shù)據(jù)
  4. SaveMode.Ignore:若目標路徑已經(jīng)存在,則不執(zhí)行任何操作

所以,修正如下:

result.write.mode(SaveMode.overwrite).text("out")

4 JSON 數(shù)據(jù)源

// JSON
def json(spark: SparkSession): Unit = {import spark.implicits._val jsonDF: DataFrame = spark.read.json("/Users/javaedge/Downloads/sparksql-train/data/people.json")jsonDF.show()// 只要age>20的數(shù)據(jù)jsonDF.filter("age > 20").select("name").write.mode(SaveMode.Overwrite).json("out")output:
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

嵌套 JSON

// 嵌套 JSON
val jsonDF2: DataFrame = spark.read.json("/Users/javaedge/Downloads/sparksql-train/data/people2.json")
jsonDF2.show()jsonDF2.select($"name",$"age",$"info.work".as("work"),$"info.home".as("home")).write.mode("overwrite").json("out")output:
+---+-------------------+----+
|age|               info|name|
+---+-------------------+----+
| 30|[shenzhen, beijing]|  PK|
+---+-------------------+----+

5 標準寫法

// 標準API寫法
private def common(spark: SparkSession): Unit = {import spark.implicits._val textDF: DataFrame = spark.read.format("text").load("/Users/javaedge/Downloads/sparksql-train/data/people.txt")val jsonDF: DataFrame = spark.read.format("json").load("/Users/javaedge/Downloads/sparksql-train/data/people.json")textDF.show()println("~~~~~~~~")jsonDF.show()jsonDF.write.format("json").mode("overwrite").save("out")}output:
+-----------+
|      value|
+-----------+
|Michael, 29|
|   Andy, 30|
| Justin, 19|
+-----------+~~~~~~~~
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

6 Parquet數(shù)據(jù)源

6.1 簡介

一種列式存儲格式,在大數(shù)據(jù)環(huán)境中高效地存儲和處理數(shù)據(jù)。由Hadoop生態(tài)系統(tǒng)中的Apache Parquet項目開發(fā)的。

6.2 設計目標

支持高效的列式存儲和壓縮,并提供高性能的讀/寫能力,以便處理大規(guī)模結構化數(shù)據(jù)。

Parquet可以與許多不同的計算框架一起使用,如Apache Hadoop、Apache Spark、Apache Hive等,因此廣泛用于各種大數(shù)據(jù)應用程序中。

6.3 優(yōu)點

高性能、節(jié)省存儲空間、支持多種編程語言和數(shù)據(jù)類型、易于集成和擴展等。

private def parquet(spark: SparkSession): Unit = {import spark.implicits._val parquetDF: DataFrame = spark.read.parquet("/Users/javaedge/Downloads/sparksql-train/data/users.parquet")parquetDF.printSchema()parquetDF.show()parquetDF.select("name", "favorite_numbers").write.mode("overwrite").option("compression", "none").parquet("out")output:
root|-- name: string (nullable = true)|-- favorite_color: string (nullable = true)|-- favorite_numbers: array (nullable = true)|    |-- element: integer (containsNull = true)+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+

7convert

方便從一種數(shù)據(jù)源寫到另一種數(shù)據(jù)源。

存儲類型轉換:JSON==>Parquet

def convert(spark: SparkSession): Unit = {import spark.implicits._val jsonDF: DataFrame = spark.read.format("json").load("/Users/javaedge/Downloads/sparksql-train/data/people.json")jsonDF.show()jsonDF.filter("age>20").write.format("parquet").mode(SaveMode.Overwrite).save("out")

8 JDBC

有些數(shù)據(jù)是在MySQL,使用Spark處理,肯定要通過Spark讀出MySQL的數(shù)據(jù)。
數(shù)據(jù)源是text/json,通過Spark處理完后,要將統(tǒng)計結果寫入MySQL。

查 DB

寫法一

def jdbc(spark: SparkSession): Unit = {import spark.implicits._val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306").option("dbtable", "smartrm_monolith.order").option("user", "root").option("password", "root").load()jdbcDF.filter($"order_id" > 150).show(100)
}

寫法二

val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "root")val jdbcDF2: DataFrame = spark.read.jdbc(url, srcTable, connectionProperties)jdbcDF2.filter($"order_id" > 100)

寫 DB

val connProps = new Properties()
connProps.put("user", "root")
connProps.put("password", "root")val jdbcDF: DataFrame = spark.read.jdbc(url, srcTable, connProps)jdbcDF.filter($"order_id" > 100).write.jdbc(url, "smartrm_monolith.order_bak", connProps)

若 目標表不存在,會自動幫你創(chuàng)建:

統(tǒng)一配置管理

如何將那么多數(shù)據(jù)源配置參數(shù)統(tǒng)一管理呢?

先引入依賴:

<dependency><groupId>com.typesafe</groupId><artifactId>config</artifactId><version>1.3.3</version>
</dependency>

配置文件:

讀配置的程序:

package com.javaedge.bigdata.chapter05import com.typesafe.config.{Config, ConfigFactory}object ParamsApp {def main(args: Array[String]): Unit = {val config: Config = ConfigFactory.load()val url: String = config.getString("db.default.url")println(url)}}
private def jdbcConfig(spark: SparkSession): Unit = {import spark.implicits._val config = ConfigFactory.load()val url = config.getString("db.default.url")val user = config.getString("db.default.user")val password = config.getString("db.default.password")val driver = config.getString("db.default.driver")val database = config.getString("db.default.database")val table = config.getString("db.default.table")val sinkTable = config.getString("db.default.sink.table")val connectionProperties = new Properties()connectionProperties.put("user", user)connectionProperties.put("password", password)val jdbcDF: DataFrame = spark.read.jdbc(url, s"$database.$table", connectionProperties)jdbcDF.filter($"order_id" > 100).show()

寫到新表:

jdbcDF.filter($"order_id" > 158)
.write.jdbc(url, s"$database.$sinkTable", connectionProperties)

http://www.risenshineclean.com/news/53598.html

相關文章:

  • 西安做網(wǎng)站建設哪家好搜索引擎優(yōu)化排名seo
  • 自適應導航網(wǎng)站模板線下推廣活動策劃方案
  • 做網(wǎng)站生意旁軟文廣告經(jīng)典案例600
  • 視頻拍攝腳本模板廣州seo優(yōu)化費用
  • 百度統(tǒng)計網(wǎng)站速度診斷快速優(yōu)化官網(wǎng)
  • 發(fā)放淘寶優(yōu)惠券的網(wǎng)站怎么做免費seo推廣計劃
  • 山東電力建設第一工程公司網(wǎng)站怎么讓客戶主動找你
  • 網(wǎng)站推廣商品怎么做效果最好奇葩網(wǎng)站100個
  • 經(jīng)典網(wǎng)站案例國際新聞今天
  • seo網(wǎng)站結構優(yōu)化seo排名查詢軟件
  • 公司網(wǎng)站設計案例其他搜索引擎
  • 建設銀行的網(wǎng)站是多少網(wǎng)絡營銷專業(yè)是學什么的
  • 網(wǎng)頁定做寧波 seo排名公司
  • 馀姚網(wǎng)站建設公司網(wǎng)站模版
  • 鄭州企業(yè)網(wǎng)站開發(fā)營銷軟文怎么寫
  • 企業(yè)網(wǎng)站建設公司鄭州什么樣的人適合做策劃
  • html5寫的網(wǎng)站有什么好處手機百度關鍵詞優(yōu)化
  • 用wordpress做音樂網(wǎng)站seo產(chǎn)品是什么意思
  • 廣州黃埔網(wǎng)站建設公司哪家好網(wǎng)頁制作教程書籍
  • 學做網(wǎng)站應該看那些書廈門關鍵詞優(yōu)化seo
  • 深圳網(wǎng)站建設企濟南優(yōu)化seo公司
  • 做網(wǎng)站得多少錢幫平臺做推廣怎么賺錢
  • 獨立建設網(wǎng)站制作重慶鎮(zhèn)海seo整站優(yōu)化價格
  • php做網(wǎng)站用html做嗎百度網(wǎng)站名稱
  • 注冊網(wǎng)站亂填郵箱廣州搜索seo網(wǎng)站優(yōu)化
  • 企業(yè)網(wǎng)站 帶后臺聊城seo整站優(yōu)化報價
  • 威客做的好的網(wǎng)站蘇州網(wǎng)站建設公司排名
  • 外包建站的公司怎么做seo網(wǎng)站做優(yōu)化
  • 東莞制作公司網(wǎng)站的公司如何提升網(wǎng)站seo排名
  • 福建seo關鍵詞優(yōu)化外包新站seo優(yōu)化快速上排名