做企業(yè)云網(wǎng)站的企業(yè)郵箱一個新品牌怎樣營銷推廣
文章目錄
??????SQL水印操作(Watermark)
一、為什么要有WaterMark
二、???????Watermark解決的問題
三、??????????????代碼演示
??????SQL水印操作(Watermark)
一、???????為什么要有WaterMark
當 flink 以 EventTime 模式處理流數(shù)據(jù)時,它會根據(jù)數(shù)據(jù)里的時間戳來處理基于時間的算子。但是由于網(wǎng)絡、分布式等原因,會導致數(shù)據(jù)亂序的情況。如下圖所示:
假設在一個5秒的Tumble窗口,有一個EventTime是 11秒的數(shù)據(jù),在第16秒時候到來了。圖示第11秒的數(shù)據(jù),在16秒到來了,如下圖:該如何處理遲到數(shù)據(jù)
二、??????????????Watermark解決的問題
上面的問題在于如何將遲來的EventTime 為11的元素正確處理?
當Watermark的時間戳等于Event中攜帶的EventTime時候,上面場景(Watermark=EventTime)的計算結(jié)果如下:
如果想正確處理遲來的數(shù)據(jù)可以定義Watermark生成策略為 Watermark = EventTime -5s, 如下:?
通過watermark來解決,簡單來說就是延遲窗口關閉的時間,等一會遲到的數(shù)據(jù),窗口關閉不在依據(jù)數(shù)據(jù)的時間,而是到達的watermark的時間。
watermark可以理解為一個特殊的數(shù)據(jù),這個數(shù)據(jù)不參與計算,僅僅是對窗口的觸發(fā)關閉起作用。
三、??????????????代碼演示
- 使用Socket模擬接收數(shù)據(jù)
- 設置WaterMark
- 設置的邏輯:在第一條數(shù)據(jù)進來時,設置WaterMark為0,指定第一條數(shù)據(jù)的時間戳后,獲取該時間戳與當前 WaterMark的最大值,并將最大值設置為下一條數(shù)據(jù)的WaterMark,以此類推
- 使用滾動Event Time窗口,將5秒內(nèi)的同組數(shù)據(jù),進行聚合輸出
CREATE TABLE watermark_zero (
item STRING,
ts TIMESTAMP(3), -- TIMESTAMP 類型的時間戳
WATERMARK FOR ts AS ts - INTERVAL '0' SECOND
) WITH (
'connector' = 'socket',
'hostname' = '178.23.142.233',
'port' = '9999',
'format' = 'csv'
);SELECT
date_format(TUMBLE_START(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_start,
date_format(TUMBLE_END(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_end,
date_format(TUMBLE_ROWTIME(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') as window_rowtime,
item,count(item) as total_item
FROM watermark_zero
GROUP BY TUMBLE(ts, INTERVAL '5' SECOND), item;
若輸入第一條數(shù)據(jù):hello,2022-03-25 16:39:45
那么,我先假設后續(xù)的數(shù)據(jù)Event Time間隔為1秒,推斷一下WaterMark的設定,如下圖所示
1.第一條數(shù)據(jù)的Event Time為1648197585000,那么當前窗口時間為:1648197585000-> 1648197589000,即下圖中紅色框線
2.第一條數(shù)據(jù)進來時,這條數(shù)據(jù)之前的WaterMark為0,當?shù)谝粭l數(shù)據(jù)已經(jīng)進入后,指定Event Time位置,并與現(xiàn)在的WaterMark比較,將兩者中大的那個值設置為新的WaterMark,那么當前數(shù)據(jù)的WaterMark為1648197585000
3.第二條數(shù)據(jù)進來時,前一條數(shù)據(jù)的WaterMark為1648197585000,第二條數(shù)據(jù)的Event Time比之前的WaterMark大,于是更新WaterMark,將當前的WaterMark更新為1648197586000,但還沒到窗口觸發(fā)時間,不進行計算
4.后面幾個以此類推,直到Event Time為:1648197590000的數(shù)據(jù)進來的時候,前一條數(shù)據(jù)的WaterMark為1648197589000,于是更新當前的WaterMark為1648197590000,Flink認為1648197590000之前的數(shù)據(jù)都已經(jīng)到達,且達到了窗口的觸發(fā)條件,開始進行計算
根據(jù)上面的推斷,啟動程序驗證一下,向9999端口監(jiān)聽終端輸入以下內(nèi)容:
hello,2022-03-25 16:39:45
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:48
hello,2022-03-25 16:39:49
hello,2022-03-25 16:39:50
?Flink輸出結(jié)果:
Rowtime列在經(jīng)過窗口操作后,其Event Time屬性將丟失。可以使用輔助函數(shù)TUMBLE_ROWTIME、HOP_ROWTIME或SESSION_ROWTIME,獲取窗口中的Rowtime列的最大值max(rowtime)作為時間窗口的Rowtime,其類型是具有Rowtime屬性的TIMESTAMP,取值為?window_end - 1?。 例如[00:00, 00:15)?的窗口,返回值為00:14:59.999?。
數(shù)據(jù)亂序的場景
上面的實例,Event Time是有序,現(xiàn)在來做一下數(shù)據(jù)亂序的場景模擬啟動程序(注意要關閉之前的查詢,重新運行查詢語句),在監(jiān)聽終端中輸入如下數(shù)據(jù):
其中,在觸發(fā)了了第一個窗口計算后,又來了兩條遲到數(shù)據(jù)hello,2022-03-25 16:39:47,hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:45
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:48
hello,2022-03-25 16:39:49
hello,2022-03-25 16:39:50
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:51
hello,2022-03-25 16:39:52
hello,2022-03-25 16:39:53
hello,2022-03-25 16:39:54
hello,2022-03-25 16:39:55
Flink結(jié)果:
從結(jié)果中可以看到,在第二個窗口中,那兩條遲到數(shù)據(jù)并沒有進行處理,這個就是遲到丟棄。
亂序時間的設置:
為了解決上面的問題,我們允許Flink處理延遲在5秒內(nèi)的遲到數(shù)據(jù)
修改最大亂序時間(新建的表僅水印與之前不同)
CREATE TABLE watermark_five (
item STRING,
ts TIMESTAMP(3), -- TIMESTAMP 類型的時間戳
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'socket',
'hostname' = '178.23.142.233',
'port' = '9999',
'format' = 'csv'
);SELECT
date_format(TUMBLE_START(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_start,
date_format(TUMBLE_END(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_end,
date_format(TUMBLE_ROWTIME(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') as window_rowtime,
item,count(item) as total_item
FROM watermark_five
GROUP BY TUMBLE(ts, INTERVAL '5' SECOND), item;
在監(jiān)聽終端中,輸入數(shù)據(jù)
hello,2022-03-25 16:39:45
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:48
hello,2022-03-25 16:39:49
hello,2022-03-25 16:39:50
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:51
hello,2022-03-25 16:39:52
hello,2022-03-25 16:39:53
hello,2022-03-25 16:39:54
hello,2022-03-25 16:39:55
Flink輸出結(jié)果: ?
可以看到,之前遲到的兩條數(shù)據(jù)在第一個窗口中進行了處理。因為設置了最大允許亂序時間后,WaterMark要比原來低5秒,可以對延遲5秒內(nèi)的數(shù)據(jù)進行處理,窗口的觸發(fā)條件也同樣會往后延遲關于延遲時間,請結(jié)合業(yè)務場景進行設置。
- 📢博客主頁:https://lansonli.blog.csdn.net
- 📢歡迎點贊 👍 收藏 ?留言 📝 如有錯誤敬請指正!
- 📢本文由 Lansonli 原創(chuàng),首發(fā)于 CSDN博客🙉
- 📢停下休息的時候不要忘了別人還在奔跑,希望大家抓緊時間學習,全力奔赴更美好的生活?