國外設(shè)計(jì)網(wǎng)站dooor企業(yè)營銷策劃書模板
Paimon
Apache Paimon (incubating) 是一項(xiàng)流式數(shù)據(jù)湖存儲技術(shù),可以為用戶提供高吞吐、低延遲的數(shù)據(jù)攝入、流式訂閱以及實(shí)時(shí)查詢能力。Paimon 采用開放的數(shù)據(jù)格式和技術(shù)理念,可以與 ApacheFlink / Spark / Trino 等諸多業(yè)界主流計(jì)算引擎進(jìn)行對接,共同推進(jìn) Streaming Lakehouse 架構(gòu)的普及和發(fā)展。
Paimon x Spark
Apache Spark,作為大數(shù)據(jù)處理的統(tǒng)一計(jì)算分析引擎的,不僅支持多種語言的高級 API 使用,也支持了豐富的大數(shù)據(jù)場景應(yīng)用,包括結(jié)構(gòu)化數(shù)據(jù)處理的Spark SQL、用于機(jī)器學(xué)習(xí)的 MLlib,用于圖形處理的 GraphX,以及用于增量計(jì)算和流處理的Structured Streaming。Spark 已經(jīng)成為了大數(shù)據(jù)領(lǐng)域軟件棧中必不可少的組成部分。作為數(shù)據(jù)湖領(lǐng)域新起的 Paimon,與 Spark 的深度、全面的集成也將為 Paimon在準(zhǔn)實(shí)時(shí)場景、離線湖倉場景提供了便利。
接下來我們介紹一些在 Paimon 新版本中基于 Spark 計(jì)算引擎支持的主要功能。
Schema Evolution
Schema evolution 是一個(gè)數(shù)據(jù)湖領(lǐng)域一個(gè)非常關(guān)鍵的特性,它允許用戶方便的修改表的當(dāng)前 Schema 以適應(yīng)現(xiàn)有數(shù)據(jù),或隨時(shí)間變化的新數(shù)據(jù),同時(shí)保持?jǐn)?shù)據(jù)的完整性和一致性。
在離線場景中,我們可以通過計(jì)算引擎,如 Spark 或者 Flink,提供的 Alter Table 的 SQL 語法來實(shí)現(xiàn)對 Schema 的操作。在某些場景下,我們并非都能實(shí)時(shí)準(zhǔn)確的獲取上游數(shù)據(jù)較當(dāng)前表的 Schema 變化;另外在 Streaming 流式場景中以離線 Alter Table 的方式完成 Schema 的更新需要執(zhí)行1)停止流作業(yè),2)完成 Schema 更新操作,3)重啟流作業(yè)這樣的流程,這是較為低效的。
Paimon 支持了在數(shù)據(jù)寫入的同時(shí),自動完成 Source 數(shù)據(jù)和當(dāng)前表數(shù)據(jù)的 Schema 合并,并將合并后的 Schema 作為表的最新 Schema,僅需要配置參數(shù)? write.merge-schema。
data.write
.format("paimon")
.mode("append")
.option("write.merge-schema", "true")
.save(location)
新增列
比較常見的是,在執(zhí)行數(shù)據(jù)追加或覆蓋操作時(shí)使用,以自動調(diào)整 Schema 以包含一個(gè)或多個(gè)新列。
假設(shè)原表的 Schema 為:
a INT
b STRING
新數(shù)據(jù) data 的 Schema 為:
a INT
b STRING
c LONG
d Map<String, Double>
操作完成后的表的 Schema 變更為:
a INT
b STRING
c LONG
d Map<String, Double>
提升字段類型
Paimon 的 Schema Evolution 也同時(shí)支持?jǐn)?shù)據(jù)類型的提升,如 Int 提升為 Long,Long提升為 Decimal 等;以上述表繼續(xù)寫入數(shù)據(jù),假設(shè)新數(shù)據(jù)的 Schema 為:
a Long
b STRING
c Decimal
d Map<String, Double>
操作完成后的表的 Schema 變更為:
a Long
b STRING
c Decimal
d Map<String, Double>
強(qiáng)制類型轉(zhuǎn)換
如以上示例所示,Paimon 支持?jǐn)?shù)據(jù)字段類型的提升,如數(shù)值型向更高的精度提升(由 Int 提升至 Long,由 Long 提升至 Decimal),同時(shí) Paimon 也支持一些類型之間的強(qiáng)制轉(zhuǎn)換,如 String 強(qiáng)轉(zhuǎn)成 Date 類型或者 Long 轉(zhuǎn)換成 Int,但需要顯式的配置參數(shù) write.merge-schema.explicit-cast。
data.write
.format("paimon")
.mode("append")
.option("write.merge-schema", "true")
.option("write.merge-schema.explicit-cast", "true")
.save(location)
假設(shè)原表的 Schema為:
a LONG
b STRING //內(nèi)容為2023-08-01的格式
新數(shù)據(jù) data 的 Schema 為:
a INT
b DATE
操作完成后的表的 Schema 變更為:
a INT
b DATE
需要注意的是:
數(shù)據(jù)寫入(追加或覆蓋寫)時(shí)的 Schema Evolution 不支持刪除列和重命名列操作的,也不支持不在隱式/顯式轉(zhuǎn)換范圍內(nèi)的數(shù)據(jù)類型提升。當(dāng)具體數(shù)值不能轉(zhuǎn)換成目標(biāo)類型時(shí),為了避免將表數(shù)據(jù)破環(huán),當(dāng)前會報(bào)錯,終止該操作。
Spark Structured Streaming
Spark Structured Streaming 是一個(gè)基于 Spark SQL 引擎構(gòu)建的可擴(kuò)展且容錯的流處理引擎,可以像表達(dá)靜態(tài)數(shù)據(jù)的批量計(jì)算一樣的表達(dá)流計(jì)算。Spark SQL 引擎將負(fù)責(zé)增量且持續(xù)地運(yùn)行它,并隨著流數(shù)據(jù)不斷到達(dá)而更新最終結(jié)果。Structured Streaming 支持流之間的聚合、事件時(shí)間窗口、流批之間 Join 等。Spark 通過 checkpointing 和 write-ahead logs 實(shí)現(xiàn)了端到端的 exactly-once。簡而言之,Structured Streaming 提供快速、可擴(kuò)展、容錯、端到端的一次性流處理,而用戶無需考慮流處理。
Paimon 在 0.5 和 0.6 兩個(gè)版本逐步完善了 Spark Structured Streaming 的讀寫支持,提供了基于 Spark 引擎的流式讀寫能力。
■?Streaming Sink
Spark Structured Streaming 定義了三種輸出模式(https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#basic-concepts),Paimon 僅支持 Append 模式和 Complete 模式。
// `df` is the upstream source data.
val stream = df.writeStream.outputMode("append").option("checkpointLocation", "/path/to/checkpoint").format("paimon").start("/path/to/paimon/sink/table")
■?Streaming Source
結(jié)合 Spark 支持的多種?Trigger 策略?[1]和 Paimon 拓展的一些流式處理的能力,Paimon 可以支持豐富的 Streaming Source 的應(yīng)用場景。
Paimon 提供了多樣了 ScanMode,允許用戶以合適的參數(shù)指定初始狀態(tài)從 Paimon 表讀取的數(shù)據(jù)。
ScanMode | 描述 |
latest | 僅讀取后續(xù)持續(xù)寫入的數(shù)據(jù)。 |
latest-full | 讀取當(dāng)前快照的數(shù)據(jù),以及后續(xù)持續(xù)寫入的數(shù)據(jù)。 |
from-timestamp | 讀取參數(shù) scan.timestamp-millis 指定的時(shí)間戳之后持續(xù)寫入的數(shù)據(jù)。 |
from-snapshot | 讀取參數(shù) scan.snapshot-id 指定的版本后續(xù)持續(xù)寫入的數(shù)據(jù)。 |
from-snapshot-full | 讀取參數(shù) scan.snapshot-id 指定的版本快照數(shù)據(jù),以及后續(xù)持續(xù)寫入的數(shù)據(jù)。 |
default | 默認(rèn)等同于 latest-full 模式;如果指定 scan.snapshot-id,等同于 from-snapshot 模式;如果指定 scan.timestamp-millis,等同于 from-timestamp 模式; |
Paimon 通過拓展?SupportsAdmissionControl?[2]接口,實(shí)現(xiàn)了 Source 端的流量控制,避免了由于要處理的單個(gè) Batch 的數(shù)據(jù)量過大而引起的流式作業(yè)運(yùn)行失敗的問題。Paimon 目前支持以下ReadLimit?[3]的實(shí)現(xiàn)。
Readlimit 參數(shù) | 描述 |
read.stream.maxFilesPerTrigger | 一個(gè) Batch 最多返回的Splits數(shù) |
read.stream.maxBytesPerTrigger | 一個(gè) Batch 最多返回的byte數(shù) |
read.stream.maxRowsPerTrigger | 一個(gè) Batch 最多返回的行數(shù) |
read.stream.minRowsPerTrigger | 一個(gè) Batch 最少返回的行數(shù),和 maxTriggerDelayMs 搭配使用構(gòu)成ReadMinRows?[4] |
read.stream.maxTriggerDelayMs | 一個(gè) Batch 觸發(fā)的最大延時(shí),和 minRowsPerTrigger 搭配使用構(gòu)成ReadMinRows?[4] |
以兩個(gè)示例說明 Paimon Spark Structured Streaming 的用法。
示例一:
普通的流式增量 ETL 場景。
// Paimon source表的Schema為:time Long, stockId INT, avg_price DOUBLE
val query = spark.readStream.format("paimon").option("scan.mode", "latest").load("/path/to/paimon/source/table").selectExpr("CAST(time AS timestamp) AS timestamp", "stockId", "price").withWatermark("timestamp", "10 seconds").groupBy(window($"timestamp", "5 seconds"), col("stockId")).writeStream.format("console").trigger(Trigger.ProcessingTime(180, TimeUnit.SECONDS)).start()
該示例以 3 分鐘的間隔流式讀取 Paimon 后續(xù)的增量數(shù)據(jù),進(jìn)行 ETL 轉(zhuǎn)化后同步到下游。
示例二:
適用于追補(bǔ)數(shù)據(jù)的場景,流式讀取 Paimon 表自某個(gè)指定快照之后的數(shù)據(jù),讀取完成后不再讀取后續(xù)寫入的數(shù)據(jù),同時(shí)限定了每個(gè) Batch 大致的數(shù)據(jù)規(guī)模。
val query = spark.readStream.format("paimon").option("scan.mode", "from-snapshot").option("scan.snapshot-id", 345).option("read.stream.maxBytesPerTrigger", "134217728").load("/path/to/paimon/source/table").writeStream.format("console").trigger(Trigger.AvailableNow()).start()
示例代碼中指定 Trigger.AvailableNow()觸發(fā)器,表示僅讀取流式任務(wù)啟動時(shí)當(dāng)前 Paimon 可用的數(shù)據(jù);使用 from-snapshot 的 ScanMode 標(biāo)識了讀取快照 ID=345 之后寫入的數(shù)據(jù)。在配置 maxBytesPerTrigger 等于 128MB 后,Spark Structured Streaming會將待消費(fèi)的數(shù)據(jù)按照 128MB 的 Splits 大小進(jìn)行 Batch 切分,由多個(gè) Batch 完成當(dāng)前快照數(shù)據(jù)的消費(fèi)。
Spark SQL 拓展
■?Insert Overwrite
Insert Overwrite 是一個(gè)常用的 SQL 語法,用于重寫整張表或者表中指定分區(qū)。該功能在 Paimon 新版本中也得到支持,包括了 static 和 dynamic 兩種模式。
Static Overwrite
覆蓋整張表:無論當(dāng)前表是否是分區(qū)表,通過以下 SQL 可以完成使用新數(shù)據(jù)覆蓋原表數(shù)據(jù)的操作。
在 Spark 環(huán)境下使用 Paimon,請參考這里?[5]。
USE paimon;CREATE TABLE T (a INT, b STRING) TBLPROPERTIES('primary-key'='a');INSERT OVERWRITE T VALUES (1, "a"), (2, "b");
----------
1 a
2 b
----------INSERT OVERWRITE T VALUES (1, "a2"), (3, "c");
----------
1 a2
3 c
----------
覆蓋指定的表分區(qū)。
USE paimon;CREATE TABLE T (dt STRING, a INT, b STRING)
TBLPROPERTIES('primary-key'='dt,a')
PARTITIONED BY(dt);INSERT OVERWRITE T VALUES ("2023-10-01", 1, "a"), ("2023-10-02", 2, "b");
----------------
2023-10-01 1 a
2023-10-02 2 b
----------------INSERT OVERWRITE T PARTITION (dt = "2023-10-02") VALUES (2, "b2"), (4, "d");
----------------
2023-10-01 1 a
2023-10-02 2 b2
2023-10-02 d 4
----------------
Dynamic Parititon Overwrite(DPO)
默認(rèn)情況下是在 Static 模式下執(zhí)行 Insert Overwrite 的,用戶需要顯式的指定要覆蓋的分區(qū)信息;我們可以通過參數(shù)啟用 Dynamic 模式來執(zhí)行 Insert Overwrite,這樣Paimon 將自動判斷 source 端數(shù)據(jù)所涉及到的分區(qū)來執(zhí)行覆蓋操作。
Paimon 啟動 DPO 需要啟動 spark session 時(shí)額外指定 paimon 的 extension:
--conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
USE paimon;CREATE TABLE T (dt STRING, a INT, b STRING)
TBLPROPERTIES('primary-key'='dt,a')
PARTITIONED BY(dt);INSERT OVERWRITE T VALUES ("2023-10-01", 1, "a"), ("2023-10-02", 2, "b");
----------------
2023-10-01 1 a
2023-10-02 2 b
----------------SET spark.sql.sources.partitionOverwriteMode=DYNAMIC;INSERT OVERWRITE T VALUES ("2023-10-02", 2, "b2"), ("2023-10-02", 4, "d");
----------------
2023-10-01 1 a
2023-10-02 2 b2
2023-10-02 d 4
----------------
在配置 spark.sql.sources.partitionOverwriteMode=DYNAMIC 后,不再需要指定要覆蓋 dt="2023-10-02"的分區(qū),實(shí)現(xiàn)了數(shù)據(jù)的動態(tài)覆蓋。
■?Call procedure
除了由 Spark 框架提供了常用的 SQL 語法(包括 DDL,DML,Query 以及一些表信息查詢)外,Paimon 還需要拓展一些額外的 SQL 語法來提供自定義功能的操作接口,便于用戶對 Paimon 表的管理和探索。Call Procedure 的引入為這種場景的支持提供了框架層面的支持。
procedure 的語法:
CALL procedure_name(table => 'table_identifier', arg1 => '', ...);
目前 Paimon 已經(jīng)實(shí)現(xiàn)了三種 procedure:
Procedure | 描述 | 用法 |
create_tag | 為指定快照創(chuàng)建標(biāo)簽 | CALL create_tag(table => 'T', tag => 'test_tag', snapshot => 2) |
delete_tag | 刪除已創(chuàng)建的標(biāo)簽 | CALL delete_tag(table => 'T', tag => 'test_tag') |
rollback | 回滾表到指定標(biāo)簽或者版本 | CALL rollback(table => 'T', version => '2') |
場景示例
以下構(gòu)造一個(gè)流式開啟 Schema Evolution 的示例,上游數(shù)據(jù)實(shí)時(shí)同步到 paimon 的 user 表(原表僅有 userId 和 name 兩個(gè)維度),在某時(shí)刻上游數(shù)據(jù)添加了 age 屬性,在無需停止作業(yè)運(yùn)維時(shí)通過開啟 Schema Evolution 自動完成元數(shù)據(jù)的合并和新數(shù)據(jù)的寫入。
// 原表的定義
// CREATE TABLE T (userId INT, name STRING) TBLPROPERTIES ('primary-key'='userId');// -- 假設(shè)原表的流式寫入的數(shù)據(jù)--
// 1 user1
// 2 user2
// -------------------------// 使用MemoryStream模擬上游streaming數(shù)據(jù)
val inputData = MemoryStream[(Int, String, Int)]
val stream = inputData.toDS().toDF("userId", "name", "age").writeStream.option("checkpointLocation", "/path/to/checkpoint").option("write.merge-schema", "true").format("paimon").start("/path/to/user_table")inputData.addData((1, "user1", 30), (3, "user3", 33))
stream.processAllAvailable()// -- 該batch數(shù)據(jù)寫入后的表數(shù)據(jù)--
// 1 user1 30
// 2 user2 null
// 3 user3 33
// ---------------------------
后續(xù)規(guī)劃
Paimon 孵化于 Flink 社區(qū),源于流式數(shù)倉,但其遠(yuǎn)不止于此。Paimon 將在與如 Apache Spark 這樣的其他引擎的深度集成上,以及在如離線湖倉的場景支持上持續(xù)發(fā)力。在接下來的時(shí)間上,社區(qū)在和 Spark 引擎的支持上將逐漸拓展支持更多的 Spark SQL 語法,比如 Update、Merge Into 等;在讀寫性能上也會進(jìn)行深層次優(yōu)化。
參考
[1]?Trigger 策略:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers
[2]?SupportsAdmissionControl:
https://spark.apache.org/docs/3.2.1/api/java/org/apache/spark/sql/connector/read/streaming/SupportsAdmissionControl.html
[3]?ReadLimit:
https://spark.apache.org/docs/3.2.1/api/java/org/apache/spark/sql/connector/read/streaming/ReadLimit.html
[4]?ReadMinRows:
https://spark.apache.org/docs/3.2.1/api/java/org/apache/spark/sql/connector/read/streaming/ReadMinRows.html
[5]?在 Spark 環(huán)境下使用 Paimon:
https://paimon.apache.org/docs/master/engines/spark3/#setup
▼ 關(guān)注「Apache Spark 技術(shù)交流社區(qū)」,獲取更多技術(shù)干貨?▼
???點(diǎn)擊「閱讀原文」,跳轉(zhuǎn)? Apache Paimon 官網(wǎng)