中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁(yè) > news >正文

注冊(cè)網(wǎng)站應(yīng)注意事項(xiàng)b站24小時(shí)自助下單平臺(tái)網(wǎng)站

注冊(cè)網(wǎng)站應(yīng)注意事項(xiàng),b站24小時(shí)自助下單平臺(tái)網(wǎng)站,電子商務(wù)網(wǎng)站建設(shè)交印花稅嗎,店面設(shè)計(jì)費(fèi)一般多少錢(qián)一平窗口Window機(jī)制 窗口概述窗口的分類(lèi)是否按鍵分區(qū)按鍵分區(qū)窗口非按鍵分區(qū) 按照驅(qū)動(dòng)類(lèi)型按具體分配規(guī)則滾動(dòng)窗口Tumbling Windows滑動(dòng)窗口 Sliding Windows會(huì)話(huà)窗口 Session Windows全局窗口 Global Windows 時(shí)間語(yǔ)義窗口分配器 Window Assigners時(shí)間窗口計(jì)數(shù)窗口例子 窗口函數(shù) W…

窗口Window機(jī)制

  • 窗口概述
  • 窗口的分類(lèi)
    • 是否按鍵分區(qū)
      • 按鍵分區(qū)窗口
      • 非按鍵分區(qū)
    • 按照驅(qū)動(dòng)類(lèi)型
    • 按具體分配規(guī)則
      • 滾動(dòng)窗口Tumbling Windows
      • 滑動(dòng)窗口 Sliding Windows
      • 會(huì)話(huà)窗口 Session Windows
      • 全局窗口 Global Windows
  • 時(shí)間語(yǔ)義
  • 窗口分配器 Window Assigners
    • 時(shí)間窗口
    • 計(jì)數(shù)窗口
    • 例子
  • 窗口函數(shù) Window Functions
    • 增量聚合函數(shù)
      • ReduceFunction
      • AggregateFunction
    • 全窗/全量口函數(shù)
      • WindowFunction
      • ProcessWindowFunction
      • 增量聚合和全窗口函數(shù)的結(jié)合
  • 其他
    • 觸發(fā)器 Trigger
    • 移除器 Evictor

窗口概述

在大多數(shù)場(chǎng)景下,需要統(tǒng)計(jì)的數(shù)據(jù)流都是無(wú)界的,因此無(wú)法等待整個(gè)數(shù)據(jù)流終止后才進(jìn)行統(tǒng)計(jì)。通常情況下,只需要對(duì)某個(gè)時(shí)間范圍或者數(shù)量范圍內(nèi)的數(shù)據(jù)進(jìn)行統(tǒng)計(jì)分析

例如:

每隔10分鐘統(tǒng)計(jì)一次過(guò)去30分鐘內(nèi)某個(gè)對(duì)象的點(diǎn)擊量每發(fā)生100次點(diǎn)擊后,就去統(tǒng)計(jì)一下每個(gè)對(duì)象點(diǎn)擊率的占比

因此,在Apache Flink中,窗口是對(duì)無(wú)界數(shù)據(jù)流進(jìn)行有界處理的機(jī)制。窗口可以將無(wú)限的數(shù)據(jù)流劃分為有限的、可處理的塊,使得可以基于這些有限的數(shù)據(jù)塊執(zhí)行聚合、計(jì)算和分析操作。

窗口的分類(lèi)

是否按鍵分區(qū)

在定義窗口操作之前,首先需要確定,到底是基于按鍵分區(qū)的數(shù)據(jù)流KeyedStream來(lái)開(kāi)窗,還是直接在沒(méi)有按鍵分區(qū)的DataStream上開(kāi)窗。

兩者區(qū)別:

1.keyed streams要調(diào)用keyBy(...)后再調(diào)用window(...) , 而non-keyed streams只用直接調(diào)用windowAll(...)2.對(duì)于keyed stream,其中數(shù)據(jù)的任何屬性都可以作為key。 允許窗口計(jì)算由多個(gè)task并行,因?yàn)槊總€(gè)邏輯上的 keyed stream都可以被單獨(dú)處理。 屬于同一個(gè)key的元素會(huì)被發(fā)送到同一個(gè) task。3.對(duì)于non-keyed stream,原始的stream不會(huì)被分割為多個(gè)邏輯上的stream, 所有的窗口計(jì)算會(huì)被同一個(gè) task完成,也就是parallelism為1

按鍵分區(qū)窗口

經(jīng)過(guò)按鍵分區(qū)keyBy操作后,數(shù)據(jù)流會(huì)按照key被分為多條邏輯流,這就是KeyedStream?;贙eyedStream進(jìn)行窗口操作時(shí),窗口計(jì)算會(huì)在多個(gè)并行子任務(wù)上同時(shí)執(zhí)行。相同key的數(shù)據(jù)會(huì)被發(fā)送到同一個(gè)并行子任務(wù),而窗口操作會(huì)基于每個(gè)key進(jìn)行單獨(dú)的處理。所以可以認(rèn)為,每個(gè)key上都定義了一組窗口,各自獨(dú)立地進(jìn)行統(tǒng)計(jì)計(jì)算。

按鍵分區(qū)窗口寫(xiě)法:

stream.keyBy(...)               <-  僅 keyed 窗口需要.window(...)              <-  必填項(xiàng):"assigner"[.trigger(...)]            <-  可選項(xiàng):"trigger" (省略則使用默認(rèn) trigger)[.evictor(...)]            <-  可選項(xiàng):"evictor" (省略則不使用 evictor)[.allowedLateness(...)]    <-  可選項(xiàng):"lateness" (省略則為 0)[.sideOutputLateData(...)] <-  可選項(xiàng):"output tag" (省略則不對(duì)遲到數(shù)據(jù)使用 side output).reduce/aggregate/apply()      <-  必填項(xiàng):"function"[.getSideOutput(...)]      <-  可選項(xiàng):"output tag"

