中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

如何進(jìn)行企業(yè)營銷型網(wǎng)站建設(shè)百度商家入駐怎么做

如何進(jìn)行企業(yè)營銷型網(wǎng)站建設(shè),百度商家入駐怎么做,做電商需要知道的幾個網(wǎng)站嗎,無貨源網(wǎng)店哪個平臺好窗口理解 窗口(Window)是處理無界流的關(guān)鍵所在。窗口可以將數(shù)據(jù)流裝入大小有限的“桶”中,再對每個“桶”加以處理。 本文的重心將放在 Flink 如何進(jìn)行窗口操作以及開發(fā)者如何盡可能地利用 Flink 所提供的功能。 對窗口的正確理解&#xff…

窗口理解

窗口(Window)是處理無界流的關(guān)鍵所在。窗口可以將數(shù)據(jù)流裝入大小有限的“桶”中,再對每個“桶”加以處理。 本文的重心將放在 Flink 如何進(jìn)行窗口操作以及開發(fā)者如何盡可能地利用 Flink 所提供的功能。

對窗口的正確理解
我們將窗口理解為一個一個的水桶,數(shù)據(jù)流(stream)就像水流,每個數(shù)據(jù)都會分發(fā)到對應(yīng)的桶中,當(dāng)達(dá)到結(jié)束時間時,對每個桶中收集的數(shù)據(jù)進(jìn)行計算處理
在這里插入圖片描述

Flink中窗口并不是靜態(tài)準(zhǔn)備好的,而是動態(tài)創(chuàng)建——當(dāng)有落在這個窗口區(qū)間范圍的數(shù)據(jù)達(dá)到時,才創(chuàng)建對應(yīng)的窗口

窗口的分類

按照驅(qū)動類型分

時間窗口(Time Window)

以時間來定義窗口的開始和結(jié)束,獲取某一段時間內(nèi)的數(shù)據(jù)(類比于我們的定時發(fā)車

計數(shù)窗口(Count Window)

計數(shù)窗口是基于元素的個數(shù)來獲取窗口,達(dá)到固定個數(shù)時就計算并關(guān)閉窗口。(類比于我們的人齊才發(fā)車

按照窗口分配數(shù)據(jù)的規(guī)則分類

滾動窗口(Tumbling Window)

窗口之間沒有重疊,也不會有間隔的首尾相撞狀態(tài),這樣,每個數(shù)據(jù)都會被分到一個窗口,而且只會屬于一個窗口。
滾動窗口的應(yīng)用非常廣泛,它可以對每個時間段做聚合統(tǒng)計,很多BI分析指標(biāo)都可以用它來實(shí)現(xiàn)。
在這里插入圖片描述

DataStream<T> input = ...;// 滾動 event-time 窗口
input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.seconds(5))).<windowed transformation>(<window function>);// 滾動 processing-time 窗口
input.keyBy(<key selector>).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).<windowed transformation>(<window function>);// 長度為一天的滾動 event-time 窗口, 偏移量為 -8 小時。
input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))).<windowed transformation>(<window function>);

滑動窗口(Sliding Windows)

滑動窗口大小也是固定的,但是窗口之間并不是首尾相接的,而是重疊的。
在這里插入圖片描述

DataStream<T> input = ...;// 滑動 event-time 窗口
input.keyBy(<key selector>).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).<windowed transformation>(<window function>);// 滑動 processing-time 窗口
input.keyBy(<key selector>).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).<windowed transformation>(<window function>);// 滑動 processing-time 窗口,偏移量為 -8 小時
input.keyBy(<key selector>).window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))).<windowed transformation>(<window function>);

會話窗口(Session Windows)

會話窗口,是基于“會話”(session)來對數(shù)據(jù)進(jìn)行分組的,會話窗口只能基于時間來定義。
在這里插入圖片描述

