寶安高端網(wǎng)站設(shè)計(jì)怎么樣百度收錄情況查詢
在實(shí)時(shí)開發(fā)中,雙流join獲取目標(biāo)對(duì)應(yīng)時(shí)刻的屬性時(shí),經(jīng)常使用temporary join。筆者在流量升級(jí)的實(shí)時(shí)迭代中,需要讓流量日志精準(zhǔn)的匹配上瀏覽時(shí)間里對(duì)應(yīng)的商品屬性,使用temporary join開發(fā)過程中踩坑不少,將一些經(jīng)驗(yàn)沉淀在此文中,供各位同學(xué)參考與交流。
背景介紹
關(guān)于實(shí)時(shí)flinkSQL的雙流join的背景知識(shí)可以先閱讀以下文章:
https://www.51cto.com/article/713922.html
目前我們有一條流量日志明細(xì)的TT流A,以及一條商品標(biāo)簽的TT流B,在flink中對(duì)A流和B流進(jìn)行雙流join類似于將A流關(guān)聯(lián)一個(gè)hbase維表。temporary join有以下特點(diǎn):
1. 單流驅(qū)動(dòng):雖然是雙流join,但數(shù)據(jù)下發(fā)只由一條流驅(qū)動(dòng)。
2. 需要定義versioned table,versioned table記錄了每個(gè)時(shí)刻的屬性信息,雙流join時(shí)被動(dòng)查詢。類似于銀行匯率表,在貨幣兌換的時(shí)候需要參考兌換時(shí)刻的匯率。
3. 查詢攜帶時(shí)間版本信息:temporary join攜帶由兩條流的watermark觸發(fā),因此查詢到的屬性是對(duì)應(yīng)時(shí)間內(nèi)的屬性。
圖片來源:孫金城, 《Blink 漫談系列 - Temporal Table JOIN》
應(yīng)用場(chǎng)景&實(shí)例分享
當(dāng)需要根據(jù)實(shí)時(shí)匯率*貨幣金額計(jì)算總金額,實(shí)時(shí)商品價(jià)格*成交件數(shù)計(jì)算總成交金額時(shí),經(jīng)常會(huì)使用temporary join獲取實(shí)時(shí)的匯率和價(jià)格信息。在筆者的流量升級(jí)業(yè)務(wù)迭代中,我們需要獲取實(shí)時(shí)的商品標(biāo)簽,因此需要定義商品標(biāo)簽的versioned table,寫法如下:
CREATE TEMPORARY TABLE `tag_ri` (`id` VARCHAR,`tag` VARCHAR,`time` VARCHAR,`ts` AS `TO_TIMESTAMP`(`time`, 'yyyy-MM-dd HH:mm:ss'),WATERMARK FOR `ts` AS `withOffset`(`ts`, 0) --定義watermark
) WITH ('connector' = 'tt','router' = '******','topic' = 'tag_ri','lineDelimiter' = '\n','fieldDelimiter' = '\u0001','encoding' = 'utf-8'
);--定義version table
CREATE TEMPORARY VIEW `tag`
AS
SELECT `id`, `tag`, `time`, `ts`
FROM ( SELECT `id`, `tag`, `time`, `ts`, ROW_NUMBER() OVER (PARTITION BY `id` --關(guān)聯(lián)主鍵ORDER BY `time` DESC) AS `rownum`FROM `tag_ri`)
WHERE `rownum` = 1;
同上我們也需要定義流量日志明細(xì)流的watermark,并進(jìn)行雙流join
CREATE TEMPORARY TABLE `log_ri` (`id` VARCHAR,`time` VARCHAR,......`ts` AS `TO_TIMESTAMP`(`time`, 'yyyy-MM-dd HH:mm:ss'),WATERMARK FOR `ts` AS `withOffset`(`ts`, 0)
) WITH ('connector' = 'tt','router' = '******','topic' = 'log_ri','lineDelimiter' = '\n','fieldDelimiter' = '\u0001','encoding' = 'utf-8',
);select `a`.`id`,......,`b`.`tag`
from (SELECT *FROM `log_ri`) AS `a`
LEFT JOIN `tag` FOR SYSTEM_TIME AS OF `a`.`ts` AS `b` ON `a`.`id` = `b`.`id`
結(jié)果如下:
--商品標(biāo)簽信息
12:00> SELECT * FROM tag_ri;id tag(商品標(biāo)簽)
======= ======================= t1 A12:30> SELECT * FROM tag_ri;id tag(商品標(biāo)簽)
======= =======================t1 B--流量明細(xì)日志查詢 t1商品共三條明細(xì)
SELECT * FROM log_ri;id time
======= ========t1 12:00 t1 12:15 t1 12:30 --執(zhí)行temporary join
select `a`.`id`,`a`.`time`,`b`.`tag`
from (SELECT *FROM `log_ri`) AS `a`
LEFT JOIN `tag` FOR SYSTEM_TIME AS OF `a`.`ts` AS `b` ON `a`.`id` = `b`.`id`id time tag(商品標(biāo)簽)
======= ======== =======================t1 12:00 At1 12:15 At1 12:30 B
開發(fā)經(jīng)驗(yàn)
???稀疏數(shù)據(jù)處理
由于temporary join是由兩條流的watermark觸發(fā),如果versioned table是一條稀疏的流(在一段時(shí)間內(nèi)無數(shù)據(jù)流入),那么join可能存在等待不下發(fā)數(shù)據(jù)的現(xiàn)象,可以通過設(shè)置參數(shù)?set?table.exec.source.idle-timeout = 10s ,可以讓A流數(shù)據(jù)不進(jìn)行等待,具體參數(shù)介紹可以參考:
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/config/#table-exec-source-idle-timeout
???數(shù)據(jù)延遲下發(fā)
問題
在實(shí)際開發(fā)中,我們發(fā)現(xiàn)temporay join后數(shù)據(jù)一直等待不下發(fā),整點(diǎn)才會(huì)進(jìn)行下發(fā)的現(xiàn)象。
原因分析
我們結(jié)合SQL語法,對(duì)TT日志進(jìn)行回流分析:代碼邏輯是四路source union后, join 定義的versioned table
select a.*,b.tag
from
(
select * from source_1
union all
select * from source_2
union all
select * from source_3
union all
select * from source_4
) a
temporay join
b流
source_4會(huì)在整點(diǎn)流入少部分當(dāng)前小時(shí)59分鐘的數(shù)據(jù),而temporay join 是由兩邊的watermark所觸發(fā),所以會(huì)有a流等待b流的時(shí)間到達(dá)當(dāng)前小時(shí)59分鐘后再觸發(fā)的現(xiàn)象。
解法
對(duì)source_4中l(wèi)og_time>當(dāng)前時(shí)間的部分,做temporary join時(shí)將log_time置為當(dāng)前時(shí)間,該問題就解決了。
總結(jié)
1. 在單流驅(qū)動(dòng)的雙流join場(chǎng)景中,temporary join是一種常見的處理方式。
2. temporary join由兩條流的watermark觸發(fā),需要對(duì)兩條流的watermark進(jìn)行預(yù)處理,防止數(shù)據(jù)稀疏和數(shù)據(jù)搶跑等現(xiàn)象影響數(shù)據(jù)下發(fā)。
參考資料
https://www.51cto.com/article/713922.html
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/config/#table-exec-source-idle-timeout
團(tuán)隊(duì)介紹
我們是淘天集團(tuán)-業(yè)務(wù)技術(shù)-商家數(shù)據(jù)團(tuán)隊(duì),專注于開發(fā)和維護(hù)生意參謀這一全渠道、全鏈路、一站式的數(shù)據(jù)平臺(tái),同時(shí)也負(fù)責(zé)品牌數(shù)據(jù)銀行和策略中心兩大產(chǎn)品。旨在為商家提供全面的數(shù)據(jù)服務(wù),包括但不限于經(jīng)營(yíng)分析、市場(chǎng)洞察、客群洞察等,以幫助商家提高商業(yè)決策效率。
¤?拓展閱讀?¤
3DXR技術(shù)?|?終端技術(shù)?|?音視頻技術(shù)
服務(wù)端技術(shù)?|?技術(shù)質(zhì)量?|?數(shù)據(jù)算法