中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁(yè) > news >正文

阿里巴巴網(wǎng)站圖片怎么做國(guó)際時(shí)事新聞2022最新

阿里巴巴網(wǎng)站圖片怎么做,國(guó)際時(shí)事新聞2022最新,建e全景,武漢網(wǎng)絡(luò)推廣自然排名Flink ?持?常多的數(shù)據(jù) Join ?式,主要包括以下三種: 動(dòng)態(tài)表(流)與動(dòng)態(tài)表(流)的 Join動(dòng)態(tài)表(流)與外部維表(?如 Redis)的 Join動(dòng)態(tài)表字段的列轉(zhuǎn)?&#xf…

Flink ?持?常多的數(shù)據(jù) Join ?式,主要包括以下三種:

  • 動(dòng)態(tài)表(流)與動(dòng)態(tài)表(流)的 Join
  • 動(dòng)態(tài)表(流)與外部維表(?如 Redis)的 Join
  • 動(dòng)態(tài)表字段的列轉(zhuǎn)?(?種特殊的 Join)

細(xì)分 Flink SQL ?持的 Join:

Regular Join:流與流的 Join,包括 Inner Equal Join、Outer Equal Join

Interval Join:流與流的 Join,兩條流?段時(shí)間區(qū)間內(nèi)的 Join

Temporal Join:流與流的 Join,包括事件時(shí)間,處理時(shí)間的 Temporal Join,類似于離線中的快照 Join

Lookup Join:流與外部維表的 Join

Array Expansion:表字段的列轉(zhuǎn)?,類似于 Hive 的 explode 數(shù)據(jù)炸開(kāi)的列轉(zhuǎn)?

Table Function:?定義函數(shù)的表字段的列轉(zhuǎn)?,?持 Inner Join 和 Left Outer Join

1.Regular Join

**Regular Join 定義(?持 Batch\Streaming):**Regular Join 和離線 Hive SQL ?樣的 Regular Join,通過(guò)條件關(guān)聯(lián)兩條流數(shù)據(jù)輸出。

**應(yīng)?場(chǎng)景:**?如?志關(guān)聯(lián)擴(kuò)充維度數(shù)據(jù),構(gòu)建寬表;?志通過(guò) ID 關(guān)聯(lián)計(jì)算 CTR。

Regular Join 包含以下?種(以 L 作為左流中的數(shù)據(jù)標(biāo)識(shí), R 作為右流中的數(shù)據(jù)標(biāo)識(shí)):

  • Inner Join(Inner Equal Join):流任務(wù)中,只有兩條流 Join 到才輸出,輸出 +[L, R]
  • Left Join(Outer Equal Join):流任務(wù)中,左流數(shù)據(jù)到達(dá)之后,?論有沒(méi)有 Join 到右流的數(shù)據(jù),都會(huì)輸出(Join 到輸出 +[L, R] ,沒(méi) Join 到輸出 +[L, null] ),如果右流數(shù)據(jù)到達(dá)之后,發(fā)現(xiàn)左流之前輸出過(guò)沒(méi)有 Join 到的數(shù)據(jù),則會(huì)發(fā)起回撤流,先輸出 -[L, null] ,然后輸出 +[L, R]
  • Right Join(Outer Equal Join):有 Left Join ?樣,左表和右表的執(zhí)?邏輯完全相反
  • Full Join(Outer Equal Join):流任務(wù)中,左流或者右流的數(shù)據(jù)到達(dá)之后,?論有沒(méi)有 Join 到另外?條流的數(shù)據(jù),都會(huì)輸出(對(duì)右流來(lái)說(shuō):Join 到輸出 +[L, R] ,沒(méi) Join 到輸出 +[null, R] ;對(duì)左流來(lái)說(shuō):Join 到輸出 +[L, R] ,沒(méi) Join 到輸出 +[L, null] )。如果?條流的數(shù)據(jù)到達(dá)之后,發(fā)現(xiàn)另?條流之前輸出過(guò)沒(méi)有 Join 到的數(shù)據(jù),則會(huì)發(fā)起回撤流(左流數(shù)據(jù)到達(dá)為例:回撤 -[null, R] ,輸出+[L, R] ,右流數(shù)據(jù)到達(dá)為例:回撤 -[L, null] ,輸出 +[L, R] )

**實(shí)際案例:**案例為曝光?志關(guān)聯(lián)點(diǎn)擊?志,篩選既有曝光?有點(diǎn)擊的數(shù)據(jù),并且補(bǔ)充點(diǎn)擊的擴(kuò)展參數(shù)

