中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

dw企業(yè)網(wǎng)站設(shè)計(jì)品牌營銷包括哪些內(nèi)容

dw企業(yè)網(wǎng)站設(shè)計(jì),品牌營銷包括哪些內(nèi)容,wordpress 添加備案,硅云wordpress多站點(diǎn)目錄 0. 相關(guān)文章鏈接 1. 基本操作 1.1. 弱類型 api 1.2. 強(qiáng)類型 1.3. 直接執(zhí)行 sql 2. 基于 event-time 的窗口操作 2.1. event-time 窗口理解 2.2. event-time 窗口生成規(guī)則 3. 基于 Watermark 處理延遲數(shù)據(jù) 3.1. 什么是 Watermark 機(jī)制 3.2. update 模式下使用 w…

目錄

0. 相關(guān)文章鏈接

1. 基本操作

1.1.?弱類型 api

1.2. 強(qiáng)類型

1.3.?直接執(zhí)行 sql

2.?基于 event-time 的窗口操作

2.1.?event-time 窗口理解

2.2.?event-time 窗口生成規(guī)則

3.?基于 Watermark 處理延遲數(shù)據(jù)

3.1. 什么是 Watermark 機(jī)制

3.2.?update 模式下使用 watermark

3.3.?append 模式下使用 wartermark

3.4. watermark 機(jī)制總結(jié)

4.?流數(shù)據(jù)去重

5. join操作

5.1.?Stream-static Joins

5.1.1.?內(nèi)連接

5.1.2.?外連接

5.2.?Stream-stream Joins

5.2.1.?inner join

4.2.2.?outer join

6.?Streaming DF/DS 不支持的操作


0. 相關(guān)文章鏈接

?Spark文章匯總?

1. 基本操作

在 DF/DS 上大多數(shù)通用操作都支持作用在 Streaming DataFrame/Streaming DataSet 上。

準(zhǔn)備處理數(shù)據(jù):?people.json

{"name": "Michael","age": 29,"sex": "female"}
{"name": "Andy","age": 30,"sex": "male"}
{"name": "Justin","age": 19,"sex": "male"}
{"name": "Lisi","age": 18,"sex": "male"}
{"name": "zs","age": 10,"sex": "female"}
{"name": "zhiling","age": 40,"sex": "female"}

1.1.?弱類型 api

代碼示例:

import org.apache.spark.sql.types.{LongType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}object StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因?yàn)?ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 創(chuàng)建格式,并讀取數(shù)據(jù)val peopleSchema: StructType = new StructType().add("name", StringType).add("age", LongType).add("sex", StringType)val peopleDF: DataFrame = spark.readStream.schema(peopleSchema).json("/Project/Data/json")// 弱類型 apival df: DataFrame = peopleDF.select("name", "age", "sex").where("age > 20")df.writeStream.outputMode("append").format("console").start.awaitTermination()// 關(guān)閉執(zhí)行環(huán)境spark.stop()}
}

結(jié)果輸出:

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+---+------+
|   name|age|   sex|
+-------+---+------+
|Michael| 29|female|
|   Andy| 30|  male|
|zhiling| 40|female|
+-------+---+------+

1.2. 強(qiáng)類型

代碼示例:

import org.apache.spark.sql.types.{LongType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}object StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因?yàn)?ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 創(chuàng)建格式,并讀取數(shù)據(jù)val peopleSchema: StructType = new StructType().add("name", StringType).add("age", LongType).add("sex", StringType)val peopleDF: DataFrame = spark.readStream.schema(peopleSchema).json("/Project/Data/json")// 強(qiáng)類型,轉(zhuǎn)成 dsval peopleDS: Dataset[People] = peopleDF.as[People]val df: Dataset[String] = peopleDS.filter((_: People).age > 20).map((_: People).name)df.writeStream.outputMode("append").format("console").start.awaitTermination()// 關(guān)閉執(zhí)行環(huán)境spark.stop()}
}case class People(name: String, age: Long, sex: String)

結(jié)果輸出:

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+
|  value|
+-------+
|Michael|
|   Andy|
|zhiling|
+-------+

1.3.?直接執(zhí)行 sql

代碼示例:

import org.apache.spark.sql.types.{LongType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}object StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因?yàn)?ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 創(chuàng)建格式,并讀取數(shù)據(jù)val peopleSchema: StructType = new StructType().add("name", StringType).add("age", LongType).add("sex", StringType)val peopleDF: DataFrame = spark.readStream.schema(peopleSchema).json("/Project/Data/json")// 直接執(zhí)行SQL,創(chuàng)建臨時(shí)表peopleDF.createOrReplaceTempView("people")val df: DataFrame = spark.sql("select * from people where age > 20")df.writeStream.outputMode("append").format("console").start.awaitTermination()// 關(guān)閉執(zhí)行環(huán)境spark.stop()}
}

結(jié)果輸出:

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+---+------+
|   name|age|   sex|
+-------+---+------+
|Michael| 29|female|
|   Andy| 30|  male|
|zhiling| 40|female|
+-------+---+------+

2.?基于 event-time 的窗口操作

2.1.?event-time 窗口理解