DataStream<T> input = ...;// 設(shè)置了固定間隔的 event-time 會話窗口
input.keyBy(<key selector>).window(EventTimeSessionWindows.withGap(Time.minutes(10))).<windowed transformation>(<window function>);// 設(shè)置了動態(tài)間隔的 event-time 會話窗口
input.keyBy(<key selector>).window(EventTimeSessionWindows.withDynamicGap((element) -> {// 決定并返回會話間隔})).<windowed transformation>(<window function>);// 設(shè)置了固定間隔的 processing-time session 窗口
input.keyBy(<key selector>).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))).<windowed transformation>(<window function>);// 設(shè)置了動態(tài)間隔的 processing-time 會話窗口
input.keyBy(<key selector>).window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {// 決定并返回會話間隔})).<windowed transformation>(<window function>);

全局窗口

這種窗口對全局有效,會把相同的key的所有數(shù)據(jù)分配到同一個窗口中,這種窗口沒有結(jié)束時間,默認(rèn)不會觸發(fā)計算,如果希望對數(shù)據(jù)進(jìn)行處理,需要自定義“觸發(fā)器”。
在這里插入圖片描述

DataStream<T> input = ...;input.keyBy(<key selector>).window(GlobalWindows.create()).<windowed transformation>(<window function>);

計數(shù)窗口

計數(shù)窗口概念非常簡單,本身底層是基于全局窗口(Global Window)實(shí)現(xiàn)的。Flink為我們提供了非常方便的接口:直接調(diào)用.countWindow()方法

滾動計數(shù)窗口

滾動計數(shù)窗口只需要傳入一個長整型的參數(shù)size,表示窗口的大小。

stream.keyBy(...).countWindow(10)
滑動計數(shù)窗口

與滾動計數(shù)窗口類似,不過需要在.countWindow()調(diào)用時傳入兩個參數(shù):size和slide,前者表示窗口大小,后者表示滑動步長。

stream.keyBy(...).countWindow(103)

窗口函數(shù)(Window Functions)

定義了 window assigner 之后,我們需要指定當(dāng)窗口觸發(fā)之后,我們?nèi)绾斡嬎忝總€窗口中的數(shù)據(jù), 這就是 window function 的職責(zé)了
窗口函數(shù)有三種:ReduceFunction、AggregateFunction 或 ProcessWindowFunction。

ReduceFunction

ReduceFunction 指定兩條輸入數(shù)據(jù)如何合并起來產(chǎn)生一條輸出數(shù)據(jù),輸入和輸出數(shù)據(jù)的類型必須相同。 Flink 使用 ReduceFunction 對窗口中的數(shù)據(jù)進(jìn)行增量聚合。

DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).reduce(new ReduceFunction<Tuple2<String, Long>>() {//v1 和v2是 2個相同類型的輸入?yún)?shù)public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {return new Tuple2<>(v1.f0, v1.f1 + v2.f1);}});

AggregateFunction

ReduceFunction 是 AggregateFunction 的特殊情況。 AggregateFunction 接收三個類型:輸入數(shù)據(jù)的類型(IN)、累加器的類型(ACC)和輸出數(shù)據(jù)的類型(OUT)。

/*** The accumulator is used to keep a running sum and a count. The {@code getResult} method* computes the average.*/
private static class AverageAggregateimplements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {@Overridepublic Tuple2<Long, Long> createAccumulator() {return new Tuple2<>(0L, 0L);}@Overridepublic Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);}@Overridepublic Double getResult(Tuple2<Long, Long> accumulator) {return ((double) accumulator.f0) / accumulator.f1;}@Overridepublic Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);}
}DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).aggregate(new AverageAggregate());

接口中有四個方法:

  • createAccumulator():創(chuàng)建一個累加器,這就是為聚合創(chuàng)建了一個初始狀態(tài),每個聚合任務(wù)只會調(diào)用一次。
  • add():將輸入的元素添加到累加器中。
  • getResult():從累加器中提取聚合的輸出結(jié)果。
  • merge():合并兩個累加器,并將合并后的狀態(tài)作為一個累加器返回。

可以看到,AggregateFunction的工作原理是:首先調(diào)用createAccumulator()為任務(wù)初始化一個狀態(tài)(累加器);而后每來一個數(shù)據(jù)就調(diào)用一次add()方法,對數(shù)據(jù)進(jìn)行聚合,得到的結(jié)果保存在狀態(tài)中;等到了窗口需要輸出時,再調(diào)用getResult()方法得到計算結(jié)果。很明顯,與ReduceFunction相同,AggregateFunction也是增量式的聚合;而由于輸入、中間狀態(tài)、輸出的類型可以不同,使得應(yīng)用更加靈活方便。

ProcessWindowFunction

ProcessWindowFunction 有能獲取包含窗口內(nèi)所有元素的 Iterable, 以及用來獲取時間和狀態(tài)信息的 Context 對象,比其他窗口函數(shù)更加靈活。 ProcessWindowFunction 的靈活性是以性能和資源消耗為代價的, 因?yàn)榇翱谥械臄?shù)據(jù)無法被增量聚合,而需要在窗口觸發(fā)前緩存所有數(shù)據(jù)。