a)Inner Join 案例 :
-- 曝光?志數(shù)據(jù)
CREATE TABLE show_log_table (log_id BIGINT,show_params STRING
) WITH ('connector' = 'datagen','rows-per-second' = '2','fields.show_params.length' = '1','fields.log_id.min' = '1','fields.log_id.max' = '100'
);-- 點(diǎn)擊?志數(shù)據(jù)
CREATE TABLE click_log_table (log_id BIGINT,click_params STRING
)
WITH ('connector' = 'datagen','rows-per-second' = '2','fields.click_params.length' = '1','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE sink_table (s_id BIGINT,s_params STRING,c_id BIGINT,c_params STRING
) WITH ('connector' = 'print'
);-- 流的 INNER JOIN,條件為 log_id
INSERT INTO sink_table
SELECTshow_log_table.log_id as s_id,show_log_table.show_params as s_params,click_log_table.log_id as c_id,click_log_table.click_params as c_params
FROM show_log_table
INNER JOIN click_log_table 
ON show_log_table.log_id = click_log_table.log_id;

輸出結(jié)果如下:

+I[5, d, 5, f]
+I[5, d, 5, 8]
+I[5, d, 5, 2]
+I[3, 4, 3, 0]
+I[3, 4, 3, 3]
b)Left Join 案例:
CREATE TABLE show_log_table (log_id BIGINT,show_params STRING
) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.show_params.length' = '3','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE click_log_table (log_id BIGINT,click_params STRING
)
WITH ('connector' = 'datagen','rows-per-second' = '1','fields.click_params.length' = '3','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE sink_table (s_id BIGINT,s_params STRING,c_id BIGINT,c_params STRING
) WITH ('connector' = 'print'
);set sql-client.execution.result-mode=changelog;INSERT INTO sink_table
SELECTshow_log_table.log_id as s_id,show_log_table.show_params as s_params,click_log_table.log_id as c_id,click_log_table.click_params as c_params
FROM show_log_table
LEFT JOIN click_log_table 
ON show_log_table.log_id = click_log_table.log_id;

輸出結(jié)果如下:

+I[5, f3c, 5, c05]
+I[5, 6e2, 5, 1f6]
+I[5, 86b, 5, 1f6]
+I[5, f3c, 5, 1f6]
-D[3, 4ab, null, null]
-D[3, 6f2, null, null]
+I[3, 4ab, 3, 765]
+I[3, 6f2, 3, 765]
+I[2, 3c4, null, null]
+I[3, 4ab, 3, a8b]
+I[3, 6f2, 3, a8b]
+I[2, c03, null, null]
...
c)Full Join 案例:
CREATE TABLE show_log_table (log_id BIGINT,show_params STRING
) WITH ('connector' = 'datagen','rows-per-second' = '2','fields.show_params.length' = '1','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE click_log_table (log_id BIGINT,click_params STRING
)WITH ('connector' = 'datagen','rows-per-second' = '2','fields.click_params.length' = '1','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE sink_table (s_id BIGINT,s_params STRING,c_id BIGINT,c_params STRING
) WITH ('connector' = 'print'
);INSERT INTO sink_table
SELECTshow_log_table.log_id as s_id,show_log_table.show_params as s_params,click_log_table.log_id as c_id,click_log_table.click_params as c_params
FROM show_log_table
FULL JOIN click_log_table 
ON show_log_table.log_id = click_log_table.log_id;

輸出結(jié)果如下:

+I[null, null, 7, 6]
+I[6, 5, null, null]
-D[1, c, null, null]
+I[1, c, 1, 2]
+I[3, 1, null, null]
+I[null, null, 7, d]
+I[10, 0, null, null]
+I[null, null, 2, 6]
-D[null, null, 7, 6]
-D[null, null, 7, d]
...

關(guān)于 Regular Join 的注意事項(xiàng):

  • 實(shí)時(shí) Regular Join 可以不是 等值 join,等值 join 和 ?等值 join 區(qū)別在于,等值 join 數(shù)據(jù) shuffle 策略是 Hash,會(huì)按照 Join on 中的等值條件作為 id 發(fā)往對(duì)應(yīng)的下游; ?等值 join 數(shù)據(jù) shuffle 策略是 Global,所有數(shù)據(jù)發(fā)往?個(gè)并發(fā),按照?等值條件進(jìn)?關(guān)聯(lián)。

    等值 Join:

在這里插入圖片描述

非等值 Join:

在這里插入圖片描述

  • Join 的流程是左流新來(lái)?條數(shù)據(jù)之后,會(huì)和右流中符合條件的所有數(shù)據(jù)做 Join,然后輸出。

  • 流的上游是?限的數(shù)據(jù),要做到關(guān)聯(lián)的話,Flink 會(huì)將兩條流的所有數(shù)據(jù)都存儲(chǔ)在 State 中,所以 Flink 任務(wù)的 State 會(huì)?限增?,需要為 State 配置合適的 TTL,以防? State 過(guò)?。

2.Interval Join(時(shí)間區(qū)間 Join)

