如何進(jìn)行企業(yè)營銷型網(wǎng)站建設(shè)百度商家入駐怎么做
窗口理解
窗口(Window)是處理無界流的關(guān)鍵所在。窗口可以將數(shù)據(jù)流裝入大小有限的“桶”中,再對每個“桶”加以處理。 本文的重心將放在 Flink 如何進(jìn)行窗口操作以及開發(fā)者如何盡可能地利用 Flink 所提供的功能。
對窗口的正確理解:
我們將窗口理解為一個一個的水桶,數(shù)據(jù)流(stream)就像水流,每個數(shù)據(jù)都會分發(fā)到對應(yīng)的桶中,當(dāng)達(dá)到結(jié)束時間時,對每個桶中收集的數(shù)據(jù)進(jìn)行計算處理
注:
Flink中窗口并不是靜態(tài)準(zhǔn)備好的,而是動態(tài)創(chuàng)建——當(dāng)有落在這個窗口區(qū)間范圍的數(shù)據(jù)達(dá)到時,才創(chuàng)建對應(yīng)的窗口
窗口的分類
按照驅(qū)動類型分
時間窗口(Time Window)
以時間來定義窗口的開始和結(jié)束,獲取某一段時間內(nèi)的數(shù)據(jù)(類比于我們的定時發(fā)車)
計數(shù)窗口(Count Window)
計數(shù)窗口是基于元素的個數(shù)來獲取窗口,達(dá)到固定個數(shù)時就計算并關(guān)閉窗口。(類比于我們的人齊才發(fā)車)
按照窗口分配數(shù)據(jù)的規(guī)則分類
滾動窗口(Tumbling Window)
窗口之間沒有重疊,也不會有間隔的首尾相撞狀態(tài),這樣,每個數(shù)據(jù)都會被分到一個窗口,而且只會屬于一個窗口。
滾動窗口的應(yīng)用非常廣泛,它可以對每個時間段做聚合統(tǒng)計,很多BI分析指標(biāo)都可以用它來實(shí)現(xiàn)。
DataStream<T> input = ...;// 滾動 event-time 窗口
input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.seconds(5))).<windowed transformation>(<window function>);// 滾動 processing-time 窗口
input.keyBy(<key selector>).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).<windowed transformation>(<window function>);// 長度為一天的滾動 event-time 窗口, 偏移量為 -8 小時。
input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))).<windowed transformation>(<window function>);
滑動窗口(Sliding Windows)
滑動窗口大小也是固定的,但是窗口之間并不是首尾相接的,而是重疊的。
DataStream<T> input = ...;// 滑動 event-time 窗口
input.keyBy(<key selector>).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).<windowed transformation>(<window function>);// 滑動 processing-time 窗口
input.keyBy(<key selector>).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).<windowed transformation>(<window function>);// 滑動 processing-time 窗口,偏移量為 -8 小時
input.keyBy(<key selector>).window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))).<windowed transformation>(<window function>);
會話窗口(Session Windows)
會話窗口,是基于“會話”(session)來對數(shù)據(jù)進(jìn)行分組的,會話窗口只能基于時間來定義。
DataStream<T> input = ...;// 設(shè)置了固定間隔的 event-time 會話窗口
input.keyBy(<key selector>).window(EventTimeSessionWindows.withGap(Time.minutes(10))).<windowed transformation>(<window function>);// 設(shè)置了動態(tài)間隔的 event-time 會話窗口
input.keyBy(<key selector>).window(EventTimeSessionWindows.withDynamicGap((element) -> {// 決定并返回會話間隔})).<windowed transformation>(<window function>);// 設(shè)置了固定間隔的 processing-time session 窗口
input.keyBy(<key selector>).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))).<windowed transformation>(<window function>);// 設(shè)置了動態(tài)間隔的 processing-time 會話窗口
input.keyBy(<key selector>).window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {// 決定并返回會話間隔})).<windowed transformation>(<window function>);
全局窗口
這種窗口對全局有效,會把相同的key的所有數(shù)據(jù)分配到同一個窗口中,這種窗口沒有結(jié)束時間,默認(rèn)不會觸發(fā)計算,如果希望對數(shù)據(jù)進(jìn)行處理,需要自定義“觸發(fā)器”。
DataStream<T> input = ...;input.keyBy(<key selector>).window(GlobalWindows.create()).<windowed transformation>(<window function>);
計數(shù)窗口
計數(shù)窗口概念非常簡單,本身底層是基于全局窗口(Global Window)實(shí)現(xiàn)的。Flink為我們提供了非常方便的接口:直接調(diào)用.countWindow()方法
滾動計數(shù)窗口
滾動計數(shù)窗口只需要傳入一個長整型的參數(shù)size,表示窗口的大小。
stream.keyBy(...).countWindow(10)
滑動計數(shù)窗口
與滾動計數(shù)窗口類似,不過需要在.countWindow()調(diào)用時傳入兩個參數(shù):size和slide,前者表示窗口大小,后者表示滑動步長。
stream.keyBy(...).countWindow(10,3)
窗口函數(shù)(Window Functions)
定義了 window assigner 之后,我們需要指定當(dāng)窗口觸發(fā)之后,我們?nèi)绾斡嬎忝總€窗口中的數(shù)據(jù), 這就是 window function 的職責(zé)了
窗口函數(shù)有三種:ReduceFunction、AggregateFunction 或 ProcessWindowFunction。
ReduceFunction
ReduceFunction 指定兩條輸入數(shù)據(jù)如何合并起來產(chǎn)生一條輸出數(shù)據(jù),輸入和輸出數(shù)據(jù)的類型必須相同。 Flink 使用 ReduceFunction 對窗口中的數(shù)據(jù)進(jìn)行增量聚合。
DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).reduce(new ReduceFunction<Tuple2<String, Long>>() {//v1 和v2是 2個相同類型的輸入?yún)?shù)public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {return new Tuple2<>(v1.f0, v1.f1 + v2.f1);}});
AggregateFunction
ReduceFunction 是 AggregateFunction 的特殊情況。 AggregateFunction 接收三個類型:輸入數(shù)據(jù)的類型(IN)、累加器的類型(ACC)和輸出數(shù)據(jù)的類型(OUT)。
/*** The accumulator is used to keep a running sum and a count. The {@code getResult} method* computes the average.*/
private static class AverageAggregateimplements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {@Overridepublic Tuple2<Long, Long> createAccumulator() {return new Tuple2<>(0L, 0L);}@Overridepublic Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);}@Overridepublic Double getResult(Tuple2<Long, Long> accumulator) {return ((double) accumulator.f0) / accumulator.f1;}@Overridepublic Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);}
}DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).aggregate(new AverageAggregate());
接口中有四個方法:
- createAccumulator():創(chuàng)建一個累加器,這就是為聚合創(chuàng)建了一個初始狀態(tài),每個聚合任務(wù)只會調(diào)用一次。
- add():將輸入的元素添加到累加器中。
- getResult():從累加器中提取聚合的輸出結(jié)果。
- merge():合并兩個累加器,并將合并后的狀態(tài)作為一個累加器返回。
可以看到,AggregateFunction的工作原理是:首先調(diào)用createAccumulator()為任務(wù)初始化一個狀態(tài)(累加器);而后每來一個數(shù)據(jù)就調(diào)用一次add()方法,對數(shù)據(jù)進(jìn)行聚合,得到的結(jié)果保存在狀態(tài)中;等到了窗口需要輸出時,再調(diào)用getResult()方法得到計算結(jié)果。很明顯,與ReduceFunction相同,AggregateFunction也是增量式的聚合;而由于輸入、中間狀態(tài)、輸出的類型可以不同,使得應(yīng)用更加靈活方便。
ProcessWindowFunction
ProcessWindowFunction 有能獲取包含窗口內(nèi)所有元素的 Iterable, 以及用來獲取時間和狀態(tài)信息的 Context 對象,比其他窗口函數(shù)更加靈活。 ProcessWindowFunction 的靈活性是以性能和資源消耗為代價的, 因?yàn)榇翱谥械臄?shù)據(jù)無法被增量聚合,而需要在窗口觸發(fā)前緩存所有數(shù)據(jù)。
public class WindowProcessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("127.0.0.1", 7777).map(new WaterSensorMapFunction());KeyedStream<WaterSensor, String> keyedStream = sensorDS.keyBy(WaterSensor::getId);WindowedStream<WaterSensor, String, TimeWindow> sensorWS = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) {// 上下文可以拿到window對象,還有其他東西:側(cè)輸出流 等等long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + key + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "條數(shù)據(jù)===>" + elements);}}).print();env.execute();}
}
增量聚合和全窗口函數(shù)的結(jié)合使用
在實(shí)際應(yīng)用中,我們往往希望兼具這兩者的優(yōu)點(diǎn),把它們結(jié)合在一起使用。
我們之前在調(diào)用WindowedStream的.reduce()和.aggregate()方法時,只是簡單地直接傳入了一個ReduceFunction或AggregateFunction進(jìn)行增量聚合。除此之外,其實(shí)還可以傳入第二個參數(shù):一個全窗口函數(shù),可以是WindowFunction或者ProcessWindowFunction。
// ReduceFunction與WindowFunction結(jié)合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction,WindowFunction<T,R,K,W> function) // ReduceFunction與ProcessWindowFunction結(jié)合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction,ProcessWindowFunction<T,R,K,W> function)// AggregateFunction與WindowFunction結(jié)合
public <ACC,V,R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T,ACC,V> aggFunction,WindowFunction<V,R,K,W> windowFunction)// AggregateFunction與ProcessWindowFunction結(jié)合
public <ACC,V,R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T,ACC,V> aggFunction,ProcessWindowFunction<V,R,K,W> windowFunction)