????????在 Structured Streaming 中, 可以按照事件發(fā)生時(shí)的時(shí)間對(duì)數(shù)據(jù)進(jìn)行聚合操作, 即基于 event-time 進(jìn)行操作。在這種機(jī)制下, 即不必考慮 Spark 陸續(xù)接收事件的順序是否與事件發(fā)生的順序一致, 也不必考慮事件到達(dá) Spark 的時(shí)間與事件發(fā)生時(shí)間的關(guān)系。因此, 它在提高數(shù)據(jù)處理精度的同時(shí), 大大減少了開發(fā)者的工作量。我們現(xiàn)在想計(jì)算 10 分鐘內(nèi)的單詞, 每 5 分鐘更新一次, 也就是說在 10 分鐘窗口 12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20等之間收到的單詞量。 注意, 12:00 - 12:10 表示數(shù)據(jù)在 12:00 之后 12:10 之前到達(dá)。現(xiàn)在,考慮一下在 12:07 收到的單詞。單詞應(yīng)該增加對(duì)應(yīng)于兩個(gè)窗口12:00 - 12:10和12:05 - 12:15的計(jì)數(shù)。因此,計(jì)數(shù)將由分組鍵(即單詞)和窗口(可以從事件時(shí)間計(jì)算)索引。

統(tǒng)計(jì)后的結(jié)果應(yīng)該是這樣的:

代碼示例:

import org.apache.spark.sql.functions.window
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因?yàn)?ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 使用socket數(shù)據(jù)源val lines: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).option("includeTimestamp", value = true) // 給產(chǎn)生的數(shù)據(jù)自動(dòng)添加時(shí)間戳.load// 把行切割成單詞, 保留時(shí)間戳val words: DataFrame = lines.as[(String, Timestamp)].flatMap((line: (String, Timestamp)) => {line._1.split(" ").map(((_: String), line._2))}).toDF("word", "timestamp")// 按照窗口和單詞分組, 并且計(jì)算每組的單詞的個(gè)數(shù),最后按照窗口排序val wordCounts: Dataset[Row] = words.groupBy(// 調(diào)用 window 函數(shù), 返回的是一個(gè) Column 類型// 參數(shù) 1: df 中表示時(shí)間戳的列// 參數(shù) 2: 窗口長度// 參數(shù) 3: 滑動(dòng)步長window($"timestamp", "60 seconds", "10 seconds"),$"word").count().orderBy($"window")wordCounts.writeStream.outputMode("complete").format("console").option("truncate", "false") // 不截?cái)?為了在控制臺(tái)能看到完整信息, 最好設(shè)置為 false.start.awaitTermination()// 關(guān)閉執(zhí)行環(huán)境spark.stop()}
}

結(jié)果輸出:

-------------------------------------------
Batch: 0
-------------------------------------------
+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+----+-----+
|window                                    |word|count|
+------------------------------------------+----+-----+
|[2023-08-02 21:18:00, 2023-08-02 21:19:00]|abc |3    |
|[2023-08-02 21:18:10, 2023-08-02 21:19:10]|a   |3    |
|[2023-08-02 21:18:10, 2023-08-02 21:19:10]|abc |5    |
|[2023-08-02 21:18:20, 2023-08-02 21:19:20]|abc |5    |
|[2023-08-02 21:18:20, 2023-08-02 21:19:20]|a   |3    |
|[2023-08-02 21:18:30, 2023-08-02 21:19:30]|a   |3    |
|[2023-08-02 21:18:30, 2023-08-02 21:19:30]|abc |5    |
|[2023-08-02 21:18:40, 2023-08-02 21:19:40]|abc |5    |
|[2023-08-02 21:18:40, 2023-08-02 21:19:40]|a   |3    |
|[2023-08-02 21:18:50, 2023-08-02 21:19:50]|a   |3    |
|[2023-08-02 21:18:50, 2023-08-02 21:19:50]|abc |5    |
|[2023-08-02 21:19:00, 2023-08-02 21:20:00]|a   |3    |
|[2023-08-02 21:19:00, 2023-08-02 21:20:00]|abc |2    |
+------------------------------------------+----+-----+-------------------------------------------
Batch: 2
-------------------------------------------
+------------------------------------------+----+-----+
|window                                    |word|count|
+------------------------------------------+----+-----+
|[2023-08-02 21:18:00, 2023-08-02 21:19:00]|abc |3    |
|[2023-08-02 21:18:10, 2023-08-02 21:19:10]|a   |3    |
|[2023-08-02 21:18:10, 2023-08-02 21:19:10]|abc |5    |
|[2023-08-02 21:18:20, 2023-08-02 21:19:20]|abc |5    |
|[2023-08-02 21:18:20, 2023-08-02 21:19:20]|a   |3    |
|[2023-08-02 21:18:30, 2023-08-02 21:19:30]|a   |3    |
|[2023-08-02 21:18:30, 2023-08-02 21:19:30]|abc |5    |
|[2023-08-02 21:18:40, 2023-08-02 21:19:40]|abc |5    |
|[2023-08-02 21:18:40, 2023-08-02 21:19:40]|a   |3    |
|[2023-08-02 21:18:50, 2023-08-02 21:19:50]|a   |3    |
|[2023-08-02 21:18:50, 2023-08-02 21:19:50]|abc |5    |
|[2023-08-02 21:19:00, 2023-08-02 21:20:00]|a   |3    |
|[2023-08-02 21:19:00, 2023-08-02 21:20:00]|abc |2    |
|[2023-08-02 21:19:10, 2023-08-02 21:20:10]|a   |2    |
|[2023-08-02 21:19:10, 2023-08-02 21:20:10]|abc |1    |
|[2023-08-02 21:19:20, 2023-08-02 21:20:20]|a   |2    |
|[2023-08-02 21:19:20, 2023-08-02 21:20:20]|abc |1    |
|[2023-08-02 21:19:30, 2023-08-02 21:20:30]|a   |2    |
|[2023-08-02 21:19:30, 2023-08-02 21:20:30]|abc |1    |
|[2023-08-02 21:19:40, 2023-08-02 21:20:40]|a   |2    |
+------------------------------------------+----+-----+
only showing top 20 rows

