長(zhǎng)沙百度網(wǎng)站制作哪里有網(wǎng)頁(yè)設(shè)計(jì)公司
【Flink-CDC】Flink CDC 介紹和原理概述
- 1)基于查詢的 CDC 和基于日志的 CDC
- 2)Flink CDC
- 3)Flink CDC原理簡(jiǎn)述
- 4)基于 Flink SQL CDC 的數(shù)據(jù)同步方案實(shí)踐
- 4.1.案例 1 : Flink SQL CDC + JDBC Connector
- 4.2.案例 2 : CDC Streaming ETL
- 4.3.案例 3 : Streaming Changes to Kafka
CDC是(Change Data Capture 變更數(shù)據(jù)獲取
)的簡(jiǎn)稱。
核心思想是,監(jiān)測(cè)并捕獲數(shù)據(jù)庫(kù)的變動(dòng)(包括數(shù)據(jù) 或 數(shù)據(jù)表的插入INSERT、更新UPDATE、刪除DELETE等),將這些變更按發(fā)生的順序完整記錄下來(lái),寫入到消息中間件中以供其他服務(wù)進(jìn)行訂閱及消費(fèi)。
1)基于查詢的 CDC 和基于日志的 CDC
CDC 主要分為基于查詢和基于 Binlog 兩種方式
經(jīng)過(guò)以上對(duì)比,我們可以發(fā)現(xiàn)基于日志 CDC
有以下這幾種優(yōu)勢(shì):
-
能夠捕獲所有數(shù)據(jù)的變化,捕獲完整的變更記錄。在異地容災(zāi),數(shù)據(jù)備份等場(chǎng)景中得到廣泛應(yīng)用,如果是基于查詢的 CDC 有可能導(dǎo)致兩次查詢的中間一部分?jǐn)?shù)據(jù)丟失
-
每次 DML 操作均有記錄無(wú)需像查詢 CDC 這樣發(fā)起全表掃描進(jìn)行過(guò)濾,擁有更高的效率和性能,具有低延遲,不增加數(shù)據(jù)庫(kù)負(fù)載的優(yōu)勢(shì)
-
無(wú)需入侵業(yè)務(wù),業(yè)務(wù)解耦,無(wú)需更改業(yè)務(wù)模型
-
捕獲刪除事件和捕獲舊記錄的狀態(tài),在查詢 CDC 中,周期的查詢無(wú)法感知中間數(shù)據(jù)是否刪除
在實(shí)時(shí)性、吞吐量方面占優(yōu),如果數(shù)據(jù)源是 MySQL、PostgreSQL、MongoDB 等常見的數(shù)據(jù)庫(kù)實(shí)現(xiàn),建議使用 Debezium 來(lái)實(shí)現(xiàn)變更數(shù)據(jù)的捕獲(下圖來(lái)自 Debezium 官方文檔)。如果使用的只有 MySQL,則可以用 Canal。
2)Flink CDC
Flink 社區(qū)開發(fā)了 flink-cdc-connectors
組件,這是一個(gè)可以直接從 MySQL
、PostgreSQL
等數(shù)據(jù)庫(kù)直接讀取全量數(shù)據(jù)和增量變更數(shù)據(jù)的 source 組件。目前也已開源,開源地址:https://github.com/ververica/flink-cdc-connectors
我們先從之前的數(shù)據(jù)架構(gòu)來(lái)看CDC的內(nèi)容
以上是之前的 mysql binlog
日志處理流程,例如 canal
監(jiān)聽 binlog
把日志寫入到 kafka 中。而 Flink 實(shí)時(shí)消費(fèi) Kafka 的數(shù)據(jù)實(shí)現(xiàn) mysql 數(shù)據(jù)的同步或其他內(nèi)容等。
拆分來(lái)說(shuō)整體上可以分為以下幾個(gè)階段。
1、mysql 開啟 binlog
2、canal 同步 binlog 數(shù)據(jù)寫入到 kafka
3、flink 讀取 kakfa 中的 binlog 數(shù)據(jù)進(jìn)行相關(guān)的業(yè)務(wù)處理。
整體的處理鏈路較長(zhǎng),需要用到的組件也比較多。Flink CDC可以直接從數(shù)據(jù)庫(kù)獲取到binlog供下游進(jìn)行業(yè)務(wù)計(jì)算分析,從內(nèi)部實(shí)現(xiàn)上講,Flink CDC Connectors
內(nèi)置了一套 Debezium 和 Kafka 組件,但這個(gè)細(xì)節(jié)對(duì)用戶屏蔽,簡(jiǎn)單來(lái)說(shuō)鏈路會(huì)變成這樣。
也就是說(shuō)數(shù)據(jù)不再通過(guò) canal 與 kafka 進(jìn)行同步,而 flink 直接進(jìn)行處理 mysql 的數(shù)據(jù)。節(jié)省了 canal 與 kafka 的過(guò)程。
3)Flink CDC原理簡(jiǎn)述
在最新 CDC 調(diào)研報(bào)告中,Debezium
和 Canal
是目前最流行使用的 CDC 工具,這些 CDC 工具的核心原理是抽取數(shù)據(jù)庫(kù)日志獲取變更。
在經(jīng)過(guò)一系列調(diào)研后,目前 Debezium (支持全量、增量同步,同時(shí)支持 MySQL、PostgreSQL、Oracle 等數(shù)據(jù)庫(kù)),使用較為廣泛。
Flink SQL CDC 內(nèi)置了 Debezium 引擎
,利用其抽取日志獲取變更的能力,將 changelog 轉(zhuǎn)換為 Flink SQL 認(rèn)識(shí)的 RowData 數(shù)據(jù)。(以下右側(cè)是 Debezium 的數(shù)據(jù)格式,左側(cè)是 Flink 的 RowData 數(shù)據(jù)格式)。
RowData 代表了一行的數(shù)據(jù),在 RowData 上面會(huì)有一個(gè)元數(shù)據(jù)的信息 RowKind
,RowKind 里面包括了插入(+I)、更新前(-U)、更新后(+U)、刪除(-D),這樣和數(shù)據(jù)庫(kù)里面的 binlog 概念十分類似。
通過(guò) Debezium 采集的數(shù)據(jù),包含了舊數(shù)據(jù)(before)和新數(shù)據(jù)行(after)以及原數(shù)據(jù)信息(source),op 的 u 表示是update 更新操作標(biāo)識(shí)符(op 字段的值 c,u,d,r 分別對(duì)應(yīng) create,update,delete,reade),ts_ms 表示同步的時(shí)間戳。
4)基于 Flink SQL CDC 的數(shù)據(jù)同步方案實(shí)踐
4.1.案例 1 : Flink SQL CDC + JDBC Connector
這個(gè)案例通過(guò)訂閱我們訂單表(事實(shí)表)數(shù)據(jù),通過(guò) Debezium 將 MySQL Binlog 發(fā)送至 Kafka,通過(guò)維表 Join 和 ETL 操作把結(jié)果輸出至下游的 PG 數(shù)據(jù)庫(kù)。
4.2.案例 2 : CDC Streaming ETL
電商公司的訂單表和物流表,需要對(duì)訂單數(shù)據(jù)進(jìn)行統(tǒng)計(jì)分析,對(duì)于不同的信息需要進(jìn)行關(guān)聯(lián)后續(xù)形成訂單的大寬表后,交給下游的業(yè)務(wù)方使用 ES 做數(shù)據(jù)分析,這個(gè)案例演示了如何只依賴 Flink 不依賴其他組件,借助 Flink 強(qiáng)大的計(jì)算能力實(shí)時(shí)把 Binlog 的數(shù)據(jù)流關(guān)聯(lián)一次并同步至 ES。
例如如下的這段 Flink SQL 代碼就能完成實(shí)時(shí)同步 MySQL 中 orders 表的全量+增量數(shù)據(jù)的目的。
CREATE TABLE orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN
) WITH ('connector' = 'mysql-cdc','hostname' = 'localhost','port' = '3306','username' = 'root','password' = '123456','database-name' = 'mydb','table-name' = 'orders'
);SELECT * FROM orders
4.3.案例 3 : Streaming Changes to Kafka
參考阿里云:https://developer.aliyun.com/article/777502?utm_content=g_1000202135