駐馬店高端網(wǎng)站建設(shè)免費(fèi)大數(shù)據(jù)查詢平臺(tái)
摘要
很多時(shí)候flink消費(fèi)上游kafka的數(shù)據(jù)是有重復(fù)的,因此有時(shí)候我們想數(shù)據(jù)在落盤之前進(jìn)行去重,這在實(shí)際開發(fā)中具有廣泛的應(yīng)用場景,此處不說詳細(xì)代碼,只粘貼相應(yīng)的flinksql
代碼
--********************************************************************--
-- 創(chuàng)建臨時(shí)表(只在當(dāng)前sessoin生效的表稱為臨時(shí)表) DDL
CREATE TEMPORARY TABLE UserAttrSource ( `data` string,`kafkaMetaTimestamp` TIMESTAMP(3) METADATA FROM 'timestamp', -- kafka record攜帶的源數(shù)據(jù)時(shí)間戳,參考官網(wǎng)kafka connectorproctime as PROCTIME() -- 獲取數(shù)據(jù)處理時(shí)間,這是flink內(nèi)置支持的關(guān)鍵字
) WITH ('connector' = 'kafka','topic' = 'user_attri_ad_dirty_data','properties.bootstrap.servers' = 'kafka地址','scan.startup.mode' = 'timestamp', -- kafka掃描數(shù)據(jù)模式,參考官網(wǎng)kafka connector'scan.startup.timestamp-millis' ='1687305600000' , -- 2023-06-21 08:00:00'format' = 'raw' -- 意思是將kafka數(shù)據(jù)格式化為string
);-- 創(chuàng)建SINK 表
CREATE TEMPORARY TABLE ADB (log_date DATE,`errorType` int,appId string,`errorCode` int,`errorReason` string,`deserialization` string,`originalData` string,kafkaMetaTimestamp TIMESTAMP,data_hash string,PRIMARY KEY (`data_hash`) NOT ENFORCED
)
WITH ('connector' = 'adb3.0','url' = 'jdbc:mysql://xxxx:3306/flink_data?rewriteBatchedStatements=true','tableName' = 'usr_attr_dirty', 'userName'='username','password'='password'
);
-- 去重視圖, 這是關(guān)鍵(json_value是flink的內(nèi)置函數(shù),data_hash是數(shù)據(jù)本身的primary key)
-- 下述語句含義是:根據(jù)data_hash字段分組,按照處理時(shí)間排序,取出最新的一條數(shù)據(jù),其他的重復(fù)數(shù)據(jù)將被拋棄
CREATE TEMPORARY VIEW quchong ASSELECT data,kafkaMetaTimestamp FROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY json_value(data,'$.data_hash') ORDER BY proctime DESC) as row_numFROM UserAttrSource)WHERE row_num = 1;-- 插入目標(biāo)表
insert into ADB
select TO_DATE(DATE_FORMAT(kafkaMetaTimestamp,'yyyy-MM-dd') )AS log_date,json_value(data,'$.errorType' RETURNING INT) errorType,json_value(data,'$.appId' NULL ON EMPTY) appId,json_value(data,'$.errorCode' RETURNING INT) errorCode,json_value(data,'$.errorReason' NULL ON EMPTY) errorReason,json_value(data,'$.deserialization' NULL ON EMPTY) deserialization,json_value(data,'$.originalData') originalData,kafkaMetaTimestamp,json_value(data,'$.data_hash') data_hash
from quchong;