由此可以看出, 在這種窗口機(jī)制下, 無論事件何時(shí)到達(dá), 以怎樣的順序到達(dá), Structured Streaming 總會(huì)根據(jù)事件時(shí)間生成對(duì)應(yīng)的若干個(gè)時(shí)間窗口, 然后按照指定的規(guī)則聚合。

2.2.?event-time 窗口生成規(guī)則

可以查看?org.apache.spark.sql.catalyst.analysis.TimeWindowing 類下的如下代碼:

The windows are calculated as below:
maxNumOverlapping <- ceil(windowDuration / slideDuration)
for (i <- 0 until maxNumOverlapping)windowId <- ceil((timestamp - startTime) / slideDuration)windowStart <- windowId * slideDuration + (i - maxNumOverlapping) * slideDuration + startTimewindowEnd <- windowStart + windowDurationreturn windowStart, windowEnd

????????將event-time 作為“初始窗口”的結(jié)束時(shí)間, 然后按照窗口滑動(dòng)寬度逐漸向時(shí)間軸前方推進(jìn), 直到某個(gè)窗口不再包含該 event-time 為止。 最終以“初始窗口”與“結(jié)束窗口”之間的若干個(gè)窗口作為最終生成的 event-time 的時(shí)間窗口。

每個(gè)窗口的起始時(shí)間與結(jié)束時(shí)間都是前必后開的區(qū)間, 因此初始窗口和結(jié)束窗口都不會(huì)包含 event-time, 最終不會(huì)被使用。

得到窗口如下:

3.?基于 Watermark 處理延遲數(shù)據(jù)

3.1. 什么是 Watermark 機(jī)制

????????在數(shù)據(jù)分析系統(tǒng)中, Structured Streaming 可以持續(xù)的按照 event-time 聚合數(shù)據(jù), 然而在此過程中并不能保證數(shù)據(jù)按照時(shí)間的先后依次到達(dá)。 例如: 當(dāng)前接收的某一條數(shù)據(jù)的 event-time 可能遠(yuǎn)遠(yuǎn)早于之前已經(jīng)處理過的 event-time。 在發(fā)生這種情況時(shí), 往往需要結(jié)合業(yè)務(wù)需求對(duì)延遲數(shù)據(jù)進(jìn)行過濾?,F(xiàn)在考慮如果事件延遲到達(dá)會(huì)有哪些影響。 假如, 一個(gè)單詞在 12:04(event-time) 產(chǎn)生, 在 12:11 到達(dá)應(yīng)用。 應(yīng)用應(yīng)該使用 12:04 來在窗口(12:00 - 12:10)中更新計(jì)數(shù), 而不是使用 12:11。 這些情況在我們基于窗口的聚合中是自然發(fā)生的, 因?yàn)榻Y(jié)構(gòu)化流可以長時(shí)間維持部分聚合的中間狀態(tài)。

????????但是, 如果這個(gè)查詢運(yùn)行數(shù)天, 系統(tǒng)很有必要限制內(nèi)存中累積的中間狀態(tài)的數(shù)量。 這意味著系統(tǒng)需要知道何時(shí)從內(nèi)存狀態(tài)中刪除舊聚合, 因?yàn)閼?yīng)用不再接受該聚合的后期數(shù)據(jù)。為了實(shí)現(xiàn)這個(gè)需求, 從 spark2.1, 引入了 watermark(水印), 使用引擎可以自動(dòng)的跟蹤當(dāng)前的事件時(shí)間, 并據(jù)此嘗試刪除舊狀態(tài)。通過指定 event-time 列和預(yù)估事件的延遲時(shí)間上限來定義一個(gè)查詢的 watermark。 針對(duì)一個(gè)以時(shí)間 T 結(jié)束的窗口, 引擎會(huì)保留狀態(tài)和允許延遲時(shí)間直到(max event time seen by the engine - late threshold > T)。 換句話說, 延遲時(shí)間在上限內(nèi)的被聚合, 延遲時(shí)間超出上限的開始被丟棄。

????????可以通過withWatermark() 來定義watermark,watermark 計(jì)算方式:watermark = MaxEventTime - Threshhod;而且, watermark只能逐漸增加, 不能減少。

Structured Streaming 引入 Watermark 機(jī)制, 主要是為了解決以下兩個(gè)問題:

  • 處理聚合中的延遲數(shù)據(jù)
  • 減少內(nèi)存中維護(hù)的聚合狀態(tài).

注意:在不同輸出模式(complete, append, update)中, Watermark 會(huì)產(chǎn)生不同的影響。

3.2.?update 模式下使用 watermark

在 update 模式下, 僅輸出與之前批次的結(jié)果相比, 涉及更新或新增的數(shù)據(jù)。

代碼示例如下:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因?yàn)?ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 使用socket數(shù)據(jù)源val lines: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load// 輸入的數(shù)據(jù)中包含時(shí)間戳, 而不是自動(dòng)添加的時(shí)間戳val words: DataFrame = lines.as[String].flatMap((line: String) => {val split: Array[String] = line.split(",")split(1).split(" ").map(((_: String), Timestamp.valueOf(split(0))))}).toDF("word", "timestamp")// 使用 withWatermark 方法,添加watermark, 參數(shù) 1: event-time 所在列的列名 參數(shù) 2: 延遲時(shí)間的上限.val wordCounts: Dataset[Row] = words.withWatermark("timestamp", "2 minutes").groupBy(window($"timestamp", "10 minutes", "2 minutes"), $"word").count()// 數(shù)據(jù)輸出val query: StreamingQuery = wordCounts.writeStream.outputMode("update").trigger(Trigger.ProcessingTime(1000)).format("console").option("truncate", "false").startquery.awaitTermination()// 關(guān)閉執(zhí)行環(huán)境spark.stop()}
}

