注冊(cè)網(wǎng)站應(yīng)注意事項(xiàng)b站24小時(shí)自助下單平臺(tái)網(wǎng)站
窗口Window機(jī)制
- 窗口概述
- 窗口的分類(lèi)
- 是否按鍵分區(qū)
- 按鍵分區(qū)窗口
- 非按鍵分區(qū)
- 按照驅(qū)動(dòng)類(lèi)型
- 按具體分配規(guī)則
- 滾動(dòng)窗口Tumbling Windows
- 滑動(dòng)窗口 Sliding Windows
- 會(huì)話(huà)窗口 Session Windows
- 全局窗口 Global Windows
- 時(shí)間語(yǔ)義
- 窗口分配器 Window Assigners
- 時(shí)間窗口
- 計(jì)數(shù)窗口
- 例子
- 窗口函數(shù) Window Functions
- 增量聚合函數(shù)
- ReduceFunction
- AggregateFunction
- 全窗/全量口函數(shù)
- WindowFunction
- ProcessWindowFunction
- 增量聚合和全窗口函數(shù)的結(jié)合
- 其他
- 觸發(fā)器 Trigger
- 移除器 Evictor
窗口概述
在大多數(shù)場(chǎng)景下,需要統(tǒng)計(jì)的數(shù)據(jù)流都是無(wú)界的,因此無(wú)法等待整個(gè)數(shù)據(jù)流終止后才進(jìn)行統(tǒng)計(jì)。通常情況下,只需要對(duì)某個(gè)時(shí)間范圍或者數(shù)量范圍內(nèi)的數(shù)據(jù)進(jìn)行統(tǒng)計(jì)分析
例如:
每隔10分鐘統(tǒng)計(jì)一次過(guò)去30分鐘內(nèi)某個(gè)對(duì)象的點(diǎn)擊量每發(fā)生100次點(diǎn)擊后,就去統(tǒng)計(jì)一下每個(gè)對(duì)象點(diǎn)擊率的占比
因此,在Apache Flink中,窗口是對(duì)無(wú)界數(shù)據(jù)流進(jìn)行有界處理的機(jī)制。窗口可以將無(wú)限的數(shù)據(jù)流劃分為有限的、可處理的塊,使得可以基于這些有限的數(shù)據(jù)塊執(zhí)行聚合、計(jì)算和分析操作。
窗口的分類(lèi)
是否按鍵分區(qū)
在定義窗口操作之前,首先需要確定,到底是基于按鍵分區(qū)的數(shù)據(jù)流KeyedStream來(lái)開(kāi)窗,還是直接在沒(méi)有按鍵分區(qū)的DataStream上開(kāi)窗。
兩者區(qū)別:
1.keyed streams要調(diào)用keyBy(...)后再調(diào)用window(...) , 而non-keyed streams只用直接調(diào)用windowAll(...)2.對(duì)于keyed stream,其中數(shù)據(jù)的任何屬性都可以作為key。 允許窗口計(jì)算由多個(gè)task并行,因?yàn)槊總€(gè)邏輯上的 keyed stream都可以被單獨(dú)處理。 屬于同一個(gè)key的元素會(huì)被發(fā)送到同一個(gè) task。3.對(duì)于non-keyed stream,原始的stream不會(huì)被分割為多個(gè)邏輯上的stream, 所有的窗口計(jì)算會(huì)被同一個(gè) task完成,也就是parallelism為1
按鍵分區(qū)窗口
經(jīng)過(guò)按鍵分區(qū)keyBy操作后,數(shù)據(jù)流會(huì)按照key被分為多條邏輯流,這就是KeyedStream?;贙eyedStream進(jìn)行窗口操作時(shí),窗口計(jì)算會(huì)在多個(gè)并行子任務(wù)上同時(shí)執(zhí)行。相同key的數(shù)據(jù)會(huì)被發(fā)送到同一個(gè)并行子任務(wù),而窗口操作會(huì)基于每個(gè)key進(jìn)行單獨(dú)的處理。所以可以認(rèn)為,每個(gè)key上都定義了一組窗口,各自獨(dú)立地進(jìn)行統(tǒng)計(jì)計(jì)算。
按鍵分區(qū)窗口寫(xiě)法:
stream.keyBy(...) <- 僅 keyed 窗口需要.window(...) <- 必填項(xiàng):"assigner"[.trigger(...)] <- 可選項(xiàng):"trigger" (省略則使用默認(rèn) trigger)[.evictor(...)] <- 可選項(xiàng):"evictor" (省略則不使用 evictor)[.allowedLateness(...)] <- 可選項(xiàng):"lateness" (省略則為 0)[.sideOutputLateData(...)] <- 可選項(xiàng):"output tag" (省略則不對(duì)遲到數(shù)據(jù)使用 side output).reduce/aggregate/apply() <- 必填項(xiàng):"function"[.getSideOutput(...)] <- 可選項(xiàng):"output tag"
代碼示例:
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 從socket接收數(shù)據(jù)流SingleOutputStreamOperator<String> source = env.socketTextStream("node01", 8086);// 將輸入數(shù)據(jù)轉(zhuǎn)換為(key, value)元組DataStream<Tuple2<String, Integer>> dataStream = source.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2 map(String s) throws Exception {int number = Integer.parseInt(s);String key = number % 2 == 0 ? "key1" : "key2";Tuple2 tuple2 = new Tuple2(key, number);return tuple2;}}).returns(Types.TUPLE(Types.STRING, Types.INT));// keyBy操作KeyedStream<Tuple2<String, Integer>, String> keyBy = dataStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple2) throws Exception {return tuple2.f0;}});// 每10秒統(tǒng)計(jì)一次數(shù)量和SingleOutputStreamOperator<Tuple2<String, Integer>> streamOperator = keyBy.window(TumblingProcessingTimeWindows.of(Time.seconds(10))).sum(1);streamOperator.print();env.execute();}
發(fā)送測(cè)試數(shù)據(jù)
[root@administrator ~]# nc -lk 8086
1
2
3
4
等待10秒后,控制臺(tái)打印如下
(key2,4)
(key1,6)
非按鍵分區(qū)
如果沒(méi)有進(jìn)行keyBy,那么原始的DataStream就不會(huì)分成多條邏輯流。這時(shí)窗口邏輯只能在一個(gè)任務(wù)task上執(zhí)行,就相當(dāng)于并行度變成了1。
非按鍵分區(qū)窗口寫(xiě)法:
stream.windowAll(...) <- 必填項(xiàng):"assigner"[.trigger(...)] <- 可選項(xiàng):"trigger" (else default trigger)[.evictor(...)] <- 可選項(xiàng):"evictor" (else no evictor)[.allowedLateness(...)] <- 可選項(xiàng):"lateness" (else zero)[.sideOutputLateData(...)] <- 可選項(xiàng):"output tag" (else no side output for late data).reduce/aggregate/apply() <- 必填項(xiàng):"function"[.getSideOutput(...)] <- 可選項(xiàng):"output tag"
代碼示例:
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 從socket接收數(shù)據(jù)流SingleOutputStreamOperator<String> source = env.socketTextStream("node01", 8086);// 將輸入數(shù)據(jù)轉(zhuǎn)換為IntegerDataStream<Integer> dataStream = source.map(str -> Integer.parseInt(str));// 每10秒統(tǒng)計(jì)一次數(shù)量和SingleOutputStreamOperator<Integer> streamOperator = dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))).sum(0);streamOperator.print();env.execute();}
按照驅(qū)動(dòng)類(lèi)型
窗口按照驅(qū)動(dòng)類(lèi)型可以分成時(shí)間窗口和計(jì)數(shù)窗口,這兩種窗口類(lèi)型根據(jù)其觸發(fā)機(jī)制和邊界規(guī)則的不同,適用于不同的應(yīng)用場(chǎng)景。
時(shí)間窗口 Time Windows:
時(shí)間窗口根據(jù)事件時(shí)間Event Time或處理時(shí)間Processing Time來(lái)劃分時(shí)間窗口根據(jù)時(shí)間的進(jìn)展劃分?jǐn)?shù)據(jù)流,當(dāng)一個(gè)窗口的時(shí)間到達(dá)或窗口中的元素?cái)?shù)量達(dá)到閾值時(shí),觸發(fā)窗口計(jì)算
計(jì)數(shù)窗口 Count Windows:
計(jì)數(shù)窗口根據(jù)元素的數(shù)量或元素的增量來(lái)劃分計(jì)數(shù)窗口在數(shù)據(jù)流中累積固定數(shù)量的元素后,觸發(fā)窗口計(jì)算窗口的大小可以是固定的,也可以是動(dòng)態(tài)變化的,取決于所設(shè)置的閾值和策略
按具體分配規(guī)則
窗口按照具體的分配規(guī)則,又有滾動(dòng)窗口(Tumbling Window)、滑動(dòng)窗口(Sliding Window)、會(huì)話(huà)窗口(Session Window),以及全局窗口(Global Window)。
滾動(dòng)窗口Tumbling Windows
滾動(dòng)窗口將數(shù)據(jù)流劃分為固定大小的、不重疊的窗口。
例如:將數(shù)據(jù)流按照5秒的滾動(dòng)窗口大小進(jìn)行劃分,每個(gè)窗口包含5秒的數(shù)據(jù)。那么每5秒就會(huì)有一個(gè)窗口被計(jì)算,且一個(gè)新的窗口被創(chuàng)建
代碼示例:
DataStream<T> input = ...;// 滾動(dòng) event-time 窗口
input.keyBy(<key selector>)// 間間隔可以用 Time.milliseconds(x)、Time.seconds(x)、Time.minutes(x) 等來(lái)指定.window(TumblingEventTimeWindows.of(Time.seconds(5))).<windowed transformation>(<window function>);// 滾動(dòng) processing-time 窗口
input.keyBy(<key selector>).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).<windowed transformation>(<window function>);// 長(zhǎng)度為一天的滾動(dòng) event-time 窗口, 偏移量為 -8 小時(shí)。
input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))).<windowed transformation>(<window function>);
滑動(dòng)窗口 Sliding Windows
滑動(dòng)窗口將數(shù)據(jù)流劃分為固定大小的窗口,窗口大小通過(guò)window size參數(shù)設(shè)置,需要一個(gè)額外的滑動(dòng)距離window slide參數(shù)來(lái)控制生成新窗口的頻率。
如果slide小于窗口大小,滑動(dòng)窗口可以允許窗口重疊。這種情況下,一個(gè)元素可能會(huì)被分發(fā)到多個(gè)窗口。
例如:將數(shù)據(jù)流按照5秒的滑動(dòng)窗口大小和3秒的滑動(dòng)步長(zhǎng)進(jìn)行劃分,窗口之間有2秒的重疊。
DataStream<T> input = ...;// 滑動(dòng) event-time 窗口
input.keyBy(<key selector>).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).<windowed transformation>(<window function>);// 滑動(dòng) processing-time 窗口
input.keyBy(<key selector>).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).<windowed transformation>(<window function>);// 滑動(dòng) processing-time 窗口,偏移量為 -8 小時(shí)
input.keyBy(<key selector>).window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))).<windowed transformation>(<window function>);
會(huì)話(huà)窗口 Session Windows
與滾動(dòng)窗口和滑動(dòng)窗口不同,會(huì)話(huà)窗口不會(huì)相互重疊,且沒(méi)有固定的開(kāi)始或結(jié)束時(shí)間。 會(huì)話(huà)窗口在一段時(shí)間沒(méi)有收到數(shù)據(jù)之后會(huì)關(guān)閉,即在一段不活躍的間隔之后。
如果會(huì)話(huà)窗口有一段時(shí)間沒(méi)有收到數(shù)據(jù), 會(huì)話(huà)窗口會(huì)自動(dòng)關(guān)閉, 這段沒(méi)有收到數(shù)據(jù)的時(shí)間就是會(huì)話(huà)窗口的gap(間隔)
可以配置靜態(tài)的gap, 也可以通過(guò)一個(gè)gap extractor函數(shù)來(lái)定義gap的長(zhǎng)度
當(dāng)時(shí)間超過(guò)了這個(gè)gap, 當(dāng)前的會(huì)話(huà)窗口就會(huì)關(guān)閉, 后序的元素會(huì)被分配到一個(gè)新的會(huì)話(huà)窗口
DataStream<T> input = ...;// 設(shè)置了固定間隔的 event-time 會(huì)話(huà)窗口
input.keyBy(<key selector>).window(EventTimeSessionWindows.withGap(Time.minutes(10))).<windowed transformation>(<window function>);// 設(shè)置了動(dòng)態(tài)間隔的 event-time 會(huì)話(huà)窗口
input.keyBy(<key selector>).window(EventTimeSessionWindows.withDynamicGap((element) -> {// 決定并返回會(huì)話(huà)間隔})).<windowed transformation>(<window function>);// 設(shè)置了固定間隔的 processing-time session 窗口
input.keyBy(<key selector>).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))).<windowed transformation>(<window function>);// 設(shè)置了動(dòng)態(tài)間隔的 processing-time 會(huì)話(huà)窗口
input.keyBy(<key selector>).window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {// 決定并返回會(huì)話(huà)間隔})).<windowed transformation>(<window function>);
全局窗口 Global Windows
全局窗口將整個(gè)數(shù)據(jù)流作為一個(gè)窗口進(jìn)行處理,不進(jìn)行分割。全局窗口適用于需要在整個(gè)數(shù)據(jù)流上執(zhí)行聚合操作的場(chǎng)景。
DataStream<T> input = ...;input.keyBy(<key selector>).window(GlobalWindows.create()).<windowed transformation>(<window function>);
時(shí)間語(yǔ)義
在Flink的流式操作中, 會(huì)涉及不同的時(shí)間概念,即時(shí)間語(yǔ)義,它是指在數(shù)據(jù)處理中確定事件的時(shí)間基準(zhǔn)的機(jī)制。
在實(shí)時(shí)數(shù)據(jù)流處理中,常見(jiàn)的時(shí)間語(yǔ)義有以下三種:
1.處理時(shí)間(Processing Time):
處理時(shí)間是指數(shù)據(jù)處理引擎的本地時(shí)鐘時(shí)間,也稱(chēng)為機(jī)器時(shí)間或系統(tǒng)時(shí)間使用處理時(shí)間時(shí),事件的時(shí)間順序是根據(jù)數(shù)據(jù)到達(dá)處理引擎的順序來(lái)確定的處理時(shí)間是一種簡(jiǎn)單和實(shí)時(shí)性較高的時(shí)間語(yǔ)義,但不考慮數(shù)據(jù)可能存在的延遲或亂序
2.事件時(shí)間(Event Time):
事件時(shí)間是數(shù)據(jù)流中記錄的實(shí)際時(shí)間,通常是數(shù)據(jù)本身攜帶的時(shí)間戳使用事件時(shí)間時(shí),數(shù)據(jù)記錄的時(shí)間戳決定事件在時(shí)間軸上的順序,而不受數(shù)據(jù)到達(dá)引擎的順序影響事件時(shí)間是一種準(zhǔn)確和可重現(xiàn)的時(shí)間語(yǔ)義,能夠處理延遲和亂序數(shù)據(jù),但可能需要關(guān)注水印的處理
3.攝取時(shí)間(Ingestion Time):
注意:較新版本的Flink已經(jīng)棄用,推薦使用事件時(shí)間
攝取時(shí)間是數(shù)據(jù)進(jìn)入數(shù)據(jù)處理引擎的時(shí)間使用攝取時(shí)間時(shí),數(shù)據(jù)到達(dá)引擎的順序決定事件的時(shí)間順序攝取時(shí)間是介于處理時(shí)間和事件時(shí)間之間的折中方案。它可以處理一定程度的延遲和亂序數(shù)據(jù),但不會(huì)像事件時(shí)間那樣需要處理水印。
區(qū)別:
處理時(shí)間適用于實(shí)時(shí)性要求較高、不關(guān)心事件的順序和時(shí)間戳的場(chǎng)景事件時(shí)間適用于需要準(zhǔn)確處理事件順序和考慮延遲、亂序數(shù)據(jù)的場(chǎng)景攝取時(shí)間提供了某種程度上的準(zhǔn)確性和實(shí)時(shí)性折中
窗口分配器 Window Assigners
在Apache Flink中,窗口分配器(Window Assigner)用于定義如何將數(shù)據(jù)流中的元素分配到窗口。窗口分配器確定了窗口的邊界以及如何對(duì)元素進(jìn)行分組和分配
窗口分配器最通用的定義方式:
如果是按鍵分區(qū)窗口, 直接調(diào)用.keyBy().window()方法,傳入一個(gè)WindowAssigner作為參數(shù),返回WindowedStream。如果是非按鍵分區(qū)窗口,直接調(diào)用.windowAll()方法,傳入一個(gè)WindowAssigner,返回的是AllWindowedStream。
時(shí)間窗口
時(shí)間窗口是最常用的窗口類(lèi)型,可以大致細(xì)分為滾動(dòng)、滑動(dòng)和會(huì)話(huà)三種。
1.滾動(dòng)處理時(shí)間窗口
窗口分配器由類(lèi)TumblingProcessingTimeWindows提供,需要調(diào)用它的靜態(tài)方法.of(),需要傳入一個(gè)Time類(lèi)型的參數(shù)size,表示滾動(dòng)窗口的大小
// 非按鍵分區(qū) 滾動(dòng)事件時(shí)間窗口,窗口長(zhǎng)度10s。每10秒操作一次
dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 按鍵分區(qū)
dataStream.keyBy().window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
2.滾動(dòng)事件時(shí)間窗口
窗口分配器由類(lèi)TumblingEventTimeWindows提供,用法與滾動(dòng)處理事件窗口完全一致。
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));
3.滑動(dòng)處理時(shí)間窗口
窗口分配器由類(lèi)SlidingProcessingTimeWindows提供,同樣需要調(diào)用它的靜態(tài)方法.of(),需要傳入兩個(gè)Time類(lèi)型的參數(shù):size和slide,前者表示滑動(dòng)窗口的大小,后者表示滑動(dòng)窗口的滑動(dòng)步長(zhǎng)
// 窗口長(zhǎng)度10s,滑動(dòng)步長(zhǎng)2s。 每2秒滑動(dòng)一次,窗口大小為10秒的滑動(dòng)時(shí)間窗口,并對(duì)窗口中的元素進(jìn)行操作。
dataStream.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2)));
4.滑動(dòng)事件時(shí)間窗口
窗口分配器由類(lèi)SlidingEventTimeWindows提供,用法與滑動(dòng)處理事件窗口完全一致
dataStream.windowAll(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)));
5.處理時(shí)間會(huì)話(huà)窗口
窗口分配器由類(lèi)ProcessingTimeSessionWindows提供,需要調(diào)用它的靜態(tài)方法withGap()或者withDynamicGap()。需要傳入一個(gè)Time類(lèi)型的參數(shù)size,表示會(huì)話(huà)的超時(shí)時(shí)間
// 會(huì)話(huà)窗口,超時(shí)間隔5s
dataStream.windowAll(ProcessingTimeSessionWindows.withGap(Time.seconds(5)));
6.事件時(shí)間會(huì)話(huà)窗口
窗口分配器由類(lèi)EventTimeSessionWindows提供,用法與處理事件會(huì)話(huà)窗口完全一致。
dataStream.windowAll(EventTimeSessionWindows.withGap(Time.seconds(10)));
計(jì)數(shù)窗口
1.滾動(dòng)計(jì)數(shù)窗口
滾動(dòng)計(jì)數(shù)窗口只需要傳入一個(gè)長(zhǎng)整型的參數(shù)size,表示窗口的大小。
當(dāng)窗口中元素?cái)?shù)量達(dá)到size
時(shí),就會(huì)觸發(fā)計(jì)算執(zhí)行并關(guān)閉窗口。
// 滾動(dòng)窗口,窗口長(zhǎng)度2個(gè)元素
dataStream.countWindowAll(2);
2.滑動(dòng)計(jì)數(shù)窗口
在countWindow()調(diào)用時(shí)傳入兩個(gè)參數(shù):size和slide,前者表示窗口大小,后者表示滑動(dòng)步長(zhǎng)。
每個(gè)窗口統(tǒng)計(jì)size
個(gè)數(shù)據(jù),每隔slide
個(gè)數(shù)據(jù)就統(tǒng)計(jì)輸出一次結(jié)果。
// 滑動(dòng)窗口,窗口長(zhǎng)度2個(gè)元素,滑動(dòng)步長(zhǎng)2個(gè)元素
dataStream.countWindowAll(5,2);
3.全局窗口
全局窗口是計(jì)數(shù)窗口的底層實(shí)現(xiàn),一般在需要自定義窗口時(shí)使用。它的定義同樣是直接調(diào)用.window(),分配器由GlobalWindows類(lèi)提供。
// 全局窗口,需要自定義的時(shí)候才會(huì)用
dataStream.windowAll(GlobalWindows.create());dataStream.keyBy().window(GlobalWindows.create());
注意:使用全局窗口必須自行定義觸發(fā)器才能實(shí)現(xiàn)窗口計(jì)算,否則不起作用。
例子
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 從socket接收數(shù)據(jù)流SingleOutputStreamOperator<String> source = env.socketTextStream("node01", 8086);// 將輸入數(shù)據(jù)轉(zhuǎn)換為IntegerDataStream<Integer> dataStream = source.map(str -> Integer.parseInt(str));// 時(shí)間窗口示例:滾動(dòng)處理時(shí)間窗口,窗口長(zhǎng)度10s。 每10秒統(tǒng)計(jì)一次數(shù)量和SingleOutputStreamOperator<Integer> streamOperator = dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))).sum(0);streamOperator.print();env.execute();}
發(fā)送測(cè)試數(shù)據(jù)
[root@administrator ~]# nc -lk 8086
1
2
3
4
等待10秒后,控制臺(tái)打印如下
10
窗口函數(shù) Window Functions
定義了window assigner之后,需要指定當(dāng)窗口觸發(fā)之后,如何計(jì)算每個(gè)窗口中的數(shù)據(jù), 這就是window function的職責(zé)
窗口函數(shù)是在窗口操作中應(yīng)用于窗口中元素的函數(shù)。Flink提供了豐富的窗口函數(shù),用于對(duì)窗口中的元素進(jìn)行各種操作和計(jì)算。
根據(jù)處理的方式可以分為兩類(lèi):增量聚合函數(shù)和全窗/全量口函數(shù),它們是Flink中用于窗口計(jì)算的兩種不同的函數(shù)。
增量聚合函數(shù)
增量聚合函數(shù)是指對(duì)窗口中的數(shù)據(jù)進(jìn)行累積計(jì)算的函數(shù)。它會(huì)在每個(gè)元素到達(dá)窗口時(shí)進(jìn)行計(jì)算,并且僅保留窗口計(jì)算所需的中間狀態(tài)。這種方式可以顯著提高計(jì)算性能,尤其適用于大規(guī)模數(shù)據(jù)和長(zhǎng)窗口的情況。
對(duì)于增量聚合函數(shù),Flink 提供了一系列內(nèi)置的聚合函數(shù),例如 sum、min、max、avg等,它們的底層,其實(shí)都是通過(guò)AggregateFunction來(lái)實(shí)現(xiàn)的。還可以通過(guò)實(shí)現(xiàn) AggregateFunction接口來(lái)定義自定義的增量聚合函數(shù)。
典型的增量聚合函數(shù)有兩個(gè):ReduceFunction和AggregateFunction。
ReduceFunction
ReduceFunction指定兩條輸入數(shù)據(jù)如何合并起來(lái)產(chǎn)生一條輸出數(shù)據(jù),輸入和輸出數(shù)據(jù)的類(lèi)型必須相同。
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 從socket接收數(shù)據(jù)流SingleOutputStreamOperator<String> source = env.socketTextStream("node01", 8086);// 將輸入數(shù)據(jù)轉(zhuǎn)換為IntegerDataStream<Integer> dataStream = source.map(str -> Integer.parseInt(str));// 指定窗口分配器AllWindowedStream<Integer, TimeWindow> allWindowedStream = dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 指定窗口函數(shù),使用 增量聚合ReduceSingleOutputStreamOperator<Integer> reduce = allWindowedStream.reduce(new ReduceFunction<Integer>() {@Overridepublic Integer reduce(Integer value1, Integer value2) throws Exception {System.out.println("前一個(gè)值: " + value1 + " ,后一個(gè)值:" + value2);return value1 + value2;}});// 在窗口觸發(fā)的時(shí)候,才會(huì)輸出窗口的最終計(jì)算結(jié)果reduce.print();env.execute();}
發(fā)送測(cè)試數(shù)據(jù):
[root@administrator ~]# nc -lk 8086
1
2
3
4
5
控制臺(tái)輸出:
前一個(gè)值: 1 ,后一個(gè)值:2
前一個(gè)值: 3 ,后一個(gè)值:3
前一個(gè)值: 6 ,后一個(gè)值:4
前一個(gè)值: 10 ,后一個(gè)值:5
15
AggregateFunction
ReduceFunction接口存在一個(gè)限制:聚合狀態(tài)的類(lèi)型、輸出結(jié)果的類(lèi)型都必須和輸入數(shù)據(jù)類(lèi)型一樣。聚合函數(shù)則突破了這個(gè)限制,可以定義更加靈活的窗口聚合操作。
AggregateFunction函數(shù)接口方法參數(shù)有三種類(lèi)型:輸入類(lèi)型(IN)、累加器類(lèi)型(ACC)和輸出類(lèi)型(OUT)。
輸入類(lèi)型IN就是輸入流中元素的數(shù)據(jù)類(lèi)型累加器類(lèi)型ACC則是我們進(jìn)行聚合的中間狀態(tài)類(lèi)型而輸出類(lèi)型當(dāng)然就是最終計(jì)算結(jié)果的類(lèi)型
接口中有四個(gè)方法:
createAccumulator():創(chuàng)建一個(gè)累加器,這就是為聚合創(chuàng)建了一個(gè)初始狀態(tài),每個(gè)聚合任務(wù)只會(huì)調(diào)用一次add():將輸入的元素添加到累加器中g(shù)etResult():從累加器中提取聚合的輸出結(jié)果merge():合并兩個(gè)累加器,并將合并后的狀態(tài)作為一個(gè)累加器返回
與ReduceFunction相同,AggregateFunction也是增量式的聚合,而由于輸入、中間狀態(tài)、輸出的類(lèi)型可以不同,使得應(yīng)用更加靈活方便。
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 從socket接收數(shù)據(jù)流SingleOutputStreamOperator<String> source = env.socketTextStream("node01", 8086);// 將輸入數(shù)據(jù)轉(zhuǎn)換為IntegerDataStream<Integer> dataStream = source.map(str -> Integer.parseInt(str));// 指定窗口分配器AllWindowedStream<Integer, TimeWindow> allWindowedStream = dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 窗口函數(shù) 增量聚合 AggregateSingleOutputStreamOperator<String> aggregate = allWindowedStream.aggregate(new MyAggregateFunction());aggregate.print();env.execute();}/*** 第一個(gè)類(lèi)型: 輸入數(shù)據(jù)的類(lèi)型* 第二個(gè)類(lèi)型: 累加器的類(lèi)型,存儲(chǔ)的中間計(jì)算結(jié)果的類(lèi)型* 第三個(gè)類(lèi)型: 輸出的類(lèi)型*/public static class MyAggregateFunction implements AggregateFunction<Integer, Integer, String> {/*** 創(chuàng)建累加器,初始化累加器** @return*/@Overridepublic Integer createAccumulator() {System.out.println("createAccumulator方法執(zhí)行");return 0;}/*** 聚合邏輯* 來(lái)一條計(jì)算一條,調(diào)用一次add方法** @param value 當(dāng)前值* @param accumulator 累加器的值* @return*/@Overridepublic Integer add(Integer value, Integer accumulator) {System.out.println("add方法執(zhí)行,當(dāng)前值 :" + value + "累加器值 :" + accumulator);return value + accumulator;}/*** 獲取最終結(jié)果,窗口觸發(fā)時(shí)輸出** @param accumulator* @return*/@Overridepublic String getResult(Integer accumulator) {System.out.println("getResult方法執(zhí)行");return "最終計(jì)算值:" + accumulator;}@Overridepublic Integer merge(Integer a, Integer b) {// 只有會(huì)話(huà)窗口才會(huì)用到System.out.println("merge方法執(zhí)行");return null;}}
發(fā)送測(cè)試數(shù)據(jù):
[root@administrator ~]# nc -lk 8086
1
2
3
4
5
控制臺(tái)輸出:
createAccumulator方法執(zhí)行
add方法執(zhí)行,當(dāng)前值 :1累加器值 :0
add方法執(zhí)行,當(dāng)前值 :2累加器值 :1
add方法執(zhí)行,當(dāng)前值 :3累加器值 :3
add方法執(zhí)行,當(dāng)前值 :4累加器值 :6
add方法執(zhí)行,當(dāng)前值 :5累加器值 :10
getResult方法執(zhí)行
最終計(jì)算值:15
全窗/全量口函數(shù)
全窗口函數(shù)是對(duì)窗口中的所有元素進(jìn)行計(jì)算的函數(shù)。它會(huì)在窗口觸發(fā)時(shí)對(duì)窗口中的所有元素進(jìn)行處理,并輸出一個(gè)或多個(gè)結(jié)果。全窗口函數(shù)可以訪(fǎng)問(wèn)窗口的所有元素,并且可以使用窗口中的狀態(tài)信息。
對(duì)于全窗口函數(shù),Flink提供了 ProcessWindowFunction 和 WindowFunction 兩個(gè)接口供用戶(hù)使用。
ProcessWindowFunction: 可以處理每個(gè)元素,并輸出零個(gè)、一個(gè)或多個(gè)結(jié)果WindowFunction: 是一個(gè)轉(zhuǎn)換函數(shù),對(duì)窗口的所有元素進(jìn)行轉(zhuǎn)換,并輸出一個(gè)或多個(gè)結(jié)果。
與增量聚合函數(shù)不同,全窗口函數(shù)需要先收集窗口中的數(shù)據(jù),并在內(nèi)部緩存起來(lái),等到窗口要輸出結(jié)果的時(shí)候再取出數(shù)據(jù)進(jìn)行計(jì)算。
WindowFunction
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 從socket接收數(shù)據(jù)流SingleOutputStreamOperator<String> source = env.socketTextStream("node01", 8086);// 將輸入數(shù)據(jù)轉(zhuǎn)換為(key, value)元組DataStream<Tuple2<String, Integer>> dataStream = source.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2 map(String s) throws Exception {int number = Integer.parseInt(s);String key = number % 2 == 0 ? "key1" : "key2";Tuple2 tuple2 = new Tuple2(key, number);return tuple2;}}).returns(Types.TUPLE(Types.STRING, Types.INT));// keyBy操作KeyedStream<Tuple2<String, Integer>, String> keyedStream = dataStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple2) throws Exception {return tuple2.f0;}});// 指定窗口分配器 非鍵分區(qū)窗口
// AllWindowedStream<Integer, TimeWindow> allWindowedStream = dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 鍵分區(qū)窗口WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowedStream = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 使用WindowFunction窗口函數(shù)SingleOutputStreamOperator<String> apply = windowedStream.apply(new MyWindowFunction());apply.print();env.execute();}/*** 窗口函數(shù)* <p>* 窗口觸發(fā)時(shí)才會(huì)調(diào)用一次,統(tǒng)一計(jì)算窗口的所有數(shù)據(jù)*/public static class MyWindowFunction implements WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow> {/*** @param s 分組的key,非鍵分區(qū)窗口則無(wú)該參數(shù)* @param window 窗口對(duì)象* @param input 存的數(shù)據(jù)* @param out 采集器*/@Overridepublic void apply(String s, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {// 上下文拿到window對(duì)象,獲取相關(guān)信息long start = window.getStart();long end = window.getEnd();String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss");String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss");long count = input.spliterator().estimateSize();out.collect("分組 " + s + " 的窗口,在時(shí)間區(qū)間: " + windowStart + "-" + windowEnd + " 產(chǎn)生" + count + "條數(shù)據(jù),具體數(shù)據(jù):" + input.toString());}}
]# nc -lk 8086
1
2
3
4
5
分組 key2 的窗口,在時(shí)間區(qū)間: 2023-06-27 16:50:10-2023-06-27 16:50:20 產(chǎn)生3條數(shù)據(jù),具體數(shù)據(jù):[(key2,1), (key2,3), (key2,5)]
分組 key1 的窗口,在時(shí)間區(qū)間: 2023-06-27 16:50:10-2023-06-27 16:50:20 產(chǎn)生2條數(shù)據(jù),具體數(shù)據(jù):[(key1,2), (key1,4)]
ProcessWindowFunction
// 使用ProcessWindowFunction處理窗口函數(shù)SingleOutputStreamOperator<String> process = windowedStream.process(new MyProcessWindowFunction());
/*** 處理窗口函數(shù)* <p>* 窗口觸發(fā)時(shí)才會(huì)調(diào)用一次,統(tǒng)一計(jì)算窗口的所有數(shù)據(jù)*/public static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow> {/*** @param s 分組的key,非鍵分區(qū)窗口則無(wú)該參數(shù)* @param context 上下文* @param input 存的數(shù)據(jù)* @param out 采集器* @throws Exception*/@Overridepublic void process(String s, Context context, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {// 上下文拿到window對(duì)象,獲取相關(guān)信息long start = context.window().getStart();long end = context.window().getEnd();String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss");String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss");long count = input.spliterator().estimateSize();out.collect("分組 " + s + " 的窗口,在時(shí)間區(qū)間: " + windowStart + "-" + windowEnd + " 產(chǎn)生" + count + "條數(shù)據(jù),具體數(shù)據(jù):" + input.toString());}}
增量聚合和全窗口函數(shù)的結(jié)合
在調(diào)用窗口的增量聚合函數(shù)方法時(shí),第一個(gè)參數(shù)直接傳入一個(gè)ReduceFunction或AggregateFunction進(jìn)行增量聚合,第二個(gè)參數(shù)傳入一個(gè)全窗口函數(shù)WindowFunction或者ProcessWindowFunction。
基于第一個(gè)參數(shù)(增量聚合函數(shù))來(lái)處理窗口數(shù)據(jù),每來(lái)一個(gè)數(shù)據(jù)就做一次聚合等到窗口需要觸發(fā)計(jì)算時(shí),則調(diào)用第二個(gè)參數(shù)(全窗口函數(shù))的處理邏輯輸出結(jié)果注意這里的全窗口函數(shù)就不再緩存所有數(shù)據(jù)了,而是直接將增量聚合函數(shù)的結(jié)果拿來(lái)當(dāng)作了Iterable類(lèi)型的輸入
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 從socket接收數(shù)據(jù)流SingleOutputStreamOperator<String> source = env.socketTextStream("node01", 8086);// 將輸入數(shù)據(jù)轉(zhuǎn)換為IntegerDataStream<Integer> dataStream = source.map(str -> Integer.parseInt(str));// 指定窗口分配器 非鍵分區(qū)窗口AllWindowedStream<Integer, TimeWindow> allWindowedStream = dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 使用ProcessWindowFunction處理窗口函數(shù)SingleOutputStreamOperator<String> process = allWindowedStream.aggregate(new MyAggregateFunction(), new MyProcessWindowFunction());process.print();env.execute();}/*** 第一個(gè)類(lèi)型: 輸入數(shù)據(jù)的類(lèi)型* 第二個(gè)類(lèi)型: 累加器的類(lèi)型,存儲(chǔ)的中間計(jì)算結(jié)果的類(lèi)型* 第三個(gè)類(lèi)型: 輸出的類(lèi)型*/public static class MyAggregateFunction implements AggregateFunction<Integer, Integer, String> {/*** 創(chuàng)建累加器,初始化累加器** @return*/@Overridepublic Integer createAccumulator() {System.out.println("createAccumulator方法執(zhí)行");return 0;}/*** 聚合邏輯* 來(lái)一條計(jì)算一條,調(diào)用一次add方法** @param value 當(dāng)前值* @param accumulator 累加器的值* @return*/@Overridepublic Integer add(Integer value, Integer accumulator) {System.out.println("add方法執(zhí)行,當(dāng)前值 :" + value + " 累加器值 :" + accumulator);return value + accumulator;}/*** 獲取最終結(jié)果,窗口觸發(fā)時(shí)輸出** @param accumulator* @return*/@Overridepublic String getResult(Integer accumulator) {System.out.println("getResult方法執(zhí)行");return "最終計(jì)算值:" + accumulator;}@Overridepublic Integer merge(Integer a, Integer b) {// 只有會(huì)話(huà)窗口才會(huì)用到System.out.println("merge方法執(zhí)行");return null;}}/*** 處理窗口函數(shù)* <p>* 窗口觸發(fā)時(shí)才會(huì)調(diào)用一次,統(tǒng)一計(jì)算窗口的所有數(shù)據(jù)* <p>* 注意:增量聚合函數(shù)的輸出類(lèi)型 是 全窗口函數(shù)的輸入類(lèi)型*/public static class MyProcessWindowFunction extends ProcessAllWindowFunction<String, String, TimeWindow> {/*** @param context 上下文* @param input 存的數(shù)據(jù)* @param out 采集器* @throws Exception*/@Overridepublic void process(Context context, Iterable<String> input, Collector<String> out) throws Exception {// 上下文拿到window對(duì)象,獲取相關(guān)信息long start = context.window().getStart();long end = context.window().getEnd();String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss");String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss");long count = input.spliterator().estimateSize();out.collect("窗口在時(shí)間區(qū)間: " + windowStart + "-" + windowEnd + " 產(chǎn)生" + count + "條數(shù)據(jù),具體數(shù)據(jù):" + input.toString());}}
createAccumulator方法執(zhí)行
add方法執(zhí)行,當(dāng)前值 :1 累加器值 :0
add方法執(zhí)行,當(dāng)前值 :2 累加器值 :1
add方法執(zhí)行,當(dāng)前值 :3 累加器值 :3
add方法執(zhí)行,當(dāng)前值 :4 累加器值 :6
add方法執(zhí)行,當(dāng)前值 :5 累加器值 :10
getResult方法執(zhí)行
窗口在時(shí)間區(qū)間: 2023-06-27 17:07:50-2023-06-27 17:08:00 產(chǎn)生1條數(shù)據(jù),具體數(shù)據(jù):[最終計(jì)算值:15]
其他
觸發(fā)器 Trigger
Trigger決定了一個(gè)窗口(由windowassigner定義)何時(shí)可以被windowfunction處理。每個(gè)WindowAssigner都有一個(gè)默認(rèn)的Trigger。如果默認(rèn)trigger無(wú)法滿(mǎn)足需要,可以在trigger(…)調(diào)用中指定自定義的trigger。
Trigger接口提供了五個(gè)方法來(lái)響應(yīng)不同的事件:
onElement()方法在每個(gè)元素被加入窗口時(shí)調(diào)用onEventTime()方法在注冊(cè)的event-timetimer觸發(fā)時(shí)調(diào)用onProcessingTime()方法在注冊(cè)的processing-timetimer觸發(fā)時(shí)調(diào)用onMerge()方法與有狀態(tài)的trigger相關(guān)。該方法會(huì)在兩個(gè)窗口合并時(shí),將窗口對(duì)應(yīng)trigger的狀態(tài)進(jìn)行合并,比如使用會(huì)話(huà)窗口時(shí)clear()方法處理在對(duì)應(yīng)窗口被移除時(shí)所需的邏輯
注意:
前三個(gè)方法通過(guò)返回TriggerResult來(lái)決定trigger如何應(yīng)對(duì)到達(dá)窗口的事件。
應(yīng)對(duì)方案:
CONTINUE: 什么也不做FIRE: 觸發(fā)計(jì)算PURGE: 清空窗口內(nèi)的元素FIRE_AND_PURGE: 觸發(fā)計(jì)算,計(jì)算結(jié)束后清空窗口內(nèi)的元素
內(nèi)置觸發(fā)器
EventTimeTrigger:基于事件時(shí)間和watermark機(jī)制來(lái)對(duì)窗口進(jìn)行觸發(fā)計(jì)算ProcessingTimeTrigger: 基于處理時(shí)間觸發(fā)CountTrigger:窗口元素?cái)?shù)超過(guò)預(yù)先給定的限制值的話(huà)會(huì)觸發(fā)計(jì)算PurgingTrigger:作為其它trigger的參數(shù),將其轉(zhuǎn)化為一個(gè)purging觸發(fā)器
基于WindowedStream調(diào)用.trigger()方法,就可以傳入一個(gè)自定義的窗口觸發(fā)器
stream.keyBy(...).window(...).trigger(new MyTrigger())
移除器 Evictor
Evictor可以在 trigger 觸發(fā)后、調(diào)用窗口函數(shù)之前或之后從窗口中刪除元素。Evictor是一個(gè)接口,不同的窗口類(lèi)型都有各自預(yù)實(shí)現(xiàn)的移除器。
內(nèi)置evictor:
默認(rèn)情況下,所有內(nèi)置的 evictor 邏輯都在調(diào)用窗口函數(shù)前執(zhí)行。
CountEvictor: 僅記錄用戶(hù)指定數(shù)量的元素,一旦窗口中的元素超過(guò)這個(gè)數(shù)量,多余的元素會(huì)從窗口緩存的開(kāi)頭移除DeltaEvictor: 接收 DeltaFunction 和 threshold 參數(shù),計(jì)算最后一個(gè)元素與窗口緩存中所有元素的差值, 并移除差值大于或等于 threshold 的元素。TimeEvictor: 接收 interval 參數(shù),以毫秒表示。 它會(huì)找到窗口中元素的最大 timestamp max_ts 并移除比 max_ts - interval 小的所有元素
基于WindowedStream調(diào)用.evictor()方法,就可以傳入一個(gè)自定義的移除器
stream.keyBy(...).window(...).evictor(new MyEvictor())