代碼示例:

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 從socket接收數(shù)據(jù)流SingleOutputStreamOperator<String> source = env.socketTextStream("node01", 8086);// 將輸入數(shù)據(jù)轉(zhuǎn)換為(key, value)元組DataStream<Tuple2<String, Integer>> dataStream = source.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2 map(String s) throws Exception {int number = Integer.parseInt(s);String key = number % 2 == 0 ? "key1" : "key2";Tuple2 tuple2 = new Tuple2(key, number);return tuple2;}}).returns(Types.TUPLE(Types.STRING, Types.INT));// keyBy操作KeyedStream<Tuple2<String, Integer>, String> keyBy = dataStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple2) throws Exception {return tuple2.f0;}});// 每10秒統(tǒng)計(jì)一次數(shù)量和SingleOutputStreamOperator<Tuple2<String, Integer>> streamOperator = keyBy.window(TumblingProcessingTimeWindows.of(Time.seconds(10))).sum(1);streamOperator.print();env.execute();}

發(fā)送測(cè)試數(shù)據(jù)

[root@administrator ~]# nc -lk 8086
1
2
3
4

等待10秒后,控制臺(tái)打印如下

(key2,4)
(key1,6)

非按鍵分區(qū)

如果沒(méi)有進(jìn)行keyBy,那么原始的DataStream就不會(huì)分成多條邏輯流。這時(shí)窗口邏輯只能在一個(gè)任務(wù)task上執(zhí)行,就相當(dāng)于并行度變成了1。

非按鍵分區(qū)窗口寫(xiě)法:

stream.windowAll(...)           <-  必填項(xiàng):"assigner"[.trigger(...)]            <-  可選項(xiàng):"trigger" (else default trigger)[.evictor(...)]            <-  可選項(xiàng):"evictor" (else no evictor)[.allowedLateness(...)]    <-  可選項(xiàng):"lateness" (else zero)[.sideOutputLateData(...)] <-  可選項(xiàng):"output tag" (else no side output for late data).reduce/aggregate/apply()      <-  必填項(xiàng):"function"[.getSideOutput(...)]      <-  可選項(xiàng):"output tag"

代碼示例:

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 從socket接收數(shù)據(jù)流SingleOutputStreamOperator<String> source = env.socketTextStream("node01", 8086);// 將輸入數(shù)據(jù)轉(zhuǎn)換為IntegerDataStream<Integer> dataStream = source.map(str -> Integer.parseInt(str));// 每10秒統(tǒng)計(jì)一次數(shù)量和SingleOutputStreamOperator<Integer> streamOperator = dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))).sum(0);streamOperator.print();env.execute();}

按照驅(qū)動(dòng)類(lèi)型

窗口按照驅(qū)動(dòng)類(lèi)型可以分成時(shí)間窗口和計(jì)數(shù)窗口,這兩種窗口類(lèi)型根據(jù)其觸發(fā)機(jī)制和邊界規(guī)則的不同,適用于不同的應(yīng)用場(chǎng)景。

時(shí)間窗口 Time Windows:

時(shí)間窗口根據(jù)事件時(shí)間Event Time或處理時(shí)間Processing Time來(lái)劃分時(shí)間窗口根據(jù)時(shí)間的進(jìn)展劃分?jǐn)?shù)據(jù)流,當(dāng)一個(gè)窗口的時(shí)間到達(dá)或窗口中的元素?cái)?shù)量達(dá)到閾值時(shí),觸發(fā)窗口計(jì)算

計(jì)數(shù)窗口 Count Windows:

計(jì)數(shù)窗口根據(jù)元素的數(shù)量或元素的增量來(lái)劃分計(jì)數(shù)窗口在數(shù)據(jù)流中累積固定數(shù)量的元素后,觸發(fā)窗口計(jì)算窗口的大小可以是固定的,也可以是動(dòng)態(tài)變化的,取決于所設(shè)置的閾值和策略

按具體分配規(guī)則

窗口按照具體的分配規(guī)則,又有滾動(dòng)窗口(Tumbling Window)、滑動(dòng)窗口(Sliding Window)、會(huì)話(huà)窗口(Session Window),以及全局窗口(Global Window)。

滾動(dòng)窗口Tumbling Windows

滾動(dòng)窗口將數(shù)據(jù)流劃分為固定大小的、不重疊的窗口。

例如:將數(shù)據(jù)流按照5秒的滾動(dòng)窗口大小進(jìn)行劃分,每個(gè)窗口包含5秒的數(shù)據(jù)。那么每5秒就會(huì)有一個(gè)窗口被計(jì)算,且一個(gè)新的窗口被創(chuàng)建

在這里插入圖片描述
代碼示例:

DataStream<T> input = ...;// 滾動(dòng) event-time 窗口
input.keyBy(<key selector>)// 間間隔可以用 Time.milliseconds(x)、Time.seconds(x)、Time.minutes(x) 等來(lái)指定.window(TumblingEventTimeWindows.of(Time.seconds(5))).<windowed transformation>(<window function>);// 滾動(dòng) processing-time 窗口
input.keyBy(<key selector>).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).<windowed transformation>(<window function>);// 長(zhǎng)度為一天的滾動(dòng) event-time 窗口, 偏移量為 -8 小時(shí)。
input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))).<windowed transformation>(<window function>);

滑動(dòng)窗口 Sliding Windows

滑動(dòng)窗口將數(shù)據(jù)流劃分為固定大小的窗口,窗口大小通過(guò)window size參數(shù)設(shè)置,需要一個(gè)額外的滑動(dòng)距離window slide參數(shù)來(lái)控制生成新窗口的頻率。

如果slide小于窗口大小,滑動(dòng)窗口可以允許窗口重疊。這種情況下,一個(gè)元素可能會(huì)被分發(fā)到多個(gè)窗口。

例如:將數(shù)據(jù)流按照5秒的滑動(dòng)窗口大小和3秒的滑動(dòng)步長(zhǎng)進(jìn)行劃分,窗口之間有2秒的重疊。

在這里插入圖片描述

