建站系統(tǒng)做的網(wǎng)站百度可以搜索到嗎網(wǎng)絡(luò)營(yíng)銷策劃方案書范文
Flink 系列文章
1、Flink 部署、概念介紹、source、transformation、sink使用示例、四大基石介紹和示例等系列綜合文章鏈接
13、Flink 的table api與sql的基本概念、通用api介紹及入門示例
14、Flink 的table api與sql之?dāng)?shù)據(jù)類型: 內(nèi)置數(shù)據(jù)類型以及它們的屬性
15、Flink 的table api與sql之流式概念-詳解的介紹了動(dòng)態(tài)表、時(shí)間屬性配置(如何處理更新結(jié)果)、時(shí)態(tài)表、流上的join、流上的確定性以及查詢配置
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及FileSystem示例(1)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Elasticsearch示例(2)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache Kafka示例(3)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及JDBC示例(4)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache Hive示例(6)
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
18、Flink的SQL 支持的操作和語(yǔ)法
19、Flink 的Table API 和 SQL 中的內(nèi)置函數(shù)及示例(1)
19、Flink 的Table API 和 SQL 中的自定義函數(shù)及示例(2)
20、Flink SQL之SQL Client: 不用編寫代碼就可以嘗試 Flink SQL,可以直接提交 SQL 任務(wù)到集群上
22、Flink 的table api與sql之創(chuàng)建表的DDL
24、Flink 的table api與sql之Catalogs(介紹、類型、java api和sql實(shí)現(xiàn)ddl、java api和sql操作catalog)-1
24、Flink 的table api與sql之Catalogs(java api操作數(shù)據(jù)庫(kù)、表)-2
24、Flink 的table api與sql之Catalogs(java api操作視圖)-3
24、Flink 的table api與sql之Catalogs(java api操作分區(qū)與函數(shù))-4
26、Flink 的SQL之概覽與入門示例
27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介紹及詳細(xì)示例(1)
27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介紹及詳細(xì)示例(2)
27、Flink 的SQL之SELECT (窗口函數(shù))介紹及詳細(xì)示例(3)
27、Flink 的SQL之SELECT (窗口聚合)介紹及詳細(xì)示例(4)
27、Flink 的SQL之SELECT (Group Aggregation分組聚合、Over Aggregation Over聚合 和 Window Join 窗口關(guān)聯(lián))介紹及詳細(xì)示例(5)
27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介紹及詳細(xì)示例(6)
27、Flink 的SQL之SELECT (Pattern Recognition 模式檢測(cè))介紹及詳細(xì)示例(7)
28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 語(yǔ)句
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1)
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(2)
30、Flink SQL之SQL 客戶端(通過kafka和filesystem的例子介紹了配置文件使用-表、視圖等)
32、Flink table api和SQL 之用戶自定義 Sources & Sinks實(shí)現(xiàn)及詳細(xì)示例
41、Flink之Hive 方言介紹及詳細(xì)示例
42、Flink 的table api與sql之Hive Catalog
43、Flink之Hive 讀寫及詳細(xì)驗(yàn)證示例
44、Flink之module模塊介紹及使用示例和Flink SQL使用hive內(nèi)置函數(shù)及自定義函數(shù)詳細(xì)示例–網(wǎng)上有些說法好像是錯(cuò)誤的
文章目錄
- Flink 系列文章
- 三、自定義函數(shù)
- 1、概述
- 2、開發(fā)指南
- 1)、函數(shù)類
- 2)、求值方法
- 3)、類型推導(dǎo)
- 1、自動(dòng)類型推導(dǎo)
- 2、定制類型推導(dǎo)
- 4)、確定性
- 1、內(nèi)置函數(shù)的確定性
- 5)、運(yùn)行時(shí)集成
- 3、標(biāo)量函數(shù)-自定義函數(shù)說明及示例
- 4、表值函數(shù)-自定義函數(shù)說明及示例
本文介紹了flink的自定義函數(shù)概述、開發(fā)指南以及標(biāo)量函數(shù)、表值函數(shù)的自定義函數(shù)實(shí)現(xiàn)及說明,提供的示例均可運(yùn)行并提供運(yùn)行結(jié)果供參考。
本文依賴flink集群能正常使用。
本文分為4個(gè)部分,即自定義函數(shù)的概述、開發(fā)指南、標(biāo)量自定義函數(shù)的說明及示例、表值自定義函數(shù)的說明及示例。
本文的示例均在Flink 1.17版本中運(yùn)行。
三、自定義函數(shù)
自定義函數(shù)(UDF)是一種擴(kuò)展開發(fā)機(jī)制,可以用來在查詢語(yǔ)句里調(diào)用難以用其他方式表達(dá)的頻繁使用或自定義的邏輯。
自定義函數(shù)可以用 JVM 語(yǔ)言(例如 Java 或 Scala)或 Python 實(shí)現(xiàn),實(shí)現(xiàn)者可以在 UDF 中使用任意第三方庫(kù),本文聚焦于使用 JVM 語(yǔ)言開發(fā)自定義函數(shù)。
1、概述
當(dāng)前 Flink 有如下幾種函數(shù):
- 標(biāo)量函數(shù),將標(biāo)量值轉(zhuǎn)換成一個(gè)新標(biāo)量值;
- 表值函數(shù),將標(biāo)量值轉(zhuǎn)換成新的行數(shù)據(jù);
- 聚合函數(shù),將多行數(shù)據(jù)里的標(biāo)量值轉(zhuǎn)換成一個(gè)新標(biāo)量值;
- 表值聚合函數(shù),將多行數(shù)據(jù)里的標(biāo)量值轉(zhuǎn)換成新的行數(shù)據(jù);
- 異步表值函數(shù),是異步查詢外部數(shù)據(jù)系統(tǒng)的特殊函數(shù)。
標(biāo)量和表值函數(shù)已經(jīng)使用了新的基于數(shù)據(jù)類型的類型系統(tǒng),聚合函數(shù)仍然使用基于 TypeInformation 的舊類型系統(tǒng)。
2、開發(fā)指南
在聚合函數(shù)使用新的類型系統(tǒng)前,本節(jié)僅適用于標(biāo)量和表值函數(shù)。
所有的自定義函數(shù)都遵循一些基本的實(shí)現(xiàn)原則。
1)、函數(shù)類
實(shí)現(xiàn)類必須繼承自合適的基類之一(例如 org.apache.flink.table.functions.ScalarFunction )。
該類必須聲明為 public ,而不是 abstract ,并且可以被全局訪問。不允許使用非靜態(tài)內(nèi)部類或匿名類。
為了將自定義函數(shù)存儲(chǔ)在持久化的 catalog 中,該類必須具有默認(rèn)構(gòu)造器,且在運(yùn)行時(shí)可實(shí)例化。
Table API 中的匿名函數(shù)只有在函數(shù)不是有狀態(tài)的(stateful)(即僅包含瞬態(tài)和靜態(tài)(transient and static)字段)時(shí)才能持久化。
2)、求值方法
基類提供了一組可以被重寫的方法,例如 open()、 close() 或 isDeterministic() 。
但是,除了上述方法之外,作用于每條傳入記錄的主要邏輯還必須通過專門的 求值方法 來實(shí)現(xiàn)。
根據(jù)函數(shù)的種類,后臺(tái)生成的運(yùn)算符會(huì)在運(yùn)行時(shí)調(diào)用諸如 eval()、accumulate() 或 retract() 之類的求值方法。
這些方法必須聲明為 public ,并帶有一組定義明確的參數(shù)。
常規(guī)的 JVM 方法調(diào)用語(yǔ)義是適用的。因此可以:
- 實(shí)現(xiàn)重載的方法,例如 eval(Integer) 和 eval(LocalDateTime);
- 使用變長(zhǎng)參數(shù),例如 eval(Integer…);
- 使用對(duì)象繼承,例如 eval(Object) 可接受 LocalDateTime 和 Integer 作為參數(shù);
- 也可組合使用,例如 eval(Object…) 可接受所有類型的參數(shù)。
示例片段
import org.apache.flink.table.functions.ScalarFunction;// 有多個(gè)重載求值方法的函數(shù)
public static class SumFunction extends ScalarFunction {//兩Integer數(shù)求和public Integer eval(Integer a, Integer b) {return a + b;}//兩String數(shù)轉(zhuǎn)換后求和public Integer eval(String a, String b) {return Integer.valueOf(a) + Integer.valueOf(b);}//多Double數(shù)據(jù)求和public Integer eval(Double... d) {double result = 0;for (double value : d)result += value;return (int) result;}
}
3)、類型推導(dǎo)
Table(類似于 SQL 標(biāo)準(zhǔn))是一種強(qiáng)類型的 API。因此,函數(shù)的參數(shù)和返回類型都必須映射到數(shù)據(jù)類型。
從邏輯角度看,Planner 需要知道數(shù)據(jù)類型、精度和小數(shù)位數(shù);從 JVM 角度來看,Planner 在調(diào)用自定義函數(shù)時(shí)需要知道如何將內(nèi)部數(shù)據(jù)結(jié)構(gòu)表示為 JVM 對(duì)象。
術(shù)語(yǔ) 類型推導(dǎo) 概括了意在驗(yàn)證輸入值、派生出參數(shù)/返回值數(shù)據(jù)類型的邏輯。
Flink 自定義函數(shù)實(shí)現(xiàn)了自動(dòng)的類型推導(dǎo)提取,通過反射從函數(shù)的類及其求值方法中派生數(shù)據(jù)類型。如果這種隱式的反射提取方法不成功,則可以通過使用 @DataTypeHint 和 @FunctionHint 注解相關(guān)參數(shù)、類或方法來支持提取過程,下面展示了有關(guān)如何注解函數(shù)的例子。
如果需要更高級(jí)的類型推導(dǎo)邏輯,實(shí)現(xiàn)者可以在每個(gè)自定義函數(shù)中顯式重寫 getTypeInference() 方法。但是,建議使用注解方式,因?yàn)樗墒棺远x類型推導(dǎo)邏輯保持在受影響位置附近,而在其他位置則保持默認(rèn)狀態(tài)。
1、自動(dòng)類型推導(dǎo)
自動(dòng)類型推導(dǎo)會(huì)檢查函數(shù)的類和求值方法,派生出函數(shù)參數(shù)和結(jié)果的數(shù)據(jù)類型, @DataTypeHint 和 @FunctionHint 注解支持自動(dòng)類型推導(dǎo)。
有關(guān)可以隱式映射到數(shù)據(jù)類型的類的完整列表,請(qǐng)參閱數(shù)據(jù)類型。
- @DataTypeHint
在許多情況下,需要支持以 內(nèi)聯(lián) 方式自動(dòng)提取出函數(shù)參數(shù)、返回值的類型。
以下例子展示了如何使用 @DataTypeHint,詳情可參考該注解類的文檔。
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.InputGroup;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;// 有多個(gè)重載求值方法的函數(shù)
public static class OverloadedFunction extends ScalarFunction {// no hint requiredpublic Long eval(long a, long b) {return a + b;}// 定義 decimal 的精度和小數(shù)位public @DataTypeHint("DECIMAL(12, 3)") BigDecimal eval(double a, double b) {return BigDecimal.valueOf(a + b);}// 定義嵌套數(shù)據(jù)類型@DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>")public Row eval(int i) {return Row.of(String.valueOf(i), Instant.ofEpochSecond(i));}// 允許任意類型的符入,并輸出序列化定制后的值@DataTypeHint(value = "RAW", bridgedTo = ByteBuffer.class)public ByteBuffer eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {return MyUtils.serializeToByteBuffer(o);}
}
- @FunctionHint
有時(shí)我們希望一種求值方法可以同時(shí)處理多種數(shù)據(jù)類型,有時(shí)又要求對(duì)重載的多個(gè)求值方法僅聲明一次通用的結(jié)果類型。
@FunctionHint 注解可以提供從入?yún)?shù)據(jù)類型到結(jié)果數(shù)據(jù)類型的映射,它可以在整個(gè)函數(shù)類或求值方法上注解輸入、累加器和結(jié)果的數(shù)據(jù)類型??梢栽陬愴敳柯暶饕粋€(gè)或多個(gè)注解,也可以為類的所有求值方法分別聲明一個(gè)或多個(gè)注解。所有的 hint 參數(shù)都是可選的,如果未定義參數(shù),則使用默認(rèn)的基于反射的類型提取。在函數(shù)類頂部定義的 hint 參數(shù)被所有求值方法繼承。
以下例子展示了如何使用 @FunctionHint,詳情可參考該注解類的文檔。
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;// 為函數(shù)類的所有求值方法指定同一個(gè)輸出類型
@FunctionHint(output = @DataTypeHint("ROW<s STRING, i INT>"))
public static class OverloadedFunction extends TableFunction<Row> {public void eval(int a, int b) {collect(Row.of("Sum", a + b));}// overloading of arguments is still possiblepublic void eval() {collect(Row.of("Empty args", -1));}
}// 解耦類型推導(dǎo)與求值方法,類型推導(dǎo)完全取決于 FunctionHint
@FunctionHint(input = {@DataTypeHint("INT"), @DataTypeHint("INT")},output = @DataTypeHint("INT")
)
@FunctionHint(input = {@DataTypeHint("BIGINT"), @DataTypeHint("BIGINT")},output = @DataTypeHint("BIGINT")
)
@FunctionHint(input = {},output = @DataTypeHint("BOOLEAN")
)
public static class OverloadedFunction extends TableFunction<Object> {// an implementer just needs to make sure that a method exists that can be called by the JVMpublic void eval(Object... o) {if (o.length == 0) {collect(false);}collect(o[0]);}
}
2、定制類型推導(dǎo)
在大多數(shù)情況下,@DataTypeHint 和 @FunctionHint 足以構(gòu)建自定義函數(shù),然而通過重寫 getTypeInference() 定制自動(dòng)類型推導(dǎo)邏輯,實(shí)現(xiàn)者可以創(chuàng)建任意像系統(tǒng)內(nèi)置函數(shù)那樣有用的函數(shù)。
以下用 Java 實(shí)現(xiàn)的例子展示了定制類型推導(dǎo)的潛力,它根據(jù)字符串參數(shù)來確定函數(shù)的結(jié)果類型。該函數(shù)帶有兩個(gè)字符串參數(shù):第一個(gè)參數(shù)表示要分析的字符串,第二個(gè)參數(shù)表示目標(biāo)類型。
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.types.Row;public static class LiteralFunction extends ScalarFunction {public Object eval(String s, String type) {switch (type) {case "INT":return Integer.valueOf(s);case "DOUBLE":return Double.valueOf(s);case "STRING":default:return s;}}// 禁用自動(dòng)的反射式類型推導(dǎo),使用如下邏輯進(jìn)行類型推導(dǎo)@Overridepublic TypeInference getTypeInference(DataTypeFactory typeFactory) {return TypeInference.newBuilder()// 指定輸入?yún)?shù)的類型,必要時(shí)參數(shù)會(huì)被隱式轉(zhuǎn)換.typedArguments(DataTypes.STRING(), DataTypes.STRING())// specify a strategy for the result data type of the function.outputTypeStrategy(callContext -> {if (!callContext.isArgumentLiteral(1) || callContext.isArgumentNull(1)) {throw callContext.newValidationError("Literal expected for second argument.");}// 基于字符串值返回?cái)?shù)據(jù)類型final String literal = callContext.getArgumentValue(1, String.class).orElse("STRING");switch (literal) {case "INT":return Optional.of(DataTypes.INT().notNull());case "DOUBLE":return Optional.of(DataTypes.DOUBLE().notNull());case "STRING":default:return Optional.of(DataTypes.STRING());}}).build();}
}
4)、確定性
每個(gè)用戶自定義函數(shù)類都可以通過重寫 isDeterministic() 方法來聲明它是否產(chǎn)生確定性的結(jié)果。如果該函數(shù)不是純粹函數(shù)式的(如random(), date(), 或now()),該方法必須返回 false。默認(rèn)情況下,isDeterministic() 返回 true。
此外,重寫 isDeterministic() 方法也可能影響運(yùn)行時(shí)行為。運(yùn)行時(shí)實(shí)現(xiàn)可能會(huì)在兩個(gè)不同的階段被調(diào)用:
-
在生成執(zhí)行計(jì)劃期間:如果一個(gè)函數(shù)是通過常量表達(dá)式調(diào)用的或者常量表達(dá)式可以從給定的語(yǔ)句中推導(dǎo)出來,那么一個(gè)函數(shù)就會(huì)被預(yù)計(jì)算以減少常量表達(dá)式,并且可能不再在集群上執(zhí)行。 除非 isDeterministic() 被重寫為 false 用來在這種情況下禁用常量表達(dá)式簡(jiǎn)化。比如說,以下對(duì) ABS 的調(diào)用在生成執(zhí)行計(jì)劃期間被執(zhí)行:SELECT ABS(-1) FROM t 和 SELECT ABS(field) FROM t WHERE field = -1,而 SELECT ABS(field) FROM t 則不執(zhí)行。
-
在運(yùn)行時(shí)(即在集群執(zhí)行):如果一個(gè)函數(shù)被調(diào)用時(shí)帶有非常量表達(dá)式或 isDeterministic() 返回 false。
1、內(nèi)置函數(shù)的確定性
系統(tǒng)(內(nèi)置)函數(shù)的確定性是不可改變的。存在兩種不具有確定性的函數(shù):動(dòng)態(tài)函數(shù)和非確定性函數(shù),根據(jù) Apache Calcite SqlOperator 的定義:
/*** Returns whether a call to this operator is guaranteed to always return* the same result given the same operands; true is assumed by default.*/public boolean isDeterministic() {return true;}/*** Returns whether it is unsafe to cache query plans referencing this* operator; false is assumed by default.*/public boolean isDynamicFunction() {return false;}
isDeterministic 表示函數(shù)的確定性,聲明返回 false 時(shí)將在運(yùn)行時(shí)對(duì)每個(gè)記錄進(jìn)行計(jì)算。
isDynamicFunction 聲明返回 true 時(shí)意味著該函數(shù)只能在查詢開始時(shí)被計(jì)算,對(duì)于批處理模式,它只在生成執(zhí)行計(jì)劃期間被執(zhí)行, 而對(duì)于流模式,它等效于一個(gè)非確定性的函數(shù),這是因?yàn)椴樵冊(cè)谶壿嬌鲜沁B續(xù)執(zhí)行的(流模式對(duì)動(dòng)態(tài)表的連續(xù)查詢抽象),所以動(dòng)態(tài)函數(shù)在每次查詢執(zhí)行時(shí)也會(huì)被重新計(jì)算(當(dāng)前實(shí)現(xiàn)下等效于每條記錄計(jì)算)。
以下內(nèi)置函數(shù)總是非確定性的(批和流模式下,都在運(yùn)行時(shí)對(duì)每條記錄進(jìn)行計(jì)算)
- UUID
- RAND
- RAND_INTEGER
- CURRENT_DATABASE
- UNIX_TIMESTAMP
- CURRENT_ROW_TIMESTAMP
以下內(nèi)置時(shí)間函數(shù)是動(dòng)態(tài)的,批處理模式下,將在生成執(zhí)行計(jì)劃期間被執(zhí)行(查詢開始),對(duì)于流模式,將在運(yùn)行時(shí)對(duì)每條記錄進(jìn)行計(jì)算
- CURRENT_DATE
- CURRENT_TIME
- CURRENT_TIMESTAMP
- NOW
- LOCALTIME
- LOCALTIMESTAMP
isDynamicFunction 僅適用于內(nèi)置函數(shù)
5)、運(yùn)行時(shí)集成
有時(shí)候自定義函數(shù)需要獲取一些全局信息,或者在真正被調(diào)用之前做一些配置(setup)/清理(clean-up)的工作。自定義函數(shù)也提供了 open() 和 close() 方法,你可以重寫這兩個(gè)方法做到類似于 DataStream API 中 RichFunction 的功能。
open() 方法在求值方法被調(diào)用之前先調(diào)用。close() 方法在求值方法調(diào)用完之后被調(diào)用。
open() 方法提供了一個(gè) FunctionContext,它包含了一些自定義函數(shù)被執(zhí)行時(shí)的上下文信息,比如 metric group、分布式文件緩存,或者是全局的作業(yè)參數(shù)等。
下面的信息可以通過調(diào)用 FunctionContext 的對(duì)應(yīng)的方法來獲得:
方法 | 描述 |
---|---|
getMetricGroup() | 執(zhí)行該函數(shù)的 subtask 的 Metric Group。 |
getCachedFile(name) | 分布式文件緩存的本地臨時(shí)文件副本。 |
getJobParameter(name, defaultValue) | 跟對(duì)應(yīng)的 key 關(guān)聯(lián)的全局參數(shù)值。 |
下面的例子展示了如何在一個(gè)標(biāo)量函數(shù)中通過 FunctionContext 來獲取一個(gè)全局的任務(wù)參數(shù):
import org.apache.flink.table.api.*;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;public static class HashCodeFunction extends ScalarFunction {private int factor = 0;@Overridepublic void open(FunctionContext context) throws Exception {// 獲取參數(shù) "hashcode_factor"// 如果不存在,則使用默認(rèn)值 "12"factor = Integer.parseInt(context.getJobParameter("hashcode_factor", "12"));}public int eval(String s) {return s.hashCode() * factor;}
}TableEnvironment env = TableEnvironment.create(...);// 設(shè)置任務(wù)參數(shù)
env.getConfig().addJobParameter("hashcode_factor", "31");// 注冊(cè)函數(shù)
env.createTemporarySystemFunction("hashCode", HashCodeFunction.class);// 調(diào)用函數(shù)
env.sqlQuery("SELECT myField, hashCode(myField) FROM MyTable");
3、標(biāo)量函數(shù)-自定義函數(shù)說明及示例
自定義標(biāo)量函數(shù)可以把 0 到多個(gè)標(biāo)量值映射成 1 個(gè)標(biāo)量值,數(shù)據(jù)類型里列出的任何數(shù)據(jù)類型都可作為求值方法的參數(shù)和返回值類型。
想要實(shí)現(xiàn)自定義標(biāo)量函數(shù),你需要擴(kuò)展 org.apache.flink.table.functions 里面的 ScalarFunction 并且實(shí)現(xiàn)一個(gè)或者多個(gè)求值方法。標(biāo)量函數(shù)的行為取決于你寫的求值方法。
求值方法必須是 public 的,而且名字必須是 eval。
下面自定義函數(shù)是將balance加上(萬元)以及求balance/age,僅僅示例如何使用,其運(yùn)行結(jié)果在每次輸出的代碼后面注釋的行。
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;import java.util.Arrays;
import java.util.List;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.InputGroup;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** @author alanchan**/
public class TestUDScalarFunctionDemo {@Data@NoArgsConstructor@AllArgsConstructorpublic static class User {private long id;private String name;private int age;private int balance;private Long rowtime;}final static List<User> userList = Arrays.asList(new User(1L, "alan", 18, 20,1698742358391L), new User(2L, "alan", 19, 25,1698742359396L), new User(3L, "alan", 25, 30,1698742360407L),new User(4L, "alanchan", 28,35, 1698742361409L), new User(5L, "alanchan", 29, 35,1698742362424L));public static class TestScalarFunction extends ScalarFunction {// 接受任意類型輸入,返回 String 型輸出public String eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {return o.toString() + " (萬元)";}public double eval(Integer age, Integer balance) {return balance / age *1.0;}}/*** @param args* @throws Exception*/public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);DataStream<User> users = env.fromCollection(userList);Table usersTable = tenv.fromDataStream(users, $("id"), $("name"), $("age"),$("balance"), $("rowtime"));//1、 在 Table API 里不經(jīng)注冊(cè)直接“內(nèi)聯(lián)”調(diào)用函數(shù)Table result = usersTable.select($("id"), $("name"), call(TestScalarFunction.class, $("balance")));DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
// resultDS.print();
// 11> (true,+I[2, alan, 25 (萬元)])
// 12> (true,+I[3, alan, 30 (萬元)])
// 13> (true,+I[4, alanchan, 35 (萬元)])
// 10> (true,+I[1, alan, 20 (萬元)])
// 14> (true,+I[5, alanchan, 35 (萬元)])Table result2 = usersTable.select($("id"), $("name"), $("age"), call(TestScalarFunction.class, $("balance")), call(TestScalarFunction.class, $("age"), $("balance")));DataStream<Tuple2<Boolean, Row>> result2DS = tenv.toRetractStream(result2, Row.class);
// result2DS.print();
// 9> (true,+I[2, alan, 19, 25 (萬元), 1.0])
// 10> (true,+I[3, alan, 25, 30 (萬元), 1.0])
// 12> (true,+I[5, alanchan, 29, 35 (萬元), 1.0])
// 11> (true,+I[4, alanchan, 28, 35 (萬元), 1.0])
// 8> (true,+I[1, alan, 18, 20 (萬元), 1.0])//2、 注冊(cè)函數(shù)tenv.createTemporarySystemFunction("TestScalarFunction", TestScalarFunction.class);// 在 Table API 里調(diào)用注冊(cè)好的函數(shù)Table result3 = usersTable.select($("id"), $("name"),call("TestScalarFunction", $("balance")));DataStream<Tuple2<Boolean, Row>> result3DS = tenv.toRetractStream(result3, Row.class);
// result3DS.print();
// 2> (true,+I[4, alanchan, 35 (萬元)])
// 3> (true,+I[5, alanchan, 35 (萬元)])
// 15> (true,+I[1, alan, 20 (萬元)])
// 16> (true,+I[2, alan, 25 (萬元)])
// 1> (true,+I[3, alan, 30 (萬元)])// 在 SQL 里調(diào)用注冊(cè)好的函數(shù)tenv.createTemporaryView("user_view", users);Table result4 = tenv.sqlQuery("SELECT id,name,TestScalarFunction(balance) ,TestScalarFunction(age,balance) FROM user_view");DataStream<Tuple2<Boolean, Row>> result4DS = tenv.toRetractStream(result4, Row.class);result4DS.print();
// 14> (true,+I[1, alan, 20 (萬元), 1.0])
// 1> (true,+I[4, alanchan, 35 (萬元), 1.0])
// 2> (true,+I[5, alanchan, 35 (萬元), 1.0])
// 15> (true,+I[2, alan, 25 (萬元), 1.0])
// 16> (true,+I[3, alan, 30 (萬元), 1.0])env.execute();}}
4、表值函數(shù)-自定義函數(shù)說明及示例
跟自定義標(biāo)量函數(shù)一樣,自定義表值函數(shù)的輸入?yún)?shù)也可以是 0 到多個(gè)標(biāo)量。但是跟標(biāo)量函數(shù)只能返回一個(gè)值不同的是,它可以返回任意多行。返回的每一行可以包含 1 到多列,如果輸出行只包含 1 列,會(huì)省略結(jié)構(gòu)化信息并生成標(biāo)量值,這個(gè)標(biāo)量值在運(yùn)行階段會(huì)隱式地包裝進(jìn)行里。
要定義一個(gè)表值函數(shù),你需要擴(kuò)展 org.apache.flink.table.functions 下的 TableFunction,可以通過實(shí)現(xiàn)多個(gè)名為 eval 的方法對(duì)求值方法進(jìn)行重載。像其他函數(shù)一樣,輸入和輸出類型也可以通過反射自動(dòng)提取出來。表值函數(shù)返回的表的類型取決于 TableFunction 類的泛型參數(shù) T,不同于標(biāo)量函數(shù),表值函數(shù)的求值方法本身不包含返回類型,而是通過 collect(T) 方法來發(fā)送要輸出的行。
在 Table API 中,表值函數(shù)是通過 .joinLateral(…) 或者 .leftOuterJoinLateral(…) 來使用的。joinLateral 算子會(huì)把外表(算子左側(cè)的表)的每一行跟跟表值函數(shù)返回的所有行(位于算子右側(cè))進(jìn)行 (cross)join。leftOuterJoinLateral 算子也是把外表(算子左側(cè)的表)的每一行跟表值函數(shù)返回的所有行(位于算子右側(cè))進(jìn)行(cross)join,并且如果表值函數(shù)返回 0 行也會(huì)保留外表的這一行。
在 SQL 里面用 JOIN 或者 以 ON TRUE 為條件的 LEFT JOIN 來配合 LATERAL TABLE() 的使用。
下面示例中包含表值函數(shù)的四種應(yīng)用方式。
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;import java.util.Arrays;
import java.util.List;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** @author alanchan**/
public class TestUDTableFunctionDemo {@Data@NoArgsConstructor@AllArgsConstructorpublic static class User {private long id;private String name;private int age;private int balance;private Long rowtime;}final static List<User> userList = Arrays.asList(new User(1L, "alan,chen", 18, 20,1698742358391L), new User(2L, "alan,chen", 19, 25,1698742359396L), new User(3L, "alan,chen", 25, 30,1698742360407L),new User(4L, "alan,chan", 28,35, 1698742361409L), new User(5L, "alan,chan", 29, 35,1698742362424L));@FunctionHint(output = @DataTypeHint("ROW<firstName STRING, lastName String>"))public static class SplitFunction extends TableFunction<Row> {public void eval(String str) {String[] names = str.split(",");collect(Row.of(names[0],names[1]));
// for (String s : str.split(", ")) {
// // use collect(...) to emit a row
// collect(Row.of(s, s.length()));
// }}}@FunctionHint(output = @DataTypeHint("ROW<id int, name String, age int, balance int, rowtime string>"))public static class OverloadedFunction extends TableFunction<Row> {public void eval(String str) {String[] user = str.split(",");collect(Row.of(Integer.valueOf(user[0]),user[1],Integer.valueOf(user[2]),Integer.valueOf(user[3]),user[4]));}}/*** @param args* @throws Exception */public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);DataStream<User> users = env.fromCollection(userList);Table usersTable = tenv.fromDataStream(users, $("id"), $("name"), $("age"), $("balance"), $("rowtime"));// 1、 在 Table API 里不經(jīng)注冊(cè)直接“內(nèi)聯(lián)”調(diào)用函數(shù)Table result = usersTable.joinLateral(call(SplitFunction.class, $("name"))).select($("id"), $("name"),$("firstName"),$("lastName"));DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
// resultDS.print();
// 11> (true,+I[5, alan,chan, alan, chan])
// 7> (true,+I[1, alan,chen, alan, chen])
// 9> (true,+I[3, alan,chen, alan, chen])
// 10> (true,+I[4, alan,chan, alan, chan])
// 8> (true,+I[2, alan,chen, alan, chen])DataStream<String> row = env.fromCollection(//id name age balance rowtimeArrays.asList("11,alan,18,20,1699341167461","12,alan,19,25,1699341168464","13,alan,20,30,1699341169472","14,alanchan,18,22,1699341170479","15,alanchan,19,25,1699341171482"));Table usersTable2 = tenv.fromDataStream(row, $("userString"));Table result2 = usersTable2.joinLateral(call(OverloadedFunction.class, $("userString"))).select($("userString"),$("id"),$("name"),$("age"),$("balance"),$("rowtime")) ; DataStream<Tuple2<Boolean, Row>> result2DS = tenv.toRetractStream(result2, Row.class);
// result2DS.print();
// 15> (true,+I[15,alanchan,19,25,1699341171482, 15, alanchan, 19, 25, 1699341171482])
// 13> (true,+I[13,alan,20,30,1699341169472, 13, alan, 20, 30, 1699341169472])
// 14> (true,+I[14,alanchan,18,22,1699341170479, 14, alanchan, 18, 22, 1699341170479])
// 11> (true,+I[11,alan,18,20,1699341167461, 11, alan, 18, 20, 1699341167461])
// 12> (true,+I[12,alan,19,25,1699341168464, 12, alan, 19, 25, 1699341168464])Table result3 = usersTable2.leftOuterJoinLateral(call(OverloadedFunction.class, $("userString"))).select($("userString"),$("id"),$("name"),$("age"),$("balance"),$("rowtime")) ; DataStream<Tuple2<Boolean, Row>> result3DS = tenv.toRetractStream(result3, Row.class);
// result3DS.print();
// 5> (true,+I[13,alan,20,30,1699341169472, 13, alan, 20, 30, 1699341169472])
// 6> (true,+I[14,alanchan,18,22,1699341170479, 14, alanchan, 18, 22, 1699341170479])
// 3> (true,+I[11,alan,18,20,1699341167461, 11, alan, 18, 20, 1699341167461])
// 4> (true,+I[12,alan,19,25,1699341168464, 12, alan, 19, 25, 1699341168464])
// 7> (true,+I[15,alanchan,19,25,1699341171482, 15, alanchan, 19, 25, 1699341171482])// 在 Table API 里重命名函數(shù)字段Table result4 = usersTable2.leftOuterJoinLateral(call(OverloadedFunction.class, $("userString")).as("t_id","t_name","t_age","t_balance","t_rowtime")).select($("userString"),$("t_id"),$("t_name"),$("t_age"),$("t_balance"),$("t_rowtime")) ; DataStream<Tuple2<Boolean, Row>> result4DS = tenv.toRetractStream(result4, Row.class);
// result4DS.print();
// 10> (true,+I[11,alan,18,20,1699341167461, 11, alan, 18, 20, 1699341167461])
// 13> (true,+I[14,alanchan,18,22,1699341170479, 14, alanchan, 18, 22, 1699341170479])
// 14> (true,+I[15,alanchan,19,25,1699341171482, 15, alanchan, 19, 25, 1699341171482])
// 12> (true,+I[13,alan,20,30,1699341169472, 13, alan, 20, 30, 1699341169472])
// 11> (true,+I[12,alan,19,25,1699341168464, 12, alan, 19, 25, 1699341168464])//2、 注冊(cè)函數(shù)tenv.createTemporarySystemFunction("OverloadedFunction", OverloadedFunction.class);// 在 Table API 里調(diào)用注冊(cè)好的函數(shù)Table result5 = usersTable2.leftOuterJoinLateral(call("OverloadedFunction", $("userString")).as("t_id","t_name","t_age","t_balance","t_rowtime")).select($("userString"),$("t_id"),$("t_name"),$("t_age"),$("t_balance"),$("t_rowtime")) ; DataStream<Tuple2<Boolean, Row>> result5DS = tenv.toRetractStream(result5, Row.class);
// result5DS.print();
// 11> (true,+I[11,alan,18,20,1699341167461, 11, alan, 18, 20, 1699341167461])
// 14> (true,+I[14,alanchan,18,22,1699341170479, 14, alanchan, 18, 22, 1699341170479])
// 15> (true,+I[15,alanchan,19,25,1699341171482, 15, alanchan, 19, 25, 1699341171482])
// 13> (true,+I[13,alan,20,30,1699341169472, 13, alan, 20, 30, 1699341169472])
// 12> (true,+I[12,alan,19,25,1699341168464, 12, alan, 19, 25, 1699341168464])Table result6 = usersTable2.joinLateral(call("OverloadedFunction", $("userString")).as("t_id","t_name","t_age","t_balance","t_rowtime")).select($("userString"),$("t_id"),$("t_name"),$("t_age"),$("t_balance"),$("t_rowtime")) ; DataStream<Tuple2<Boolean, Row>> result6DS = tenv.toRetractStream(result6, Row.class);
// result6DS.print();
// 8> (true,+I[14,alanchan,18,22,1699341170479, 14, alanchan, 18, 22, 1699341170479])
// 9> (true,+I[15,alanchan,19,25,1699341171482, 15, alanchan, 19, 25, 1699341171482])
// 5> (true,+I[11,alan,18,20,1699341167461, 11, alan, 18, 20, 1699341167461])
// 7> (true,+I[13,alan,20,30,1699341169472, 13, alan, 20, 30, 1699341169472])
// 6> (true,+I[12,alan,19,25,1699341168464, 12, alan, 19, 25, 1699341168464])//3、 在 SQL 里調(diào)用注冊(cè)好的函數(shù)tenv.createTemporaryView("user_view", usersTable2);Table result7 = tenv.sqlQuery("SELECT userString, id,name,age,balance,rowtime " +"FROM user_view, LATERAL TABLE(OverloadedFunction(userString))");DataStream<Tuple2<Boolean, Row>> result7DS = tenv.toRetractStream(result7, Row.class);
// result7DS.print();
// 15> (true,+I[13,alan,20,30,1699341169472, 13, alan, 20, 30, 1699341169472])
// 13> (true,+I[11,alan,18,20,1699341167461, 11, alan, 18, 20, 1699341167461])
// 1> (true,+I[15,alanchan,19,25,1699341171482, 15, alanchan, 19, 25, 1699341171482])
// 14> (true,+I[12,alan,19,25,1699341168464, 12, alan, 19, 25, 1699341168464])
// 16> (true,+I[14,alanchan,18,22,1699341170479, 14, alanchan, 18, 22, 1699341170479])Table result8 = tenv.sqlQuery("SELECT userString, id,name,age,balance,rowtime " +"FROM user_view "+" LEFT JOIN LATERAL TABLE( OverloadedFunction(userString)) ON TRUE " );DataStream<Tuple2<Boolean, Row>> result8DS = tenv.toRetractStream(result8, Row.class);
// result8DS.print();
// 13> (true,+I[11,alan,18,20,1699341167461, 11, alan, 18, 20, 1699341167461])
// 1> (true,+I[15,alanchan,19,25,1699341171482, 15, alanchan, 19, 25, 1699341171482])
// 15> (true,+I[13,alan,20,30,1699341169472, 13, alan, 20, 30, 1699341169472])
// 14> (true,+I[12,alan,19,25,1699341168464, 12, alan, 19, 25, 1699341168464])
// 16> (true,+I[14,alanchan,18,22,1699341170479, 14, alanchan, 18, 22, 1699341170479])//4、 在 SQL 里重命名函數(shù)字段Table result9 = tenv.sqlQuery("SELECT userString, t_id, t_name,t_age,t_balance,t_rowtime " +"FROM user_view "+"LEFT JOIN LATERAL TABLE(OverloadedFunction(userString)) AS T(t_id, t_name,t_age,t_balance,t_rowtime) ON TRUE");DataStream<Tuple2<Boolean, Row>> result9DS = tenv.toRetractStream(result9, Row.class);result9DS.print();
// 7> (true,+I[12,alan,19,25,1699341168464, 12, alan, 19, 25, 1699341168464])
// 10> (true,+I[15,alanchan,19,25,1699341171482, 15, alanchan, 19, 25, 1699341171482])
// 9> (true,+I[14,alanchan,18,22,1699341170479, 14, alanchan, 18, 22, 1699341170479])
// 8> (true,+I[13,alan,20,30,1699341169472, 13, alan, 20, 30, 1699341169472])
// 6> (true,+I[11,alan,18,20,1699341167461, 11, alan, 18, 20, 1699341167461])env.execute();}}
以上,介紹了flink的自定義函數(shù)概述、開發(fā)指南以及標(biāo)量函數(shù)、表值函數(shù)的自定義函數(shù)實(shí)現(xiàn)及說明,提供的示例均可運(yùn)行并提供運(yùn)行結(jié)果供參考。