太原市萬柏林區(qū)疫情防控最新消息專業(yè)seo培訓(xùn)學(xué)校
Flink 系列文章
1、Flink 部署、概念介紹、source、transformation、sink使用示例、四大基石介紹和示例等系列綜合文章鏈接
13、Flink 的table api與sql的基本概念、通用api介紹及入門示例
14、Flink 的table api與sql之?dāng)?shù)據(jù)類型: 內(nèi)置數(shù)據(jù)類型以及它們的屬性
15、Flink 的table api與sql之流式概念-詳解的介紹了動態(tài)表、時(shí)間屬性配置(如何處理更新結(jié)果)、時(shí)態(tài)表、流上的join、流上的確定性以及查詢配置
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及FileSystem示例(1)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Elasticsearch示例(2)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache Kafka示例(3)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及JDBC示例(4)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache Hive示例(6)
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
18、Flink的SQL 支持的操作和語法
19、Flink 的Table API 和 SQL 中的內(nèi)置函數(shù)及示例(1)
20、Flink SQL之SQL Client: 不用編寫代碼就可以嘗試 Flink SQL,可以直接提交 SQL 任務(wù)到集群上
22、Flink 的table api與sql之創(chuàng)建表的DDL
24、Flink 的table api與sql之Catalogs(介紹、類型、java api和sql實(shí)現(xiàn)ddl、java api和sql操作catalog)-1
24、Flink 的table api與sql之Catalogs(java api操作數(shù)據(jù)庫、表)-2
24、Flink 的table api與sql之Catalogs(java api操作視圖)-3
24、Flink 的table api與sql之Catalogs(java api操作分區(qū)與函數(shù))-4
26、Flink 的SQL之概覽與入門示例
27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介紹及詳細(xì)示例(1)
27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介紹及詳細(xì)示例(2)
27、Flink 的SQL之SELECT (窗口函數(shù))介紹及詳細(xì)示例(3)
27、Flink 的SQL之SELECT (窗口聚合)介紹及詳細(xì)示例(4)
27、Flink 的SQL之SELECT (Group Aggregation分組聚合、Over Aggregation Over聚合 和 Window Join 窗口關(guān)聯(lián))介紹及詳細(xì)示例(5)
27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介紹及詳細(xì)示例(6)
27、Flink 的SQL之SELECT (Pattern Recognition 模式檢測)介紹及詳細(xì)示例(7)
28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 語句
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1)
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(2)
30、Flink SQL之SQL 客戶端(通過kafka和filesystem的例子介紹了配置文件使用-表、視圖等)
32、Flink table api和SQL 之用戶自定義 Sources & Sinks實(shí)現(xiàn)及詳細(xì)示例
41、Flink之Hive 方言介紹及詳細(xì)示例
42、Flink 的table api與sql之Hive Catalog
43、Flink之Hive 讀寫及詳細(xì)驗(yàn)證示例
44、Flink之module模塊介紹及使用示例和Flink SQL使用hive內(nèi)置函數(shù)及自定義函數(shù)詳細(xì)示例–網(wǎng)上有些說法好像是錯(cuò)誤的
文章目錄
- Flink 系列文章
- 一、函數(shù)分類
- 1、分類標(biāo)準(zhǔn)及類別
- 2、函數(shù)引用
- 1)、精確函數(shù)引用
- 2)、模糊函數(shù)引用
- 3、函數(shù)解析順序
- 1)、精確函數(shù)引用
- 2)、模糊函數(shù)引用
- 二、系統(tǒng)(內(nèi)置)函數(shù)
- 1、標(biāo)量函數(shù)
- 1)、比較函數(shù)
- 2)、邏輯函數(shù)
- 3)、算術(shù)函數(shù)
- 4)、字符串函數(shù)
- 5)、時(shí)間函數(shù)
- 6)、條件函數(shù)
- 7)、類型轉(zhuǎn)換函數(shù)
- 8)、集合函數(shù)
- 9)、JSON Functions
- 1、IS JSON
- 2、JSON_EXISTS
- 3、JSON_STRING
- 4、JSON_VALUE
- 5、JSON_QUERY
- 6、JSON_OBJECT
- 7、JSON_ARRAY
- 8、JSON_ARRAYAGG
- 10、JSON_OBJECTAGG
- 10)、值構(gòu)建函數(shù)
- 11)、值獲取函數(shù)
- 12)、分組函數(shù)
- 13)、哈希函數(shù)
- 2、聚合函數(shù)
- 3、時(shí)間間隔單位和時(shí)間點(diǎn)單位標(biāo)識符
- 4、列函數(shù)
本文介紹了flink的函數(shù)分類、內(nèi)置函數(shù)的說明及示例,特別是針對json function函數(shù)每個(gè)均以可運(yùn)行的示例進(jìn)行說明。
本文依賴flink集群能正常使用。
本文分為2個(gè)部分,即函數(shù)分類以及內(nèi)置函數(shù)。
本文的示例均在Flink 1.17版本中運(yùn)行。
一、函數(shù)分類
Flink 允許用戶在 Table API 和 SQL 中使用函數(shù)進(jìn)行數(shù)據(jù)的轉(zhuǎn)換。
Flink 中的函數(shù)有兩個(gè)劃分標(biāo)準(zhǔn)。
1、分類標(biāo)準(zhǔn)及類別
一個(gè)劃分標(biāo)準(zhǔn)是:系統(tǒng)(內(nèi)置)函數(shù)和 Catalog 函數(shù)。系統(tǒng)函數(shù)沒有名稱空間,只能通過其名稱來進(jìn)行引用。 Catalog 函數(shù)屬于 Catalog 和數(shù)據(jù)庫,因此它們擁有 Catalog 和數(shù)據(jù)庫命名空間。 用戶可以通過全/部分限定名(catalog.db.func 或 db.func)或者函數(shù)名 來對 Catalog 函數(shù)進(jìn)行引用。
另一個(gè)劃分標(biāo)準(zhǔn)是:臨時(shí)函數(shù)和持久化函數(shù)。 臨時(shí)函數(shù)始終由用戶創(chuàng)建,它容易改變并且僅在會話的生命周期內(nèi)有效。 持久化函數(shù)不是由系統(tǒng)提供,就是存儲在 Catalog 中,它在會話的整個(gè)生命周期內(nèi)都有效。
這兩個(gè)劃分標(biāo)準(zhǔn)給 Flink 用戶提供了 4 種函數(shù):
- 臨時(shí)性系統(tǒng)函數(shù)
- 系統(tǒng)函數(shù)
- 臨時(shí)性 Catalog 函數(shù)
- Catalog 函數(shù)
系統(tǒng)函數(shù)始終優(yōu)先于 Catalog 函數(shù)解析,臨時(shí)函數(shù)始終優(yōu)先于持久化函數(shù)解析, 函數(shù)解析優(yōu)先級如下所述。
2、函數(shù)引用
用戶在 Flink 中可以通過精確、模糊兩種引用方式引用函數(shù)。
1)、精確函數(shù)引用
精確函數(shù)引用允許用戶跨 Catalog,跨數(shù)據(jù)庫調(diào)用 Catalog 函數(shù)。
例如:select mycatalog.mydb.myfunc(x) from mytable 和 select mydb.myfunc(x) from mytable。
僅 Flink 1.10 以上版本支持。
2)、模糊函數(shù)引用
在模糊函數(shù)引用中,用戶只需在 SQL 查詢中指定函數(shù)名,例如: select myfunc(x) from mytable。
3、函數(shù)解析順序
當(dāng)函數(shù)名相同,函數(shù)類型不同時(shí),函數(shù)解析順序才有意義。
例如:當(dāng)有三個(gè)都名為 “myfunc” 的臨時(shí)性 Catalog 函數(shù),Catalog 函數(shù),和系統(tǒng)函數(shù)時(shí), 如果沒有命名沖突,三個(gè)函數(shù)將會被解析為一個(gè)函數(shù)。
1)、精確函數(shù)引用
由于系統(tǒng)函數(shù)沒有命名空間,Flink 中的精確函數(shù)引用必須 指向臨時(shí)性 Catalog 函數(shù)或 Catalog 函數(shù)。
解析順序如下:
- 臨時(shí)性 catalog 函數(shù)
- Catalog 函數(shù)
2)、模糊函數(shù)引用
解析順序如下:
- 臨時(shí)性系統(tǒng)函數(shù)
- 系統(tǒng)函數(shù)
- 臨時(shí)性 Catalog 函數(shù), 在會話的當(dāng)前 Catalog 和當(dāng)前數(shù)據(jù)庫中
- Catalog 函數(shù), 在會話的當(dāng)前 Catalog 和當(dāng)前數(shù)據(jù)庫中
二、系統(tǒng)(內(nèi)置)函數(shù)
Flink Table API & SQL 為用戶提供了一組內(nèi)置的數(shù)據(jù)轉(zhuǎn)換函數(shù)。
1、標(biāo)量函數(shù)
標(biāo)量函數(shù)將零、一個(gè)或多個(gè)值作為輸入并返回單個(gè)值作為結(jié)果。
1)、比較函數(shù)
2)、邏輯函數(shù)
3)、算術(shù)函數(shù)
4)、字符串函數(shù)
5)、時(shí)間函數(shù)
6)、條件函數(shù)
7)、類型轉(zhuǎn)換函數(shù)
8)、集合函數(shù)
9)、JSON Functions
JSON 函數(shù)使用 SQL 標(biāo)準(zhǔn)的 ISO/IEC TR 19075-6 中所述的 JSON 路徑表達(dá)式(JSON path expressions )。它們的語法受到 ECMAScript 的啟發(fā)并采用了 ECMAScript 的許多功能,但既不是它的子集也不是它的超集。
路徑表達(dá)式有兩種風(fēng)格,寬松和嚴(yán)格( lax and strict.)。省略時(shí),它默認(rèn)為嚴(yán)格模式。
嚴(yán)格模式旨在從架構(gòu)角度檢查數(shù)據(jù),每當(dāng)數(shù)據(jù)不符合路徑表達(dá)式時(shí),就會引發(fā)錯(cuò)誤。但是,像 JSON_VALUE 這樣的函數(shù)允許在遇到錯(cuò)誤時(shí)定義回退行為。
寬松模式更寬容,并將錯(cuò)誤轉(zhuǎn)換為空序列。
特殊字符 $ 表示 JSON 路徑中的根節(jié)點(diǎn)。路徑可以訪問屬性 ( . a )、數(shù)組元素( .a)、數(shù)組元素 ( .a)、數(shù)組元素(.a[0].b) 或分支數(shù)組中的所有元素 ($.a[*].b)。
已知限制:
截至Flink 1.17版本并非正確支持寬松模式的所有功能。這是一個(gè)上游錯(cuò)誤 (CALCITE-4717)。不保證非標(biāo)準(zhǔn)行為。
1、IS JSON
確定給定字符串是否為有效的 JSON。
指定可選的類型參數(shù)會限制允許哪種類型的 JSON 對象。如果字符串是有效的 JSON,但不是該類型,則返回 false。默認(rèn)值為 VALUE。
- SQL語法
IS JSON [ { VALUE | SCALAR | ARRAY | OBJECT } ]
- table api語法
STRING.isJson([JsonType type])
- 示例
-- TRUE
Flink SQL> select '1' IS JSON;
+----+--------+
| op | EXPR$0 |
+----+--------+
| +I | TRUE |
+----+--------+Flink SQL> select '[]' IS JSON;
+----+--------+
| op | EXPR$0 |
+----+--------+
| +I | TRUE |
+----+--------+
-- The following statements return TRUE.
SELECT '1' IS JSON;
SELECT '[]' IS JSON;
SELECT '{}' IS JSON;
SELECT '"abc"' IS JSON;
SELECT '1' IS JSON SCALAR;
SELECT '{}' IS JSON OBJECT;-- The following statements return FALSE.
SELECT 'abc' IS JSON;
SELECT '1' IS JSON ARRAY;
SELECT '1' IS JSON OBJECT;
SELECT '{}' IS JSON SCALAR;
SELECT '{}' IS JSON ARRAY;# 以下示例一樣,不再贅述'1' IS JSON
'[]' IS JSON
'{}' IS JSON-- TRUE
'"abc"' IS JSON
-- FALSE
'abc' IS JSON
NULL IS JSON-- TRUE
'1' IS JSON SCALAR
-- FALSE
'1' IS JSON ARRAY
-- FALSE
'1' IS JSON OBJECT-- FALSE
'{}' IS JSON SCALAR
-- FALSE
'{}' IS JSON ARRAY
-- TRUE
'{}' IS JSON OBJECT
2、JSON_EXISTS
確定 JSON 字符串是否滿足給定的路徑搜索條件。
如果省略錯(cuò)誤行為,則假定 FALSE ON ERROR 為默認(rèn)值。
- SQL語法
JSON_EXISTS(jsonValue, path [ { TRUE | FALSE | UNKNOWN | ERROR } ON ERROR ])
- table api語法
STRING.jsonExists(STRING path [, JsonExistsOnError onError])
- 示例
Flink SQL> SELECT JSON_EXISTS('{"a": true}', 'strict $.b' FALSE ON ERROR);
+----+--------+
| op | EXPR$0 |
+----+--------+
| +I | FALSE |
+----+--------+-- The following statements return TRUE.
SELECT JSON_EXISTS('{"a": true}', '$.a');
SELECT JSON_EXISTS('{"a": [{ "b": 1 }]}', '$.a[0].b');
SELECT JSON_EXISTS('{"a": true}', 'strict $.b' TRUE ON ERROR);
-- The following statements return FALSE.
SELECT JSON_EXISTS('{"a": true}', '$.b');
SELECT JSON_EXISTS('{"a": true}', 'strict $.b' FALSE ON ERROR);-- TRUE
SELECT JSON_EXISTS('{"a": true}', '$.a');
-- FALSE
SELECT JSON_EXISTS('{"a": true}', '$.b');
-- TRUE
SELECT JSON_EXISTS('{"a": [{ "b": 1 }]}','$.a[0].b');-- TRUE
SELECT JSON_EXISTS('{"a": true}','strict $.b' TRUE ON ERROR);
-- FALSE
SELECT JSON_EXISTS('{"a": true}','strict $.b' FALSE ON ERROR);
3、JSON_STRING
將值序列化為 JSON。
此函數(shù)返回包含序列化值的 JSON 字符串。如果值為 NULL,則該函數(shù)返回 NULL。
- SQL語法
JSON_STRING(value)
- table api語法
jsonString(value)
- 示例
Flink SQL> SELECT JSON_STRING(1);
+----+--------------------------------+
| op | EXPR$0 |
+----+--------------------------------+
| +I | 1 |
+----+--------------------------------+-- returns NULL
SELECT JSON_STRING(CAST(NULL AS INT));-- returns '1'
SELECT JSON_STRING(1);-- returns 'true'
SELECT JSON_STRING(TRUE);-- returns '"Hello, World!"'
JSON_STRING('Hello, World!');-- returns '[1,2]'
JSON_STRING(ARRAY[1, 2])-- NULL
JSON_STRING(CAST(NULL AS INT))-- '1'
JSON_STRING(1)
-- 'true'
JSON_STRING(TRUE)
-- '"Hello, World!"'
JSON_STRING('Hello, World!')
-- '[1,2]'
JSON_STRING(ARRAY[1, 2])
4、JSON_VALUE
從 JSON 字符串中提取標(biāo)量。
此方法在 JSON 字符串中搜索給定的路徑表達(dá)式,如果該路徑的值為標(biāo)量,則返回該值。不能返回非標(biāo)量值。
默認(rèn)情況下,該值以 STRING 形式返回。使用 returningType 可以選擇不同的類型,并支持以下類型:
- VARCHAR / STRING
- BOOLEAN
- INTEGER
- DOUBLE
對于空路徑表達(dá)式或錯(cuò)誤,可以將行為定義為返回 null、引發(fā)錯(cuò)誤或返回定義的默認(rèn)值。
省略時(shí),默認(rèn)值分別為 NULL ON EMPTY 或 NULL ON ERROR。
默認(rèn)值可以是文本或表達(dá)式。如果默認(rèn)值本身引發(fā)錯(cuò)誤,則它將下降到 ON EMPTY 的錯(cuò)誤行為,并引發(fā) ON ERROR 的錯(cuò)誤。
對于包含空格等特殊字符的路徑,可以使用 [‘property’] 或 [“property”] 選擇父對象中的指定屬性。
請務(wù)必在屬性名稱兩邊加上單引號或雙引號。
在 SQL 中使用 JSON_VALUE 時(shí),路徑是一個(gè)字符參數(shù),該參數(shù)已經(jīng)是單引號,因此您必須對屬性名稱周圍的單引號進(jìn)行轉(zhuǎn)義,
例如 JSON_VALUE(‘{“a b”: “true”}’, ‘$.[’‘a(chǎn) b’‘]’)。
- SQL語法
JSON_VALUE(jsonValue, path [RETURNING <dataType>] [ { NULL | ERROR | DEFAULT <defaultExpr> } ON EMPTY ] [ { NULL | ERROR | DEFAULT <defaultExpr> } ON ERROR ])
- table api語法
STRING.jsonValue(STRING path [, returnType, onEmpty, defaultOnEmpty, onError, defaultOnError])
- 示例
Flink SQL> SELECT JSON_VALUE('{"a": true}', '$.a');
+----+--------------------------------+
| op | EXPR$0 |
+----+--------------------------------+
| +I | true |
+----+--------------------------------+
Flink SQL> SELECT JSON_VALUE('{"contains blank": "right"}', 'strict $.[''contains blank'']' NULL ON EMPTY DEFAULT 'wrong' ON ERROR);
+----+--------------------------------+
| op | EXPR$0 |
+----+--------------------------------+
| +I | right |
+----+--------------------------------+-- returns "true"
SELECT JSON_VALUE('{"a": true}', '$.a');-- returns TRUE
SELECT JSON_VALUE('{"a": true}', '$.a' RETURNING BOOLEAN);-- returns "false"
SELECT JSON_VALUE('{"a": true}', 'lax $.b' DEFAULT FALSE ON EMPTY);-- returns "false"
SELECT JSON_VALUE('{"a": true}', 'strict $.b' DEFAULT FALSE ON ERROR);-- returns 0.998D
SELECT JSON_VALUE('{"a.b": [0.998,0.996]}','$.["a.b"][0]' RETURNING DOUBLE);-- returns "right"
SELECT JSON_VALUE('{"contains blank": "right"}', 'strict $.[''contains blank'']' NULL ON EMPTY DEFAULT 'wrong' ON ERROR);
5、JSON_QUERY
目前不支持 RETURNING 子句。
wrappingBehavior 確定是否應(yīng)將提取的值包裝到數(shù)組中,以及是無條件地包裝,還是僅在值本身還不是數(shù)組時(shí)才這樣做。
onEmpty 和 onError 分別確定路徑表達(dá)式為空或引發(fā)錯(cuò)誤時(shí)的行為。
默認(rèn)情況下,在這兩種情況下都返回 null。其他選擇是使用空數(shù)組、空對象或引發(fā)錯(cuò)誤。
- SQL語法
JSON_QUERY(jsonValue, path [ { WITHOUT | WITH CONDITIONAL | WITH UNCONDITIONAL } [ ARRAY ] WRAPPER ] [ { NULL | EMPTY ARRAY | EMPTY OBJECT | ERROR } ON EMPTY ] [ { NULL | EMPTY ARRAY | EMPTY OBJECT | ERROR } ON ERROR ])
- table api語法
STRING.jsonQuery(path [, JsonQueryWrapper [, JsonQueryOnEmptyOrError, JsonQueryOnEmptyOrError ] ])
- 示例
Flink SQL> SELECT JSON_QUERY('{ "a": { "b": 1 } }', '$.a');
+----+--------------------------------+
| op | EXPR$0 |
+----+--------------------------------+
| +I | {"b":1} |
+----+--------------------------------+
Flink SQL> SELECT JSON_QUERY('{}', 'lax $.invalid' EMPTY OBJECT ON EMPTY);
+----+--------------------------------+
| op | EXPR$0 |
+----+--------------------------------+
| +I | {} |
+----+--------------------------------+
-- returns '{ "b": 1 }'
SELECT JSON_QUERY('{ "a": { "b": 1 } }', '$.a');-- returns '[1, 2]'
SELECT JSON_QUERY('[1, 2]', '$');-- returns NULL
SELECT JSON_QUERY(CAST(NULL AS STRING), '$');-- returns '["c1","c2"]'
SELECT JSON_QUERY('{"a":[{"c":"c1"},{"c":"c2"}]}', 'lax $.a[*].c');-- Wrap the result into an array.
-- returns '[{}]'
SELECT JSON_QUERY('{}', '$' WITH CONDITIONAL ARRAY WRAPPER);-- returns '[1, 2]'
SELECT JSON_QUERY('[1, 2]', '$' WITH CONDITIONAL ARRAY WRAPPER);-- returns '[[1, 2]]'
SELECT JSON_QUERY('[1, 2]', '$' WITH UNCONDITIONAL ARRAY WRAPPER);-- Scalars must be wrapped to be returned.
-- returns NULL
SELECT JSON_QUERY(1, '$');-- returns '[1]'
SELECT JSON_QUERY(1, '$' WITH CONDITIONAL ARRAY WRAPPER);-- Behavior if the path expression is empty.
-- returns '{}'
SELECT JSON_QUERY('{}', 'lax $.invalid' EMPTY OBJECT ON EMPTY);-- Behavior if the path expression has an error.
-- returns '[]'
SELECT JSON_QUERY('{}', 'strict $.invalid' EMPTY ARRAY ON ERROR);-- '{ "b": 1 }'
JSON_QUERY('{ "a": { "b": 1 } }', '$.a')
-- '[1, 2]'
JSON_QUERY('[1, 2]', '$')
-- NULL
JSON_QUERY(CAST(NULL AS STRING), '$')
-- '["c1","c2"]'
JSON_QUERY('{"a":[{"c":"c1"},{"c":"c2"}]}','lax $.a[*].c')-- Wrap result into an array
-- '[{}]'
JSON_QUERY('{}', '$' WITH CONDITIONAL ARRAY WRAPPER)
-- '[1, 2]'
JSON_QUERY('[1, 2]', '$' WITH CONDITIONAL ARRAY WRAPPER)
-- '[[1, 2]]'
JSON_QUERY('[1, 2]', '$' WITH UNCONDITIONAL ARRAY WRAPPER)-- Scalars must be wrapped to be returned
-- NULL
JSON_QUERY(1, '$')
-- '[1]'
JSON_QUERY(1, '$' WITH CONDITIONAL ARRAY WRAPPER)-- Behavior if path expression is empty / there is an error
-- '{}'
JSON_QUERY('{}', 'lax $.invalid' EMPTY OBJECT ON EMPTY)
-- '[]'
JSON_QUERY('{}', 'strict $.invalid' EMPTY ARRAY ON ERROR)
6、JSON_OBJECT
從鍵值對列表生成 JSON 對象字符串。
請注意,鍵必須是非 NULL 字符串文本,而值可以是任意表達(dá)式。
此函數(shù)返回一個(gè) JSON 字符串。ON NULL 行為定義如何處理 NULL 值。如果省略,則默認(rèn)假定 NULL ON NULL。
從另一個(gè) JSON 構(gòu)造函數(shù)調(diào)用(JSON_OBJECT、JSON_ARRAY)創(chuàng)建的值是直接插入的,而不是作為字符串插入的。這允許構(gòu)建嵌套的 JSON 結(jié)構(gòu)。
- SQL語法
JSON_OBJECT([[KEY] key VALUE value]* [ { NULL | ABSENT } ON NULL ])
- table api語法
jsonObject(JsonOnNull, keyValues...)
- 示例
Flink SQL> SELECT JSON_OBJECT(
> KEY 'K1'
> VALUE JSON_OBJECT(
> KEY 'K2'
> VALUE 'V'
> )
> );
+----+--------------------------------+
| op | EXPR$0 |
+----+--------------------------------+
| +I | {"K1":{"K2":"V"}} |
+----+--------------------------------+Flink SQL> SELECT JSON_OBJECT(KEY 'K1' VALUE CAST(NULL AS STRING) ABSENT ON NULL);
+----+--------------------------------+
| op | EXPR$0 |
+----+--------------------------------+
| +I | {} |
+----+--------------------------------+-- returns '{}'
SELECT JSON_OBJECT();-- returns '{"K1":"V1","K2":"V2"}'
SELECT JSON_OBJECT('K1' VALUE 'V1', 'K2' VALUE 'V2');-- Use an expression as a value.
SELECT JSON_OBJECT('orderNo' VALUE orders.orderId);-- ON NULL
-- '{"K1":null}'
SELECT JSON_OBJECT(KEY 'K1' VALUE CAST(NULL AS STRING) NULL ON NULL);-- ON NULL
-- '{}'
SELECT JSON_OBJECT(KEY 'K1' VALUE CAST(NULL AS STRING) ABSENT ON NULL);-- returns '{"K1":{"K2":"V"}}'
SELECT JSON_OBJECT(KEY 'K1'VALUE JSON_OBJECT(KEY 'K2'VALUE 'V')
);-- '{}'
JSON_OBJECT()-- '{"K1":"V1","K2":"V2"}'
JSON_OBJECT('K1' VALUE 'V1', 'K2' VALUE 'V2')-- Expressions as values
JSON_OBJECT('orderNo' VALUE orders.orderId)-- ON NULL
JSON_OBJECT(KEY 'K1' VALUE CAST(NULL AS STRING) NULL ON NULL) -- '{"K1":null}'
JSON_OBJECT(KEY 'K1' VALUE CAST(NULL AS STRING) ABSENT ON NULL) -- '{}'-- '{"K1":{"K2":"V"}}'
JSON_OBJECT(KEY 'K1'VALUE JSON_OBJECT(KEY 'K2'VALUE 'V')
)
7、JSON_ARRAY
從值列表生成 JSON 數(shù)組字符串。
此函數(shù)返回一個(gè) JSON 字符串。這些值可以是任意表達(dá)式。ON NULL 行為定義如何處理 NULL 值。如果省略,則默認(rèn)假定 ABSENT ON NULL。
從另一個(gè) JSON 構(gòu)造函數(shù)調(diào)用(JSON_OBJECT、JSON_ARRAY)創(chuàng)建的元素是直接插入的,而不是作為字符串插入的。這允許構(gòu)建嵌套的 JSON 結(jié)構(gòu)。
- SQL語法
JSON_ARRAY([value]* [ { NULL | ABSENT } ON NULL ])
- table api語法
jsonArray(JsonOnNull, values...)
- 示例
Flink SQL>
> SELECT JSON_ARRAY(1, '2');
+----+--------------------------------+
| op | EXPR$0 |
+----+--------------------------------+
| +I | [1,"2"] |
+----+--------------------------------+
Received a total of 1 rowFlink SQL> SELECT JSON_ARRAY(CAST(NULL AS STRING) ABSENT ON NULL);
+----+--------------------------------+
| op | EXPR$0 |
+----+--------------------------------+
| +I | [] |
+----+--------------------------------+-- returns '[]'
SELECT JSON_ARRAY();-- returns '[1,"2"]'
SELECT JSON_ARRAY(1, '2');-- Use an expression as a value.
SELECT JSON_ARRAY(orders.orderId);-- ON NULL
-- returns '[null]'
SELECT JSON_ARRAY(CAST(NULL AS STRING) NULL ON NULL);-- ON NULL
-- returns '[]'
SELECT JSON_ARRAY(CAST(NULL AS STRING) ABSENT ON NULL);-- returns '[[1]]'
SELECT JSON_ARRAY(JSON_ARRAY(1));-- '[]'
JSON_ARRAY()
-- '[1,"2"]'
JSON_ARRAY(1, '2')-- Expressions as values
JSON_ARRAY(orders.orderId)-- ON NULL
JSON_ARRAY(CAST(NULL AS STRING) NULL ON NULL) -- '[null]'
JSON_ARRAY(CAST(NULL AS STRING) ABSENT ON NULL) -- '[]'-- '[[1]]'
JSON_ARRAY(JSON_ARRAY(1))
8、JSON_ARRAYAGG
將明細(xì)聚合到 JSON 數(shù)組字符串中。
JSON_ARRAYAGG 函數(shù)通過將指定的項(xiàng)聚合到數(shù)組中來創(chuàng)建 JSON 對象字符串。
item 表達(dá)式可以是任意的,包括其他 JSON 函數(shù)。
如果值為 NULL,則 ON NULL 行為定義要執(zhí)行的操作。如果省略,則 ABSENT ON NULL 為默認(rèn)值。
OVER 窗口、無限會話窗口或 HOP 窗口不支持JSON_ARRAYAGG函數(shù)。
- SQL語法
JSON_ARRAYAGG(items [ { NULL | ABSENT } ON NULL ])
- table api語法
在這里插入代碼片
- 示例
Flink SQL> CREATE TABLE source_table (
> userId INT,
> age INT,
> balance DOUBLE,
> userName STRING,
> t_insert_time AS localtimestamp,
> WATERMARK FOR t_insert_time AS t_insert_time
> ) WITH (
> 'connector' = 'datagen',
> 'rows-per-second'='5',
> 'fields.userId.kind'='sequence',
> 'fields.userId.start'='1',
> 'fields.userId.end'='10',
>
> 'fields.balance.kind'='random',
> 'fields.balance.min'='1',
> 'fields.balance.max'='100',
>
> 'fields.age.min'='1',
> 'fields.age.max'='1000',
>
> 'fields.userName.length'='10'
> );
[INFO] Execute statement succeed.Flink SQL> select * from source_table;
+----+-------------+-------------+--------------------------------+--------------------------------+-------------------------+
| op | userId | age | balance | userName | t_insert_time |
+----+-------------+-------------+--------------------------------+--------------------------------+-------------------------+
| +I | 1 | 555 | 90.45012880441223 | 7e2b6c7beb | 2023-11-06 17:29:05.273 |
| +I | 2 | 209 | 32.07201650494765 | f652baac94 | 2023-11-06 17:29:05.274 |
| +I | 3 | 278 | 24.299962537076734 | 11b4353416 | 2023-11-06 17:29:05.274 |
| +I | 4 | 433 | 58.634356546049574 | 21d5d09603 | 2023-11-06 17:29:05.274 |
| +I | 5 | 55 | 16.20617629075601 | d626f31213 | 2023-11-06 17:29:05.274 |
| +I | 6 | 442 | 98.87803427244727 | 0305c21dc5 | 2023-11-06 17:29:06.267 |
| +I | 7 | 19 | 96.11095443982174 | ea873b2df2 | 2023-11-06 17:29:06.268 |
| +I | 8 | 806 | 36.5775262369553 | f8df556b22 | 2023-11-06 17:29:06.268 |
| +I | 9 | 919 | 69.47517602162831 | 85074390f3 | 2023-11-06 17:29:06.268 |
| +I | 10 | 46 | 47.519467818569815 | 662990446f | 2023-11-06 17:29:06.268 |
+----+-------------+-------------+--------------------------------+--------------------------------+-------------------------+
Received a total of 10 rowsFlink SQL> SELECT
> JSON_ARRAYAGG(userName)
> FROM source_table;
+----+--------------------------------+
| op | EXPR$0 |
+----+--------------------------------+
| +I | ["ee2e4edb32"] |
| -U | ["ee2e4edb32"] |
| +U | ["ee2e4edb32","66e13f3f77"] |
| -U | ["ee2e4edb32","66e13f3f77"] |
| +U | ["ee2e4edb32","66e13f3f77",... |
| -U | ["ee2e4edb32","66e13f3f77",... |
| +U | ["ee2e4edb32","66e13f3f77",... |
| -U | ["ee2e4edb32","66e13f3f77",... |
| +U | ["ee2e4edb32","66e13f3f77",... |
| -U | ["ee2e4edb32","66e13f3f77",... |
| +U | ["ee2e4edb32","66e13f3f77",... |
| -U | ["ee2e4edb32","66e13f3f77",... |
| +U | ["ee2e4edb32","66e13f3f77",... |
| -U | ["ee2e4edb32","66e13f3f77",... |
| +U | ["ee2e4edb32","66e13f3f77",... |
| -U | ["ee2e4edb32","66e13f3f77",... |
| +U | ["ee2e4edb32","66e13f3f77",... |
| -U | ["ee2e4edb32","66e13f3f77",... |
| +U | ["ee2e4edb32","66e13f3f77",... |
+----+--------------------------------+
Received a total of 19 rowsFlink SQL> SELECT
> JSON_ARRAYAGG(userId)
> FROM source_table;
+----+--------------------------------+
| op | EXPR$0 |
+----+--------------------------------+
| +I | [1] |
| -U | [1] |
| +U | [1,2] |
| -U | [1,2] |
| +U | [1,2,3] |
| -U | [1,2,3] |
| +U | [1,2,3,4] |
| -U | [1,2,3,4] |
| +U | [1,2,3,4,5] |
| -U | [1,2,3,4,5] |
| +U | [1,2,3,4,5,6] |
| -U | [1,2,3,4,5,6] |
| +U | [1,2,3,4,5,6,7] |
| -U | [1,2,3,4,5,6,7] |
| +U | [1,2,3,4,5,6,7,8] |
| -U | [1,2,3,4,5,6,7,8] |
| +U | [1,2,3,4,5,6,7,8,9] |
| -U | [1,2,3,4,5,6,7,8,9] |
| +U | [1,2,3,4,5,6,7,8,9,10] |
+----+--------------------------------+
Received a total of 19 rows
10、JSON_OBJECTAGG
將key-value表達(dá)式聚合到 JSON 字符串中。
JSON_OBJECTAGG 函數(shù)通過將key-value表達(dá)式聚合到單個(gè) JSON 對象中來創(chuàng)建 JSON 對象字符串。
key表達(dá)式必須返回不可為 null 的字符串。value表達(dá)式可以是任意的,包括其他 JSON 函數(shù)。
密鑰必須是唯一的。如果一個(gè)key多次出現(xiàn),則會引發(fā)錯(cuò)誤。
如果value為 NULL,則 ON NULL 行為定義要執(zhí)行的操作。如果省略,則 NULL ON NULL 為默認(rèn)值。
OVER 窗口中不支持 JSON_OBJECTAGG 函數(shù)。
- SQL語法
JSON_OBJECTAGG([KEY] key VALUE value [ { NULL | ABSENT } ON NULL ])
- table api語法
在這里插入代碼片
- 示例
Flink SQL> select
> JSON_OBJECTAGG(userName VALUE 'f652baac94' )
> FROM source_table;
+----+--------------------------------+
| op | EXPR$0 |
+----+--------------------------------+
| +I | {"0c3ceeca6f":"f652baac94"} |
| -U | {"0c3ceeca6f":"f652baac94"} |
| +U | {"0c3ceeca6f":"f652baac94",... |
| -U | {"0c3ceeca6f":"f652baac94",... |
| +U | {"0c3ceeca6f":"f652baac94",... |
| -U | {"0c3ceeca6f":"f652baac94",... |
| +U | {"0c3ceeca6f":"f652baac94",... |
| -U | {"0c3ceeca6f":"f652baac94",... |
| +U | {"0c3ceeca6f":"f652baac94",... |
| -U | {"0c3ceeca6f":"f652baac94",... |
| +U | {"0c3ceeca6f":"f652baac94",... |
| -U | {"0c3ceeca6f":"f652baac94",... |
| +U | {"0c3ceeca6f":"f652baac94",... |
| -U | {"0c3ceeca6f":"f652baac94",... |
| +U | {"0c3ceeca6f":"f652baac94",... |
| -U | {"0c3ceeca6f":"f652baac94",... |
| +U | {"0c3ceeca6f":"f652baac94",... |
| -U | {"0c3ceeca6f":"f652baac94",... |
| +U | {"0c3ceeca6f":"f652baac94",... |
+----+--------------------------------+
10)、值構(gòu)建函數(shù)
11)、值獲取函數(shù)
12)、分組函數(shù)
13)、哈希函數(shù)
2、聚合函數(shù)
聚合函數(shù)將所有的行作為輸入,并返回單個(gè)聚合值作為結(jié)果。
3、時(shí)間間隔單位和時(shí)間點(diǎn)單位標(biāo)識符
下表列出了時(shí)間間隔單位和時(shí)間點(diǎn)單位標(biāo)識符。
對于 Table API,請使用 _ 代替空格(例如 DAY_TO_HOUR)
4、列函數(shù)
列函數(shù)用于選擇或丟棄表的列。
列函數(shù)僅在 Table API 中使用。
詳細(xì)語法如下:
//列函數(shù):withColumns(columnExprs)withoutColumns(columnExprs)//多列表達(dá)式:columnExpr [, columnExpr]*//單列表達(dá)式:columnRef | columnIndex to columnIndex | columnName to columnName//列引用:columnName(The field name that exists in the table) | columnIndex(a positive integer starting from 1)
列函數(shù)的用法如下表所示(假設(shè)我們有一個(gè)包含 5 列的表:(a: Int, b: Long, c: String, d:String, e: String)):
列函數(shù)可用于所有需要列字段的地方,例如 select、groupBy、orderBy、UDFs 等函數(shù),例如:
table.groupBy(withColumns(range(1, 3))).select(withColumns(range("a", "b")), myUDAgg(myUDF(withColumns(range(5, 20)))));
以上,介紹了flink的函數(shù)分類、內(nèi)置函數(shù)的說明及示例,特別是針對json function函數(shù)每個(gè)均以可運(yùn)行的示例進(jìn)行說明。