初始化的wartmark是 0,通過如下輸入的幾條數(shù)據(jù),可以看到水位線的變化。

第一次輸入數(shù)據(jù):??2023-08-07 10:55:00,dog 。這個(gè)條數(shù)據(jù)作為第一批數(shù)據(jù)。 按照window($"timestamp", "10 minutes", "2 minutes")得到 5 個(gè)窗口。 由于是第一批, 所有的窗口的結(jié)束時(shí)間都大于 wartermark(0), 所以 5 個(gè)窗口都顯示,如下所示:

+------------------------------------------+----+-----+
|window                                    |word|count|
+------------------------------------------+----+-----+
|[2023-08-07 10:50:00, 2023-08-07 11:00:00]|dog |1    |
|[2023-08-07 10:54:00, 2023-08-07 11:04:00]|dog |1    |
|[2023-08-07 10:48:00, 2023-08-07 10:58:00]|dog |1    |
|[2023-08-07 10:52:00, 2023-08-07 11:02:00]|dog |1    |
|[2023-08-07 10:46:00, 2023-08-07 10:56:00]|dog |1    |
+------------------------------------------+----+-----+

然后根據(jù)當(dāng)前批次中最大的 event-time, 計(jì)算出來下次使用的 watermark. 本批次只有一個(gè)數(shù)據(jù)(10:55), 所有: watermark = 10:55 - 2min = 10:53 。

第二次輸入數(shù)據(jù):??2023-08-07 11:00:00,dog?。 這條數(shù)據(jù)作為第二批數(shù)據(jù), 計(jì)算得到 5 個(gè)窗口。 此時(shí)的watermark=10:53, 所有的窗口的結(jié)束時(shí)間均大于 watermark。 在 update 模式下, 只輸出結(jié)果表中涉及更新或新增的數(shù)據(jù)。

+------------------------------------------+----+-----+
|window                                    |word|count|
+------------------------------------------+----+-----+
|[2023-08-07 10:58:00, 2023-08-07 11:08:00]|dog |1    |
|[2023-08-07 10:54:00, 2023-08-07 11:04:00]|dog |2    |
|[2023-08-07 10:56:00, 2023-08-07 11:06:00]|dog |1    |
|[2023-08-07 10:52:00, 2023-08-07 11:02:00]|dog |2    |
|[2023-08-07 11:00:00, 2023-08-07 11:10:00]|dog |1    |
+------------------------------------------+----+-----+

其中: count 是 2 的表示更新, count 是 1 的表示新增。 沒有變化的就沒有顯示(但是內(nèi)存中仍然保存著)。此時(shí)的的 watermark = 11:00 - 2min = 10:58 。如下數(shù)據(jù)為在內(nèi)存中保存著,但是沒有打印出來的數(shù)據(jù):

|[2023-08-07 10:46:00, 2023-08-07 10:56:00]|dog |1    |
|[2023-08-07 10:48:00, 2023-08-07 10:58:00]|dog |1    |
|[2023-08-07 10:50:00, 2023-08-07 11:00:00]|dog |1    |

第三次輸入數(shù)據(jù):? ?2023-08-07 10:55:00,dog? 。?這條數(shù)據(jù)作為第 3 批次,相當(dāng)于一條延遲數(shù)據(jù),計(jì)算得到 5 個(gè)窗口。此時(shí)的 watermark = 10:58 當(dāng)前內(nèi)存中有兩個(gè)窗口的結(jié)束時(shí)間已經(jīng)低于 10: 58。

|[2023-08-07 10:46:00, 2023-08-07 10:56:00]|dog |1    |
|[2023-08-07 10:48:00, 2023-08-07 10:58:00]|dog |1    |

則立即刪除這兩個(gè)窗口在內(nèi)存中的維護(hù)狀態(tài)。 同時(shí), 當(dāng)前批次中新加入的數(shù)據(jù)所劃分出來的窗口, 如果窗口結(jié)束時(shí)間低于 11:58, 則窗口會(huì)被過濾掉。

所以這次輸出結(jié)果:

+------------------------------------------+----+-----+
|window                                    |word|count|
+------------------------------------------+----+-----+
|[2023-08-07 10:50:00, 2023-08-07 11:00:00]|dog |2    |
|[2023-08-07 10:54:00, 2023-08-07 11:04:00]|dog |3    |
|[2023-08-07 10:52:00, 2023-08-07 11:02:00]|dog |3    |
+------------------------------------------+----+-----+

第三個(gè)批次的數(shù)據(jù)處理完成后, 立即計(jì)算: watermark= 10:55 - 2min = 10:53, 這個(gè)值小于當(dāng)前的 watermask(10:58), 所以保持不變(因?yàn)?watermask 只能增加不能減少)。

3.3.?append 模式下使用 wartermark

代碼示例如下:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因?yàn)?ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 使用socket數(shù)據(jù)源val lines: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load// 輸入的數(shù)據(jù)中包含時(shí)間戳, 而不是自動(dòng)添加的時(shí)間戳val words: DataFrame = lines.as[String].flatMap((line: String) => {val split: Array[String] = line.split(",")split(1).split(" ").map(((_: String), Timestamp.valueOf(split(0))))}).toDF("word", "timestamp")// 使用 withWatermark 方法,添加watermark, 參數(shù) 1: event-time 所在列的列名 參數(shù) 2: 延遲時(shí)間的上限.val wordCounts: Dataset[Row] = words.withWatermark("timestamp", "2 minutes").groupBy(window($"timestamp", "10 minutes", "2 minutes"), $"word").count()// 數(shù)據(jù)輸出val query: StreamingQuery = wordCounts.writeStream.outputMode("append").trigger(Trigger.ProcessingTime(0)).format("console").option("truncate", "false").startquery.awaitTermination()// 關(guān)閉執(zhí)行環(huán)境spark.stop()}
}

