化妝品產(chǎn)品的自建網(wǎng)站喲哪些申請自己的網(wǎng)站
文章目錄
- 1. 引言
- 2. 基本概念
- 2.1 定義
- 2.2 使用場景
- 3. 流式處理
- 3.1 自動小文件合并
- 3.2 流式查詢
- 4. 數(shù)據(jù)更新
- 4.1 查詢
- 4.2 更新
- 4.3 分桶附加表
- 5 總結(jié)
1. 引言
通過本文,上篇我們了解了Apache Paimon
主鍵表,本期我們將繼續(xù)學(xué)習(xí)附加表(Append Only Table
) 我們將帶領(lǐng)讀者《 《Apache Paimon Docs - Table w/o PK》》 繼續(xù)剖析 Paimon 的僅追加表相關(guān)知識。
通過本文你將了解到:
- Paimon 附加表相關(guān)的基本概念,了解什么是附加表,它在Paimon中扮演什么角色,以及它如何與主鍵表區(qū)分開來。
- 及其適用場景,探索附加表在實際應(yīng)用中的多樣化場景。
- 數(shù)據(jù)查詢更新方式,從高效的數(shù)據(jù)合并策略到靈活的流式查詢配置,以及如何通過索引和文件索引優(yōu)化查詢性能。
2. 基本概念
2.1 定義
如果一個表沒有定義主鍵,那它就是一個附加表(Append Table
)。與主鍵表相比,附加表無法直接接收變更日志,也不能直接通過 upsert 更新數(shù)據(jù),只能接收附加數(shù)據(jù)。
CREATE TABLE my_table (product_id BIGINT,price DOUBLE,sales BIGINT
) WITH (-- 'target-file-size' = '256 MB',-- 'file.format' = 'parquet',-- 'file.compression' = 'zstd',-- 'file.compression.zstd-level' = '3'
);
2.2 使用場景
使用場景或優(yōu)勢 | 說明 |
---|---|
批量寫入和批量讀取 | 類似于常規(guī)的 Hive 分區(qū)表,適用于大規(guī)模數(shù)據(jù)的批量處理。 |
友好的對象存儲 | 良好的兼容性和適應(yīng)性,支持 S3、OSS 等對象存儲。 |
時間穿越和回滾 | 支持?jǐn)?shù)據(jù)的時間旅行和回滾功能,方便數(shù)據(jù)的歷史查詢和恢復(fù)。 |
低成本的刪除和更新 | 在批量數(shù)據(jù)操作中,能夠以較低的計算和資源成本進(jìn)行刪除和更新操作。 |
流式接收中的小文件自動合并 | 在流式寫入過程中,自動處理小文件合并,減少存儲碎片。 |
隊列形式的流式讀寫 | 支持如隊列般的流式讀寫操作,可以像消息隊列一樣處理數(shù)據(jù)。 |
高性能查詢 | 通過順序和索引實現(xiàn)的高效查詢性能。 |
批量寫入和讀取
CREATE TABLE my_table (product_id BIGINT,price DOUBLE,sales BIGINT
) WITH ('target-file-size' = '256 MB', -- 設(shè)置目標(biāo)文件大小'file.format' = 'parquet', -- 文件格式為 Parquet'file.compression' = 'zstd', -- 使用 ZSTD 壓縮'file.compression.zstd-level' = '3' -- 設(shè)置 ZSTD 壓縮級別為 3
);
流式接收和小文件合并
CREATE TABLE my_stream_table (event_id BIGINT,event_time TIMESTAMP,event_data STRING
) WITH ('target-file-size' = '128 MB', -- 設(shè)置目標(biāo)文件大小'file.format' = 'avro', -- 文件格式為 Avro'file.compression' = 'snappy', -- 使用 Snappy 壓縮'streaming.min-batch-interval' = '5 min' -- 設(shè)置流處理最小批處理時間間隔為 5 分鐘
);
具有以下的優(yōu)點:
功能特性 | 技術(shù)優(yōu)勢 | 實現(xiàn) |
---|---|---|
對象存儲友好 | 良好的兼容性和適應(yīng)性,支持 S3、OSS 等對象存儲。 | 通過接入主流對象存儲服務(wù),優(yōu)化讀寫性能和兼容性,特別是大規(guī)模數(shù)據(jù)存儲和處理場景下。 |
時間穿越和回滾 | 支持?jǐn)?shù)據(jù)的時間旅行和回滾功能,方便數(shù)據(jù)的歷史查詢和恢復(fù)。 | 利用快照和元數(shù)據(jù)管理,實現(xiàn)任意時間點的數(shù)據(jù)查詢和回滾能力。 |
低成本的刪除和更新 | 在批量數(shù)據(jù)操作中,能夠以較低的計算和資源成本進(jìn)行刪除和更新操作。 | 通過高效的數(shù)據(jù)合并和變更處理機(jī)制,優(yōu)化批量操作中的資源消耗。 |
小文件合并 | 在流式寫入過程中,自動處理小文件合并,減少存儲碎片。 | 在流式寫入過程中,使用異步任務(wù)定期合并小文件,確保合理的文件大小和存儲效率。 |
高性能查詢 | 通過順序和索引實現(xiàn)的高效查詢性能。 | 通過索引構(gòu)建和數(shù)據(jù)排序,提升查詢的響應(yīng)速度和資源利用效率。 |
3. 流式處理
附加表(Append Table)可以通過 Flink 進(jìn)行非常靈活的流式寫入,并可以像隊列一樣通過 Flink 進(jìn)行讀取。唯一的區(qū)別是其延遲為分鐘級別,但其優(yōu)勢在于非常低的成本以及能夠進(jìn)行過濾和投影下推。
3.1 自動小文件合并
在流式寫入作業(yè)中,如果沒有定義分桶(bucket
),寫入器不會進(jìn)行壓縮;相反,將使用壓縮協(xié)調(diào)器(Compact Coordinator
)掃描小文件并將壓縮任務(wù)傳遞給壓縮工作者(Compact Worker
)。流式模式下,如果在 Flink 中運(yùn)行插入 SQL,拓?fù)浣Y(jié)構(gòu)將如下所示:
Source -> Transformations -> Sink-> Compact Coordinator -> Compact Worker
- 無反壓:壓縮任務(wù)不會引起反壓。
- 寫入模式:如果設(shè)置
write-only
為true
,壓縮協(xié)調(diào)器和壓縮工作者將在拓?fù)渲斜灰瞥?/li> - Flink 流模式:自動壓縮僅在 Flink 引擎的流模式下被支持??梢酝ㄟ^ Paimon 在 Flink 中啟動壓縮作業(yè),并通過設(shè)置
write-only
禁用所有其他壓縮。
3.2 流式查詢
附加表可以像消息隊列一樣使用,進(jìn)行流式查詢,與主鍵表類似,有兩個選項可以進(jìn)行流式讀取:
- 默認(rèn)模式:流式讀取在首次啟動時生成表的最新快照,并繼續(xù)讀取最新的增量記錄。
- 增量模式:可以指定
scan.mode
或scan.snapshot-id
或scan.timestamp-millis
或scan.file-creation-time-millis
進(jìn)行增量讀取。
類似 Flink-Kafka,默認(rèn)情況下不保證順序。如果數(shù)據(jù)需要某種順序,也需要考慮定義桶鍵(bucket-key),請參考分桶附加(Bucketed Append)部分。
流式寫入和自動小文件合并
CREATE TABLE my_stream_table (event_id BIGINT,event_time TIMESTAMP,event_data STRING
) WITH ('target-file-size' = '128 MB', -- 設(shè)置目標(biāo)文件大小'file.format' = 'avro', -- 文件格式為 Avro'file.compression' = 'snappy', -- 使用 Snappy 壓縮'streaming.min-batch-interval' = '5 min' -- 設(shè)置流處理最小批處理時間間隔為 5 分鐘
);
在流式寫入過程中,配置 Compact Coordinator
和 Compact Worker
以確保小文件自動合并。
流式查詢配置(默認(rèn)模式)
SET 'scan.startup.mode' = 'latest-offset'; -- 設(shè)置流式讀取從最新的快照開始
流式查詢配置(增量模式)
SET 'scan.mode' = 'incremental'; -- 設(shè)置流式讀取為增量模式
SET 'scan.snapshot-id' = '1234567890'; -- 可選:指定從特定快照 ID 開始
SET 'scan.timestamp-millis' = '1627849923000'; -- 可選:指定從特定時間戳(毫秒)開始
流式查詢配置(帶順序要求)
CREATE TABLE ordered_stream_table (event_id BIGINT,event_time TIMESTAMP,event_data STRING
) WITH ('target-file-size' = '128 MB','file.format' = 'parquet','file.compression' = 'zstd','streaming.min-batch-interval' = '5 min','bucket-key' = 'event_time' -- 設(shè)置桶鍵(bucket-key)以確保數(shù)據(jù)按照時間順序
);
技術(shù)優(yōu)勢及其實現(xiàn):
功能特性 | 技術(shù)優(yōu)勢 | 實現(xiàn) |
---|---|---|
流式寫入 | 通過靈活的配置選項,實現(xiàn)分鐘級別低延遲的流式寫入,并支持過濾和投影下推,提升查詢效率。 | 通過靈活的配置選項,優(yōu)化數(shù)據(jù)流的寫入路徑,減少延遲,并通過下推操作提升查詢效率。 |
自動小文件合并 | 在流式處理過程中,動態(tài)管理文件大小,減少存儲碎片,提高存儲效率。 | 使用動態(tài)文件管理策略,自動合并小文件,以優(yōu)化存儲空間和提高I/O效率。 |
流式讀取 | 支持從最新快照讀取或增量讀取,類似消息隊列的使用,方便實時數(shù)據(jù)處理和分析。 | 提供快照和增量讀取功能,使得流式讀取更加靈活,適用于實時數(shù)據(jù)處理場景。 |
順序保證 | 通過配置桶鍵,可以確保數(shù)據(jù)在需要順序的情境下有序讀取和寫入,滿足業(yè)務(wù)需求。 | 通過桶鍵配置,實現(xiàn)數(shù)據(jù)的有序存儲和檢索,保證業(yè)務(wù)邏輯的順序性。 |
4. 數(shù)據(jù)更新
4.1 查詢
按順序跳過數(shù)據(jù)
Paimon 默認(rèn)在清單文件中記錄每個字段的最大值和最小值。在查詢時,根據(jù)查詢的 WHERE 條件,通過清單中的統(tǒng)計信息進(jìn)行文件過濾。如果過濾效果良好,查詢時間可以從分鐘級別加速到毫秒級別。
然而,數(shù)據(jù)分布并不總是能有效過濾,因此如果可以根據(jù) WHERE 條件中的字段對數(shù)據(jù)進(jìn)行排序,將會更高效??梢詤⒖?Flink 的 COMPACT Action 或 COMPACT Procedure,以及 Spark 的 COMPACT Procedure。
-- 對數(shù)據(jù)進(jìn)行排序以優(yōu)化按順序跳過數(shù)據(jù)的查詢性能
ALTER TABLE my_table COMPACT BY (field_name);
按文件索引跳過數(shù)據(jù)
還可以使用文件索引,它將在讀取端通過索引過濾文件。
CREATE TABLE my_table (product_id BIGINT,price DOUBLE,sales BIGINT
) WITH ('file-index.bloom-filter.columns' = 'product_id','file-index.bloom-filter.product_id.items' = '200'
);
定義 file-index.bloom-filter.columns
后,Paimon 將為每個文件創(chuàng)建相應(yīng)的索引文件。如果索引文件太小,它將直接存儲在清單中,否則將存儲在數(shù)據(jù)文件的目錄中。每個數(shù)據(jù)文件對應(yīng)一個索引文件,該文件有獨(dú)立的定義,可以包含不同類型的多列索引。
文件索引的應(yīng)用場景
不同文件索引在不同場景下效率不同。例如:
- 布隆過濾器(Bloom Filter):在點查找場景中可能加速查詢。
- 位圖(Bitmap):可能消耗更多空間,但精度更高。
目前,文件索引僅支持附加表(Append-Only Table)。
布隆過濾器的配置:
file-index.bloom-filter.columns
:指定需要布隆過濾器索引的列。file-index.bloom-filter.<column_name>.fpp
:配置錯誤正率(False Positive Probability)。file-index.bloom-filter.<column_name>.items
:配置一個數(shù)據(jù)文件中預(yù)期的不同項目數(shù)量。
位圖的配置:
file-index.bitmap.columns
:指定需要位圖索引的列。
添加文件索引到現(xiàn)有表
如果你想在不重寫的情況下添加文件索引,可以使用 rewrite_file_index
過程。在使用該過程之前,你應(yīng)該在目標(biāo)表中配置適當(dāng)?shù)呐渲?。可以使?ALTER
子句來配置 file-index.<filter-type>.columns
。
使用示例:添加文件索引到現(xiàn)有表
ALTER TABLE my_table
SET ('file-index.bloom-filter.columns' = 'product_id');CALL rewrite_file_index('my_table');
4.2 更新
目前,僅 Spark SQL 支持 DELETE 和 UPDATE 操作,可以參考 Spark Write 的相關(guān)文檔。
DELETE FROM my_table
WHERE currency = 'UNKNOWN';
更新模式
附加表(Append Table)有兩種更新模式:
-
COW(Copy on Write):
- 機(jī)制:搜索命中的文件,然后重新寫入每個文件以移除需要刪除的數(shù)據(jù)。
- 成本:這種操作成本高,因為每次刪除或更新都需要重新寫入整個文件。
-
MOW(Merge on Write):
- 機(jī)制:通過指定
'deletion-vectors.enabled' = 'true'
,啟用刪除向量模式(Deletion Vectors)。只標(biāo)記對應(yīng)文件的某些記錄為刪除,并寫入刪除文件,而不需要重新寫入整個文件。 - 優(yōu)勢:相比 COW 模式,MOW 模式的刪除和更新成本更低,因為只需寫入小的刪除文件,而不需要重寫全部數(shù)據(jù)文件。
- 機(jī)制:通過指定
在創(chuàng)建或更新表時,可以啟用刪除向量模式:
CREATE TABLE my_table (product_id BIGINT,price DOUBLE,sales BIGINT
) WITH ('deletion-vectors.enabled' = 'true'
);
或在現(xiàn)有表上啟用刪除向量:
ALTER TABLE my_table
SET ('deletion-vectors.enabled' = 'true');
MOW 模式下的 DELETE 操作
DELETE FROM my_table
WHERE currency = 'UNKNOWN';
此操作將標(biāo)記 currency 為 ‘UNKNOWN’ 的記錄為刪除,而不重寫整個文件。
通過 Spark SQL 進(jìn)行更新操作
val spark = SparkSession.builder().appName("UpdateExample").getOrCreate()// 啟用刪除向量
spark.sql("ALTER TABLE my_table SET ('deletion-vectors.enabled' = 'true')")// 執(zhí)行 DELETE 操作
spark.sql("DELETE FROM my_table WHERE currency = 'UNKNOWN'")// 執(zhí)行 UPDATE 操作
spark.sql("UPDATE my_table SET price = price * 1.1 WHERE product_id = 1001")
4.3 分桶附加表
您可以定義 bucket
和 bucket-key
以創(chuàng)建一個分桶附加表。在這種表中,不同桶內(nèi)的數(shù)據(jù)是嚴(yán)格有序的,流式讀取將按寫入順序準(zhǔn)確地傳輸記錄。這樣可以優(yōu)化數(shù)據(jù)處理和查詢性能。
--創(chuàng)建分桶附加表
CREATE TABLE my_table (product_id BIGINT,price DOUBLE,sales BIGINT
) WITH ('bucket' = '8','bucket-key' = 'product_id'
);
一個普通的附加表的流式寫讀取沒有嚴(yán)格的順序保證,但是有些情況下需要定義一個類似于 Kafka 的鍵。
- 每個分桶中的記錄都是嚴(yán)格有序的:流式讀取將按寫入順序準(zhǔn)確地傳輸記錄。無需配置特殊的設(shè)置,所有數(shù)據(jù)將按隊列形式進(jìn)入一個桶內(nèi)。
分桶中的壓縮(Compaction in Bucket)
默認(rèn)情況下,Sink 節(jié)點會自動執(zhí)行壓縮以控制文件數(shù)量。以下選項控制壓縮策略:
Key | Default | Type | Description |
---|---|---|---|
write-only | false | Boolean | 如果設(shè)置為 true,將跳過壓縮和快照過期操作。此選項與專用壓縮作業(yè)一起使用。 |
compaction.min.file-num | 5 | Integer | 對文件集 [f_0,…,f_N],滿足 sum(size(f_i)) >= targetFileSize 的最小文件數(shù)量以觸發(fā)附加表的壓縮。避免壓縮幾乎滿的文件,因為這不合算。 |
compaction.max.file-num | 5 | Integer | 對文件集 [f_0,…,f_N],即使 sum(size(f_i)) < targetFileSize,也觸發(fā)壓縮的最大文件數(shù)量。此值避免過多小文件積壓,減慢性能。 |
full-compaction.delta-commits | (none) | Integer | 在 delta 提交后會不斷觸發(fā)全量壓縮。 |
流式讀取順序(Streaming Read Order)
對于流式讀取,記錄按以下順序生產(chǎn):
- 跨分區(qū)記錄:如果
scan.plan-sort-partition
設(shè)置為 true,則首先生產(chǎn)分區(qū)值較小的記錄。否則,先生產(chǎn)創(chuàng)建時間較早的分區(qū)的記錄。 - 同分區(qū)同桶記錄:首先生產(chǎn)先寫入的記錄。
- 同分區(qū)不同桶記錄:不同桶由不同任務(wù)處理,不保證順序。
水印定義(Watermark Definition)
CREATE TABLE t (`user` BIGINT,product STRING,order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (...);
啟動有界流任務(wù)讀取 Paimon 表
SELECT window_start, window_end, COUNT(`user`)
FROM TABLE(TUMBLE(TABLE t, DESCRIPTOR(order_time), INTERVAL '10' MINUTES)
)
GROUP BY window_start, window_end;
還可以啟用 Flink 水印對齊,確保沒有來源/分片/分區(qū)的水印前進(jìn)太快:
Key | Default | Type | Description |
---|---|---|---|
scan.watermark.alignment.group | (none) | String | 要對齊水印的一組源。 |
scan.watermark.alignment.max-drift | (none) | Duration | 對齊水印的最大漂移,在此漂移前暫停從源/任務(wù)/分區(qū)消費(fèi)。 |
有界流(Bounded Stream)
流式來源(Streaming Source)也可以是有界的,可以通過指定 scan.bounded.watermark
來定義有界流模式的結(jié)束條件。
--創(chuàng)建 Kafka 表和啟動流式插入及讀取作業(yè)
CREATE TABLE kafka_table (`user` BIGINT,product STRING,order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka'...);-- 啟動流式插入作業(yè)
INSERT INTO paimon_table
SELECT * FROM kafka_table;-- 啟動有界流任務(wù)讀取 Paimon 表
SELECT * FROM paimon_table /*+ OPTIONS('scan.bounded.watermark'='...') */;
批處理(Batch)
分桶表可以在批處理查詢中避免 shuffle,例如可以用以下 Spark SQL 讀取 Paimon 表:
SET spark.sql.sources.v2.bucketing.enabled = true;CREATE TABLE FACT_TABLE (order_id INT, f1 STRING
) TBLPROPERTIES ('bucket'='10', 'bucket-key' = 'order_id');CREATE TABLE DIM_TABLE (order_id INT, f2 STRING
) TBLPROPERTIES ('bucket'='10', 'primary-key' = 'order_id');SELECT *
FROM FACT_TABLE
JOIN DIM_TABLE
ON FACT_TABLE.order_id = DIM_TABLE.order_id;
通過設(shè)置 spark.sql.sources.v2.bucketing.enabled
為 true,Spark 將識別 V2 數(shù)據(jù)源報告的特定分布,并在必要時嘗試避免 shuffle。如果兩個表具有相同的分桶策略和相同數(shù)量的桶,昂貴的 join shuffle 操作將被避免。
5 總結(jié)
本文詳細(xì)介紹了Apache Paimon中附加表的概念和應(yīng)用。我們首先定義了什么是附加表,并比較了它與主鍵表的區(qū)別。接著,我們探討了附加表在不同場景下的使用,包括批量寫入和讀取、對象存儲的友好性、時間穿越和回滾功能、低成本的刪除和更新操作、流式接收中小文件的自動合并、隊列形式的流式讀寫以及高性能查詢。此外,我們還詳細(xì)介紹了流式處理的相關(guān)技術(shù),包括自動小文件合并、流式查詢的不同模式、順序保證的重要性以及分桶附加表的優(yōu)勢。最后,我們討論了數(shù)據(jù)更新策略,包括DELETE和UPDATE操作,以及如何通過配置優(yōu)化查詢性能。
如果你想?yún)⑴c討論,請 點擊這里👉https://github.com/hiszm/BigDataWeekly,每周都有新的主題,周末或周一發(fā)布。
大數(shù)據(jù)精讀,探索知識的深度。
關(guān)注 大數(shù)據(jù)精讀周刊
版權(quán)聲明:自由轉(zhuǎn)載-非商用-非衍生-保持署名([創(chuàng)意共享 3.0 許可證](https://creativecommons.org/licenses/by-nc-nd/3.0/deed.e