中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

國外設(shè)計(jì)網(wǎng)站dooor企業(yè)營銷策劃書模板

國外設(shè)計(jì)網(wǎng)站dooor,企業(yè)營銷策劃書模板,網(wǎng)站LOGO透明底色PNG格式怎么做的,肥鄉(xiāng)網(wǎng)站建設(shè)Paimon Apache Paimon (incubating) 是一項(xiàng)流式數(shù)據(jù)湖存儲技術(shù),可以為用戶提供高吞吐、低延遲的數(shù)據(jù)攝入、流式訂閱以及實(shí)時(shí)查詢能力。Paimon 采用開放的數(shù)據(jù)格式和技術(shù)理念,可以與 ApacheFlink / Spark / Trino 等諸多業(yè)界主流計(jì)算引擎進(jìn)行對接&#xf…

Paimon

Apache Paimon (incubating) 是一項(xiàng)流式數(shù)據(jù)湖存儲技術(shù),可以為用戶提供高吞吐、低延遲的數(shù)據(jù)攝入、流式訂閱以及實(shí)時(shí)查詢能力。Paimon 采用開放的數(shù)據(jù)格式和技術(shù)理念,可以與 ApacheFlink / Spark / Trino 等諸多業(yè)界主流計(jì)算引擎進(jìn)行對接,共同推進(jìn) Streaming Lakehouse 架構(gòu)的普及和發(fā)展。

Paimon x Spark

Apache Spark,作為大數(shù)據(jù)處理的統(tǒng)一計(jì)算分析引擎的,不僅支持多種語言的高級 API 使用,也支持了豐富的大數(shù)據(jù)場景應(yīng)用,包括結(jié)構(gòu)化數(shù)據(jù)處理的Spark SQL、用于機(jī)器學(xué)習(xí)的 MLlib,用于圖形處理的 GraphX,以及用于增量計(jì)算和流處理的Structured Streaming。Spark 已經(jīng)成為了大數(shù)據(jù)領(lǐng)域軟件棧中必不可少的組成部分。作為數(shù)據(jù)湖領(lǐng)域新起的 Paimon,與 Spark 的深度、全面的集成也將為 Paimon在準(zhǔn)實(shí)時(shí)場景、離線湖倉場景提供了便利。

接下來我們介紹一些在 Paimon 新版本中基于 Spark 計(jì)算引擎支持的主要功能。

Schema Evolution

Schema evolution 是一個(gè)數(shù)據(jù)湖領(lǐng)域一個(gè)非常關(guān)鍵的特性,它允許用戶方便的修改表的當(dāng)前 Schema 以適應(yīng)現(xiàn)有數(shù)據(jù),或隨時(shí)間變化的新數(shù)據(jù),同時(shí)保持?jǐn)?shù)據(jù)的完整性和一致性。

在離線場景中,我們可以通過計(jì)算引擎,如 Spark 或者 Flink,提供的 Alter Table 的 SQL 語法來實(shí)現(xiàn)對 Schema 的操作。在某些場景下,我們并非都能實(shí)時(shí)準(zhǔn)確的獲取上游數(shù)據(jù)較當(dāng)前表的 Schema 變化;另外在 Streaming 流式場景中以離線 Alter Table 的方式完成 Schema 的更新需要執(zhí)行1)停止流作業(yè),2)完成 Schema 更新操作,3)重啟流作業(yè)這樣的流程,這是較為低效的。

Paimon 支持了在數(shù)據(jù)寫入的同時(shí),自動完成 Source 數(shù)據(jù)和當(dāng)前表數(shù)據(jù)的 Schema 合并,并將合并后的 Schema 作為表的最新 Schema,僅需要配置參數(shù)? write.merge-schema。

data.write
.format("paimon")
.mode("append")
.option("write.merge-schema", "true")
.save(location)
新增列

比較常見的是,在執(zhí)行數(shù)據(jù)追加或覆蓋操作時(shí)使用,以自動調(diào)整 Schema 以包含一個(gè)或多個(gè)新列。

假設(shè)原表的 Schema 為:

a INT
b STRING

新數(shù)據(jù) data 的 Schema 為:

a INT
b STRING
c LONG
d Map<String, Double>

操作完成后的表的 Schema 變更為:

a INT
b STRING
c LONG
d Map<String, Double>
提升字段類型

Paimon 的 Schema Evolution 也同時(shí)支持?jǐn)?shù)據(jù)類型的提升,如 Int 提升為 Long,Long提升為 Decimal 等;以上述表繼續(xù)寫入數(shù)據(jù),假設(shè)新數(shù)據(jù)的 Schema 為:

a Long
b STRING
c Decimal
d Map<String, Double>

操作完成后的表的 Schema 變更為:

a Long
b STRING
c Decimal
d Map<String, Double>
強(qiáng)制類型轉(zhuǎn)換

如以上示例所示,Paimon 支持?jǐn)?shù)據(jù)字段類型的提升,如數(shù)值型向更高的精度提升(由 Int 提升至 Long,由 Long 提升至 Decimal),同時(shí) Paimon 也支持一些類型之間的強(qiáng)制轉(zhuǎn)換,如 String 強(qiáng)轉(zhuǎn)成 Date 類型或者 Long 轉(zhuǎn)換成 Int,但需要顯式的配置參數(shù) write.merge-schema.explicit-cast。

data.write
.format("paimon")
.mode("append")
.option("write.merge-schema", "true")
.option("write.merge-schema.explicit-cast", "true")
.save(location)

假設(shè)原表的 Schema為:

a LONG
b STRING //內(nèi)容為2023-08-01的格式

新數(shù)據(jù) data 的 Schema 為:

a INT
b DATE

操作完成后的表的 Schema 變更為:

a INT
b DATE

需要注意的是:

數(shù)據(jù)寫入(追加或覆蓋寫)時(shí)的 Schema Evolution 不支持刪除列和重命名列操作的,也不支持不在隱式/顯式轉(zhuǎn)換范圍內(nèi)的數(shù)據(jù)類型提升。當(dāng)具體數(shù)值不能轉(zhuǎn)換成目標(biāo)類型時(shí),為了避免將表數(shù)據(jù)破環(huán),當(dāng)前會報(bào)錯,終止該操作。

Spark Structured Streaming

Spark Structured Streaming 是一個(gè)基于 Spark SQL 引擎構(gòu)建的可擴(kuò)展且容錯的流處理引擎,可以像表達(dá)靜態(tài)數(shù)據(jù)的批量計(jì)算一樣的表達(dá)流計(jì)算。Spark SQL 引擎將負(fù)責(zé)增量且持續(xù)地運(yùn)行它,并隨著流數(shù)據(jù)不斷到達(dá)而更新最終結(jié)果。Structured Streaming 支持流之間的聚合、事件時(shí)間窗口、流批之間 Join 等。Spark 通過 checkpointing 和 write-ahead logs 實(shí)現(xiàn)了端到端的 exactly-once。簡而言之,Structured Streaming 提供快速、可擴(kuò)展、容錯、端到端的一次性流處理,而用戶無需考慮流處理。

Paimon 在 0.5 和 0.6 兩個(gè)版本逐步完善了 Spark Structured Streaming 的讀寫支持,提供了基于 Spark 引擎的流式讀寫能力。

■?Streaming Sink

