分類信息網站平臺的推廣新聞20字摘抄大全
文章目錄
- 1、基本介紹
- 2、代碼實戰(zhàn)
- 2.1、數(shù)據(jù)源準備
- 2.2、代碼實戰(zhàn)
- 2.3、數(shù)據(jù)格式
1、基本介紹
Flink CDC 是 Apache Flink 提供的一個功能強大的組件,用于實時捕獲和處理數(shù)據(jù)庫中的數(shù)據(jù)變更??梢詫崟r地從各種數(shù)據(jù)庫(如MySQL、PostgreSQL、Oracle、MongoDB等)中捕獲數(shù)據(jù)變更并將其轉換為流式數(shù)據(jù),FlinkCDC 同步數(shù)據(jù)有兩種方式:
- FlinkSQL
- Flink DataStream 和 Table API(本文使用該方式)
對比其他的CDC開源方案,發(fā)現(xiàn)FlinkCDC是絕大多數(shù)場景最好的選擇方式,別在傻傻的只關注Canal了,如下圖所示:
2、代碼實戰(zhàn)
2.1、數(shù)據(jù)源準備
本次我是用MySQL 8.0版本,并且創(chuàng)建好數(shù)據(jù)庫(庫名為quick_chat),本次演示表結構如下:
CREATE TABLE `quick_chat_msg` (`id` bigint NOT NULL COMMENT '主鍵id',`from_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '賬戶id(發(fā)送人)',`to_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '賬戶id(接收人)',`relation_id` varchar(50) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '發(fā)送關聯(lián)',`content` varchar(500) DEFAULT NULL COMMENT '消息內容',`msg_type` tinyint(1) DEFAULT NULL COMMENT '消息類型(1:文字,2:語音,3:表情包,4:文件,5:語音通話,6:視頻通話)',`extra_info` varchar(500) DEFAULT NULL COMMENT '額外信息',`create_time` datetime DEFAULT NULL COMMENT '創(chuàng)建時間',`deleted` tinyint(1) DEFAULT NULL COMMENT '刪除標識',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
需要保證MySQL的Binlog格式是ROW,不過MySQL 8.0版本格式默認就是ROW:
最后,要把數(shù)據(jù)庫時區(qū)配置好,否則會出現(xiàn)問題,命令如下:
SET persist time_zone = '+8:00';
SET time_zone = '+8:00';
SHOW VARIABLES LIKE '%time_zone%';
2.2、代碼實戰(zhàn)
首先,引入Flink CDC相關依賴,內容如下:
<dependencies><!-- Flink connector連接器基礎包 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.14.0</version></dependency><!-- Flink CDC MySQL源 --><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>2.3.0</version></dependency><!-- Flink DataStream數(shù)據(jù)流API --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.2.0</version><scope>provided</scope></dependency><!-- Flink客戶端--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.14.0</version></dependency><!--Flink WebUI,端口8081(默認沒有開啟)--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.12</artifactId><version>1.14.0</version></dependency><!--Flink Table API&SQL程序可以連接到其他外部系統(tǒng),用于讀寫批處理表和流式表。--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime_2.12</artifactId><version>1.14.0</version></dependency>
</dependencies>
第二步,開發(fā) Sink 監(jiān)聽類,用于監(jiān)聽 MySQL 數(shù)據(jù)變化:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;public class MySinkHandler extends RichSinkFunction<String> {@Overridepublic void invoke(String value, Context context) throws Exception {System.out.println(value);}@Overridepublic void open(Configuration parameters) throws Exception {}@Overridepublic void close() throws Exception {}
}
最后,配置好 Flink CDC 監(jiān)聽進程,隨著項目啟動運行:
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;@Component
public class MySqlSourceExample {@PostConstructpublic void init() throws Exception {// 配置監(jiān)聽數(shù)據(jù)源MySqlSource<String> source = MySqlSource.<String>builder().hostname("8.141.28.132").port(3306)// 數(shù)據(jù)庫集合,可以配置多個.databaseList("quick_chat")// 表集合,可以配置多個.tableList("quick_chat.quick_chat_msg").username("root").password("root").deserializer(new JsonDebeziumDeserializationSchema()).includeSchemaChanges(true).build();// 配置 Flink WebUIConfiguration configuration = new Configuration();configuration.setInteger(RestOptions.PORT, 8081);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);// 檢查點間隔時間// checkpoint的側重點是“容錯”,即Flink作業(yè)意外失敗并重啟之后,能夠直接從早先打下的checkpoint恢復運行,且不影響作業(yè)邏輯的準確性。env.enableCheckpointing(5000);DataStreamSink<String> sink = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source").addSink(new MySinkHandler());env.execute();}
}
項目啟動完畢后,可以通過8081端口訪問Flink UI頁面:
2.3、數(shù)據(jù)格式
上述操作完畢后,我對表數(shù)據(jù)進行了新增、修改、刪除操作,控制臺可以看到MySQL變更監(jiān)聽日志輸出信息:
# 新增
{"before": null,"after": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"source": {"version": "1.6.4.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1729135279000,"snapshot": "false","db": "quick_chat","sequence": null,"table": "quick_chat_msg","server_id": 1,"gtid": null,"file": "binlog.000002","pos": 2452,"row": 0,"thread": null,"query": null},"op": "c","ts_ms": 1729135278633,"transaction": null
}
# 修改
{"before": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"after": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊,小貓咪","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"source": {"version": "1.6.4.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1729135289000,"snapshot": "false","db": "quick_chat","sequence": null,"table": "quick_chat_msg","server_id": 1,"gtid": null,"file": "binlog.000002","pos": 2825,"row": 0,"thread": null,"query": null},"op": "u","ts_ms": 1729135288473,"transaction": null
}
# 刪除
{"before": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊,小貓咪","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"after": null,"source": {"version": "1.6.4.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1729135301000,"snapshot": "false","db": "quick_chat","sequence": null,"table": "quick_chat_msg","server_id": 1,"gtid": null,"file": "binlog.000002","pos": 3247,"row": 0,"thread": null,"query": null},"op": "d","ts_ms": 1729135300692,"transaction": null
}