網(wǎng)站建設(shè)售后培訓(xùn)東莞疫情最新消息今天新增
前言
自定義flink sink,批模式下,有insert overwrite 需求或需要啟動(dòng)任務(wù)或任務(wù)完成后時(shí),只執(zhí)行一次某些操作時(shí),則可參考此文章
組件:
flink: 1.15
參考文檔:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sourcessinks/
分析
insert overwrite 即在批模式下 先清除表的數(shù)據(jù) 再插入
在大數(shù)據(jù)多并發(fā)模式下 只能執(zhí)行一次清空數(shù)據(jù) 且需要在執(zhí)行插入數(shù)據(jù)之前先清理
在flink 支持的連接器中 只有 hive和文件系統(tǒng)此2中連接器支持insert overwrite功能,可參考他們的實(shí)現(xiàn)方式
此處演示例子
source: flink jdbc連接器
sink : 自定義flink jdbc連接器
加工: insert overwrite osd_user_test select * from ods_user
關(guān)鍵點(diǎn)是在 執(zhí)行 overwrite 語(yǔ)句時(shí) 在任務(wù)開(kāi)始前或結(jié)束時(shí) 清空原表的數(shù)據(jù),且此操作只執(zhí)行一次,不然會(huì)有其他并發(fā)執(zhí)行寫(xiě)入數(shù)據(jù)了 在執(zhí)行清數(shù)據(jù)情況造成漏數(shù)
查看FileSystem sink的源代碼