**Interval Join 定義(?持 Batch\Streaming):**Interval Join 可以讓?條流去 Join 另?條流中前后?段時(shí)間內(nèi)的數(shù)據(jù)。

**應(yīng)?場(chǎng)景:**Regular Join 會(huì)產(chǎn)?回撤流,在實(shí)時(shí)數(shù)倉(cāng)中?般寫?的 sink 是類似于 Kafka 的消息隊(duì)列,然后接 clickhouse 等引擎,這些引擎不具備處理回撤流的能?,Interval Join ?于消滅回撤流的。

Interval Join 包含以下?種(以 L 作為左流中的數(shù)據(jù)標(biāo)識(shí), R 作為右流中的數(shù)據(jù)標(biāo)識(shí)):

  • Inner Interval Join:流任務(wù)中,只有兩條流 Join 到(滿? Join on 中的條件:兩條流的數(shù)據(jù)在時(shí)間區(qū)間 + 滿?其他等值條件)才輸出,輸出 +[L, R]
  • Left Interval Join:流任務(wù)中,左流數(shù)據(jù)到達(dá)之后,如果沒(méi)有 Join 到右流的數(shù)據(jù),就會(huì)等待(放在 State 中等),如果右流之后數(shù)據(jù)到達(dá),發(fā)現(xiàn)能和剛剛那條左流數(shù)據(jù) Join 到,則會(huì)輸出 +[L,R] 。事件時(shí)間中隨著 Watermark 的推進(jìn)(也?持處理時(shí)間)。如果發(fā)現(xiàn)發(fā)現(xiàn)左流 State 中的數(shù)據(jù)過(guò)期了,就把左流中過(guò)期的數(shù)據(jù)從 State 中刪除,然后輸出 +[L, null] ,如果右流 State 中的數(shù)據(jù)過(guò)期了,就直接從 State 中刪除。
  • Right Interval Join:和 Left Interval Join 執(zhí)?邏輯?樣,只不過(guò)左表和右表的執(zhí)?邏輯完全相反。
  • Full Interval Join:流任務(wù)中,左流或者右流的數(shù)據(jù)到達(dá)之后,如果沒(méi)有 Join 到另外?條流的數(shù)據(jù),就會(huì)等待(左流放在左流對(duì)應(yīng)的 State 中等,右流放在右流對(duì)應(yīng)的 State 中等),如果之后另?條流數(shù)據(jù)到達(dá)之后,發(fā)現(xiàn)能和剛剛那條數(shù)據(jù) Join 到,則會(huì)輸出 +[L, R] 。事件時(shí)間中隨著 Watermark 的推進(jìn)(也?持處理時(shí)間),發(fā)現(xiàn) State 中的數(shù)據(jù)過(guò)期了,就將這些數(shù)據(jù)從 State 中刪除并且輸出(左流過(guò)期輸出+[L, null] ,右流過(guò)期輸出 -[null, R] )

**Inner Interval Join 和 Outer Interval Join 的區(qū)別在于:**Outer 在隨著時(shí)間推移的過(guò)程中,如果有數(shù)據(jù)過(guò)期了之后,會(huì)根據(jù)是否是 Outer 將沒(méi)有 Join 到的數(shù)據(jù)也給輸出。

**實(shí)際案例:**曝光?志關(guān)聯(lián)點(diǎn)擊?志,篩選既有曝光?有點(diǎn)擊的數(shù)據(jù),條件是曝光發(fā)?之后,4 ?時(shí)之內(nèi)的點(diǎn)擊,并且補(bǔ)充點(diǎn)擊的擴(kuò)展參數(shù)

a)Inner Interval Join
CREATE TABLE show_log_table (log_id BIGINT,show_params STRING,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time
) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.show_params.length' = '1','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE click_log_table (log_id BIGINT,click_params STRING,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time
)
WITH ('connector' = 'datagen','rows-per-second' = '1','fields.click_params.length' = '1','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE sink_table (s_id BIGINT,s_params STRING,c_id BIGINT,c_params STRING
) WITH ('connector' = 'print'
);INSERT INTO sink_table
SELECTshow_log_table.log_id as s_id,show_log_table.show_params as s_params,click_log_table.log_id as c_id,click_log_table.click_params as c_params
FROM show_log_table 
INNER JOIN click_log_table 
ON show_log_table.log_id = click_log_table.log_id
AND show_log_table.row_time BETWEEN click_log_table.row_time - INTERVAL '4' SECOND AND click_log_table.row_time

輸出結(jié)果如下:

6> +I[2, a, 2, 6]
6> +I[2, 6, 2, 6]
2> +I[4, 1, 4, 5]
2> +I[10, 8, 10, d]
2> +I[10, 7, 10, d]
2> +I[10, d, 10, d]
2> +I[5, b, 5, d]
6> +I[1, a, 1, 7]
b)Left Interval Join
CREATE TABLE show_log (log_id BIGINT,show_params STRING,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time
) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.show_params.length' = '1','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE click_log (log_id BIGINT,click_params STRING,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time
)
WITH ('connector' = 'datagen','rows-per-second' = '1','fields.click_params.length' = '1','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE sink_table (s_id BIGINT,s_params STRING,c_id BIGINT,c_params STRING
) WITH ('connector' = 'print'
);INSERT INTO sink_table
SELECTshow_log.log_id as s_id,show_log.show_params as s_params,click_log.log_id as c_id,click_log.click_params as c_params
FROM show_log LEFT JOIN click_log ON show_log.log_id = click_log.log_id
AND show_log.row_time BETWEEN click_log.row_time - INTERVAL '5' SECOND AND click_log.row_time

輸出結(jié)果如下:

+I[6, e, 6, 7]
+I[11, d, null, null]
+I[7, b, null, null]
+I[8, 0, 8, 3]
+I[13, 6, null, null]
c)Full Interval Join
CREATE TABLE show_log (log_id BIGINT,show_params STRING,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time
) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.show_params.length' = '1','fields.log_id.min' = '5','fields.log_id.max' = '15'
);CREATE TABLE click_log (log_id BIGINT,click_params STRING,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time
)
WITH ('connector' = 'datagen','rows-per-second' = '1','fields.click_params.length' = '1','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE sink_table (s_id BIGINT,s_params STRING,c_id BIGINT,c_params STRING
) WITH ('connector' = 'print'
);INSERT INTO sink_table
SELECTshow_log.log_id as s_id,show_log.show_params as s_params,click_log.log_id as c_id,click_log.click_params as c_params
FROM show_log FULL JOIN click_log ON show_log.log_id = click_log.log_id
AND show_log.row_time BETWEEN click_log.row_time - INTERVAL '5' SECOND AND click_log.row_time

輸出結(jié)果如下:

+I[6, 1, null, null]
+I[7, 3, 7, 8]
+I[null, null, 6, 6]
+I[null, null, 4, d]
+I[8, d, null, null]
+I[null, null, 3, b]

關(guān)于 Interval Join 的注意事項(xiàng):

實(shí)時(shí) Interval Join 可以不是 等值 join ,等值 join 和 ?等值 join 區(qū)別在于, 等值 join 數(shù)據(jù) shuffle 策略是 Hash,會(huì)按照 Join on 中的等值條件作為 id 發(fā)往對(duì)應(yīng)的下游; ?等值 join 數(shù)據(jù) shuffle 策略是 Global,所有數(shù)據(jù)發(fā)往?個(gè)并發(fā),將滿?條件的數(shù)據(jù)進(jìn)?關(guān)聯(lián)輸出。

3.Temporal Join(快照 Join)

**Temporal Join 定義(?持 Batch\Streaming):**同離線中的 拉鏈快照表 ,Flink SQL 中對(duì)應(yīng)的表叫做 Versioned Table ,使??個(gè)明細(xì)表去 join 這個(gè) Versioned Table 的 join 操作就叫做 Temporal Join。

Temporal Join 中,Versioned Table 是對(duì)同?條 key(在 DDL 中以 primary key 標(biāo)記同?個(gè) key)的歷史版本(根據(jù)時(shí)間劃分版本)做維護(hù),當(dāng)有明細(xì)表 Join 這個(gè)表時(shí),可以根據(jù)明細(xì)表中的時(shí)間版本選擇 Versioned Table 對(duì)應(yīng)時(shí)間區(qū)間內(nèi)的快照數(shù)據(jù)進(jìn)? join。

**應(yīng)?場(chǎng)景:**?如匯率數(shù)據(jù)(實(shí)時(shí)的根據(jù)匯率計(jì)算總?額),在 12:00 之前(事件時(shí)間),??幣和美元匯率是 7:1,在 12:00 之后變?yōu)?6:1,那么在 12:00 之前數(shù)據(jù)就要按照 7:1 進(jìn)?計(jì)算,12:00 之后就要按照 6:1 計(jì)算。

**Verisoned Table:**Verisoned Table 中存儲(chǔ)的數(shù)據(jù)通常來(lái)源于 CDC 或者會(huì)發(fā)?更新的數(shù)據(jù)。Flink SQL 會(huì)為 Versioned Table 維護(hù) Primary Key 下的所有歷史時(shí)間版本的數(shù)據(jù)。

**示例:**匯率計(jì)算中定義 Versioned Table 的兩種?式。

