中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當前位置: 首頁 > news >正文

分類信息網站平臺的推廣新聞20字摘抄大全

分類信息網站平臺的推廣,新聞20字摘抄大全,國際室內設計公司排名,百科網站推廣文章目錄 1、基本介紹2、代碼實戰(zhàn)2.1、數(shù)據(jù)源準備2.2、代碼實戰(zhàn)2.3、數(shù)據(jù)格式 1、基本介紹 Flink CDC 是 Apache Flink 提供的一個功能強大的組件,用于實時捕獲和處理數(shù)據(jù)庫中的數(shù)據(jù)變更??梢詫崟r地從各種數(shù)據(jù)庫(如MySQL、PostgreSQL、Oracle、MongoDB…

文章目錄

  • 1、基本介紹
  • 2、代碼實戰(zhàn)
    • 2.1、數(shù)據(jù)源準備
    • 2.2、代碼實戰(zhàn)
    • 2.3、數(shù)據(jù)格式

1、基本介紹

Flink CDC 是 Apache Flink 提供的一個功能強大的組件,用于實時捕獲和處理數(shù)據(jù)庫中的數(shù)據(jù)變更??梢詫崟r地從各種數(shù)據(jù)庫(如MySQL、PostgreSQL、Oracle、MongoDB等)中捕獲數(shù)據(jù)變更并將其轉換為流式數(shù)據(jù),FlinkCDC 同步數(shù)據(jù)有兩種方式:

  1. FlinkSQL
  2. Flink DataStream 和 Table API(本文使用該方式)
    在這里插入圖片描述
    對比其他的CDC開源方案,發(fā)現(xiàn)FlinkCDC是絕大多數(shù)場景最好的選擇方式,別在傻傻的只關注Canal了,如下圖所示:
    在這里插入圖片描述

2、代碼實戰(zhàn)

2.1、數(shù)據(jù)源準備

本次我是用MySQL 8.0版本,并且創(chuàng)建好數(shù)據(jù)庫(庫名為quick_chat),本次演示表結構如下:

CREATE TABLE `quick_chat_msg` (`id` bigint NOT NULL COMMENT '主鍵id',`from_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '賬戶id(發(fā)送人)',`to_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '賬戶id(接收人)',`relation_id` varchar(50) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '發(fā)送關聯(lián)',`content` varchar(500) DEFAULT NULL COMMENT '消息內容',`msg_type` tinyint(1) DEFAULT NULL COMMENT '消息類型(1:文字,2:語音,3:表情包,4:文件,5:語音通話,6:視頻通話)',`extra_info` varchar(500) DEFAULT NULL COMMENT '額外信息',`create_time` datetime DEFAULT NULL COMMENT '創(chuàng)建時間',`deleted` tinyint(1) DEFAULT NULL COMMENT '刪除標識',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;

需要保證MySQL的Binlog格式是ROW,不過MySQL 8.0版本格式默認就是ROW:
在這里插入圖片描述
最后,要把數(shù)據(jù)庫時區(qū)配置好,否則會出現(xiàn)問題,命令如下:

SET persist time_zone = '+8:00';
SET time_zone = '+8:00';
SHOW VARIABLES LIKE '%time_zone%';

在這里插入圖片描述

2.2、代碼實戰(zhàn)

首先,引入Flink CDC相關依賴,內容如下:

<dependencies><!-- Flink connector連接器基礎包 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.14.0</version></dependency><!-- Flink CDC MySQL源 --><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>2.3.0</version></dependency><!-- Flink DataStream數(shù)據(jù)流API --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.2.0</version><scope>provided</scope></dependency><!-- Flink客戶端--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.14.0</version></dependency><!--Flink WebUI,端口8081(默認沒有開啟)--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.12</artifactId><version>1.14.0</version></dependency><!--Flink Table API&SQL程序可以連接到其他外部系統(tǒng),用于讀寫批處理表和流式表。--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime_2.12</artifactId><version>1.14.0</version></dependency>
</dependencies>

第二步,開發(fā) Sink 監(jiān)聽類,用于監(jiān)聽 MySQL 數(shù)據(jù)變化:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;public class MySinkHandler extends RichSinkFunction<String> {@Overridepublic void invoke(String value, Context context) throws Exception {System.out.println(value);}@Overridepublic void open(Configuration parameters) throws Exception {}@Overridepublic void close() throws Exception {}
}

最后,配置好 Flink CDC 監(jiān)聽進程,隨著項目啟動運行:

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;@Component
public class MySqlSourceExample {@PostConstructpublic void init() throws Exception {// 配置監(jiān)聽數(shù)據(jù)源MySqlSource<String> source = MySqlSource.<String>builder().hostname("8.141.28.132").port(3306)// 數(shù)據(jù)庫集合,可以配置多個.databaseList("quick_chat")// 表集合,可以配置多個.tableList("quick_chat.quick_chat_msg").username("root").password("root").deserializer(new JsonDebeziumDeserializationSchema()).includeSchemaChanges(true).build();// 配置 Flink WebUIConfiguration configuration = new Configuration();configuration.setInteger(RestOptions.PORT, 8081);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);// 檢查點間隔時間// checkpoint的側重點是“容錯”,即Flink作業(yè)意外失敗并重啟之后,能夠直接從早先打下的checkpoint恢復運行,且不影響作業(yè)邏輯的準確性。env.enableCheckpointing(5000);DataStreamSink<String> sink = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source").addSink(new MySinkHandler());env.execute();}
}

項目啟動完畢后,可以通過8081端口訪問Flink UI頁面:
在這里插入圖片描述

2.3、數(shù)據(jù)格式

上述操作完畢后,我對表數(shù)據(jù)進行了新增、修改、刪除操作,控制臺可以看到MySQL變更監(jiān)聽日志輸出信息:

# 新增
{"before": null,"after": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"source": {"version": "1.6.4.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1729135279000,"snapshot": "false","db": "quick_chat","sequence": null,"table": "quick_chat_msg","server_id": 1,"gtid": null,"file": "binlog.000002","pos": 2452,"row": 0,"thread": null,"query": null},"op": "c","ts_ms": 1729135278633,"transaction": null
}
# 修改
{"before": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"after": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊,小貓咪","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"source": {"version": "1.6.4.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1729135289000,"snapshot": "false","db": "quick_chat","sequence": null,"table": "quick_chat_msg","server_id": 1,"gtid": null,"file": "binlog.000002","pos": 2825,"row": 0,"thread": null,"query": null},"op": "u","ts_ms": 1729135288473,"transaction": null
}
# 刪除
{"before": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊,小貓咪","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"after": null,"source": {"version": "1.6.4.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1729135301000,"snapshot": "false","db": "quick_chat","sequence": null,"table": "quick_chat_msg","server_id": 1,"gtid": null,"file": "binlog.000002","pos": 3247,"row": 0,"thread": null,"query": null},"op": "d","ts_ms": 1729135300692,"transaction": null
}
http://www.risenshineclean.com/news/35067.html

相關文章:

  • 張家港保稅區(qū)建設規(guī)劃局網站湖南優(yōu)化公司
  • 做精神科網站專業(yè)代寫軟文
  • 電商網站seoseo管理系統(tǒng)培訓
  • 南京做網站公司seo排名優(yōu)化推薦
  • wordpress好看分頁北京做網絡優(yōu)化的公司
  • 新聞網站開發(fā)書籍北京seo公司
  • 做網站上傳服務器嗎優(yōu)化網站教程
  • 海南高端網站建設直接打開百度
  • 淮南 搭建一個企業(yè)展示網站cba最新排名
  • 電腦裝機網站網站優(yōu)化的方法有哪些
  • 營銷型網站=企業(yè)登記代理網站關鍵詞優(yōu)化方法
  • 做網站那個平臺個人怎么做免費百度推廣
  • wordpress 導入幻燈片優(yōu)化網站建設
  • 杭州設計網站最好的公司東莞seo建站排名
  • 網絡服務合同糾紛定義簡述影響關鍵詞優(yōu)化的因素
  • 海南做房地產網站的網絡公司seo公司官網
  • 制作官網需要什么條件茶葉seo網站推廣與優(yōu)化方案
  • 網站制作手機網站成人培訓班有哪些課程
  • 網站制作價格和流程云浮新增確診病例30例
  • 建網站的外包公司百度推廣怎么做步驟
  • wordpress花生殼lamp南京百度seo代理
  • 員工做違法網站seo顧問賺錢嗎
  • 企業(yè)網站內容運營方案策劃網絡廣告怎么做
  • 網站建設項目國內外分析報告長春網站推廣公司
  • 網站建設優(yōu)化廣告流量關鍵詞搜索排行榜
  • 做網站引流推廣賺錢一個2元
  • 網站建設實用教程企業(yè)郵箱申請
  • 網站做seo屏蔽搜索關鍵詞排名監(jiān)控批量查詢
  • 河北涿州網站建設網絡營銷是指什么
  • 網站開發(fā)的對聯(lián)株洲疫情最新情況