在 append 模式中, 僅輸出新增的數(shù)據(jù), 且輸出后的數(shù)據(jù)無法變更。

第一次輸入數(shù)據(jù):?2023-08-07 10:55:00,dog? 。?這個(gè)條數(shù)據(jù)作為第一批數(shù)據(jù)。 按照window($"timestamp", "10 minutes", "2 minutes")得到 5 個(gè)窗口。 由于此時(shí)初始 watermask=0, 當(dāng)前批次中所有窗口的結(jié)束時(shí)間均大于 watermask。但是 Structured Streaming 無法確定后續(xù)批次的數(shù)據(jù)中是否會(huì)更新當(dāng)前批次的內(nèi)容。 因此, 基于 Append 模式的特點(diǎn), 這時(shí)并不會(huì)輸出任何數(shù)據(jù)(因?yàn)檩敵龊髷?shù)據(jù)就無法更改了), 直到某個(gè)窗口的結(jié)束時(shí)間小于 watermask, 即可以確定后續(xù)數(shù)據(jù)不會(huì)再變更該窗口的聚合結(jié)果時(shí)才會(huì)將其輸出, 并移除內(nèi)存中對(duì)應(yīng)窗口的聚合狀態(tài)。

+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+

然后根據(jù)當(dāng)前批次中最大的 event-time, 計(jì)算出來下次使用的 watermark。 本批次只有一個(gè)數(shù)據(jù)(10:55), 所有: watermark = 10:55 - 2min = 10:53

第二次輸入數(shù)據(jù):?2023-08-07 11:00:00,dog? 。這條數(shù)據(jù)作為第二批數(shù)據(jù), 計(jì)算得到 5 個(gè)窗口。 此時(shí)的watermark=10:53, 所有的窗口的結(jié)束時(shí)間均大于 watermark, 仍然不會(huì)輸出。

+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+

然后計(jì)算 watermark = 11:00 - 2min = 10:58

第三次輸入數(shù)據(jù):?2023-08-07 10:55:00,dog 。相當(dāng)于一條延遲數(shù)據(jù),這條數(shù)據(jù)作為第 3 批次, 計(jì)算得到 5 個(gè)窗口。 此時(shí)的 watermark = 10:58 當(dāng)前內(nèi)存中有兩個(gè)窗口的結(jié)束時(shí)間已經(jīng)低于 10: 58。

|[2023-08-07 10:48:00, 2023-08-07 10:58:00]|dog |1    |
|[2023-08-07 10:46:00, 2023-08-07 10:56:00]|dog |1    |

則意味著這兩個(gè)窗口的數(shù)據(jù)不會(huì)再發(fā)生變化, 此時(shí)輸出這個(gè)兩個(gè)窗口的聚合結(jié)果, 并在內(nèi)存中清除這兩個(gè)窗口的狀態(tài)。所以這次輸出結(jié)果:

+------------------------------------------+----+-----+
|window                                    |word|count|
+------------------------------------------+----+-----+
|[2023-08-07 10:48:00, 2023-08-07 10:58:00]|dog |1    |
|[2023-08-07 10:46:00, 2023-08-07 10:56:00]|dog |1    |
+------------------------------------------+----+-----+

第三個(gè)批次的數(shù)據(jù)處理完成后, 立即計(jì)算: watermark= 10:55 - 2min = 10:53, 這個(gè)值小于當(dāng)前的 watermask(10:58), 所以保持不變。(因?yàn)?watermask 只能增加不能減少)

3.4. watermark 機(jī)制總結(jié)

  • watermark 在用于基于時(shí)間的狀態(tài)聚合操作時(shí), 該時(shí)間可以基于窗口, 也可以基于 event-timeb本身。
  • 輸出模式必須是append或update。 在輸出模式是complete的時(shí)候(必須有聚合), 要求每次輸出所有的聚合結(jié)果。 我們使用 watermark 的目的是丟棄一些過時(shí)聚合數(shù)據(jù), 所以complete模式使用wartermark無效也無意義。
  • 在輸出模式是append時(shí), 必須設(shè)置 watermask 才能使用聚合操作。 其實(shí), watermask 定義了 append 模式中何時(shí)輸出聚合聚合結(jié)果(狀態(tài)), 并清理過期狀態(tài)。
  • 在輸出模式是update時(shí), watermask 主要用于過濾過期數(shù)據(jù)并及時(shí)清理過期狀態(tài)。
  • watermask 會(huì)在處理當(dāng)前批次數(shù)據(jù)時(shí)更新, 并且會(huì)在處理下一個(gè)批次數(shù)據(jù)時(shí)生效使用。 但如果節(jié)點(diǎn)發(fā)送故障, 則可能延遲若干批次生效。
  • withWatermark 必須使用與聚合操作中的時(shí)間戳列是同一列。df.withWatermark("time", "1 min").groupBy("time2").count() 無效。
  • withWatermark 必須在聚合之前調(diào)用。 f.groupBy("time").count().withWatermark("time", "1 min") 無效。

4.?流數(shù)據(jù)去重