-- 定義?個(gè)匯率 versioned 表
CREATE TABLE currency_rates (currency STRING,conversion_rate DECIMAL(32, 2),update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,WATERMARK FOR update_time AS update_time,-- PRIMARY KEY 定義?式PRIMARY KEY(currency) NOT ENFORCED
) WITH ('connector' = 'kafka','value.format' = 'debezium-json',/* ... */
);-- 將數(shù)據(jù)源表按照 Deduplicate ?式定義為 Versioned Table
CREATE VIEW versioned_rates AS
SELECT currency, conversion_rate, update_time -- 1. 定義 `update_time` 為時(shí)間字段FROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY currency -- 2. 定義 `currency` 為主鍵ORDER BY update_time DESC -- 3. ORDER BY 中必須是時(shí)間戳列) AS rownum FROM currency_rates)
WHERE rownum = 1;

**Temporal Join ?持的時(shí)間語(yǔ)義:**事件時(shí)間、處理時(shí)間

**實(shí)際案例:**匯率計(jì)算以 事件時(shí)間 任務(wù)舉例

-- 1. 定義?個(gè)輸?訂單表
CREATE TABLE orders (order_id BIGINT,price BIGINT,currency STRING,order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time
) WITH ('connector' = 'filesystem', 'path' = 'file:///Users/hhx/Desktop/orders.csv','format' = 'csv'
);1,100,a,2023-11-01 10:10:10.100
2,200,a,2023-11-02 10:10:10.100
3,300,a,2023-11-03 10:10:10.100
4,300,a,2023-11-04 10:10:10.100
5,300,a,2023-11-05 10:10:10.100
6,300,a,2023-11-06 10:10:10.100-- 2. 定義?個(gè)匯率 versioned 表,其中 versioned 表的概念下?會(huì)介紹到
CREATE TABLE currency_rates (currency STRING,conversion_rate BIGINT,update_time TIMESTAMP(3),WATERMARK FOR update_time AS update_time,PRIMARY KEY(currency) NOT ENFORCED
) WITH ('connector' = 'filesystem', 'path' = 'file:///Users/hhx/Desktop/currency_rates.csv','format' = 'csv'
);a,10,2023-11-01 09:10:10.100
a,11,2023-11-01 10:00:10.100
a,12,2023-11-01 10:10:10.100
a,13,2023-11-01 10:20:10.100
a,14,2023-11-02 10:20:10.100
a,15,2023-11-03 10:20:10.100
a,16,2023-11-04 10:20:10.100
a,17,2023-11-05 10:20:10.100
a,18,2023-11-06 10:00:10.100
a,19,2023-11-06 10:11:10.100SELECTorder_id,price,orders.currency,conversion_rate,order_time,update_time
FROM orders
-- 3. Temporal Join 邏輯
-- SQL 語(yǔ)法為:FOR SYSTEM_TIME AS OF
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
ON orders.currency = currency_rates.currency;

可以看到相同的貨幣匯率會(huì)根據(jù)具體數(shù)據(jù)的事件時(shí)間不同, Join 到對(duì)應(yīng)時(shí)間的匯率【Join 到最近可用的匯率】:

在這里插入圖片描述

注意:

事件時(shí)間的 Temporal Join ?定要給左右兩張表都設(shè)置 Watermark。

事件時(shí)間的 Temporal Join ?定要把 Versioned Table 的主鍵包含在 Join on 的條件中。

**實(shí)際案例:**匯率計(jì)算以 處理時(shí)間 任務(wù)舉例

10:15> SELECT * FROM LatestRates;currency rate
======== ======
US Dollar 102
Euro 114
Yen 110:30> SELECT * FROM LatestRates;currency rate
======== ======
US Dollar 102
Euro 114
Yen 1-- 10:42 時(shí),Euro 的匯率從 114 變?yōu)?116
10:52> SELECT * FROM LatestRates;currency rate
======== ======
US Dollar 102
Euro 116 
Yen 1-- 從 Orders 表查詢數(shù)據(jù)
SELECT * FROM Orders;amount currency
====== =========2 Euro <== 在處理時(shí)間 10:15 到達(dá)的?條數(shù)據(jù)1 US Dollar <== 在處理時(shí)間 10:30 到達(dá)的?條數(shù)據(jù)2 Euro <== 在處理時(shí)間 10:52 到達(dá)的?條數(shù)據(jù)-- 執(zhí)?關(guān)聯(lián)查詢
SELECTo.amount,o.currency,r.rate, o.amount * r.rate
FROMOrders AS oJOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS rON r.currency = o.currency-- 結(jié)果如下:
amount currency rate amount*rate
====== ========= ======= ============2 Euro 114 228 <== 在處理時(shí)間 10:15 到達(dá)的?條數(shù)據(jù)1 US Dollar 102 102 <== 在處理時(shí)間 10:30 到達(dá)的?條數(shù)據(jù)2 Euro 116 232 <== 在處理時(shí)間 10:52 到達(dá)的?條數(shù)據(jù)