Spark Structured Streaming 定義了三種輸出模式(https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#basic-concepts),Paimon 僅支持 Append 模式和 Complete 模式。

// `df` is the upstream source data.
val stream = df.writeStream.outputMode("append").option("checkpointLocation", "/path/to/checkpoint").format("paimon").start("/path/to/paimon/sink/table")
■?Streaming Source

結(jié)合 Spark 支持的多種?Trigger 策略?[1]和 Paimon 拓展的一些流式處理的能力,Paimon 可以支持豐富的 Streaming Source 的應(yīng)用場景。

Paimon 提供了多樣了 ScanMode,允許用戶以合適的參數(shù)指定初始狀態(tài)從 Paimon 表讀取的數(shù)據(jù)。

ScanMode

描述

latest

僅讀取后續(xù)持續(xù)寫入的數(shù)據(jù)。

latest-full

讀取當(dāng)前快照的數(shù)據(jù),以及后續(xù)持續(xù)寫入的數(shù)據(jù)。

from-timestamp

讀取參數(shù) scan.timestamp-millis 指定的時(shí)間戳之后持續(xù)寫入的數(shù)據(jù)。

from-snapshot

讀取參數(shù) scan.snapshot-id 指定的版本后續(xù)持續(xù)寫入的數(shù)據(jù)。

from-snapshot-full

讀取參數(shù) scan.snapshot-id 指定的版本快照數(shù)據(jù),以及后續(xù)持續(xù)寫入的數(shù)據(jù)。

default

默認(rèn)等同于 latest-full 模式;如果指定 scan.snapshot-id,等同于 from-snapshot 模式;如果指定 scan.timestamp-millis,等同于 from-timestamp 模式;

Paimon 通過拓展?SupportsAdmissionControl?[2]接口,實(shí)現(xiàn)了 Source 端的流量控制,避免了由于要處理的單個(gè) Batch 的數(shù)據(jù)量過大而引起的流式作業(yè)運(yùn)行失敗的問題。Paimon 目前支持以下ReadLimit?[3]的實(shí)現(xiàn)。

Readlimit 參數(shù)

描述

read.stream.maxFilesPerTrigger

一個(gè) Batch 最多返回的Splits數(shù)

read.stream.maxBytesPerTrigger

一個(gè) Batch 最多返回的byte數(shù)

read.stream.maxRowsPerTrigger

一個(gè) Batch 最多返回的行數(shù)

read.stream.minRowsPerTrigger

一個(gè) Batch 最少返回的行數(shù),和 maxTriggerDelayMs 搭配使用構(gòu)成ReadMinRows?[4]

read.stream.maxTriggerDelayMs

一個(gè) Batch 觸發(fā)的最大延時(shí),和 minRowsPerTrigger 搭配使用構(gòu)成ReadMinRows?[4]

以兩個(gè)示例說明 Paimon Spark Structured Streaming 的用法。

示例一:

普通的流式增量 ETL 場景。

// Paimon source表的Schema為:time Long, stockId INT, avg_price DOUBLE
val query = spark.readStream.format("paimon").option("scan.mode", "latest").load("/path/to/paimon/source/table").selectExpr("CAST(time AS timestamp) AS timestamp", "stockId", "price").withWatermark("timestamp", "10 seconds").groupBy(window($"timestamp", "5 seconds"), col("stockId")).writeStream.format("console").trigger(Trigger.ProcessingTime(180, TimeUnit.SECONDS)).start()

該示例以 3 分鐘的間隔流式讀取 Paimon 后續(xù)的增量數(shù)據(jù),進(jìn)行 ETL 轉(zhuǎn)化后同步到下游。

示例二:

適用于追補(bǔ)數(shù)據(jù)的場景,流式讀取 Paimon 表自某個(gè)指定快照之后的數(shù)據(jù),讀取完成后不再讀取后續(xù)寫入的數(shù)據(jù),同時(shí)限定了每個(gè) Batch 大致的數(shù)據(jù)規(guī)模。

val query = spark.readStream.format("paimon").option("scan.mode", "from-snapshot").option("scan.snapshot-id", 345).option("read.stream.maxBytesPerTrigger", "134217728").load("/path/to/paimon/source/table").writeStream.format("console").trigger(Trigger.AvailableNow()).start()

示例代碼中指定 Trigger.AvailableNow()觸發(fā)器,表示僅讀取流式任務(wù)啟動時(shí)當(dāng)前 Paimon 可用的數(shù)據(jù);使用 from-snapshot 的 ScanMode 標(biāo)識了讀取快照 ID=345 之后寫入的數(shù)據(jù)。在配置 maxBytesPerTrigger 等于 128MB 后,Spark Structured Streaming會將待消費(fèi)的數(shù)據(jù)按照 128MB 的 Splits 大小進(jìn)行 Batch 切分,由多個(gè) Batch 完成當(dāng)前快照數(shù)據(jù)的消費(fèi)。

Spark SQL 拓展

■?Insert Overwrite

Insert Overwrite 是一個(gè)常用的 SQL 語法,用于重寫整張表或者表中指定分區(qū)。該功能在 Paimon 新版本中也得到支持,包括了 static 和 dynamic 兩種模式。

Static Overwrite

覆蓋整張表:無論當(dāng)前表是否是分區(qū)表,通過以下 SQL 可以完成使用新數(shù)據(jù)覆蓋原表數(shù)據(jù)的操作。

在 Spark 環(huán)境下使用 Paimon,請參考這里?[5]

USE paimon;CREATE TABLE T (a INT, b STRING) TBLPROPERTIES('primary-key'='a');INSERT OVERWRITE T VALUES (1, "a"), (2, "b");
----------
1 a
2 b
----------INSERT OVERWRITE T VALUES (1, "a2"), (3, "c");
----------
1 a2
3 c
----------

覆蓋指定的表分區(qū)。

USE paimon;CREATE TABLE T (dt STRING, a INT, b STRING)
TBLPROPERTIES('primary-key'='dt,a')
PARTITIONED BY(dt);INSERT OVERWRITE T VALUES ("2023-10-01", 1, "a"), ("2023-10-02", 2, "b");
----------------
2023-10-01 1 a
2023-10-02 2 b
----------------INSERT OVERWRITE T PARTITION (dt = "2023-10-02") VALUES (2, "b2"), (4, "d");
----------------
2023-10-01 1 a
2023-10-02 2 b2
2023-10-02 d 4
----------------

Dynamic Parititon Overwrite(DPO)

默認(rèn)情況下是在 Static 模式下執(zhí)行 Insert Overwrite 的,用戶需要顯式的指定要覆蓋的分區(qū)信息;我們可以通過參數(shù)啟用 Dynamic 模式來執(zhí)行 Insert Overwrite,這樣Paimon 將自動判斷 source 端數(shù)據(jù)所涉及到的分區(qū)來執(zhí)行覆蓋操作。

Paimon 啟動 DPO 需要啟動 spark session 時(shí)額外指定 paimon 的 extension:

--conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
 
USE paimon;CREATE TABLE T (dt STRING, a INT, b STRING)
TBLPROPERTIES('primary-key'='dt,a')
PARTITIONED BY(dt);INSERT OVERWRITE T VALUES ("2023-10-01", 1, "a"), ("2023-10-02", 2, "b");
----------------
2023-10-01 1 a
2023-10-02 2 b
----------------SET spark.sql.sources.partitionOverwriteMode=DYNAMIC;INSERT OVERWRITE T VALUES ("2023-10-02", 2, "b2"), ("2023-10-02", 4, "d");
----------------
2023-10-01 1 a
2023-10-02 2 b2
2023-10-02 d 4
----------------

在配置 spark.sql.sources.partitionOverwriteMode=DYNAMIC 后,不再需要指定要覆蓋 dt="2023-10-02"的分區(qū),實(shí)現(xiàn)了數(shù)據(jù)的動態(tài)覆蓋。

■?Call procedure

除了由 Spark 框架提供了常用的 SQL 語法(包括 DDL,DML,Query 以及一些表信息查詢)外,Paimon 還需要拓展一些額外的 SQL 語法來提供自定義功能的操作接口,便于用戶對 Paimon 表的管理和探索。Call Procedure 的引入為這種場景的支持提供了框架層面的支持。

procedure 的語法:

CALL procedure_name(table => 'table_identifier', arg1 => '', ...);

目前 Paimon 已經(jīng)實(shí)現(xiàn)了三種 procedure:

Procedure

描述

用法

create_tag

為指定快照創(chuàng)建標(biāo)簽

CALL create_tag(table => 'T', tag => 'test_tag', snapshot => 2)

delete_tag

刪除已創(chuàng)建的標(biāo)簽

CALL delete_tag(table => 'T', tag => 'test_tag')

rollback

回滾表到指定標(biāo)簽或者版本

CALL rollback(table => 'T', version => '2')


場景示例

以下構(gòu)造一個(gè)流式開啟 Schema Evolution 的示例,上游數(shù)據(jù)實(shí)時(shí)同步到 paimon 的 user 表(原表僅有 userId 和 name 兩個(gè)維度),在某時(shí)刻上游數(shù)據(jù)添加了 age 屬性,在無需停止作業(yè)運(yùn)維時(shí)通過開啟 Schema Evolution 自動完成元數(shù)據(jù)的合并和新數(shù)據(jù)的寫入。

d29f7b28eeb0f663120ebf2d21e3b4a4.png

 
// 原表的定義
// CREATE TABLE T (userId INT, name STRING) TBLPROPERTIES ('primary-key'='userId');// -- 假設(shè)原表的流式寫入的數(shù)據(jù)--
// 1 user1
// 2 user2
// -------------------------// 使用MemoryStream模擬上游streaming數(shù)據(jù)
val inputData = MemoryStream[(Int, String, Int)]
val stream = inputData.toDS().toDF("userId", "name", "age").writeStream.option("checkpointLocation", "/path/to/checkpoint").option("write.merge-schema", "true").format("paimon").start("/path/to/user_table")inputData.addData((1, "user1", 30), (3, "user3", 33))
stream.processAllAvailable()// -- 該batch數(shù)據(jù)寫入后的表數(shù)據(jù)--
// 1 user1 30
// 2 user2 null
// 3 user3 33
// ---------------------------

后續(xù)規(guī)劃

Paimon 孵化于 Flink 社區(qū),源于流式數(shù)倉,但其遠(yuǎn)不止于此。Paimon 將在與如 Apache Spark 這樣的其他引擎的深度集成上,以及在如離線湖倉的場景支持上持續(xù)發(fā)力。在接下來的時(shí)間上,社區(qū)在和 Spark 引擎的支持上將逐漸拓展支持更多的 Spark SQL 語法,比如 Update、Merge Into 等;在讀寫性能上也會進(jìn)行深層次優(yōu)化。


參考

[1]?Trigger 策略

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers

[2]?SupportsAdmissionControl

https://spark.apache.org/docs/3.2.1/api/java/org/apache/spark/sql/connector/read/streaming/SupportsAdmissionControl.html

[3]?ReadLimit

https://spark.apache.org/docs/3.2.1/api/java/org/apache/spark/sql/connector/read/streaming/ReadLimit.html

[4]?ReadMinRows

https://spark.apache.org/docs/3.2.1/api/java/org/apache/spark/sql/connector/read/streaming/ReadMinRows.html

[5]?在 Spark 環(huán)境下使用 Paimon

https://paimon.apache.org/docs/master/engines/spark3/#setup


▼ 關(guān)注「Apache Spark 技術(shù)交流社區(qū)」,獲取更多技術(shù)干貨?▼

?f6b637bb03c3661ca0875130168999b7.gif??點(diǎn)擊「閱讀原文」,跳轉(zhuǎn)? Apache Paimon 官網(wǎng)

http://www.risenshineclean.com/news/32775.html

相關(guān)文章:

  • 網(wǎng)站中qq跳轉(zhuǎn)怎么做的推廣公司經(jīng)營范圍
  • 網(wǎng)站文化建設(shè)石家莊百度seo代理
  • 設(shè)計(jì)師論壇seo包年優(yōu)化
  • 做網(wǎng)站運(yùn)營買什么電腦揚(yáng)州seo推廣
  • 銅仁建設(shè)集團(tuán)招聘信息網(wǎng)站seo快速優(yōu)化軟件網(wǎng)站
  • 做網(wǎng)站接項(xiàng)目seo網(wǎng)站是什么意思
  • 個(gè)人作品展示網(wǎng)站模板營銷型網(wǎng)站策劃書
  • 做網(wǎng)站服務(wù)器在哪買微商引流人脈推廣軟件
  • 做一個(gè)網(wǎng)站維護(hù)多少錢快速排名新
  • 山西省住房建設(shè)廳網(wǎng)站房屋建筑定額北京seo關(guān)鍵詞優(yōu)化外包
  • 公司網(wǎng)站怎么做備案東莞疫情最新情況
  • 建站知識互聯(lián)網(wǎng)整合營銷推廣
  • 各大網(wǎng)站圖片電商營銷策劃方案范文
  • 什么是網(wǎng)站評價(jià)上海seo推廣服務(wù)
  • 深圳58同城網(wǎng)站建設(shè)百度廣告代理商加盟
  • 最新經(jīng)濟(jì)新聞頭條新聞廈門seo怎么做
  • 汕頭市道路建設(shè)網(wǎng)站免費(fèi)seo優(yōu)化工具
  • 網(wǎng)站加速打開百度一下搜索一下
  • 博爾塔拉州大型網(wǎng)站建設(shè)百度知道在線問答
  • 做網(wǎng)站常用哪種語言全網(wǎng)關(guān)鍵詞優(yōu)化公司哪家好
  • 語言互動網(wǎng)站建設(shè)輿情系統(tǒng)
  • wordpress如何加友鏈網(wǎng)站排名seo培訓(xùn)
  • 掃描做電子版網(wǎng)站百度地圖收錄提交入口
  • 濟(jì)南行業(yè)網(wǎng)站開發(fā)東莞網(wǎng)站建設(shè)公司排名
  • 做鏈接哪個(gè)網(wǎng)站好專業(yè)營銷推廣團(tuán)隊(duì)
  • 阿里云做網(wǎng)站經(jīng)費(fèi)免費(fèi)網(wǎng)站建設(shè)
  • 遂寧市網(wǎng)站建設(shè)最近發(fā)生的新聞
  • 信譽(yù)好的o2o網(wǎng)站建設(shè)關(guān)鍵詞網(wǎng)絡(luò)推廣企業(yè)
  • 家庭室內(nèi)裝修設(shè)計(jì)公司杭州seo網(wǎng)
  • 建站之星設(shè)計(jì)師站優(yōu)云seo優(yōu)化