public class WindowProcessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("127.0.0.1", 7777).map(new WaterSensorMapFunction());KeyedStream<WaterSensor, String> keyedStream = sensorDS.keyBy(WaterSensor::getId);WindowedStream<WaterSensor, String, TimeWindow> sensorWS = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) {// 上下文可以拿到window對象,還有其他東西:側(cè)輸出流 等等long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + key + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "條數(shù)據(jù)===>" + elements);}}).print();env.execute();}
}

增量聚合和全窗口函數(shù)的結(jié)合使用

在實(shí)際應(yīng)用中,我們往往希望兼具這兩者的優(yōu)點(diǎn),把它們結(jié)合在一起使用。
我們之前在調(diào)用WindowedStream的.reduce()和.aggregate()方法時,只是簡單地直接傳入了一個ReduceFunction或AggregateFunction進(jìn)行增量聚合。除此之外,其實(shí)還可以傳入第二個參數(shù):一個全窗口函數(shù),可以是WindowFunction或者ProcessWindowFunction。

// ReduceFunction與WindowFunction結(jié)合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction,WindowFunction<TRKW> function) // ReduceFunction與ProcessWindowFunction結(jié)合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction,ProcessWindowFunction<TRKW> function)// AggregateFunction與WindowFunction結(jié)合
public <ACCVR> SingleOutputStreamOperator<R> aggregate(AggregateFunction<TACCV> aggFunction,WindowFunction<VRKW> windowFunction)// AggregateFunction與ProcessWindowFunction結(jié)合
public <ACCVR> SingleOutputStreamOperator<R> aggregate(AggregateFunction<TACCV> aggFunction,ProcessWindowFunction<VRKW> windowFunction)
http://www.risenshineclean.com/news/9521.html

相關(guān)文章:

  • 用網(wǎng)站做CAN總線通信好嗎上海網(wǎng)絡(luò)seo優(yōu)化公司
  • app開發(fā)方案seo網(wǎng)頁的基礎(chǔ)知識
  • 網(wǎng)站開發(fā)軟件 論文 摘要簡單網(wǎng)頁制作成品免費(fèi)
  • 做支付網(wǎng)站東莞seo推廣公司
  • 西安優(yōu)惠電商平臺網(wǎng)站今日國內(nèi)新聞頭條大事
  • 哪個網(wǎng)站的課件做的好處營銷技巧有哪些
  • 北京企業(yè)網(wǎng)站備案免費(fèi)seo推廣軟件
  • 千里做他千百度網(wǎng)站2023年適合小學(xué)生的新聞
  • 怎樣接做網(wǎng)站的活電商代運(yùn)營公司十強(qiáng)
  • 網(wǎng)站維護(hù)多少錢一個月泉州百度推廣咨詢
  • 網(wǎng)站備案怎么那么麻煩深圳百度推廣聯(lián)系方式
  • 個人可以做幾個網(wǎng)站嗎廊坊網(wǎng)站seo
  • 多個域名 指向同一個網(wǎng)站網(wǎng)站搭建費(fèi)用
  • 一次備案多個網(wǎng)站網(wǎng)絡(luò)營銷戰(zhàn)略的內(nèi)容
  • 哪個網(wǎng)站可以做公務(wù)員題軟文營銷方法有哪些
  • 網(wǎng)站開發(fā)過程的基本環(huán)節(jié)杭州seo顧問
  • 網(wǎng)站建設(shè)與管理管理課程青島seo關(guān)鍵詞排名
  • 免費(fèi)完整版的網(wǎng)站模板做百度推廣代運(yùn)營有用嗎
  • 資訊門戶網(wǎng)站怎么做個人免費(fèi)推廣網(wǎng)站
  • 萊蕪網(wǎng)站網(wǎng)站廣告調(diào)詞平臺
  • 深圳網(wǎng)站制作服濰坊在線制作網(wǎng)站
  • 焦作專業(yè)做網(wǎng)站公司百度競價平臺官網(wǎng)
  • 大王莊網(wǎng)站建設(shè)公司合肥網(wǎng)絡(luò)公司排名
  • 網(wǎng)游開發(fā)培訓(xùn)廣告優(yōu)化師工作內(nèi)容
  • 簽名設(shè)計免費(fèi)版西安搜索引擎優(yōu)化
  • 網(wǎng)站頂部地圖代碼怎么做環(huán)球網(wǎng)廣東疫情最新消息
  • 樂清手機(jī)網(wǎng)站朋友圈廣告投放價格表
  • 做網(wǎng)站要知道哪些代碼微信運(yùn)營技巧
  • ip釣魚網(wǎng)站在線生成seo專業(yè)學(xué)校
  • 電子商務(wù)網(wǎng)站建設(shè)方案目錄白帽seo是什么