需求內(nèi)容:根據(jù)唯一的 id 實(shí)現(xiàn)數(shù)據(jù)去重

代碼示例:

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因?yàn)?ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 使用socket數(shù)據(jù)源val lines: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load// 數(shù)據(jù)預(yù)處理val words: DataFrame = lines.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), Timestamp.valueOf(arr(1)), arr(2))}).toDF("uid", "ts", "word")// 去重重復(fù)數(shù)據(jù) uid 相同就是重復(fù).  可以傳遞多個(gè)列val wordCounts: Dataset[Row] = words.withWatermark("ts", "2 minutes").dropDuplicates("uid")// 輸出數(shù)據(jù)wordCounts.writeStream.outputMode("append").format("console").start.awaitTermination()// 關(guān)閉執(zhí)行環(huán)境spark.stop()}
}

數(shù)據(jù)輸入(按順序從上到下):

1,2023-08-09 11:50:00,dog
2,2023-08-09 11:51:00,dog
1,2023-08-09 11:50:00,dog
3,2023-08-09 11:53:00,dog
1,2023-08-09 11:50:00,dog
4,2023-08-09 11:45:00,dog

注意點(diǎn):

  • dropDuplicates 不可用在聚合之后, 即通過聚合得到的 df/ds 不能調(diào)用dropDuplicates?
  • 使用watermask - 如果重復(fù)記錄的到達(dá)時(shí)間有上限,則可以在事件時(shí)間列上定義水印,并使用guid和事件時(shí)間列進(jìn)行重復(fù)數(shù)據(jù)刪除。該查詢將使用水印從過去的記錄中刪除舊的狀態(tài)數(shù)據(jù),這些記錄不會(huì)再被重復(fù)。這限制了查詢必須維護(hù)的狀態(tài)量。?
  • 沒有watermask - 由于重復(fù)記錄可能到達(dá)時(shí)沒有界限,查詢將來自所有過去記錄的數(shù)據(jù)存儲(chǔ)為狀態(tài)。

測(cè)試:

  • 第一次輸入數(shù)據(jù):1,2023-08-09 11:50:00,dog
+---+-------------------+----+
|uid|                 ts|word|
+---+-------------------+----+
|  1|2023-08-09 11:50:00| dog|
+---+-------------------+----+
  • 第二次輸入數(shù)據(jù):2,2023-08-09 11:51:00,dog
+---+-------------------+----+
|uid|                 ts|word|
+---+-------------------+----+
|  2|2023-08-09 11:51:00| dog|
+---+-------------------+----+
  • 第三次輸入數(shù)據(jù):1,2023-08-09 11:50:00,dog (id 重復(fù)無輸出)
  • 第四次輸入數(shù)據(jù):3,2023-08-09 11:53:00,dog (此時(shí) watermask=11:51)
+---+-------------------+----+
|uid|                 ts|word|
+---+-------------------+----+
|  3|2023-08-09 11:53:00| dog|
+---+-------------------+----+
  • 第五次輸入數(shù)據(jù):1,2023-08-09 11:50:00,dog (數(shù)據(jù)重復(fù), 并且數(shù)據(jù)過期, 所以無輸出)
  • 第六次輸入數(shù)據(jù):4,2023-08-09 11:45:00,dog (數(shù)據(jù)過時(shí), 所以無輸出)

5. join操作

????????Structured Streaming 支持 streaming DataSet/DataFrame 與靜態(tài)的DataSet/DataFrame 進(jìn)行 join, 也支持 streaming DataSet/DataFrame與另外一個(gè)streaming DataSet/DataFrame 進(jìn)行 join。join 的結(jié)果也是持續(xù)不斷的生成, 類似于前面的 streaming 的聚合結(jié)果。

5.1.?Stream-static Joins

靜態(tài)數(shù)據(jù):

lisi,male
zhiling,female
zs,male

流式數(shù)據(jù):

lisi,20
zhiling,40
ww,30

5.1.1.?內(nèi)連接

代碼示例:

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}object StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因?yàn)?ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 1. 靜態(tài) dfval arr: Array[(String, String)] = Array(("lisi", "male"), ("zhiling", "female"), ("zs", "male"));val staticDF: DataFrame = spark.sparkContext.parallelize(arr).toDF("name", "sex")// 2. 流式 dfval lines: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()val streamDF: DataFrame = lines.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), arr(1).toInt)}).toDF("name", "age")// 3. join   等值內(nèi)連接  a.name=b.nameval joinResult: DataFrame = streamDF.join(staticDF, "name")// 4. 輸出joinResult.writeStream.outputMode("append").format("console").start.awaitTermination()// 關(guān)閉執(zhí)行環(huán)境spark.stop()}
}

數(shù)據(jù)輸出:

+-------+---+------+
|   name|age|   sex|
+-------+---+------+
|zhiling| 40|female|
|   lisi| 20|  male|
+-------+---+------+

5.1.2.?外連接

代碼示例:

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}object StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因?yàn)?ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 1. 靜態(tài) dfval arr: Array[(String, String)] = Array(("lisi", "male"), ("zhiling", "female"), ("zs", "male"));val staticDF: DataFrame = spark.sparkContext.parallelize(arr).toDF("name", "sex")// 2. 流式 dfval lines: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()val streamDF: DataFrame = lines.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), arr(1).toInt)}).toDF("name", "age")// 3. joinval joinResult: DataFrame = streamDF.join(staticDF, Seq("name"), "left")// 4. 輸出joinResult.writeStream.outputMode("append").format("console").start.awaitTermination()// 關(guān)閉執(zhí)行環(huán)境spark.stop()}
}

數(shù)據(jù)輸出:

+-------+---+------+
|   name|age|   sex|
+-------+---+------+
|zhiling| 40|female|
|     ww| 30|  null|
|   lisi| 20|  male|
+-------+---+------+

5.2.?Stream-stream Joins

????????在 Spark2.3, 開始支持 stream-stream join。Spark 會(huì)自動(dòng)維護(hù)兩個(gè)流的狀態(tài), 以保障后續(xù)流入的數(shù)據(jù)能夠和之前流入的數(shù)據(jù)發(fā)生 join 操作, 但這會(huì)導(dǎo)致狀態(tài)無限增長。 因此, 在對(duì)兩個(gè)流進(jìn)行 join 操作時(shí), 依然可以用 watermark 機(jī)制來消除過期的狀態(tài), 避免狀態(tài)無限增長。

第 1 個(gè)數(shù)據(jù)格式:姓名,年齡,事件時(shí)間

lisi,female,2023-08-09 11:50:00
zs,male,2023-08-09 11:51:00
ww,female,2023-08-09 11:52:00
zhiling,female,2023-08-09 11:53:00
fengjie,female,2023-08-09 11:54:00
yifei,female,2023-08-09 11:55:00

第 2?個(gè)數(shù)據(jù)格式:姓名,年齡,事件時(shí)間

lisi,18,2023-08-09 11:50:00
zs,19,2023-08-09 11:51:00
ww,20,2023-08-09 11:52:00
zhiling,22,2023-08-09 11:53:00
yifei,30,2023-08-09 11:54:00
fengjie,98,2023-08-09 11:55:00

5.2.1.?inner join

對(duì) 2 個(gè)流式數(shù)據(jù)進(jìn)行 join 操作,輸出模式僅支持append模式。

不帶 watermast 的 inner join(join 的速度很慢):

import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因?yàn)?ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 第 1 個(gè) streamval nameSexStream: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), arr(1), Timestamp.valueOf(arr(2)))}).toDF("name", "sex", "ts1")// 第 2 個(gè) streamval nameAgeStream: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 8888).load.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), arr(1).toInt, Timestamp.valueOf(arr(2)))}).toDF("name", "age", "ts2")// join 操作val joinResult: DataFrame = nameSexStream.join(nameAgeStream, "name")// 數(shù)據(jù)輸出joinResult.writeStream.outputMode("append").format("console").trigger(Trigger.ProcessingTime(0)).start().awaitTermination()// 關(guān)閉執(zhí)行環(huán)境spark.stop()}
}//      數(shù)據(jù)輸出:
//      +-------+------+-------------------+---+-------------------+
//      |   name|   sex|                ts1|age|                ts2|
//      +-------+------+-------------------+---+-------------------+
//      |zhiling|female|2023-08-09 11:53:00| 22|2023-08-09 11:53:00|
//      |     ww|female|2023-08-09 11:52:00| 20|2023-08-09 11:52:00|
//      |  yifei|female|2023-08-09 11:55:00| 30|2023-08-09 11:54:00|
//      |     zs|  male|2023-08-09 11:51:00| 19|2023-08-09 11:51:00|
//      |fengjie|female|2023-08-09 11:54:00| 98|2023-08-09 11:55:00|
//      |   lisi|female|2023-08-09 11:50:00| 18|2023-08-09 11:50:00|
//      +-------+------+-------------------+---+-------------------+

帶 watermast 的 inner join(join 的速度很慢):

import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因?yàn)?ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 第 1 個(gè) streamval nameSexStream: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), arr(1), Timestamp.valueOf(arr(2)))}).toDF("name1", "sex", "ts1")// 第 2 個(gè) streamval nameAgeStream: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 8888).load.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), arr(1).toInt, Timestamp.valueOf(arr(2)))}).toDF("name2", "age", "ts2").withWatermark("ts2", "1 minutes")// join 操作val joinResult: DataFrame = nameSexStream.join(nameAgeStream,expr("""|name1=name2 and|ts2 >= ts1 and|ts2 <= ts1 + interval 1 minutes""".stripMargin))// 數(shù)據(jù)輸出joinResult.writeStream.outputMode("append").format("console").trigger(Trigger.ProcessingTime(0)).start().awaitTermination()// 關(guān)閉執(zhí)行環(huán)境spark.stop()}
}//      數(shù)據(jù)輸出:
//      +-------+------+-------------------+-------+---+-------------------+
//      |  name1|   sex|                ts1|  name2|age|                ts2|
//      +-------+------+-------------------+-------+---+-------------------+
//      |zhiling|female|2023-08-09 11:53:00|zhiling| 22|2023-08-09 11:53:00|
//      |     ww|female|2023-08-09 11:52:00|     ww| 20|2023-08-09 11:52:00|
//      |     zs|  male|2023-08-09 11:51:00|     zs| 19|2023-08-09 11:51:00|
//      |fengjie|female|2023-08-09 11:54:00|fengjie| 98|2023-08-09 11:55:00|
//      |   lisi|female|2023-08-09 11:50:00|   lisi| 18|2023-08-09 11:50:00|
//      +-------+------+-------------------+-------+---+-------------------+

4.2.2.?outer join