DataStream<T> input = ...;// 滑動(dòng) event-time 窗口
input.keyBy(<key selector>).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).<windowed transformation>(<window function>);// 滑動(dòng) processing-time 窗口
input.keyBy(<key selector>).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).<windowed transformation>(<window function>);// 滑動(dòng) processing-time 窗口,偏移量為 -8 小時(shí)
input.keyBy(<key selector>).window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))).<windowed transformation>(<window function>);

會(huì)話(huà)窗口 Session Windows

與滾動(dòng)窗口和滑動(dòng)窗口不同,會(huì)話(huà)窗口不會(huì)相互重疊,且沒(méi)有固定的開(kāi)始或結(jié)束時(shí)間。 會(huì)話(huà)窗口在一段時(shí)間沒(méi)有收到數(shù)據(jù)之后會(huì)關(guān)閉,即在一段不活躍的間隔之后。

如果會(huì)話(huà)窗口有一段時(shí)間沒(méi)有收到數(shù)據(jù), 會(huì)話(huà)窗口會(huì)自動(dòng)關(guān)閉, 這段沒(méi)有收到數(shù)據(jù)的時(shí)間就是會(huì)話(huà)窗口的gap(間隔)

可以配置靜態(tài)的gap, 也可以通過(guò)一個(gè)gap extractor函數(shù)來(lái)定義gap的長(zhǎng)度

當(dāng)時(shí)間超過(guò)了這個(gè)gap, 當(dāng)前的會(huì)話(huà)窗口就會(huì)關(guān)閉, 后序的元素會(huì)被分配到一個(gè)新的會(huì)話(huà)窗口

在這里插入圖片描述

DataStream<T> input = ...;// 設(shè)置了固定間隔的 event-time 會(huì)話(huà)窗口
input.keyBy(<key selector>).window(EventTimeSessionWindows.withGap(Time.minutes(10))).<windowed transformation>(<window function>);// 設(shè)置了動(dòng)態(tài)間隔的 event-time 會(huì)話(huà)窗口
input.keyBy(<key selector>).window(EventTimeSessionWindows.withDynamicGap((element) -> {// 決定并返回會(huì)話(huà)間隔})).<windowed transformation>(<window function>);// 設(shè)置了固定間隔的 processing-time session 窗口
input.keyBy(<key selector>).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))).<windowed transformation>(<window function>);// 設(shè)置了動(dòng)態(tài)間隔的 processing-time 會(huì)話(huà)窗口
input.keyBy(<key selector>).window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {// 決定并返回會(huì)話(huà)間隔})).<windowed transformation>(<window function>);

全局窗口 Global Windows

全局窗口將整個(gè)數(shù)據(jù)流作為一個(gè)窗口進(jìn)行處理,不進(jìn)行分割。全局窗口適用于需要在整個(gè)數(shù)據(jù)流上執(zhí)行聚合操作的場(chǎng)景。

在這里插入圖片描述

DataStream<T> input = ...;input.keyBy(<key selector>).window(GlobalWindows.create()).<windowed transformation>(<window function>);

時(shí)間語(yǔ)義

在Flink的流式操作中, 會(huì)涉及不同的時(shí)間概念,即時(shí)間語(yǔ)義,它是指在數(shù)據(jù)處理中確定事件的時(shí)間基準(zhǔn)的機(jī)制。

在實(shí)時(shí)數(shù)據(jù)流處理中,常見(jiàn)的時(shí)間語(yǔ)義有以下三種:

1.處理時(shí)間(Processing Time):

處理時(shí)間是指數(shù)據(jù)處理引擎的本地時(shí)鐘時(shí)間,也稱(chēng)為機(jī)器時(shí)間或系統(tǒng)時(shí)間使用處理時(shí)間時(shí),事件的時(shí)間順序是根據(jù)數(shù)據(jù)到達(dá)處理引擎的順序來(lái)確定的處理時(shí)間是一種簡(jiǎn)單和實(shí)時(shí)性較高的時(shí)間語(yǔ)義,但不考慮數(shù)據(jù)可能存在的延遲或亂序

2.事件時(shí)間(Event Time):

事件時(shí)間是數(shù)據(jù)流中記錄的實(shí)際時(shí)間,通常是數(shù)據(jù)本身攜帶的時(shí)間戳使用事件時(shí)間時(shí),數(shù)據(jù)記錄的時(shí)間戳決定事件在時(shí)間軸上的順序,而不受數(shù)據(jù)到達(dá)引擎的順序影響事件時(shí)間是一種準(zhǔn)確和可重現(xiàn)的時(shí)間語(yǔ)義,能夠處理延遲和亂序數(shù)據(jù),但可能需要關(guān)注水印的處理

3.攝取時(shí)間(Ingestion Time):

注意:較新版本的Flink已經(jīng)棄用,推薦使用事件時(shí)間

攝取時(shí)間是數(shù)據(jù)進(jìn)入數(shù)據(jù)處理引擎的時(shí)間使用攝取時(shí)間時(shí),數(shù)據(jù)到達(dá)引擎的順序決定事件的時(shí)間順序攝取時(shí)間是介于處理時(shí)間和事件時(shí)間之間的折中方案。它可以處理一定程度的延遲和亂序數(shù)據(jù),但不會(huì)像事件時(shí)間那樣需要處理水印。

區(qū)別:

處理時(shí)間適用于實(shí)時(shí)性要求較高、不關(guān)心事件的順序和時(shí)間戳的場(chǎng)景事件時(shí)間適用于需要準(zhǔn)確處理事件順序和考慮延遲、亂序數(shù)據(jù)的場(chǎng)景攝取時(shí)間提供了某種程度上的準(zhǔn)確性和實(shí)時(shí)性折中

窗口分配器 Window Assigners

在Apache Flink中,窗口分配器(Window Assigner)用于定義如何將數(shù)據(jù)流中的元素分配到窗口。窗口分配器確定了窗口的邊界以及如何對(duì)元素進(jìn)行分組和分配

窗口分配器最通用的定義方式:

如果是按鍵分區(qū)窗口, 直接調(diào)用.keyBy().window()方法,傳入一個(gè)WindowAssigner作為參數(shù),返回WindowedStream。如果是非按鍵分區(qū)窗口,直接調(diào)用.windowAll()方法,傳入一個(gè)WindowAssigner,返回的是AllWindowedStream。

時(shí)間窗口

時(shí)間窗口是最常用的窗口類(lèi)型,可以大致細(xì)分為滾動(dòng)、滑動(dòng)和會(huì)話(huà)三種。

1.滾動(dòng)處理時(shí)間窗口

窗口分配器由類(lèi)TumblingProcessingTimeWindows提供,需要調(diào)用它的靜態(tài)方法.of(),需要傳入一個(gè)Time類(lèi)型的參數(shù)size,表示滾動(dòng)窗口的大小

// 非按鍵分區(qū) 滾動(dòng)事件時(shí)間窗口,窗口長(zhǎng)度10s。每10秒操作一次
dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 按鍵分區(qū)
dataStream.keyBy().window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

2.滾動(dòng)事件時(shí)間窗口

窗口分配器由類(lèi)TumblingEventTimeWindows提供,用法與滾動(dòng)處理事件窗口完全一致。

dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));

