專(zhuān)門(mén)做網(wǎng)站需要敲代碼么旺道seo優(yōu)化軟件怎么用
文章目錄
- 一. create table hints
- 1. 語(yǔ)法
- 2. 示例
- 3. 注意
- 二. 實(shí)戰(zhàn):簡(jiǎn)化hive連接器參數(shù)設(shè)置
- 三. select hints(ing)
SQL 提示(SQL Hints)是和 SQL 語(yǔ)句一起使用來(lái)改變執(zhí)行計(jì)劃的。本章介紹如何使用 SQL 提示來(lái)實(shí)現(xiàn)各種干預(yù)。
SQL 提示一般可以用于以下:
- 增強(qiáng) planner:沒(méi)有完美的 planner, SQL 提示讓用戶(hù)更好地控制執(zhí)行;
- 增加元數(shù)據(jù)(或者統(tǒng)計(jì)信息):如"已掃描的表索引"和"一些混洗鍵(shuffle keys)的傾斜信息"的一些統(tǒng)計(jì)數(shù)據(jù)對(duì)于查詢(xún)來(lái)說(shuō)是動(dòng)態(tài)的,用提示來(lái)配置它們會(huì)非常方便,因?yàn)槲覀儚?planner
獲得的計(jì)劃元數(shù)據(jù)通常不那么準(zhǔn)確;- 算子(Operator)資源約束:在許多情況下,我們會(huì)為執(zhí)行算子提供默認(rèn)的資源配置,即最小并行度或托管內(nèi)存(UDF 資源消耗)或特殊資源需求(GPU 或 SSD 磁盤(pán))等,可以使用 SQL 提示非常靈活地
為每個(gè)查詢(xún)(非作業(yè))配置資源
。
?
一. create table hints
動(dòng)態(tài)表選項(xiàng)允許動(dòng)態(tài)地指定或覆蓋表選項(xiàng)
,不同于用 SQL DDL 或 連接 API 定義的靜態(tài)表選項(xiàng),這些選項(xiàng)可以在每個(gè)查詢(xún)的每個(gè)表范圍內(nèi)靈活地指定。
因此,它非常適合用于交互式終端中的特定查詢(xún),例如,在 SQL-CLI 中,你可以通過(guò)添加動(dòng)態(tài)選項(xiàng)/*+ OPTIONS('csv.ignore-parse-errors'='true') */
來(lái)指定忽略 CSV 源的解析錯(cuò)誤。
?
1. 語(yǔ)法
為了不破壞 SQL 兼容性,我們使用 Oracle 風(fēng)格的 SQL hints 語(yǔ)法:
table_path /*+ OPTIONS(key=val [, key=val]*) */key: string字符
val: string字符
?
2. 示例
CREATE TABLE kafka_table1 (id BIGINT, name STRING, age INT) WITH (...);
CREATE TABLE kafka_table2 (id BIGINT, name STRING, age INT) WITH (...);-- `覆蓋`查詢(xún)語(yǔ)句中源表的選項(xiàng)
select id, name from kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;-- 覆蓋 join 中源表的選項(xiàng)
select * fromkafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t1joinkafka_table2 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t2on t1.id = t2.id;-- 覆蓋插入語(yǔ)句中結(jié)果表的選項(xiàng)
insert into kafka_table1 /*+ OPTIONS('sink.partitioner'='round-robin') */ select * from kafka_table2;
?
3. 注意
create table
hints 傳遞的連接器中catalog
的相關(guān)參數(shù),即create table with
下參數(shù),具體到源代碼是:context.getCatalogTable().getOptions()
。
?
如果傳參無(wú)效且在日志中看到參數(shù)已經(jīng)設(shè)置成功,那
可能將context.getConfiguration()中的參數(shù)傳遞到with參數(shù)下,比如:
hive連接器下:table.exec.hive.sink.statistic-auto-gather.enable
參數(shù)由DefaultDynamicTableContext的configuration
來(lái)接收。此參數(shù)為flink sql的全局參數(shù),此時(shí)可以通過(guò)set table.exec.hive.sink.statistic-auto-gather.enable=false
語(yǔ)法來(lái)設(shè)定參數(shù)。
?
二. 實(shí)戰(zhàn):簡(jiǎn)化hive連接器參數(shù)設(shè)置
對(duì)于hive連接器,Flink實(shí)現(xiàn)了通過(guò)catalog的方式來(lái)管理hive表,在使用hive表時(shí)需要使用hive相關(guān)語(yǔ)法,此時(shí)需要聲明,hive dialect,如下:
CREATE CATALOG myhive WITH ('type' = 'hive','default-database' = 'aaa','hive-conf-dir' = '/usr/bin/hadoop/software/hive/conf'
);SET table.sql-dialect=hive;-- 因?yàn)樾枰褂胔ive連接器中的寫(xiě)特性,所以需要create table ,此時(shí)sql語(yǔ)法為hive語(yǔ)法
CREATE TABLE hive_table (user_id STRING,order_amount DOUBLE
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ('partition.time-extractor.timestamp-pattern'='$dt $hr:00:00','sink.partition-commit.trigger'='partition-time','sink.partition-commit.delay'='1 h','sink.partition-commit.policy.kind'='metastore,success-file'
);-- 對(duì)于某些框架例如chunjun,此處不能很好的適配:
--
SET table.sql-dialect=default;
CREATE TABLE kafka_table (user_id STRING,order_amount DOUBLE,log_ts TIMESTAMP(3),WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- 在 TIMESTAMP 列聲明 watermark。
) WITH (...);-- streaming sql, insert into hive table
INSERT INTO TABLE myhive.aaa.hive_table
SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH')
FROM kafka_table;
如下可以把寫(xiě)hive的一些行為通過(guò)sql hint方式,放到Flink sql語(yǔ)句中,如下整個(gè)Flink sql 會(huì)清爽很多。
CREATE CATALOG myhive WITH ('type' = 'hive','default-database' = 'database_name','hive-conf-dir' = '/usr/bin/hadoop/software/hive/conf'
);CREATE TABLE source_kafka (`pv` string,`uv` string,`p_day_id` string
) WITH ('connector' = 'kafka-x','topic' = 'hive_kafka','properties.bootstrap.servers' = 'xxx:9092','properties.group.id' = 'luna_g','scan.startup.mode' = 'earliest-offset','json.timestamp-format.standard' = 'SQL','json.ignore-parse-errors' = 'true','format' = 'json','scan.parallelism' = '1');insert into myhive.database_name.table_name /*+ OPTIONS('partition.time-extractor.timestamp-pattern'='$p_day_id:00:00','sink.partition-commit.policy.kind'='metastore,success-file','sink.partition-commit.success-file.name'='_SUCCESS_gao111') */select * from source_kafka;
?
三. select hints(ing)
查詢(xún)提示(Query Hints
)用于為優(yōu)化器修改執(zhí)行計(jì)劃提供建議,該修改只能在當(dāng)前查詢(xún)提示所在的查詢(xún)塊中生效(Query block
, 什么是查詢(xún)塊)。 目前,Flink 查詢(xún)提示只支持聯(lián)接提示(Join Hints
)。
具體見(jiàn):官網(wǎng)
https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/table/sql/queries/hints/#%E6%9F%A5%E8%AF%A2%E6%8F%90%E7%A4%BA
?