處理時(shí)間語(yǔ)義中是根據(jù)左流數(shù)據(jù)到達(dá)的時(shí)間決定拿到的匯率值,Flink 就只為 LatestRates 維護(hù)了最新的狀態(tài)數(shù)據(jù),不需要關(guān)?歷史版本的數(shù)據(jù)。

注意:

Processing-time temporal join is not supported yet.
4.Lookup Join(維表 Join)

**Lookup Join 定義(?持 Batch\Streaming):**Lookup Join 是維表 Join,實(shí)時(shí)數(shù)倉(cāng)場(chǎng)景中,實(shí)時(shí)獲取外部緩存。

**應(yīng)?場(chǎng)景:**Regular Join,Interval Join 等上?說(shuō)的 Join 都是流與流之間的 Join,? Lookup Join 是流與 Redis,Mysql,HBase 這種存儲(chǔ)介質(zhì)的 Join,Lookup 的意思是實(shí)時(shí)查找。

**實(shí)際案例:**使?曝光?戶?志流(show_log)關(guān)聯(lián)?戶畫像維表(user_profile)關(guān)聯(lián)到?戶的維度之后,提供給下游,計(jì)算分性別,年齡段的曝光?戶數(shù)使?。

輸?數(shù)據(jù): 曝光?戶?志流(show_log)數(shù)據(jù)(數(shù)據(jù)存儲(chǔ)在 kafka 中):

log_id timestamp user_id
1 2021-11-01 00:01:03 a
2 2021-11-01 00:03:00 b
3 2021-11-01 00:05:00 c
4 2021-11-01 00:06:00 b
5 2021-11-01 00:07:00 c

?戶畫像維表(user_profile)數(shù)據(jù)(數(shù)據(jù)存儲(chǔ)在 redis 中)

user_id(主鍵) age sex
a 12-18 男
b 18-24 ?
c 18-24 男

**注意:**redis 中的數(shù)據(jù)結(jié)構(gòu)是按照 key,value 存儲(chǔ)的,其中 key 為 user_id,value 為 age,sex 的 json。

CREATE TABLE show_log (log_id BIGINT,`timestamp` TIMESTAMP(3),user_id STRING,proctime AS PROCTIME()
) WITH ('connector' = 'filesystem', 'path' = 'file:///Users/hhx/Desktop/show_log.csv','format' = 'csv'
);1 2021-11-01 00:01:03 a
2 2021-11-01 00:03:00 b
3 2021-11-01 00:05:00 c
4 2021-11-01 00:06:00 b
5 2021-11-01 00:07:00 cCREATE TABLE user_profile (user_id STRING,age STRING,sex STRING,proctime AS PROCTIME(),PRIMARY KEY(user_id) NOT ENFORCED
) WITH ('connector' = 'filesystem', 'path' = 'file:///Users/hhx/Desktop/currency_rates.csv','format' = 'csv'
);a 12-18 男
b 18-24 ?
c 18-24 男CREATE TABLE sink_table (log_id BIGINT,`timestamp` TIMESTAMP(3),user_id STRING,proctime TIMESTAMP(3),age STRING,sex STRING
) WITH ('connector' = 'print'
);-- Processing-time temporal join is not supported yet.
-- lookup join 的 query 邏輯
INSERT INTO sink_table
SELECTs.log_id as log_id, s.`timestamp` as `timestamp`, s.user_id as user_id, s.proctime as proctime, u.sex as sex, u.age as age
FROM show_log AS s
LEFT JOIN user_profile FOR SYSTEM_TIME AS OF s.proctime AS u
ON s.user_id = u.user_id

輸出數(shù)據(jù)如下:

log_id timestamp user_id age sex
1 2021-11-01 00:01:03 a 12-18 男
2 2021-11-01 00:03:00 b 18-24 ?
3 2021-11-01 00:05:00 c 18-24 男
4 2021-11-01 00:06:00 b 18-24 ?
5 2021-11-01 00:07:00 c 18-24 男

實(shí)時(shí)的 lookup 維表關(guān)聯(lián)能使? 處理時(shí)間 去做關(guān)聯(lián)。

注意:

a)同?條數(shù)據(jù)關(guān)聯(lián)到的維度數(shù)據(jù)可能不同

實(shí)時(shí)數(shù)倉(cāng)中常?的實(shí)時(shí)維表是不斷變化的,當(dāng)前流表數(shù)據(jù)關(guān)聯(lián)完維表數(shù)據(jù)后,如果同?個(gè) key 的維表的數(shù)據(jù)發(fā)?了變化,已關(guān)聯(lián)到的維表的結(jié)果數(shù)據(jù)不會(huì)再同步更新。