3.滑動(dòng)處理時(shí)間窗口

窗口分配器由類(lèi)SlidingProcessingTimeWindows提供,同樣需要調(diào)用它的靜態(tài)方法.of(),需要傳入兩個(gè)Time類(lèi)型的參數(shù):size和slide,前者表示滑動(dòng)窗口的大小,后者表示滑動(dòng)窗口的滑動(dòng)步長(zhǎng)

//  窗口長(zhǎng)度10s,滑動(dòng)步長(zhǎng)2s。 每2秒滑動(dòng)一次,窗口大小為10秒的滑動(dòng)時(shí)間窗口,并對(duì)窗口中的元素進(jìn)行操作。
dataStream.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2)));

4.滑動(dòng)事件時(shí)間窗口

窗口分配器由類(lèi)SlidingEventTimeWindows提供,用法與滑動(dòng)處理事件窗口完全一致

dataStream.windowAll(SlidingEventTimeWindows.of(Time.seconds(10)Time.seconds(5)));

5.處理時(shí)間會(huì)話(huà)窗口

窗口分配器由類(lèi)ProcessingTimeSessionWindows提供,需要調(diào)用它的靜態(tài)方法withGap()或者withDynamicGap()。需要傳入一個(gè)Time類(lèi)型的參數(shù)size,表示會(huì)話(huà)的超時(shí)時(shí)間

// 會(huì)話(huà)窗口,超時(shí)間隔5s
dataStream.windowAll(ProcessingTimeSessionWindows.withGap(Time.seconds(5)));

6.事件時(shí)間會(huì)話(huà)窗口

窗口分配器由類(lèi)EventTimeSessionWindows提供,用法與處理事件會(huì)話(huà)窗口完全一致。

dataStream.windowAll(EventTimeSessionWindows.withGap(Time.seconds(10)));

計(jì)數(shù)窗口

1.滾動(dòng)計(jì)數(shù)窗口

滾動(dòng)計(jì)數(shù)窗口只需要傳入一個(gè)長(zhǎng)整型的參數(shù)size,表示窗口的大小。

當(dāng)窗口中元素?cái)?shù)量達(dá)到size時(shí),就會(huì)觸發(fā)計(jì)算執(zhí)行并關(guān)閉窗口。

// 滾動(dòng)窗口,窗口長(zhǎng)度2個(gè)元素
dataStream.countWindowAll(2);

2.滑動(dòng)計(jì)數(shù)窗口

在countWindow()調(diào)用時(shí)傳入兩個(gè)參數(shù):size和slide,前者表示窗口大小,后者表示滑動(dòng)步長(zhǎng)。

每個(gè)窗口統(tǒng)計(jì)size個(gè)數(shù)據(jù),每隔slide個(gè)數(shù)據(jù)就統(tǒng)計(jì)輸出一次結(jié)果。

 // 滑動(dòng)窗口,窗口長(zhǎng)度2個(gè)元素,滑動(dòng)步長(zhǎng)2個(gè)元素
dataStream.countWindowAll(5,2);

3.全局窗口

全局窗口是計(jì)數(shù)窗口的底層實(shí)現(xiàn),一般在需要自定義窗口時(shí)使用。它的定義同樣是直接調(diào)用.window(),分配器由GlobalWindows類(lèi)提供。

// 全局窗口,需要自定義的時(shí)候才會(huì)用
dataStream.windowAll(GlobalWindows.create());dataStream.keyBy().window(GlobalWindows.create());

注意:使用全局窗口必須自行定義觸發(fā)器才能實(shí)現(xiàn)窗口計(jì)算,否則不起作用。

例子

 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 從socket接收數(shù)據(jù)流SingleOutputStreamOperator<String> source = env.socketTextStream("node01", 8086);// 將輸入數(shù)據(jù)轉(zhuǎn)換為IntegerDataStream<Integer> dataStream = source.map(str -> Integer.parseInt(str));// 時(shí)間窗口示例:滾動(dòng)處理時(shí)間窗口,窗口長(zhǎng)度10s。 每10秒統(tǒng)計(jì)一次數(shù)量和SingleOutputStreamOperator<Integer> streamOperator = dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))).sum(0);streamOperator.print();env.execute();}

發(fā)送測(cè)試數(shù)據(jù)

[root@administrator ~]# nc -lk 8086
1
2
3
4

等待10秒后,控制臺(tái)打印如下

10

窗口函數(shù) Window Functions

定義了window assigner之后,需要指定當(dāng)窗口觸發(fā)之后,如何計(jì)算每個(gè)窗口中的數(shù)據(jù), 這就是window function的職責(zé)

