制作一個(gè)網(wǎng)站需要多少錢(qián)百度托管公司
增量聚合的 ProcessWindowFunction?#
ProcessWindowFunction
?可以與?ReduceFunction
?或?AggregateFunction
?搭配使用, 使其能夠在數(shù)據(jù)到達(dá)窗口的時(shí)候進(jìn)行增量聚合。當(dāng)窗口關(guān)閉時(shí),ProcessWindowFunction
?將會(huì)得到聚合的結(jié)果。 這樣它就可以增量聚合窗口的元素并且從 ProcessWindowFunction` 中獲得窗口的元數(shù)據(jù)。
你也可以對(duì)過(guò)時(shí)的?WindowFunction
?使用增量聚合。
使用 ReduceFunction 增量聚合?#
- 下例展示了如何將?
ReduceFunction
?與?ProcessWindowFunction
?組合,返回窗口中的最小元素和窗口的開(kāi)始時(shí)間。
?
DataStream<SensorReading> input = ...;input.keyBy(<key selector>).window(<window assigner>).reduce(new MyReduceFunction(), new MyProcessWindowFunction());// Function definitionsprivate static class MyReduceFunction implements ReduceFunction<SensorReading> {public SensorReading reduce(SensorReading r1, SensorReading r2) {return r1.value() > r2.value() ? r2 : r1;}
}private static class MyProcessWindowFunctionextends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {public void process(String key,Context context,Iterable<SensorReading> minReadings,Collector<Tuple2<Long, SensorReading>> out) {SensorReading min = minReadings.iterator().next();out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));}
}
通俗解釋:窗口開(kāi)始時(shí)間的作用
我們可以用一個(gè)更貼近生活的例子來(lái)理解?窗口開(kāi)始時(shí)間?的意義。
場(chǎng)景比喻:每天上午的「溫度統(tǒng)計(jì)報(bào)告
假設(shè)你有一個(gè)氣象站,每5分鐘記錄一次戶(hù)外溫度?,F(xiàn)在需要?每小時(shí)(例如8:00-9:00)統(tǒng)計(jì)一次該時(shí)段內(nèi)的最低溫度,并在報(bào)告中標(biāo)注這個(gè)小時(shí)段的起始時(shí)間(如“8:00-9:00的最低溫度是15°C”)。
關(guān)鍵點(diǎn)
-
窗口開(kāi)始時(shí)間:就是時(shí)間段的起點(diǎn)(如8:00)。
-
窗口結(jié)束時(shí)間:就是時(shí)間段的終點(diǎn)(如9:00)。
-
為什么要記錄開(kāi)始時(shí)間?
方便人類(lèi)理解數(shù)據(jù)屬于哪個(gè)時(shí)段(比如“8點(diǎn)檔”的數(shù)據(jù))。
代碼示例解析
1. 窗口如何劃分?
假設(shè)使用?滾動(dòng)窗口(Tumbling Window),每1小時(shí)劃分一次:
Copy
8:00-9:00 → 窗口1 9:00-10:00 → 窗口2
所有時(shí)間戳在8:00≤t<9:00的數(shù)據(jù)會(huì)被分配到窗口1。
2. 窗口觸發(fā)計(jì)算的時(shí)機(jī)
當(dāng)系統(tǒng)時(shí)間(或事件時(shí)間)到達(dá)9:00時(shí),窗口1關(guān)閉,觸發(fā)計(jì)算:
-
調(diào)用?
MyReduceFunction
?找出該窗口內(nèi)的最低溫度。 -
調(diào)用?
MyProcessWindowFunction
?將結(jié)果與窗口開(kāi)始時(shí)間(8:00)綁定。
3. 為什么輸出的是開(kāi)始時(shí)間(8:00)而不是結(jié)束時(shí)間(9:00)?
-
業(yè)務(wù)需求:通常更關(guān)心數(shù)據(jù)所屬時(shí)段的起點(diǎn)(例如“8點(diǎn)檔的數(shù)據(jù)”)。
-
避免歧義:如果輸出9:00,可能被誤解為“9點(diǎn)檔的數(shù)據(jù)”(實(shí)際是8:00-9:00的數(shù)據(jù))。
代碼中具體如何獲取開(kāi)始時(shí)間?
在?MyProcessWindowFunction
?中:
context.window().getStart(); // 返回窗口的起始時(shí)間戳(如8:00對(duì)應(yīng)的毫秒值)
-
context
?對(duì)象:包含窗口的元信息(起止時(shí)間、觸發(fā)時(shí)間等)。 -
實(shí)際輸出時(shí):將時(shí)間戳轉(zhuǎn)換為人類(lèi)可讀格式(如?
8:00
)。
常見(jiàn)疑問(wèn)解答
Q1:如果數(shù)據(jù)延遲到達(dá)(比如8:59的數(shù)據(jù)在9:05才到),會(huì)進(jìn)入哪個(gè)窗口?
-
取決于時(shí)間語(yǔ)義:
-
若使用?事件時(shí)間(Event Time):按數(shù)據(jù)自帶的時(shí)間戳分配到8:00-9:00窗口。
-
若使用?處理時(shí)間(Processing?Time):按到達(dá)系統(tǒng)的時(shí)間分配到9:00-10:00窗口。
-
(示例代碼未顯式設(shè)置時(shí)間語(yǔ)義,默認(rèn)可能是處理時(shí)間)
-
Q2:窗口開(kāi)始時(shí)間是如何計(jì)算的?
-
由窗口分配器(Window Assigner)決定:
-
滾動(dòng)窗口按固定間隔對(duì)齊(如整點(diǎn))。
-
滑動(dòng)窗口按步長(zhǎng)對(duì)齊(如每30分鐘滑動(dòng)一次的1小時(shí)窗口)。
-
會(huì)話(huà)窗口根據(jù)數(shù)據(jù)活躍度動(dòng)態(tài)劃分。
-
Q3:可以同時(shí)輸出開(kāi)始時(shí)間和結(jié)束時(shí)間嗎?
可以!修改?ProcessWindowFunction
:
out.collect(new Tuple3<>(context.window().getStart(), context.window().getEnd(), min));
總結(jié)
-
窗口開(kāi)始時(shí)間?標(biāo)記了數(shù)據(jù)所屬時(shí)間段的起點(diǎn)(如“8:00檔”)。
-
在 Flink 中,通過(guò)?
ProcessWindowFunction
?的?context
?可以輕松獲取這一信息。 -
這種設(shè)計(jì)讓數(shù)據(jù)處理結(jié)果更易理解(如統(tǒng)計(jì)報(bào)告、監(jiān)控儀表盤(pán))。