哪家公司做網(wǎng)站比較好成都百度seo公司
在《OceanBase 數(shù)據(jù)庫源碼解析》這本書中,關(guān)于SQL執(zhí)行器的深入剖析相對較少,因此,希望增添一些實(shí)用且詳盡的補(bǔ)充內(nèi)容。
上一篇博客《 OceanBase技術(shù)解析: 執(zhí)行器中的自適應(yīng)技術(shù)》中,已初步介紹了執(zhí)行器中幾項(xiàng)典型的自適應(yīng)技術(shù),但其中對于hash group by 中的兩階段下壓技術(shù),我是基于大家已有一定了解的前提展開了闡述。若你在執(zhí)行器的多階段下壓技術(shù)方面尚存疑惑,歡迎閱讀這篇博客,來一起學(xué)習(xí)一下 OceanBase 中比較常見的幾種自適應(yīng)分布式下壓技術(shù)。
什么是分布式下壓
在分布式執(zhí)行的過程中,為了更好地利用并行的能力,降低 CPU 和網(wǎng)絡(luò)的開銷,優(yōu)化器生成計(jì)劃的過程中,往往會(huì)將部分算子下壓到更下層的各個(gè)計(jì)算節(jié)點(diǎn)上。目的是為了充分利用集群的計(jì)算資源,提升執(zhí)行效率。這次就來介紹下 OceanBase 里面最常見的幾種分布式下壓技術(shù)。
LIMIT 下壓
我們先介紹一下 limit 的下壓。舉一個(gè)簡單的例子,這兩條 SQL 是創(chuàng)建一個(gè) orders 表,并從 orders 表中讀 100 行數(shù)據(jù)。
CREATE TABLE `orders` (`o_orderkey` bigint(20) NOT NULL,`o_custkey` bigint(20) NOT NULL,`o_orderdate` date NOT NULL,PRIMARY KEY (`o_orderkey`, `o_orderdate`, `o_custkey`),KEY `o_orderkey` (`o_orderkey`) LOCAL BLOCK_SIZE 16384
) partition by range columns(o_orderdate)subpartition by hash(o_custkey) subpartitions 64
(partition ord1 values less than ('1992-01-01'),
partition ord2 values less than ('1992-02-01'),
partition ord3 values less than ('1992-03-01'),
partition ord77 values less than ('1998-05-01'),
partition ord78 values less than ('1998-06-01'),
partition ord79 values less than ('1998-07-01'),
partition ord80 values less than ('1998-08-01'),
partition ord81 values less than (MAXVALUE));select * from orders limit 100;
圖中的計(jì)劃是分布式下壓的一個(gè)很常見的場景:
explain select * from orders limit 100;
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Query Plan |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
| ================================================================= |
| |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| |
| ----------------------------------------------------------------- |
| |0 |LIMIT | |1 |2794 | |
| |1 |└─PX COORDINATOR | |1 |2794 | |
| |2 | └─EXCHANGE OUT DISTR |:EX10000|1 |2793 | |
| |3 | └─LIMIT | |1 |2792 | |
| |4 | └─PX PARTITION ITERATOR| |1 |2792 | |
| |5 | └─TABLE FULL SCAN |orders |1 |2792 | |
| ================================================================= |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([orders.o_orderkey], [orders.o_custkey], [orders.o_orderdate]), filter(nil) |
| limit(100), offset(nil) |
| 1 - output([orders.o_orderkey], [orders.o_custkey], [orders.o_orderdate]), filter(nil) |
| 2 - output([orders.o_orderkey], [orders.o_custkey], [orders.o_orderdate]), filter(nil) |
| dop=1 |
| 3 - output([orders.o_orderkey], [orders.o_custkey], [orders.o_orderdate]), filter(nil) |
| limit(100), offset(nil) |
| 4 - output([orders.o_orderkey], [orders.o_orderdate], [orders.o_custkey]), filter(nil) |
| force partition granule |
| 5 - output([orders.o_orderkey], [orders.o_orderdate], [orders.o_custkey]), filter(nil) |
| access([orders.o_orderkey], [orders.o_orderdate], [orders.o_custkey]), partitions(p0sp[0-63], p1sp[0-63], p2sp[0-63], p3sp[0-63], p4sp[0-63], p5sp[0-63], |
| p6sp[0-63], p7sp[0-63]) |
| limit(100), offset(nil), is_index_back=false, is_global_index=false, |
| range_key([orders.o_orderkey], [orders.o_orderdate], [orders.o_custkey]), range(MIN,MIN,MIN ; MAX,MAX,MAX)always true |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
可以看到計(jì)劃中有兩個(gè) limit 算子(1 號和 3 號)。通過下壓生成 3 號 limit 算子,可以降低對 5 號 table scan 對 orders 每個(gè)分區(qū)的掃描行數(shù),讓每個(gè) table scan 的線程最多只掃描 100 行數(shù)據(jù),這樣可以降低 table scan 掃描數(shù)據(jù)的開銷以及發(fā)送數(shù)據(jù)到 1 號算子進(jìn)行匯總的網(wǎng)絡(luò)開銷。目前 OB 的一個(gè) exchange 算子是從下層收到 64K 數(shù)據(jù)以后發(fā)一個(gè)包,limit 如果不下壓的話可能會(huì)多掃描很多的數(shù)據(jù),并且?guī)砗艽蟮木W(wǎng)絡(luò)開銷。
真實(shí)的場景中,limit 往往伴隨著 order by。如果在前面的例子中加上 order by 關(guān)鍵字,order by 加 limit 會(huì)在計(jì)劃中生成一個(gè) top-n sort 算子,它的性能是比 sort 要好很多的。
explain select * from orders order by o_orderdate limit 100;
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Query Plan |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
| ================================================================= |
| |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| |
| ----------------------------------------------------------------- |
| |0 |LIMIT | |1 |2794 | |
| |1 |└─PX COORDINATOR MERGE SORT | |1 |2794 | |
| |2 | └─EXCHANGE OUT DISTR |:EX10000|1 |2793 | |
| |3 | └─TOP-N SORT | |1 |2792 | |
| |4 | └─PX PARTITION ITERATOR| |1 |2792 | |
| |5 | └─TABLE FULL SCAN |orders |1 |2792 | |
| ================================================================= |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([orders.o_orderkey], [orders.o_custkey], [orders.o_orderdate]), filter(nil) |
| limit(100), offset(nil) |
| 1 - output([orders.o_orderkey], [orders.o_custkey], [orders.o_orderdate]), filter(nil) |
| sort_keys([orders.o_orderdate, ASC]) |
| 2 - output([orders.o_orderkey], [orders.o_custkey], [orders.o_orderdate]), filter(nil) |
| dop=1 |
| 3 - output([orders.o_orderkey], [orders.o_custkey], [orders.o_orderdate]), filter(nil) |
| sort_keys([orders.o_orderdate, ASC]), topn(100) |
| 4 - output([orders.o_orderkey], [orders.o_orderdate], [orders.o_custkey]), filter(nil) |
| force partition granule |
| 5 - output([orders.o_orderkey], [orders.o_orderdate], [orders.o_custkey]), filter(nil) |
| access([orders.o_orderkey], [orders.o_orderdate], [orders.o_custkey]), partitions(p0sp[0-63], p1sp[0-63], p2sp[0-63], p3sp[0-63], p4sp[0-63], p5sp[0-63], |
| p6sp[0-63], p7sp[0-63]) |
| is_index_back=false, is_global_index=false, |
| range_key([orders.o_orderkey], [orders.o_orderdate], [orders.o_custkey]), range(MIN,MIN,MIN ; MAX,MAX,MAX)always true |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
如果上面的 limit 不下壓的話,3 號算子就會(huì)變成 sort 算子,每個(gè)線程需要將自己掃描的所有數(shù)據(jù)排序以后發(fā)送給上層的 DFO(DFO 就是一個(gè)子計(jì)劃,相鄰的 DFO 之間以 exchange 算子作為分割,詳見:OceanBase分布式數(shù)據(jù)庫-海量數(shù)據(jù) 筆筆算數(shù))。
limit 下壓的作用,就是能夠提前結(jié)束執(zhí)行,減少計(jì)算和網(wǎng)絡(luò)的開銷。
AGGREGATION 下壓
下面介紹一下聚合中的分布式下壓,以這條 group by 語句為例:
select count(o_totalprice), sum(o_totalprice) from orders group by o_orderdate;
這條 SQL 查詢了每一天的訂單數(shù)和銷售額,如果希望并行地執(zhí)行這條 SQL 的話,最直接的想法肯定是讓表中數(shù)據(jù)根據(jù) group by 列(o_orderdate)的 hash 值進(jìn)行數(shù)據(jù)的分發(fā),因?yàn)檫@樣可以確保 o_orderdate 值相同的行都被發(fā)送到了同一個(gè)線程,各個(gè)線程可以并行地對收到的數(shù)據(jù)去進(jìn)行聚合。
但是這個(gè)計(jì)劃的一個(gè)弊端是要對表中所有的數(shù)據(jù)都要做一次 shuffle 網(wǎng)絡(luò)的開銷可能很大;還有一個(gè)問題是如果表中存在數(shù)據(jù)傾斜比如某一天的訂單特別多,那么處理負(fù)責(zé)處理這一天訂單的線程的工作量就會(huì)比其他線程多很多,這個(gè)長尾的任務(wù)可能直接導(dǎo)致這個(gè)查詢的執(zhí)行時(shí)間特別長。
為了解決上述這些問題,我們會(huì)對 group by 算子進(jìn)行下壓,生成這樣一個(gè)計(jì)劃:
explain select count(o_totalprice), sum(o_totalprice) from orders group by o_orderdate;
+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
| Query Plan |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
| ===================================================================== |
| |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| |
| --------------------------------------------------------------------- |
| |0 |PX COORDINATOR | |1 |2796 | |
| |1 |└─EXCHANGE OUT DISTR |:EX10001|1 |2795 | |
| |2 | └─HASH GROUP BY | |1 |2795 | |
| |3 | └─EXCHANGE IN DISTR | |1 |2794 | |
| |4 | └─EXCHANGE OUT DISTR (HASH)|:EX10000|1 |2794 | |
| |5 | └─HASH GROUP BY | |1 |2793 | |
| |6 | └─PX PARTITION ITERATOR| |1 |2792 | |
| |7 | └─TABLE FULL SCAN |orders |1 |2792 | |
| ===================================================================== |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([INTERNAL_FUNCTION(T_FUN_COUNT_SUM(T_FUN_COUNT(orders.o_totalprice)), T_FUN_SUM(T_FUN_SUM(orders.o_totalprice)))]), filter(nil) |
| 1 - output([INTERNAL_FUNCTION(T_FUN_COUNT_SUM(T_FUN_COUNT(orders.o_totalprice)), T_FUN_SUM(T_FUN_SUM(orders.o_totalprice)))]), filter(nil) |
| dop=1 |
| 2 - output([T_FUN_COUNT_SUM(T_FUN_COUNT(orders.o_totalprice))], [T_FUN_SUM(T_FUN_SUM(orders.o_totalprice))]), filter(nil) |
| group([orders.o_orderdate]), agg_func([T_FUN_COUNT_SUM(T_FUN_COUNT(orders.o_totalprice))], [T_FUN_SUM(T_FUN_SUM(orders.o_totalprice))]) |
| 3 - output([orders.o_orderdate], [T_FUN_COUNT(orders.o_totalprice)], [T_FUN_SUM(orders.o_totalprice)]), filter(nil) |
| 4 - output([orders.o_orderdate], [T_FUN_COUNT(orders.o_totalprice)], [T_FUN_SUM(orders.o_totalprice)]), filter(nil) |
| (#keys=1, [orders.o_orderdate]), dop=1 |
| 5 - output([orders.o_orderdate], [T_FUN_COUNT(orders.o_totalprice)], [T_FUN_SUM(orders.o_totalprice)]), filter(nil) |
| group([orders.o_orderdate]), agg_func([T_FUN_COUNT(orders.o_totalprice)], [T_FUN_SUM(orders.o_totalprice)]) |
| 6 - output([orders.o_orderdate], [orders.o_totalprice]), filter(nil) |
| force partition granule |
| 7 - output([orders.o_orderdate], [orders.o_totalprice]), filter(nil) |
| access([orders.o_orderdate], [orders.o_totalprice]), partitions(p0sp[0-63], p1sp[0-63], p2sp[0-63], p3sp[0-63], p4sp[0-63], p5sp[0-63], p6sp[0-63], |
| p7sp[0-63]) |
| is_index_back=false, is_global_index=false, |
| range_key([orders.o_orderkey], [orders.o_orderdate], [orders.o_custkey]), range(MIN,MIN,MIN ; MAX,MAX,MAX)always true |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
在這個(gè)計(jì)劃里面,每個(gè)線程在進(jìn)行數(shù)據(jù)的分發(fā)之前會(huì)先對自己讀取的這部分?jǐn)?shù)據(jù)進(jìn)行預(yù)聚合,也就是計(jì)劃里面的 5 號 group by 算子做的工作。然后 5 號算子將聚合的結(jié)果發(fā)送給上層的算子,之后上層的 2號 group by 算子會(huì)對收到的數(shù)據(jù)再進(jìn)一次聚合。因?yàn)榻?jīng)過這個(gè) 5 號 group by 算子的提前聚合之后,數(shù)據(jù)量一般都會(huì)大幅降低,這樣即可以降低數(shù)據(jù) shuffle 帶來的網(wǎng)絡(luò)開銷,也能降低數(shù)據(jù)傾斜對執(zhí)行時(shí)間的影響。
接下來展示一下具體的執(zhí)行過程來進(jìn)行說明,還是剛才那條熟悉的 SQL,求每一天的訂單數(shù)和銷售額。
select count(o_totalprice), sum(o_totalprice) from orders group by o_orderdate;
原始的數(shù)據(jù)共有 7 行,每筆訂單的銷售額都是 10 元,它分布在 1、2、3 號這三天里面。
下圖中展示了執(zhí)行的過程,這里我們把并行度設(shè)置為 2:
我們從左往右看,可以看到左上方的第一個(gè)線程掃描了 3 行的數(shù)據(jù),左下方的第二個(gè)線程掃描了 4 行數(shù)據(jù)。日期相同的數(shù)據(jù),也就是同一組的數(shù)據(jù),都被標(biāo)成了相同的顏色。
先看左側(cè),第一個(gè)線程對其掃描的 3 行數(shù)據(jù)去進(jìn)行聚合,這 3 行數(shù)據(jù)分布在兩個(gè) group 里面,6 月 1 號有 2 行,6 月 3 號有 1 行。因?yàn)?6 月 1 號有 2 行,所以它的 count 是 2,銷售額是 20。6 月 3 號有 1 行,它的 count 是 1,銷售額是10。第二個(gè)線程掃描的 4 行數(shù)據(jù)也分布在兩個(gè) group 里面,聚合后也生成了兩行數(shù)據(jù),這里不再贅述。這部分工作對應(yīng)計(jì)劃里的 5 號算子。
然后這兩個(gè)線程利用 o_orderdate 列的 hash 值進(jìn)行數(shù)據(jù)的分發(fā),讓同一天的數(shù)據(jù)都發(fā)送到同一個(gè)線程。這部分工作對應(yīng)計(jì)劃里的 3 號和 4 號算子。
右側(cè)的每個(gè)線程對收到的數(shù)據(jù)會(huì)再進(jìn)行一次聚合。可以看到左邊兩個(gè)線程中 6 月 3 號的數(shù)據(jù)(紅色)都被發(fā)送到了右下方這個(gè)線程里,這兩行從左側(cè)不同線程發(fā)過來的 6 月 3 號的數(shù)據(jù)被右側(cè)的算子再次進(jìn)行聚合,count 和 sum 都再次被相加,count 變成了2,sum 變成了 20,最終被聚合成了一行。這部分工作對應(yīng)計(jì)劃里的 2 號算子。
然后所有的數(shù)據(jù)都會(huì)發(fā)送給協(xié)調(diào)者,協(xié)調(diào)者對數(shù)據(jù)進(jìn)行匯總之后,將最終的計(jì)算結(jié)果發(fā)送給客戶端。
JOIN FILTER 下壓
join 算子中也會(huì)把左表的過濾條件 join filter 下壓到右表,對右表的數(shù)據(jù)進(jìn)行提前過濾和分區(qū)裁剪。
提前過濾
hash join 在執(zhí)行的時(shí)候,總是先讀左表的數(shù)據(jù),建立一個(gè)哈希表。然后用右側(cè)的數(shù)據(jù)去探測這個(gè)哈希表,如果探測成功的話就會(huì)把這個(gè)數(shù)據(jù)發(fā)送給上層算子。這里存在的一個(gè)問題就是如果 hash join 的右側(cè)存在一個(gè)數(shù)據(jù) reshuffle(重分布)的話,網(wǎng)絡(luò)的開銷可能比較大,這個(gè)開銷標(biāo)取決于右表的數(shù)據(jù)量大小。在這種情況下,我們可以通過 join filter 來降低數(shù)據(jù) shuffle 的網(wǎng)絡(luò)開銷。
以這個(gè)計(jì)劃為例:
在上面這個(gè)計(jì)劃中,2 號的 hash join 算子從左側(cè)讀數(shù)據(jù),讀的時(shí)候會(huì)使用 t1.c1 這個(gè)連接鍵創(chuàng)建一個(gè) join filter,就是計(jì)劃中的這個(gè) 3 號 join filter create 算子。join filter 最常見的一個(gè)形式是 bloom filter,join filter 創(chuàng)建完成以后會(huì)被發(fā)送到 hash join 右側(cè)這個(gè) DFO(6 號算子以及更下層的算子)。
可以看到,10 號的這個(gè) table scan 上面有一個(gè)過濾條件 sys_op_bloom_filter(t2.c1),表示會(huì)用 bloom_filter 對 hash join 右表 t2.c1 的值去進(jìn)行一個(gè)快速的探測。如果探測失敗的話說明不存在 t2.c1 跟這個(gè) t1.c1 相等,那么這行數(shù)據(jù)可以直接被提前過濾掉,不需要向上發(fā)送給 hash join。
分區(qū)裁剪
join filter 不僅可以對行進(jìn)行過濾,還可以用于分區(qū)裁剪,即對分區(qū)進(jìn)行過濾。如果 t1 是一個(gè)分區(qū)表,并且連接鍵是它的分區(qū)鍵的話,那么可以生成這樣的計(jì)劃:
可以看到這個(gè)計(jì)劃里,3 號是一個(gè) partition join filter create 算子,它會(huì)感知 hash join 右邊的 t1 表的分區(qū)方式,它每從下層獲取一行左表的數(shù)據(jù),就會(huì)用 c1 的值去計(jì)算這行數(shù)據(jù)在右表 t1 表里的哪個(gè)分區(qū)里面,并將這個(gè) partition id 記錄到 join filter 里。最終這個(gè) partition id 的 join filter 會(huì)在 8 號算子上用于 hash join 右表的分區(qū)裁剪。右表掃描每一個(gè)分區(qū)之前都會(huì)檢查這個(gè) partition id 是否存在于 join filter 中,如果不存在的話,可以直接跳過整分區(qū)的掃描。
join filter 可以提前對數(shù)據(jù)進(jìn)行過濾、提前對分區(qū)進(jìn)行裁剪,降低了掃描數(shù)據(jù)、網(wǎng)絡(luò)傳輸和探測 hash 表的開銷。目前 4.2 之前只支持 bloom filter 這一種類型的 join filter。4.2上新支持了 in filter 和 range filter 這兩種類型的 join filter,這兩種新的 join filter 在一些場景中對性能有很好的提升,特別是在左表不同值的個(gè)數(shù)較少或者是左表值連續(xù)的場景。
其他的分布式下壓
除了上述介紹的幾個(gè)比較常見也比較易于理解的分布式下壓技術(shù),OceanBase 還支持更多的自適應(yīng)分布式下壓,例如: window function 的自適應(yīng)兩階段下壓、三階段的聚合下壓等等。
OceanBase 中這些更為復(fù)雜的分布式下壓技術(shù),由于精力所限,就不再一一詳細(xì)介紹。下面會(huì)貼一下剛才提到的兩種分布式下壓的執(zhí)行計(jì)劃,供有興趣的同學(xué)去進(jìn)行更深入的研究。
window function 的自適應(yīng)兩階段下壓:
select /*+parallel(3) */c1, sum(c2) over (partition by c1) from t1 order by c1;
Query Plan
===================================================
|ID|OPERATOR |NAME |
---------------------------------------------------
|0 |PX COORDINATOR MERGE SORT | |
|1 | EXCHANGE OUT DISTR |:EX10001|
|2 | MATERIAL | |
|3 | WINDOW FUNCTION CONSOLIDATOR | |
|4 | EXCHANGE IN MERGE SORT DISTR | |
|5 | EXCHANGE OUT DISTR (HASH HYBRID)|:EX10000|
|6 | WINDOW FUNCTION | |
|7 | SORT | |
|8 | PX BLOCK ITERATOR | |
|9 | TABLE SCAN |t1 |
===================================================
三階段的聚合下壓:
select /*+ parallel(2) */c1, sum(distinct c2),count(distinct c3), sum(c4) from t group by c1;
Query Plan
===========================================================================
|ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)|
---------------------------------------------------------------------------
|0 |PX COORDINATOR | |1 |8 |
|1 |└─EXCHANGE OUT DISTR |:EX10002|1 |7 |
|2 | └─HASH GROUP BY | |1 |6 |
|3 | └─EXCHANGE IN DISTR | |2 |6 |
|4 | └─EXCHANGE OUT DISTR (HASH) |:EX10001|2 |6 |
|5 | └─HASH GROUP BY | |2 |4 |
|6 | └─EXCHANGE IN DISTR | |2 |4 |
|7 | └─EXCHANGE OUT DISTR (HASH)|:EX10000|2 |3 |
|8 | └─HASH GROUP BY | |2 |2 |
|9 | └─PX BLOCK ITERATOR | |1 |1 |
|10| └─TABLE FULL SCAN |t |1 |1 |
===========================================================================
下回預(yù)告
這篇博客給大家介紹了 OceanBase 執(zhí)行器中幾個(gè)比較具有代表性的分布式下壓技術(shù),但是已經(jīng)假設(shè)大家對數(shù)據(jù)庫的分布式執(zhí)行技術(shù)有所了解。如果大家對執(zhí)行器的并行執(zhí)行技術(shù)還不是特別了解,請期待下一篇博客《OceanBase 并行執(zhí)行技術(shù)》。