窗口函數(shù)是在窗口操作中應(yīng)用于窗口中元素的函數(shù)。Flink提供了豐富的窗口函數(shù),用于對(duì)窗口中的元素進(jìn)行各種操作和計(jì)算。

根據(jù)處理的方式可以分為兩類(lèi):增量聚合函數(shù)和全窗/全量口函數(shù),它們是Flink中用于窗口計(jì)算的兩種不同的函數(shù)。

增量聚合函數(shù)

增量聚合函數(shù)是指對(duì)窗口中的數(shù)據(jù)進(jìn)行累積計(jì)算的函數(shù)。它會(huì)在每個(gè)元素到達(dá)窗口時(shí)進(jìn)行計(jì)算,并且僅保留窗口計(jì)算所需的中間狀態(tài)。這種方式可以顯著提高計(jì)算性能,尤其適用于大規(guī)模數(shù)據(jù)和長(zhǎng)窗口的情況。

對(duì)于增量聚合函數(shù),Flink 提供了一系列內(nèi)置的聚合函數(shù),例如 sum、min、max、avg等,它們的底層,其實(shí)都是通過(guò)AggregateFunction來(lái)實(shí)現(xiàn)的。還可以通過(guò)實(shí)現(xiàn) AggregateFunction接口來(lái)定義自定義的增量聚合函數(shù)。

典型的增量聚合函數(shù)有兩個(gè):ReduceFunction和AggregateFunction。

ReduceFunction

ReduceFunction指定兩條輸入數(shù)據(jù)如何合并起來(lái)產(chǎn)生一條輸出數(shù)據(jù),輸入和輸出數(shù)據(jù)的類(lèi)型必須相同。

  public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 從socket接收數(shù)據(jù)流SingleOutputStreamOperator<String> source = env.socketTextStream("node01", 8086);// 將輸入數(shù)據(jù)轉(zhuǎn)換為IntegerDataStream<Integer> dataStream = source.map(str -> Integer.parseInt(str));// 指定窗口分配器AllWindowedStream<Integer, TimeWindow> allWindowedStream = dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 指定窗口函數(shù),使用 增量聚合ReduceSingleOutputStreamOperator<Integer> reduce = allWindowedStream.reduce(new ReduceFunction<Integer>() {@Overridepublic Integer reduce(Integer value1, Integer value2) throws Exception {System.out.println("前一個(gè)值: " + value1 + " ,后一個(gè)值:" + value2);return value1 + value2;}});// 在窗口觸發(fā)的時(shí)候,才會(huì)輸出窗口的最終計(jì)算結(jié)果reduce.print();env.execute();}

發(fā)送測(cè)試數(shù)據(jù):

[root@administrator ~]#  nc -lk 8086
1
2
3
4
5

控制臺(tái)輸出:

前一個(gè)值: 1 ,后一個(gè)值:2
前一個(gè)值: 3 ,后一個(gè)值:3
前一個(gè)值: 6 ,后一個(gè)值:4
前一個(gè)值: 10 ,后一個(gè)值:5
15

AggregateFunction

ReduceFunction接口存在一個(gè)限制:聚合狀態(tài)的類(lèi)型、輸出結(jié)果的類(lèi)型都必須和輸入數(shù)據(jù)類(lèi)型一樣。聚合函數(shù)則突破了這個(gè)限制,可以定義更加靈活的窗口聚合操作。

AggregateFunction函數(shù)接口方法參數(shù)有三種類(lèi)型:輸入類(lèi)型(IN)、累加器類(lèi)型(ACC)和輸出類(lèi)型(OUT)。

輸入類(lèi)型IN就是輸入流中元素的數(shù)據(jù)類(lèi)型累加器類(lèi)型ACC則是我們進(jìn)行聚合的中間狀態(tài)類(lèi)型而輸出類(lèi)型當(dāng)然就是最終計(jì)算結(jié)果的類(lèi)型

接口中有四個(gè)方法:

createAccumulator():創(chuàng)建一個(gè)累加器,這就是為聚合創(chuàng)建了一個(gè)初始狀態(tài),每個(gè)聚合任務(wù)只會(huì)調(diào)用一次add():將輸入的元素添加到累加器中g(shù)etResult():從累加器中提取聚合的輸出結(jié)果merge():合并兩個(gè)累加器,并將合并后的狀態(tài)作為一個(gè)累加器返回

與ReduceFunction相同,AggregateFunction也是增量式的聚合,而由于輸入、中間狀態(tài)、輸出的類(lèi)型可以不同,使得應(yīng)用更加靈活方便。

   public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 從socket接收數(shù)據(jù)流SingleOutputStreamOperator<String> source = env.socketTextStream("node01", 8086);// 將輸入數(shù)據(jù)轉(zhuǎn)換為IntegerDataStream<Integer> dataStream = source.map(str -> Integer.parseInt(str));// 指定窗口分配器AllWindowedStream<Integer, TimeWindow> allWindowedStream = dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 窗口函數(shù) 增量聚合 AggregateSingleOutputStreamOperator<String> aggregate = allWindowedStream.aggregate(new MyAggregateFunction());aggregate.print();env.execute();}/*** 第一個(gè)類(lèi)型: 輸入數(shù)據(jù)的類(lèi)型* 第二個(gè)類(lèi)型: 累加器的類(lèi)型,存儲(chǔ)的中間計(jì)算結(jié)果的類(lèi)型* 第三個(gè)類(lèi)型: 輸出的類(lèi)型*/public static class MyAggregateFunction implements AggregateFunction<Integer, Integer, String> {/*** 創(chuàng)建累加器,初始化累加器** @return*/@Overridepublic Integer createAccumulator() {System.out.println("createAccumulator方法執(zhí)行");return 0;}/*** 聚合邏輯* 來(lái)一條計(jì)算一條,調(diào)用一次add方法** @param value       當(dāng)前值* @param accumulator 累加器的值* @return*/@Overridepublic Integer add(Integer value, Integer accumulator) {System.out.println("add方法執(zhí)行,當(dāng)前值 :" + value + "累加器值 :" + accumulator);return value + accumulator;}/*** 獲取最終結(jié)果,窗口觸發(fā)時(shí)輸出** @param accumulator* @return*/@Overridepublic String getResult(Integer accumulator) {System.out.println("getResult方法執(zhí)行");return "最終計(jì)算值:" + accumulator;}@Overridepublic Integer merge(Integer a, Integer b) {// 只有會(huì)話(huà)窗口才會(huì)用到System.out.println("merge方法執(zhí)行");return null;}}

