真人做爰網(wǎng)站怎么提交網(wǎng)址讓百度收錄
? Flink SQL支持對動態(tài)表進(jìn)行復(fù)雜而靈活的連接操作。 為了處理不同的場景,需要多種查詢語義,因此有幾種不同類型的 Join。默認(rèn)情況下,joins 的順序是沒有優(yōu)化的。表的 join 順序是在 FROM
從句指定的。可以通過把更新頻率最低的表放在第一個(gè)、頻率最高的放在最后這種方式來微調(diào) join 查詢的性能。需要確保表的順序不會產(chǎn)生笛卡爾積,因?yàn)椴恢С诌@樣的操作并且會導(dǎo)致查詢失敗。
? Flink Join根據(jù)輸入源形式不同可以分為雙流Join、維表Join和其他Join多種形式,下面根據(jù)大類分別介紹各自特點(diǎn)。
一 雙流JOIN
? 在正式進(jìn)入FlinkSQL Join場景研究之前,首先我們先介紹一下在FlinkSQL場景下常見的Kafka數(shù)據(jù)流分類。截止到Flink1.18為止,目前常見的Kafka數(shù)據(jù)流包括不含鍵更新的普通Kafka數(shù)據(jù)流(即Kafka SQL Connector數(shù)據(jù)流)和包含鍵更新的Kafka數(shù)據(jù)流(即Upsert-Kafka SQL Connector數(shù)據(jù)流)兩種。
1 Regular Join
? Regular join 是最通用的 join 類型。在這種 join 下,join 兩側(cè)表的任何新記錄或變更都是可見的,并會影響整個(gè) join 的結(jié)果。對于流式查詢,regular join 的語法是最靈活的,允許任何類型的更新(插入、更新、刪除)輸入表。 然而,這種操作具有重要的操作意義:Flink 需要將 Join 輸入的兩邊數(shù)據(jù)永遠(yuǎn)保持在狀態(tài)中。 因此,計(jì)算查詢結(jié)果所需的狀態(tài)可能會無限增長,這取決于所有輸入表的輸入數(shù)據(jù)量。你可以提供一個(gè)合適的狀態(tài) time-to-live (TTL) 配置來防止?fàn)顟B(tài)過大。注意:這樣做可能會影響查詢的正確性。
? 左右兩邊流數(shù)據(jù)都能驅(qū)動join,左側(cè)流新加入數(shù)據(jù)會和右側(cè)流狀態(tài)中所有匹配記錄join上;同理,右側(cè)流新增數(shù)據(jù)會和左側(cè)流所有匹配記錄join上,外連接不會等待,即使Join不上也會即及時(shí)輸出,待對側(cè)數(shù)據(jù)到來通過回撤修復(fù)數(shù)據(jù)。
-
Inner Join
根據(jù) join 限制條件返回一個(gè)簡單的笛卡爾積。目前只支持 equi-joins,即:至少有一個(gè)等值條件。不支持任意的 cross join 和 theta join。
select t1.order_id as order_id,t2.product_id as product_id,t1.create_time as create_time from tbl_order t1 join tbl_order_product t2 on t1.order_id = t2.order_id ;
Inner join不會產(chǎn)生回撤流,source端可以是Kafka SQL Connector也可以試Upsert-kafka SQL Connector,也可以是混合模式,sink端理論均可以是Kafka Connector,但如果輸入端有重復(fù)輸入,輸出端可以設(shè)置成Upsert-Kafka SQL Connector接收數(shù)據(jù)。Upsert-Kafka SQL Connector注意設(shè)置主鍵。
-
outer join
返回所有符合條件的笛卡爾積(即:所有通過 join 條件連接的行),加上所有外表沒有匹配到的行。Flink 支持 LEFT、RIGHT 和 FULL outer joins。目前只支持 equi-joins,即:至少有一個(gè)等值條件。不支持任意的 cross join 和 theta join。
select t1.order_id as order_id,t2.product_id as product_id,t1.create_time as create_time from tbl_order t1 left join tbl_order_product t2 on t1.order_id = t2.order_id ;select t1.order_id as order_id,t2.product_id as product_id,t1.create_time as create_time from tbl_order t1 right join tbl_order_product t2 on t1.order_id = t2.order_id ;select t1.order_id as order_id,t2.product_id as product_id,t1.create_time as create_time from tbl_order t1 full join tbl_order_product t2 on t1.order_id = t2.order_id ;
Outer Join會產(chǎn)生回撤流,source端可以是Kafka SQL Connector也可以是Upsert-kafka SQL Connector,也可以是混合模式,sink端理僅支持設(shè)置成Upsert-Kafka SQL Connector接收數(shù)據(jù)。Upsert-Kafka SQL Connector注意設(shè)置主鍵。
-
Regular Join總結(jié)應(yīng)用模式如下(a代表Append-Only流,u代表Upsert-Kafka流):
-
a join a => a|u
-
u join u => a|u
-
a join u => a|u
-
a left join a => u
-
u left join u => u
-
a left join u => u
-
2 Interval Join
? 返回一個(gè)符合 join 條件和時(shí)間限制的簡單笛卡爾積。Interval join 需要至少一個(gè) equi-join 條件和一個(gè) join 兩邊都包含的時(shí)間限定 join 條件。范圍判斷可以定義成就像一個(gè)條件(<, <=, >=, >),也可以是一個(gè) BETWEEN 條件,或者兩邊表的一個(gè)相同類型(即:處理時(shí)間 或 事件時(shí)間)的時(shí)間屬性 的等式判斷。
? 下面列舉了一些有效的 interval join 時(shí)間條件:
ltime = rtime
ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
? 對于流式查詢,對比 regular join,interval join 只支持有時(shí)間屬性的Append-Only表。 由于時(shí)間屬性是遞增的,Flink 從狀態(tài)中移除舊值也不會影響結(jié)果的正確性,即interval join會根據(jù)間隔自動維護(hù)狀態(tài)大小,不丟棄狀態(tài)也不會讓狀態(tài)無限增長。
-
Inner join
select * from tbl_order t1 join tbl_shopment t2 on t1.order_id = t2.order_id and t1.create_time between t2.create_time - interval '4' hour and t2.create_time ;
? 輸入源只支持Kafka SQL Connector,不支持任何一方回撤流,這也可以理解,因?yàn)镮nterval Join是有時(shí)間屬性參與Join的。輸出數(shù)據(jù)可以是Kafka SQL Connector也可以試Upsert-kafka SQL Connector。Upsert-kafka SQL Connector要注意鍵設(shè)計(jì)。
-
Outer join
select * from tbl_order t1 left join tbl_shopment t2 on t1.order_id = t2.order_id and t1.create_time between t2.create_time - interval '4' hour and t2.create_time ;select * from tbl_order t1 right join tbl_shopment t2 on t1.order_id = t2.order_id and t1.create_time between t2.create_time - interval '4' hour and t2.create_time ;select * from tbl_order t1 full join tbl_shopment t2 on t1.order_id = t2.order_id and t1.create_time between t2.create_time - interval '4' hour and t2.create_time ;
? 輸入端僅至此Kafka SQL Connector,不支持任何一方回撤流,這也可以理解,因?yàn)镮nterval Join是有時(shí)間屬性參與Outer Join的。輸出數(shù)據(jù)可以是Kafka SQL Connector也可以試Upsert-kafka SQL Connector。Upsert-kafka SQL Connector要注意鍵設(shè)計(jì)。
-
注意點(diǎn)
-
測試要配置并行度為1,否則右表關(guān)聯(lián)不上數(shù)據(jù)因?yàn)樗痪€識別不到會而不超時(shí)輸出;
executionEnvironment.setParallelism(1);
-
left join右表關(guān)聯(lián)不上輸出條件
- 右表關(guān)聯(lián)數(shù)據(jù)出現(xiàn)觸發(fā)輸出
- 超時(shí)觸發(fā)器輸出關(guān)聯(lián)不上數(shù)據(jù)
-
-
Interval Join總結(jié)應(yīng)用模式如下(a代表Append-Only流,u代表Upsert-Kafka流):
-
a join a => a|u
-
a left join a => a|u
-
3 Temporal Join(Snapshot Join)
? 時(shí)態(tài)表(Temporal table)是一個(gè)隨時(shí)間變化的表:在 Flink 中被稱為動態(tài)表。時(shí)態(tài)表中的行與一個(gè)或多個(gè)時(shí)間段相關(guān)聯(lián),所有 Flink 中的表都是時(shí)態(tài)的(Temporal)。 時(shí)態(tài)表包含一個(gè)或多個(gè)版本的表快照,它可以是一個(gè)變化的歷史表,跟蹤變化(例如,數(shù)據(jù)庫變化日志,包含所有快照)或一個(gè)變化的維度表,也可以是一個(gè)將變更物化的維表(例如,存放最終快照的數(shù)據(jù)表)。
-
Inner join
select t1.order_id as order_id,t1.user_id as user_id,t2.user_name as user_name,t1.create_time as create_time from tbl_order t1 join tbl_user for system_time as of t1.create_time t2 on t1.user_id = t2.user_id ;
特點(diǎn):
- 左右兩邊事件時(shí)間屬性,標(biāo)識兩側(cè)流join場景,如果處理時(shí)間請參考Lookup join;
- 只支持event-time,如果是processing-time那么就變成join最新版本數(shù)據(jù),同Lookup Join;
- 左表支持append流和upsert流;
- 右表只支持upsert流;
- 輸出可以是append流或者upsert流;
- 左表觸發(fā)計(jì)算,右表更新不觸發(fā)計(jì)算;
- 設(shè)置超時(shí)時(shí)間:
tableEnvironment.getConfig().set("table.exec.source.idle-timeout","3s");
;
-
Left join
select t1.order_id as order_id,t1.user_id as user_id,t2.user_name as user_name,t1.create_time as create_time from tbl_order t1 left join tbl_user for system_time as of t1.create_time t2 on t1.user_id = t2.user_id ;
特點(diǎn):
- 左右兩邊事件時(shí)間屬性,標(biāo)識兩側(cè)流join場景,如果處理時(shí)間請參考Lookup join;
- 只支持event-time,如果是processing-time那么就變成join最新版本數(shù)據(jù),同Lookup Join;
- 左表支持append流和upsert流;
- 右表只支持upsert流;
- 輸出可以是append流或者upsert流;
- 左表觸發(fā)計(jì)算,右表更新不觸發(fā)計(jì)算;
- 設(shè)置超時(shí)時(shí)間:
tableEnvironment.getConfig().set("table.exec.source.idle-timeout","3s");
;
-
Snapshot Join總結(jié)應(yīng)用模式如下(a代表Append-Only流,u代表Upsert-Kafka流):
-
a join u => a|u
-
u join u => u
-
a left join u => a|u
-
u left join u => u
-
4 Window Join
? 窗口關(guān)聯(lián)就是增加時(shí)間維度到關(guān)聯(lián)條件中。在此過程中,窗口關(guān)聯(lián)將兩個(gè)流中在同一窗口且符合 join 條件的元素 join 起來。窗口關(guān)聯(lián)的語義和DataStream window join相同。
? 在流式查詢中,與其他連續(xù)表上的關(guān)聯(lián)不同,窗口關(guān)聯(lián)不產(chǎn)生中間結(jié)果,只在窗口結(jié)束產(chǎn)生一個(gè)最終的結(jié)果。另外,窗口關(guān)聯(lián)會清除不需要的中間狀態(tài)。
? 通常,窗口關(guān)聯(lián)和窗口表值函數(shù)一起使用。而且,窗口關(guān)聯(lián)可以在其他基于窗口表值函數(shù)的操作后使用,例如窗口聚合,窗口 Top-N和窗口關(guān)聯(lián)。
? 目前,窗口關(guān)聯(lián)需要在 join on 條件中包含兩個(gè)輸入表的 window_start
等值條件和 window_end
等值條件。
? 窗口關(guān)聯(lián)支持 INNER/LEFT/RIGHT/FULL OUTER/ANTI/SEMI JOIN。
-
語法
select ... from l [left|right|full outer] join r -- l and r are relations applied windowing TVF on l.window_start = r.window_start and l.window_end = r.window_end and ...
-
注意
-
當(dāng)前版本窗口Join必須同時(shí)指定window_start和window_end等值條件
-
窗口Join不支持源是upsert流的情況
-
-
限制
- Join 子句的限制
? 目前,窗口關(guān)聯(lián)需要在 join on 條件中包含兩個(gè)輸入表的
window_start
等值條件和window_end
等值條件。未來,如果是滾動或滑動窗口,只需要在 join on 條件中包含窗口開始相等即可。- 輸入的窗口表值函數(shù)的限制
? 目前,關(guān)聯(lián)的左右兩邊必須使用相同的窗口表值函數(shù)。這個(gè)規(guī)則在未來可以擴(kuò)展,比如:滾動和滑動窗口在窗口大小相同的情況下 join。
- 窗口表值函數(shù)之后直接使用窗口關(guān)聯(lián)的限制
? 目前窗口關(guān)聯(lián)支持作用在滾動(TUMBLE)、滑動(HOP)和累積(CUMULATE)窗口表值函數(shù)之上,但是還不支持會話窗口(SESSION)。
-
Snapshot Join總結(jié)應(yīng)用模式如下(a代表Append-Only流,u代表Upsert-Kafka流):
-
a join a => a|u
-
a left join a => a|u
-
二 維表JOIN
5 Lookup Join(processing-time temporal join)
? lookup join 通常用于使用從外部系統(tǒng)查詢的數(shù)據(jù)來豐富表。join 要求一個(gè)表具有處理時(shí)間屬性,另一個(gè)表由查找源連接器(lookup source connnector)支持。通常使用基于處理時(shí)間的流表與外部版本表(例如 mysql、hbase)的最新版本相關(guān)聯(lián)(即processing-time temporal join 常常用在使用外部系統(tǒng)來豐富流的數(shù)據(jù))。
? 通過定義一個(gè)處理時(shí)間屬性,這個(gè) join 總是返回最新的值??梢詫?build side 中被查找的表想象成一個(gè)存儲所有記錄簡單的 HashMap<K,V>
。 這種 join 的強(qiáng)大之處在于,當(dāng)無法在 Flink 中將表具體化為動態(tài)表時(shí),它允許 Flink 直接針對外部系統(tǒng)工作。
? Join操作由流端觸發(fā),當(dāng)新增一個(gè)流數(shù)據(jù),會查詢外部DB映射,獲取數(shù)據(jù)補(bǔ)全后發(fā)出結(jié)果數(shù)據(jù)。
-
inner join
select t1.order_id as order_id,t1.user_id as user_id,t2.user_name as user_name,t1.create_time as create_time from tbl_order t1 join tbl_user for system_time as of t1.create_time t2 on t1.user_id = t2.user_id ;
特點(diǎn):
- Lookup join只支持inner join和left join;
- 源必須聲明處理時(shí)間,即row_time as proctime(),如果源聲明為事件時(shí)間,那么要走Snapshot join方式;
- 源支持kafka和upsert-kafka連接器
- 輸出支持kafka和upsert-kafka連接器
- 查詢外部表注意使用異步IO/Cache特性優(yōu)化外表查詢性能
-
Left join
select t1.order_id as order_id,t1.user_id as user_id,t2.user_name as user_name,t1.create_time as create_time from tbl_order t1 left join tbl_user for system_time as of t1.create_time t2 on t1.user_id = t2.user_id ;
特點(diǎn):
- Lookup join只支持inner join和left join;
- 源必須聲明處理時(shí)間,即row_time as proctime(),如果源聲明為事件時(shí)間,那么要走Snapshot join方式;
- 源支持kafka和upsert-kafka連接器
- 輸出支持kafka和upsert-kafka連接器
- 查詢外部表注意使用異步IO/Cache特性優(yōu)化外表查詢性能
-
Lookup Join總結(jié)應(yīng)用模式如下(a代表Append-Only流,s代表外表靜態(tài)表):
-
a join s => a|u
-
u join s => a|u
-
a left join s => a|u
-
u left join s => a|u
-
三 其他JOIN
6 Array Expansion
? 對于輸入的包含數(shù)組列的單行數(shù)據(jù),返回給定數(shù)組中每個(gè)元素的新行,拆分后的數(shù)據(jù)除解析數(shù)組元素外,其他元素與原始行數(shù)據(jù)一致。
selectorder_id,order_tag,tag
from tbl_order_source cross join unnest(order_tag) as t(tag)
;
特征:
- 輸入數(shù)據(jù)可以是Append或者Upsert
- 輸出數(shù)據(jù)可以是Append或者Upsert
7 Table Function
? 將表與表函數(shù)的結(jié)果聯(lián)接。左側(cè)(外部)表的每一行都與表函數(shù)的相應(yīng)調(diào)用產(chǎn)生的所有行相連接。用戶自定義表函數(shù)必須在使用前注冊。
? 對于是inner join,如果表函數(shù)調(diào)用返回一個(gè)空結(jié)果,那么左表的這行數(shù)據(jù)將不會輸出。對于left join,如果表函數(shù)調(diào)用返回了一個(gè)空結(jié)果,則保留相應(yīng)的行,并用空值填充未關(guān)聯(lián)到的結(jié)果。當(dāng)前,針對 lateral table 的 left outer join 需要 ON 子句中有一個(gè)固定的 TRUE 連接條件。
select order_id,order_tag,tag
from tbl_order_source
left join lateral table(table_func(order_tag)) t(tag) on true
;
特征:
- 輸入數(shù)據(jù)可以是Append或者Upsert
- 輸出數(shù)據(jù)可以是Append或者Upsert