舉個(gè)例?,維表中 user_id 為 1 的數(shù)據(jù)在 08:00 時(shí) age 由 12-18 變?yōu)榱?18-24,那么當(dāng)任務(wù)在 08:01 failover 之后從 07:59 開(kāi)始回溯數(shù)據(jù)時(shí),原本應(yīng)該關(guān)聯(lián)到 12-18 的數(shù)據(jù)會(huì)關(guān)聯(lián)到 18-24 的 age 數(shù)據(jù),有可能會(huì)影響數(shù)據(jù)質(zhì)量。

b)會(huì)發(fā)?實(shí)時(shí)的新建及更新的維表應(yīng)該建?起數(shù)據(jù)延遲的監(jiān)控,防?流表數(shù)據(jù)先于維表數(shù)據(jù)到達(dá),關(guān)聯(lián)不到維表數(shù)據(jù)

c)維表常?的性能問(wèn)題及優(yōu)化思路

維表性能問(wèn)題: ? qps 下訪問(wèn)維表存儲(chǔ)引擎產(chǎn)?的任務(wù)背壓,數(shù)據(jù)產(chǎn)出延遲問(wèn)題。

舉個(gè)例?:

**在沒(méi)有使?維表的情況下:**?條數(shù)據(jù)從輸? Flink 任務(wù)到輸出 Flink 任務(wù)的時(shí)延假如為 0.1 ms ,那么并?度為 1 的任務(wù)的吞吐可以達(dá)到 1 query / 0.1 ms = 1w qps 。

**在使?維表之后:**每條數(shù)據(jù)訪問(wèn)維表的外部存儲(chǔ)的時(shí)?為 2 ms ,那么?條數(shù)據(jù)從輸? Flink 任務(wù)到輸出 Flink 任務(wù)的時(shí)延就會(huì)變成 2.1 ms ,那么同樣并?度為 1 的任務(wù)的吞吐只能達(dá)到 1 query / 2.1 ms = 476 qps ,兩者的吞吐量相差 21 倍,導(dǎo)致維表 join 的算?會(huì)產(chǎn)?背壓,任務(wù)產(chǎn)出會(huì)延遲。

常?的優(yōu)化?案-DataStream:

  • **按照 redis 維表的 key 分桶 + local cache:**通過(guò)按照 key 分桶的?式,讓?多數(shù)據(jù)的維表關(guān)聯(lián)的數(shù)據(jù)訪問(wèn)?之前訪問(wèn)過(guò)得 local cache 即可,把訪問(wèn)外部存儲(chǔ) 2.1 ms 處理?個(gè) query 變?yōu)樵L問(wèn)內(nèi)存的 0.1 ms 處理?個(gè) query 的時(shí)?。
  • **異步訪問(wèn)外存:**DataStream api 有異步算?,可以利?線程池去同時(shí)多次請(qǐng)求維表外部存儲(chǔ),把 2.1 ms 處理 1 個(gè) query 變?yōu)?2.1 ms 處理 10 個(gè) query,吞吐可變優(yōu)化到 10 / 2.1 ms = 4761 qps。
  • **批量訪問(wèn)外存:**除了異步訪問(wèn)之外,還可以批量訪問(wèn)外部存儲(chǔ),舉例:在訪問(wèn) redis 維表的 1 query 占? 2.1 ms 時(shí)?中,其中可能有 2 ms 都是在?絡(luò)請(qǐng)求上?的耗時(shí) ,其中只有 0.1 ms 是 redis server 處理請(qǐng)求的時(shí)?,可以使? redis 提供的 pipeline 能?,在客戶端(也就是 flink 任務(wù) lookup join 算?中),攢?批數(shù)據(jù),使? pipeline 去同時(shí)訪問(wèn) redis sever,把 2.1 ms 處理 1 個(gè) query 變?yōu)?7ms(2ms + 50 * 0.1ms) 處理 50 個(gè) query,吞吐可變?yōu)?50 query / 7 ms = 7143 qps。

**實(shí)測(cè):**上述優(yōu)化效果中,最好?的是 1 + 3,2 相? 3 還是?條?條發(fā)請(qǐng)求,性能會(huì)差?些。

常?的優(yōu)化?案-Flink SQL:

**按照 redis 維表的 key 分桶 + local cache:**sql 中做分桶,得先做 group by,如果做了 group by 的聚合,就只能在 udaf 中做訪問(wèn) redis 處理,并且 UDAF 產(chǎn)出的結(jié)果只能是?條,實(shí)現(xiàn)復(fù)雜,因此選擇不做 keyby 分桶,直接使? local cache 做本地緩存,雖然【直接緩存】的效果?【先按照 key 分桶再做緩存】的效果差,但是也能減少訪問(wèn) redis 壓?。

**異步訪問(wèn)外存:**官?實(shí)現(xiàn)的 hbase connector ?持異步訪問(wèn),搜索 lookup.async。

https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/hbase/ 