發(fā)送測(cè)試數(shù)據(jù):

[root@administrator ~]#  nc -lk 8086
1
2
3
4
5

控制臺(tái)輸出:

createAccumulator方法執(zhí)行
add方法執(zhí)行,當(dāng)前值 :1累加器值 :0
add方法執(zhí)行,當(dāng)前值 :2累加器值 :1
add方法執(zhí)行,當(dāng)前值 :3累加器值 :3
add方法執(zhí)行,當(dāng)前值 :4累加器值 :6
add方法執(zhí)行,當(dāng)前值 :5累加器值 :10
getResult方法執(zhí)行
最終計(jì)算值:15

全窗/全量口函數(shù)

全窗口函數(shù)是對(duì)窗口中的所有元素進(jìn)行計(jì)算的函數(shù)。它會(huì)在窗口觸發(fā)時(shí)對(duì)窗口中的所有元素進(jìn)行處理,并輸出一個(gè)或多個(gè)結(jié)果。全窗口函數(shù)可以訪(fǎng)問(wèn)窗口的所有元素,并且可以使用窗口中的狀態(tài)信息。

對(duì)于全窗口函數(shù),Flink提供了 ProcessWindowFunction 和 WindowFunction 兩個(gè)接口供用戶(hù)使用。

ProcessWindowFunction: 可以處理每個(gè)元素,并輸出零個(gè)、一個(gè)或多個(gè)結(jié)果WindowFunction: 是一個(gè)轉(zhuǎn)換函數(shù),對(duì)窗口的所有元素進(jìn)行轉(zhuǎn)換,并輸出一個(gè)或多個(gè)結(jié)果。

與增量聚合函數(shù)不同,全窗口函數(shù)需要先收集窗口中的數(shù)據(jù),并在內(nèi)部緩存起來(lái),等到窗口要輸出結(jié)果的時(shí)候再取出數(shù)據(jù)進(jìn)行計(jì)算。

WindowFunction

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 從socket接收數(shù)據(jù)流SingleOutputStreamOperator<String> source = env.socketTextStream("node01", 8086);// 將輸入數(shù)據(jù)轉(zhuǎn)換為(key, value)元組DataStream<Tuple2<String, Integer>> dataStream = source.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2 map(String s) throws Exception {int number = Integer.parseInt(s);String key = number % 2 == 0 ? "key1" : "key2";Tuple2 tuple2 = new Tuple2(key, number);return tuple2;}}).returns(Types.TUPLE(Types.STRING, Types.INT));// keyBy操作KeyedStream<Tuple2<String, Integer>, String> keyedStream = dataStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple2) throws Exception {return tuple2.f0;}});// 指定窗口分配器 非鍵分區(qū)窗口
//        AllWindowedStream<Integer, TimeWindow> allWindowedStream = dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 鍵分區(qū)窗口WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowedStream = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 使用WindowFunction窗口函數(shù)SingleOutputStreamOperator<String> apply = windowedStream.apply(new MyWindowFunction());apply.print();env.execute();}/*** 窗口函數(shù)* <p>* 窗口觸發(fā)時(shí)才會(huì)調(diào)用一次,統(tǒng)一計(jì)算窗口的所有數(shù)據(jù)*/public static class MyWindowFunction implements WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow> {/*** @param s      分組的key,非鍵分區(qū)窗口則無(wú)該參數(shù)* @param window 窗口對(duì)象* @param input  存的數(shù)據(jù)* @param out    采集器*/@Overridepublic void apply(String s, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {// 上下文拿到window對(duì)象,獲取相關(guān)信息long start = window.getStart();long end = window.getEnd();String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss");String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss");long count = input.spliterator().estimateSize();out.collect("分組 " + s + " 的窗口,在時(shí)間區(qū)間: " + windowStart + "-" + windowEnd + " 產(chǎn)生" + count + "條數(shù)據(jù),具體數(shù)據(jù):" + input.toString());}}
]# nc -lk 8086
1
2
3
4
5
分組 key2 的窗口,在時(shí)間區(qū)間: 2023-06-27 16:50:10-2023-06-27 16:50:20 產(chǎn)生3條數(shù)據(jù),具體數(shù)據(jù):[(key2,1), (key2,3), (key2,5)]
分組 key1 的窗口,在時(shí)間區(qū)間: 2023-06-27 16:50:10-2023-06-27 16:50:20 產(chǎn)生2條數(shù)據(jù),具體數(shù)據(jù):[(key1,2), (key1,4)]