外連接必須使用 watermast,和內(nèi)連接相比, 代碼幾乎一致, 只需要在連接的時(shí)候指定下連接類型即可:joinType = "left"。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因?yàn)?ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 第 1 個(gè) streamval nameSexStream: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), arr(1), Timestamp.valueOf(arr(2)))}).toDF("name1", "sex", "ts1")// 第 2 個(gè) streamval nameAgeStream: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 8888).load.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), arr(1).toInt, Timestamp.valueOf(arr(2)))}).toDF("name2", "age", "ts2").withWatermark("ts2", "1 minutes")// join 操作val joinResult: DataFrame = nameSexStream.join(nameAgeStream,expr("""|name1=name2 and|ts2 >= ts1 and|ts2 <= ts1 + interval 1 minutes""".stripMargin),joinType = "left")// 數(shù)據(jù)輸出joinResult.writeStream.outputMode("append").format("console").trigger(Trigger.ProcessingTime(0)).start().awaitTermination()// 關(guān)閉執(zhí)行環(huán)境spark.stop()}
}//      數(shù)據(jù)輸出:
//        +-------+------+-------------------+-------+---+-------------------+
//        |  name1|   sex|                ts1|  name2|age|                ts2|
//        +-------+------+-------------------+-------+---+-------------------+
//        |zhiling|female|2023-08-09 11:53:00|zhiling| 22|2023-08-09 11:53:00|
//        |     ww|female|2023-08-09 11:52:00|     ww| 20|2023-08-09 11:52:00|
//        |     zs|  male|2023-08-09 11:51:00|     zs| 19|2023-08-09 11:51:00|
//        |fengjie|female|2023-08-09 11:54:00|fengjie| 98|2023-08-09 11:55:00|
//        |   lisi|female|2023-08-09 11:50:00|   lisi| 18|2023-08-09 11:50:00|
//        +-------+------+-------------------+-------+---+-------------------+

6.?Streaming DF/DS 不支持的操作

到目前, DF/DS 的有些操作 Streaming DF/DS 還不支持:

  • 多個(gè)Streaming 聚合(例如在 DF 上的聚合鏈)目前還不支持
  • limit 和取前 N 行還不支持
  • distinct 也不支持
  • 僅僅支持對(duì) complete 模式下的聚合操作進(jìn)行排序操作
  • 僅支持有限的外連接
  • 有些方法不能直接用于查詢和返回結(jié)果, 因?yàn)樗麄冇迷诹魇綌?shù)據(jù)上沒有意義
    • count() 不能返回單行數(shù)據(jù), 必須是s.groupBy().count()
    • foreach() 不能直接使用, 而是使用: ds.writeStream.foreach(...)
    • show() 不能直接使用, 而是使用 console sink

如果執(zhí)行上面操作會(huì)看到這樣的異常: operation XYZ is not supported with streaming DataFrames/Datasets。


注:其他Spark相關(guān)系列文章鏈接由此進(jìn) ->??Spark文章匯總?


http://www.risenshineclean.com/news/46351.html

相關(guān)文章:

  • 網(wǎng)站加載效果怎么做的百度推廣代運(yùn)營
  • wordpress mysql重啟資源網(wǎng)站優(yōu)化排名軟件公司
  • 成華網(wǎng)站制作為什么中國禁止谷歌瀏覽器
  • 標(biāo)題優(yōu)化方法郴州seo快速排名
  • 今日濮陽重大新聞seo優(yōu)化服務(wù)是什么意思
  • asp.net做的網(wǎng)站要放到網(wǎng)上空間去_要放哪些文件上去網(wǎng)站建網(wǎng)站建設(shè)網(wǎng)站
  • 房山新農(nóng)村建設(shè)網(wǎng)站深圳百度seo公司
  • 免費(fèi)代理ip的網(wǎng)站百度搜索推廣操作簡要流程
  • 如何進(jìn)行網(wǎng)站運(yùn)營與規(guī)劃打開百度網(wǎng)頁
  • 國外做批發(fā)配件的 在哪個(gè)網(wǎng)站百度葷seo公司
  • 傳奇網(wǎng)站怎么制作教程查關(guān)鍵詞
  • windows做網(wǎng)站服務(wù)器杭州推廣公司
  • 什么是網(wǎng)站功能需求推推蛙品牌策劃
  • 有服務(wù)器可以做網(wǎng)站嗎站長工具是什么意思
  • 商業(yè)網(wǎng)站建立搜索引擎優(yōu)化seo應(yīng)用
  • 中國三農(nóng)建設(shè)工作委員會(huì)官方網(wǎng)站深圳網(wǎng)絡(luò)推廣最新招聘
  • 云服務(wù)器可以做網(wǎng)站嗎網(wǎng)絡(luò)營銷的一般流程
  • 知名企業(yè)網(wǎng)站建設(shè)哈爾濱網(wǎng)站制作軟件
  • 做鋼材的網(wǎng)站有哪些網(wǎng)站的網(wǎng)站建設(shè)
  • 個(gè)人網(wǎng)站畢業(yè)設(shè)計(jì)搜索關(guān)鍵詞然后排名怎樣提升
  • 國外兒童社區(qū)網(wǎng)站模板外鏈信息
  • 做微網(wǎng)站迅宇科技網(wǎng)店推廣是什么
  • 做網(wǎng)站的不給做robots文件百度推廣登錄后臺(tái)
  • 在百度上做網(wǎng)站多少錢百度收錄提交
  • 杭州旅游 網(wǎng)站建設(shè)必應(yīng)搜索引擎地址
  • 一個(gè)網(wǎng)站可以做多少個(gè)小程序營銷推廣方案
  • 做301跳轉(zhuǎn)會(huì)影響之前網(wǎng)站排名嗎上海谷歌推廣
  • 在國外網(wǎng)站做中國旅游推廣百度關(guān)鍵詞熱度排名
  • 什么專業(yè)可以做網(wǎng)站百度店鋪免費(fèi)入駐
  • 學(xué)校定制網(wǎng)站建設(shè)公司深圳優(yōu)化公司高粱seo較