**批量訪問(wèn)外存:**基于 redis 的批量訪問(wèn)外存優(yōu)化功能,參考下?。

https://mp.weixin.qq.com/s/ku11tCZp7CAFzpkqd4J1cQ
5.Regular Join 、Interval Join、Temporal Join、Lookup Join 總結(jié)
a)FlinkSQL 的 Join 按照流的性質(zhì)分為
  • 流與流的 Join:Regular Join+Interval Join+Temporal Join
  • 流于外部存儲(chǔ)的 Join:Lookup Join
b)Inner Join 與 Outer Join 區(qū)別

Inner Join:只有兩條流 Join 上才會(huì)發(fā)出,不涉及回撤流

Outer Join:Join 不上會(huì)發(fā)出 null,如果是 Regular Outer Join 涉及回撤流,Interval Outer Join 不涉及回撤流

c)Regular Join 、Interval Join、Temporal Join 區(qū)別

Regular Join:如果不設(shè)置狀態(tài)的 TTL,兩條流的所有數(shù)據(jù)都會(huì)暫存進(jìn)行 Join,涉及回撤流

Interval Join:可以選定 一條流指定時(shí)間區(qū)間內(nèi)數(shù)據(jù) 進(jìn)行 Join,不涉及回撤流

Temporal Join:根據(jù) 一條流的時(shí)間字段 選擇 另一條流的歷史時(shí)間區(qū)間 進(jìn)行 Join,不涉及回撤流

http://www.risenshineclean.com/news/61726.html

相關(guān)文章:

  • 旅游網(wǎng)站開(kāi)發(fā)團(tuán)隊(duì)百度廣告投放代理商
  • 南充網(wǎng)站建設(shè)公司seo 公司
  • 南通做網(wǎng)站的推廣普通話的文字內(nèi)容
  • 中國(guó)建設(shè)銀行新聞網(wǎng)站最近一周熱點(diǎn)新聞
  • 手機(jī)端企業(yè)網(wǎng)站源碼下載推廣產(chǎn)品的方式有哪些
  • notepad做網(wǎng)站網(wǎng)絡(luò)seo啥意思
  • 局域網(wǎng)網(wǎng)站開(kāi)發(fā)濟(jì)南seo外包公司
  • 外包網(wǎng)站建設(shè)費(fèi)用包括網(wǎng)站備份如何制作網(wǎng)頁(yè)鏈接教程
  • wordpress 制作模板seo優(yōu)化培訓(xùn)多少錢
  • asp網(wǎng)站 seob站推廣入口2023
  • 專做短篇的網(wǎng)站百度站長(zhǎng)工具域名查詢
  • 建網(wǎng)站程序怎么寫中小型企業(yè)網(wǎng)站設(shè)計(jì)與開(kāi)發(fā)
  • 網(wǎng)站開(kāi)發(fā)常見(jiàn)畢業(yè)設(shè)計(jì)題目互聯(lián)網(wǎng)營(yíng)銷顧問(wèn)
  • 建設(shè)銀行網(wǎng)站點(diǎn)擊次數(shù)百度風(fēng)云榜游戲
  • wordpress調(diào)用7天熱門文章seo優(yōu)化交流
  • 網(wǎng)站中文域名好嗎廣州seo推廣培訓(xùn)
  • 完備的網(wǎng)站建設(shè)怎么找百度客服
  • 下載中心免費(fèi)下載seo搜索引擎優(yōu)化方案
  • 公司名被注冊(cè)網(wǎng)站網(wǎng)站seo優(yōu)化檢測(cè)
  • 哪里有免費(fèi)的ppt模板下載網(wǎng)站免費(fèi)seo教程資源
  • 大型自適應(yīng)的網(wǎng)站開(kāi)發(fā)互動(dòng)營(yíng)銷案例100
  • 做旅游的網(wǎng)站的目的和意義什么是引流推廣
  • 網(wǎng)站建設(shè)就問(wèn)山東聚搜網(wǎng)絡(luò)f南寧網(wǎng)絡(luò)推廣有幾家
  • 企業(yè)自己做網(wǎng)站營(yíng)銷培訓(xùn)心得體會(huì)
  • 重慶建網(wǎng)站的公司集中在哪里百度醫(yī)生
  • qq空間認(rèn)證的網(wǎng)站后臺(tái)根目錄青島設(shè)計(jì)優(yōu)化公司
  • 政府網(wǎng)站平臺(tái)建設(shè)情況發(fā)布外鏈的步驟
  • 做音樂(lè)網(wǎng)站首頁(yè)要求雅思培訓(xùn)班價(jià)格一般多少
  • 導(dǎo)航網(wǎng)站開(kāi)發(fā)用戶文檔新站seo優(yōu)化快速上排名
  • 玉林網(wǎng)站制作想做百度推廣找誰(shuí)