ProcessWindowFunction

  // 使用ProcessWindowFunction處理窗口函數(shù)SingleOutputStreamOperator<String> process = windowedStream.process(new MyProcessWindowFunction());
    /*** 處理窗口函數(shù)* <p>* 窗口觸發(fā)時(shí)才會(huì)調(diào)用一次,統(tǒng)一計(jì)算窗口的所有數(shù)據(jù)*/public static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow> {/*** @param s       分組的key,非鍵分區(qū)窗口則無(wú)該參數(shù)* @param context 上下文* @param input   存的數(shù)據(jù)* @param out     采集器* @throws Exception*/@Overridepublic void process(String s, Context context, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {// 上下文拿到window對(duì)象,獲取相關(guān)信息long start = context.window().getStart();long end = context.window().getEnd();String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss");String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss");long count = input.spliterator().estimateSize();out.collect("分組 " + s + " 的窗口,在時(shí)間區(qū)間: " + windowStart + "-" + windowEnd + " 產(chǎn)生" + count + "條數(shù)據(jù),具體數(shù)據(jù):" + input.toString());}}

增量聚合和全窗口函數(shù)的結(jié)合

在調(diào)用窗口的增量聚合函數(shù)方法時(shí),第一個(gè)參數(shù)直接傳入一個(gè)ReduceFunction或AggregateFunction進(jìn)行增量聚合,第二個(gè)參數(shù)傳入一個(gè)全窗口函數(shù)WindowFunction或者ProcessWindowFunction。

基于第一個(gè)參數(shù)(增量聚合函數(shù))來(lái)處理窗口數(shù)據(jù),每來(lái)一個(gè)數(shù)據(jù)就做一次聚合等到窗口需要觸發(fā)計(jì)算時(shí),則調(diào)用第二個(gè)參數(shù)(全窗口函數(shù))的處理邏輯輸出結(jié)果注意這里的全窗口函數(shù)就不再緩存所有數(shù)據(jù)了,而是直接將增量聚合函數(shù)的結(jié)果拿來(lái)當(dāng)作了Iterable類(lèi)型的輸入
    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 從socket接收數(shù)據(jù)流SingleOutputStreamOperator<String> source = env.socketTextStream("node01", 8086);// 將輸入數(shù)據(jù)轉(zhuǎn)換為IntegerDataStream<Integer> dataStream = source.map(str -> Integer.parseInt(str));// 指定窗口分配器 非鍵分區(qū)窗口AllWindowedStream<Integer, TimeWindow> allWindowedStream = dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 使用ProcessWindowFunction處理窗口函數(shù)SingleOutputStreamOperator<String> process = allWindowedStream.aggregate(new MyAggregateFunction(), new MyProcessWindowFunction());process.print();env.execute();}/*** 第一個(gè)類(lèi)型: 輸入數(shù)據(jù)的類(lèi)型* 第二個(gè)類(lèi)型: 累加器的類(lèi)型,存儲(chǔ)的中間計(jì)算結(jié)果的類(lèi)型* 第三個(gè)類(lèi)型: 輸出的類(lèi)型*/public static class MyAggregateFunction implements AggregateFunction<Integer, Integer, String> {/*** 創(chuàng)建累加器,初始化累加器** @return*/@Overridepublic Integer createAccumulator() {System.out.println("createAccumulator方法執(zhí)行");return 0;}/*** 聚合邏輯* 來(lái)一條計(jì)算一條,調(diào)用一次add方法** @param value       當(dāng)前值* @param accumulator 累加器的值* @return*/@Overridepublic Integer add(Integer value, Integer accumulator) {System.out.println("add方法執(zhí)行,當(dāng)前值 :" + value + " 累加器值 :" + accumulator);return value + accumulator;}/*** 獲取最終結(jié)果,窗口觸發(fā)時(shí)輸出** @param accumulator* @return*/@Overridepublic String getResult(Integer accumulator) {System.out.println("getResult方法執(zhí)行");return "最終計(jì)算值:" + accumulator;}@Overridepublic Integer merge(Integer a, Integer b) {// 只有會(huì)話(huà)窗口才會(huì)用到System.out.println("merge方法執(zhí)行");return null;}}/*** 處理窗口函數(shù)* <p>* 窗口觸發(fā)時(shí)才會(huì)調(diào)用一次,統(tǒng)一計(jì)算窗口的所有數(shù)據(jù)* <p>* 注意:增量聚合函數(shù)的輸出類(lèi)型 是 全窗口函數(shù)的輸入類(lèi)型*/public static class MyProcessWindowFunction extends ProcessAllWindowFunction<String, String, TimeWindow> {/*** @param context 上下文* @param input   存的數(shù)據(jù)* @param out     采集器* @throws Exception*/@Overridepublic void process(Context context, Iterable<String> input, Collector<String> out) throws Exception {// 上下文拿到window對(duì)象,獲取相關(guān)信息long start = context.window().getStart();long end = context.window().getEnd();String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss");String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss");long count = input.spliterator().estimateSize();out.collect("窗口在時(shí)間區(qū)間: " + windowStart + "-" + windowEnd + " 產(chǎn)生" + count + "條數(shù)據(jù),具體數(shù)據(jù):" + input.toString());}}
createAccumulator方法執(zhí)行
add方法執(zhí)行,當(dāng)前值 :1 累加器值 :0
add方法執(zhí)行,當(dāng)前值 :2 累加器值 :1
add方法執(zhí)行,當(dāng)前值 :3 累加器值 :3
add方法執(zhí)行,當(dāng)前值 :4 累加器值 :6
add方法執(zhí)行,當(dāng)前值 :5 累加器值 :10
getResult方法執(zhí)行
窗口在時(shí)間區(qū)間: 2023-06-27 17:07:50-2023-06-27 17:08:00 產(chǎn)生1條數(shù)據(jù),具體數(shù)據(jù):[最終計(jì)算值:15]

其他

觸發(fā)器 Trigger

Trigger決定了一個(gè)窗口(由windowassigner定義)何時(shí)可以被windowfunction處理。每個(gè)WindowAssigner都有一個(gè)默認(rèn)的Trigger。如果默認(rèn)trigger無(wú)法滿(mǎn)足需要,可以在trigger(…)調(diào)用中指定自定義的trigger。

Trigger接口提供了五個(gè)方法來(lái)響應(yīng)不同的事件:

onElement()方法在每個(gè)元素被加入窗口時(shí)調(diào)用onEventTime()方法在注冊(cè)的event-timetimer觸發(fā)時(shí)調(diào)用onProcessingTime()方法在注冊(cè)的processing-timetimer觸發(fā)時(shí)調(diào)用onMerge()方法與有狀態(tài)的trigger相關(guān)。該方法會(huì)在兩個(gè)窗口合并時(shí),將窗口對(duì)應(yīng)trigger的狀態(tài)進(jìn)行合并,比如使用會(huì)話(huà)窗口時(shí)clear()方法處理在對(duì)應(yīng)窗口被移除時(shí)所需的邏輯

注意:

前三個(gè)方法通過(guò)返回TriggerResult來(lái)決定trigger如何應(yīng)對(duì)到達(dá)窗口的事件。

應(yīng)對(duì)方案:

CONTINUE: 什么也不做FIRE: 觸發(fā)計(jì)算PURGE: 清空窗口內(nèi)的元素FIRE_AND_PURGE: 觸發(fā)計(jì)算,計(jì)算結(jié)束后清空窗口內(nèi)的元素

內(nèi)置觸發(fā)器

EventTimeTrigger:基于事件時(shí)間和watermark機(jī)制來(lái)對(duì)窗口進(jìn)行觸發(fā)計(jì)算ProcessingTimeTrigger: 基于處理時(shí)間觸發(fā)CountTrigger:窗口元素?cái)?shù)超過(guò)預(yù)先給定的限制值的話(huà)會(huì)觸發(fā)計(jì)算PurgingTrigger:作為其它trigger的參數(shù),將其轉(zhuǎn)化為一個(gè)purging觸發(fā)器

基于WindowedStream調(diào)用.trigger()方法,就可以傳入一個(gè)自定義的窗口觸發(fā)器

stream.keyBy(...).window(...).trigger(new MyTrigger())

移除器 Evictor

Evictor可以在 trigger 觸發(fā)后、調(diào)用窗口函數(shù)之前或之后從窗口中刪除元素。Evictor是一個(gè)接口,不同的窗口類(lèi)型都有各自預(yù)實(shí)現(xiàn)的移除器。

內(nèi)置evictor:

默認(rèn)情況下,所有內(nèi)置的 evictor 邏輯都在調(diào)用窗口函數(shù)前執(zhí)行。

CountEvictor: 僅記錄用戶(hù)指定數(shù)量的元素,一旦窗口中的元素超過(guò)這個(gè)數(shù)量,多余的元素會(huì)從窗口緩存的開(kāi)頭移除DeltaEvictor: 接收 DeltaFunction 和 threshold 參數(shù),計(jì)算最后一個(gè)元素與窗口緩存中所有元素的差值, 并移除差值大于或等于 threshold 的元素。TimeEvictor: 接收 interval 參數(shù),以毫秒表示。 它會(huì)找到窗口中元素的最大 timestamp max_ts 并移除比 max_ts - interval 小的所有元素

基于WindowedStream調(diào)用.evictor()方法,就可以傳入一個(gè)自定義的移除器

stream.keyBy(...).window(...).evictor(new MyEvictor())
http://www.risenshineclean.com/news/6987.html

相關(guān)文章:

  • 百度錄入網(wǎng)站怎樣建立網(wǎng)站平臺(tái)
  • 企業(yè)名稱(chēng)登記管理規(guī)定長(zhǎng)沙弧度seo
  • 十堰網(wǎng)站建設(shè)哪家好推廣營(yíng)銷(xiāo)方案
  • 做網(wǎng)站要做哪些最新?tīng)I(yíng)銷(xiāo)模式
  • 長(zhǎng)治做網(wǎng)站公司運(yùn)營(yíng)推廣seo招聘
  • 做網(wǎng)站和網(wǎng)頁(yè)的目的和作用是什么深圳優(yōu)化公司哪家好
  • 兼積做調(diào)查掙錢(qián)網(wǎng)站免費(fèi)優(yōu)化網(wǎng)站排名
  • 武漢網(wǎng)站建設(shè)十強(qiáng)企業(yè)地推掃碼平臺(tái)
  • 公司建站花費(fèi)香港seo公司
  • 營(yíng)銷(xiāo)型網(wǎng)站建設(shè)測(cè)驗(yàn)題百度平臺(tái)客服電話(huà)
  • 企業(yè)網(wǎng)站策劃運(yùn)營(yíng)推廣
  • 北京網(wǎng)絡(luò)營(yíng)銷(xiāo)推廣培訓(xùn)哪家好太原seo自媒體
  • 湖北做網(wǎng)站的公司總裁班課程培訓(xùn)
  • 學(xué)習(xí)aspmvc網(wǎng)站開(kāi)發(fā) 書(shū)推廣普通話(huà)手抄報(bào)內(nèi)容
  • 建個(gè)個(gè)人網(wǎng)站一年多少錢(qián)網(wǎng)站排名優(yōu)化外包
  • ar做網(wǎng)站百度云網(wǎng)盤(pán)登錄入口
  • 長(zhǎng)沙網(wǎng)站模板建設(shè)成都網(wǎng)站seo外包
  • 網(wǎng)站開(kāi)發(fā)設(shè)計(jì)公推廣普通話(huà)手抄報(bào)簡(jiǎn)單又好看
  • thinkphp手機(jī)網(wǎng)站模板百度貼吧官網(wǎng)
  • 做商城外貿(mào)網(wǎng)站杭州優(yōu)化外包
  • 團(tuán)購(gòu)網(wǎng)站前景seo刷排名公司
  • 九江網(wǎng)站建設(shè)寧波seo教程行業(yè)推廣
  • 政府網(wǎng)站建設(shè)發(fā)展前景口碑營(yíng)銷(xiāo)5t
  • 上海網(wǎng)站域名備案處百度貼吧官網(wǎng)入口
  • 凡客網(wǎng)站的域名怎么做外鏈網(wǎng)址
  • wordpress 分享后可見(jiàn)自己怎么優(yōu)化我網(wǎng)站關(guān)鍵詞
  • 做網(wǎng)站建設(shè)最好學(xué)什么手機(jī)優(yōu)化什么意思
  • 做招聘網(wǎng)站都需要什么手續(xù)易推廣
  • 開(kāi)源網(wǎng)站建設(shè)輿情系統(tǒng)
  • 在線(xiàn)做圖的網(wǎng)站長(zhǎng)沙網(wǎng)站推廣 下拉通推廣