網(wǎng)站關(guān)鍵字優(yōu)化軟件免費(fèi)關(guān)鍵詞排名優(yōu)化軟件
Flink 系列文章
1、Flink 部署、概念介紹、source、transformation、sink使用示例、四大基石介紹和示例等系列綜合文章鏈接
13、Flink 的table api與sql的基本概念、通用api介紹及入門示例
14、Flink 的table api與sql之?dāng)?shù)據(jù)類型: 內(nèi)置數(shù)據(jù)類型以及它們的屬性
15、Flink 的table api與sql之流式概念-詳解的介紹了動(dòng)態(tài)表、時(shí)間屬性配置(如何處理更新結(jié)果)、時(shí)態(tài)表、流上的join、流上的確定性以及查詢配置
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及FileSystem示例(1)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Elasticsearch示例(2)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache Kafka示例(3)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及JDBC示例(4)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache Hive示例(6)
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
18、Flink的SQL 支持的操作和語(yǔ)法
19、Flink 的Table API 和 SQL 中的內(nèi)置函數(shù)及示例(1)
19、Flink 的Table API 和 SQL 中的自定義函數(shù)及示例(2)
19、Flink 的Table API 和 SQL 中的自定義函數(shù)及示例(3)
19、Flink 的Table API 和 SQL 中的自定義函數(shù)及示例(4)
20、Flink SQL之SQL Client: 不用編寫代碼就可以嘗試 Flink SQL,可以直接提交 SQL 任務(wù)到集群上
21、Flink 的table API與DataStream API 集成(1)- 介紹及入門示例、集成說(shuō)明
21、Flink 的table API與DataStream API 集成(2)- 批處理模式和inser-only流處理
21、Flink 的table API與DataStream API 集成(3)- changelog流處理、管道示例、類型轉(zhuǎn)換和老版本轉(zhuǎn)換示例
21、Flink 的table API與DataStream API 集成(完整版)
22、Flink 的table api與sql之創(chuàng)建表的DDL
24、Flink 的table api與sql之Catalogs(介紹、類型、java api和sql實(shí)現(xiàn)ddl、java api和sql操作catalog)-1
24、Flink 的table api與sql之Catalogs(java api操作數(shù)據(jù)庫(kù)、表)-2
24、Flink 的table api與sql之Catalogs(java api操作視圖)-3
24、Flink 的table api與sql之Catalogs(java api操作分區(qū)與函數(shù))-4
25、Flink 的table api與sql之函數(shù)(自定義函數(shù)示例)
26、Flink 的SQL之概覽與入門示例
27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介紹及詳細(xì)示例(1)
27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介紹及詳細(xì)示例(2)
27、Flink 的SQL之SELECT (窗口函數(shù))介紹及詳細(xì)示例(3)
27、Flink 的SQL之SELECT (窗口聚合)介紹及詳細(xì)示例(4)
27、Flink 的SQL之SELECT (Group Aggregation分組聚合、Over Aggregation Over聚合 和 Window Join 窗口關(guān)聯(lián))介紹及詳細(xì)示例(5)
27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介紹及詳細(xì)示例(6)
27、Flink 的SQL之SELECT (Pattern Recognition 模式檢測(cè))介紹及詳細(xì)示例(7)
28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 語(yǔ)句
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1)
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(2)
30、Flink SQL之SQL 客戶端(通過(guò)kafka和filesystem的例子介紹了配置文件使用-表、視圖等)
32、Flink table api和SQL 之用戶自定義 Sources & Sinks實(shí)現(xiàn)及詳細(xì)示例
33、Flink 的Table API 和 SQL 中的時(shí)區(qū)
41、Flink之Hive 方言介紹及詳細(xì)示例
42、Flink 的table api與sql之Hive Catalog
43、Flink之Hive 讀寫及詳細(xì)驗(yàn)證示例
44、Flink之module模塊介紹及使用示例和Flink SQL使用hive內(nèi)置函數(shù)及自定義函數(shù)詳細(xì)示例–網(wǎng)上有些說(shuō)法好像是錯(cuò)誤的
文章目錄
- Flink 系列文章
- 一、Table API 與 DataStream API集成
- 1、概述
- 2、 DataStream 和 Table 相互轉(zhuǎn)換示例
- 1)、示例1 - toDataStream
- 2)、示例2 - toChangelogStream
- 3)、示例3 - 通過(guò)僅切換標(biāo)志來(lái)處理批處理和流數(shù)據(jù)
- 3、集成說(shuō)明
- 1)、maven依賴
- 2)、import
- 3)、Configuration
- 4)、執(zhí)行行為
- 1、DataStream API
- 2、Table API
本文是Flink table api 與 datastream api的集成的第一篇,主要介紹了集成的概述、table api 與 datastream api相互轉(zhuǎn)換的三個(gè)示例以及其集成的說(shuō)明(即maven依賴、import、配置以及執(zhí)行行為),并以具體的示例進(jìn)行說(shuō)明。
本文依賴flink、kafka集群能正常使用。
本文分為3個(gè)部分,即集成概述、三個(gè)入門示例、集成說(shuō)明。
本文的示例是在Flink 1.17版本中運(yùn)行。
一、Table API 與 DataStream API集成
1、概述
在定義數(shù)據(jù)處理管道時(shí),Table API和DataStream API同樣重要。
DataStream API在一個(gè)相對(duì)低級(jí)的命令式編程API中提供流處理的原語(yǔ)(即時(shí)間、狀態(tài)和數(shù)據(jù)流管理)。Table API抽象了許多內(nèi)部構(gòu)件,并提供了結(jié)構(gòu)化和聲明性API。
這兩個(gè)API都可以處理有界和無(wú)界流。
在處理歷史數(shù)據(jù)時(shí),需要管理有界流。無(wú)界流發(fā)生在實(shí)時(shí)處理場(chǎng)景中,這些場(chǎng)景可能先使用歷史數(shù)據(jù)進(jìn)行初始化。
為了有效執(zhí)行,這兩個(gè)API都以優(yōu)化的批處理執(zhí)行模式提供處理有界流。然而,由于批處理只是流的一種特殊情況,因此也可以在常規(guī)流執(zhí)行模式下運(yùn)行有界流的管道。
一個(gè)API中的管道可以端到端定義,而不依賴于另一個(gè)API。然而,出于各種原因,混合這兩種API可能是有用的:
- 在DataStream API中實(shí)現(xiàn)主管道(main pipeline)之前,使用表生態(tài)系統(tǒng)(table ecosystem)輕松訪問(wèn)目錄(catalogs )或連接到外部系統(tǒng)。
- 在DataStream API中實(shí)現(xiàn)主管道之前,訪問(wèn)一些SQL函數(shù)以進(jìn)行無(wú)狀態(tài)數(shù)據(jù)規(guī)范化和清理。
- 如果table API中不存在更低級(jí)的操作(例如自定義計(jì)時(shí)器處理),則不時(shí)切換到DataStream API。
Flink提供了特殊的橋接功能,以使與DataStream API的集成盡可能順利。
在DataStream 和Table API之間切換會(huì)增加一些轉(zhuǎn)換開(kāi)銷。例如,部分處理二進(jìn)制數(shù)據(jù)的表運(yùn)行時(shí)(即RowData)的內(nèi)部數(shù)據(jù)結(jié)構(gòu)需要轉(zhuǎn)換為更用戶友好的數(shù)據(jù)結(jié)構(gòu)(即Row)。通常,這個(gè)開(kāi)銷可以忽略。
- maven依賴
本篇文章,如果沒(méi)有特殊說(shuō)明,將使用如下maven依賴
<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-gateway</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-uber --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-uber</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.0-1.17</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-hive_2.12</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>3.1.2</version></dependency><!-- flink連接器 --><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-connector-kafka</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-compress</artifactId><version>1.24.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.2</version><!-- <scope>provided</scope> --></dependency></dependencies>
2、 DataStream 和 Table 相互轉(zhuǎn)換示例
Flink提供了專門的StreamTableEnvironment,用于與DataStream API集成。這些環(huán)境使用其他方法擴(kuò)展常規(guī)TableEnvironment,并將DataStream API中使用的StreamExecutionEnvironments作為參數(shù)。
1)、示例1 - toDataStream
下面的代碼展示了如何在兩個(gè)API之間來(lái)回切換的示例。表的列名和類型自動(dòng)從DataStream的TypeInformation派生。由于DataStream API本機(jī)不支持變更日志處理,因此代碼假設(shè)在流到表和表到流轉(zhuǎn)換期間僅附加/僅插入語(yǔ)義。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;/*** @author alanchan**/
public class ConvertingDataStreamAndTableDemo {/*** @param args* @throws Exception*/public static void main(String[] args) throws Exception {// 1、創(chuàng)建運(yùn)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);// 2、創(chuàng)建輸入流DataStream<String> dataStream = env.fromElements("alan", "alanchan", "alanchanchn");// 3、將datastream 轉(zhuǎn)為 tableTable inputTable = tenv.fromDataStream(dataStream);// 4、創(chuàng)建視圖,該步驟不是必須,將姓名轉(zhuǎn)為大寫tenv.createTemporaryView("InputTable", inputTable);Table resultTable = tenv.sqlQuery("SELECT UPPER(f0) FROM InputTable");// 5、將table轉(zhuǎn)成datastream進(jìn)行輸出DataStream<Row> resultStream = tenv.toDataStream(resultTable);resultStream.print();env.execute();}}
- 示例輸出
12> +I[ALAN]
14> +I[ALANCHANCHN]
13> +I[ALANCHAN]
fromDataStream和toDataStream的完整語(yǔ)義可以在下面的部分中找到。特別是,本節(jié)討論了如何使用更復(fù)雜的嵌套類型來(lái)影響模式派生。它還包括使用事件時(shí)間和水印。
根據(jù)查詢的類型,在許多情況下,生成的動(dòng)態(tài)表是一個(gè)管道,它不僅在將表轉(zhuǎn)換為數(shù)據(jù)流時(shí)產(chǎn)生僅插入的更改,而且還產(chǎn)生收回和其他類型的更新。在表到流轉(zhuǎn)換期間,這可能會(huì)導(dǎo)致類似于以下內(nèi)容的異常
Table sink 'Unregistered_DataStream_Sink_1' doesn't support consuming update changes [...].
在這種情況下,需要再次修改查詢或切換到ChangelogStream。
2)、示例2 - toChangelogStream
下面的示例顯示如何轉(zhuǎn)換更新表。
每個(gè)結(jié)果行表示更改日志中的一個(gè)條目,該條目具有更改標(biāo)志,可以通過(guò)對(duì)其調(diào)用row.getKind()來(lái)查詢。在本例中,alan的第二個(gè)分?jǐn)?shù)在更改之前(-U)創(chuàng)建更新,在更改之后(+U)創(chuàng)建更新。
本示例僅僅以一個(gè)方法來(lái)展示,避免沒(méi)有必要的代碼,運(yùn)行框架參考上述示例。
public static void test2() throws Exception {// 1、創(chuàng)建運(yùn)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);// 2、創(chuàng)建輸入流DataStream<Row> dataStream = env.fromElements(Row.of("alan", 18), Row.of("alanchan", 19), Row.of("alanchanchn", 20), Row.of("alan", 20));// 3、將datastream 轉(zhuǎn)為 tableTable inputTable = tenv.fromDataStream(dataStream).as("name", "salary");// 4、創(chuàng)建視圖,該步驟不是必須tenv.createTemporaryView("InputTable", inputTable);Table resultTable = tenv.sqlQuery("SELECT name, SUM(salary) FROM InputTable GROUP BY name");// 5、將table轉(zhuǎn)成datastream進(jìn)行輸出DataStream<Row> resultStream = tenv.toChangelogStream(resultTable);resultStream.print();env.execute();}
- 運(yùn)行結(jié)果
2> +I[alan, 18]
16> +I[alanchan, 19]
16> +I[alanchanchn, 20]
2> -U[alan, 18]
2> +U[alan, 38]
fromChangelogStream和toChangelogStream的完整語(yǔ)義可以在下面的部分中找到。特別是,本節(jié)討論了如何使用更復(fù)雜的嵌套類型來(lái)影響模式派生。它包括使用事件時(shí)間和水印。它討論了如何為輸入和輸出流聲明主鍵和變更日志模式。
上面的示例顯示了如何通過(guò)為每個(gè)傳入記錄連續(xù)發(fā)出逐行更新來(lái)增量計(jì)算最終結(jié)果。然而,在輸入流有限(即有界)的情況下,通過(guò)利用批處理原理可以更有效地計(jì)算結(jié)果。
在批處理中,可以在連續(xù)的階段中執(zhí)行運(yùn)算符,這些階段在發(fā)出結(jié)果之前使用整個(gè)輸入表。例如,連接操作符可以在執(zhí)行實(shí)際連接之前對(duì)兩個(gè)有界輸入進(jìn)行排序(即排序合并連接算法),或者在使用另一個(gè)輸入之前從一個(gè)輸入構(gòu)建哈希表(即哈希連接算法的構(gòu)建/探測(cè)階段)。
DataStream API和Table API都提供專門的批處理運(yùn)行時(shí)模式。
3)、示例3 - 通過(guò)僅切換標(biāo)志來(lái)處理批處理和流數(shù)據(jù)
下面的示例說(shuō)明了統(tǒng)一管道能夠通過(guò)僅切換標(biāo)志來(lái)處理批處理和流數(shù)據(jù)。
public static void test3() throws Exception {// 1、創(chuàng)建運(yùn)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.BATCH);StreamTableEnvironment tenv = StreamTableEnvironment.create(env);// 2、創(chuàng)建輸入流DataStream<Row> dataStream = env.fromElements(Row.of("alan", 18), Row.of("alanchan", 19), Row.of("alanchanchn", 20), Row.of("alan", 20));// 3、將datastream 轉(zhuǎn)為 tableTable inputTable = tenv.fromDataStream(dataStream).as("name", "salary");// 4、創(chuàng)建視圖,該步驟不是必須tenv.createTemporaryView("InputTable", inputTable);Table resultTable = tenv.sqlQuery("SELECT name, SUM(salary) FROM InputTable GROUP BY name");// 5、將table轉(zhuǎn)成datastream進(jìn)行輸出DataStream<Row> resultStream = tenv.toChangelogStream(resultTable);resultStream.print();env.execute();}
- 運(yùn)行結(jié)果
注意比較和示例2的輸出區(qū)別
+I[alanchan, 19]
+I[alan, 38]
+I[alanchanchn, 20]
一旦將changelog 應(yīng)用于外部系統(tǒng)(例如鍵值存儲(chǔ)),可以看到兩種模式都能夠產(chǎn)生完全相同的輸出表。通過(guò)在發(fā)出結(jié)果之前使用所有輸入數(shù)據(jù),批處理模式的更改日志僅由僅插入的更改組成。有關(guān)更多細(xì)節(jié),請(qǐng)參閱下面的專用批處理模式部分。
3、集成說(shuō)明
將Table API與DataStream API相結(jié)合的項(xiàng)目需要添加以下橋接模塊之一。
它們包括對(duì) flink-table-api-java或flink-table-api-scala的可傳遞依賴性,以及相應(yīng)的特定于語(yǔ)言的DataStream api模塊。
1)、maven依賴
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>1.17.1</version><scope>provided</scope>
</dependency>
2)、import
使用DataStream API和Table API的Java或Scala版本聲明公共管道需要以下導(dǎo)入。
// imports for Java DataStream API
import org.apache.flink.streaming.api.*;
import org.apache.flink.streaming.api.environment.*;// imports for Table API with bridging to Java DataStream API
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.*;
3)、Configuration
TableEnvironment將采用傳遞的StreamExecutionEnvironment.中的所有配置選項(xiàng)。然而,不能保證對(duì)StreamExecutionEnvironment配置的進(jìn)一步更改在實(shí)例化后傳播到StreamTableEnvironment。在規(guī)劃期間,將選項(xiàng)從Table API傳播到DataStream API。
我們建議在切換到Table API之前盡早在DataStream API中設(shè)置所有配置選項(xiàng)。
import java.time.ZoneId;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;// create Java DataStream API
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// set various configuration earlyenv.setMaxParallelism(256);
env.getConfig().addDefaultKryoSerializer(MyCustomType.class, CustomKryoSerializer.class);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// then switch to Java Table API
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// set configuration early
tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Europe/Berlin"));// start defining your pipelines in both APIs...
4)、執(zhí)行行為
這兩個(gè)API都提供了執(zhí)行管道的方法。換句話說(shuō):如果被請(qǐng)求,它們將編譯一個(gè)作業(yè)圖( job graph),該作業(yè)圖將提交到集群并觸發(fā)以執(zhí)行。結(jié)果將流式傳輸?shù)铰暶鞯膕inks。
通常,這兩個(gè)API都在方法名稱中使用術(shù)語(yǔ)“執(zhí)行”來(lái)標(biāo)記這種行為。然而,Table API和DataStream API之間的執(zhí)行行為略有不同。
1、DataStream API
DataStream API的StreamExecutionEnvironment使用生成器模式(builder pattern)來(lái)構(gòu)造復(fù)雜的管道。管道可能會(huì)拆分為多個(gè)分支,這些分支可能以sink結(jié)尾,也可能不以sink結(jié)尾。環(huán)境緩沖(environment buffers)所有這些定義的分支,直到提交作業(yè)。
StreamExecutionEnvironment.execute()提交整個(gè)構(gòu)建的管道,然后清除構(gòu)建器。換句話說(shuō):不再聲明sources 和sinks ,并且可以向生成器中添加新的管道。因此,每個(gè)DataStream程序通常以對(duì)StreamExecutionEnvironment.execute()的調(diào)用結(jié)束?;蛘?#xff0c;DataStream.executeAndCollect()隱式定義了一個(gè)sink,用于將結(jié)果流式傳輸?shù)奖镜乜蛻舳恕?/p>
2、Table API
在Table API中,分支管道僅在StatementSet中受支持,其中每個(gè)分支必須聲明一個(gè)最終sink。TableEnvironment和StreamTableEnvironment都不提供專用的通用execute()方法。相反,它們提供了提交單個(gè)source-to-sink管道或語(yǔ)句集的方法:
final static String sinkSQL = "CREATE TABLE OutputTable (\n" +" userId INT,\r\n" + " age INT,\r\n" + " balance DOUBLE,\r\n" + " userName STRING,\r\n" +" t_insert_time TIMESTAMP(3)\r\n" +") WITH (\n" +" 'connector' = 'print'\n" +")";final static String sinkSQL2 = "CREATE TABLE OutputTable2 (\n" +" userId INT,\r\n" + " age INT,\r\n" + " balance DOUBLE,\r\n" + " userName STRING,\r\n" +" t_insert_time TIMESTAMP(3)\r\n" +") WITH (\n" +" 'connector' = 'print'\n" +")";final static String sourceSQL = "CREATE TABLE InputTable (\r\n" + " userId INT,\r\n" + " age INT,\r\n" + " balance DOUBLE,\r\n" + " userName STRING,\r\n" + " t_insert_time AS localtimestamp,\r\n" + " WATERMARK FOR t_insert_time AS t_insert_time\r\n" + ") WITH (\r\n" + " 'connector' = 'datagen',\r\n" + " 'rows-per-second'='10',\r\n" + " 'fields.userId.kind'='sequence',\r\n" + " 'fields.userId.start'='1',\r\n" + " 'fields.userId.end'='20',\r\n" + " 'fields.balance.kind'='random',\r\n" + " 'fields.balance.min'='1',\r\n" + " 'fields.balance.max'='100',\r\n" + " 'fields.age.min'='1',\r\n" + " 'fields.age.max'='100',\r\n" + " 'fields.userName.length'='6'\r\n" + ");";public static void test4() throws Exception {// 1、創(chuàng)建運(yùn)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);//sinkSQL//sourceSQL// 建表tenv.executeSql(sourceSQL);//tenv.executeSql(sinkSQL);tenv.executeSql(sinkSQL2);//插入表數(shù)據(jù),方式一tenv.from("InputTable").insertInto("OutputTable").execute();tenv.executeSql("select * from OutputTable");tenv.from("InputTable").execute().print();//插入表數(shù)據(jù),方式二tenv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable");tenv.executeSql("select * from OutputTable");//插入表數(shù)據(jù),方式三tenv.createStatementSet().addInsertSql("INSERT INTO OutputTable SELECT * FROM InputTable").addInsertSql("INSERT INTO OutputTable2 SELECT * FROM InputTable").execute();// 輸出tenv.from("InputTable").execute().print();tenv.executeSql("SELECT * FROM InputTable").print();env.execute();}
- 輸出結(jié)果
3> +I[3, 99, 36.20987556045243, d23888, 2023-11-13T14:49:58.812]
15> +I[15, 39, 68.30743253178122, 43bec8, 2023-11-13T14:49:58.812]
2> +I[2, 62, 47.280395949976885, 7bae4e, 2023-11-13T14:49:58.812]
16> +I[16, 52, 42.10205629532836, 6baf0e, 2023-11-13T14:49:58.812]
10> +I[10, 25, 58.008035887440094, d43dea, 2023-11-13T14:49:58.812]
13> +I[13, 36, 70.9215559827798, 01bb28, 2023-11-13T14:49:58.812]
12> +I[12, 38, 30.31004698340413, 322ba8, 2023-11-13T14:49:58.812]
6> +I[6, 17, 32.28909358733212, 13bf88, 2023-11-13T14:49:58.812]
9> +I[9, 49, 44.52802246768357, e8280c, 2023-11-13T14:49:58.812]
8> +I[8, 80, 18.03487847824154, 803b2a, 2023-11-13T14:49:58.812]
5> +I[5, 61, 54.43695775227862, 063f08, 2023-11-13T14:49:58.812]
7> +I[7, 64, 33.886576642098404, 443dea, 2023-11-13T14:49:58.812]
14> +I[14, 92, 63.71527772015468, 123848, 2023-11-13T14:49:58.812]
11> +I[11, 22, 30.745102844313315, e62848, 2023-11-13T14:49:58.812]
4> +I[4, 78, 88.60724929598506, 55bca8, 2023-11-13T14:49:58.812]
1> +I[1, 82, 62.50149215989057, 0bba0c, 2023-11-13T14:49:58.812]
3> +I[19, 67, 14.244993215937432, e6c911, 2023-11-13T14:49:59.806]
1> +I[17, 67, 91.05078612782468, 560b6c, 2023-11-13T14:49:59.807]
4> +I[20, 95, 82.12047947156385, 1ac5b2, 2023-11-13T14:49:59.807]
2> +I[18, 81, 25.384055001988084, fe98d1, 2023-11-13T14:49:59.806]
+----+-------------+-------------+--------------------------------+--------------------------------+-------------------------+
| op | userId | age | balance | userName | t_insert_time |
+----+-------------+-------------+--------------------------------+--------------------------------+-------------------------+
| +I | 1 | 91 | 22.629318048042723 | 923e08 | 2023-11-13 14:49:59.800 |
| +I | 2 | 67 | 75.26915785038814 | 342baa | 2023-11-13 14:49:59.803 |
| +I | 3 | 68 | 74.06076023217011 | 1dbbce | 2023-11-13 14:49:59.803 |
| +I | 4 | 26 | 79.47471729272772 | 083e2e | 2023-11-13 14:49:59.802 |
| +I | 5 | 97 | 82.56249330491859 | 4a3c6e | 2023-11-13 14:49:59.804 |
| +I | 6 | 32 | 81.74903214944425 | fdac4e | 2023-11-13 14:49:59.800 |
| +I | 7 | 67 | 94.80154136831771 | f7acea | 2023-11-13 14:49:59.800 |
| +I | 8 | 53 | 50.85073238739004 | cfbd0c | 2023-11-13 14:49:59.800 |
| +I | 9 | 69 | 93.64054547476522 | 7fa9ec | 2023-11-13 14:49:59.801 |
| +I | 10 | 66 | 61.92366658766452 | 05b86a | 2023-11-13 14:49:59.803 |
| +I | 11 | 81 | 95.61717698776191 | efa8ce | 2023-11-13 14:49:59.797 |
| +I | 12 | 8 | 63.573174957723076 | 0fbfec | 2023-11-13 14:49:59.802 |
| +I | 13 | 85 | 52.938510850778734 | 43bfa8 | 2023-11-13 14:49:59.803 |
| +I | 14 | 26 | 5.130287258770441 | 083c6c | 2023-11-13 14:49:59.797 |
| +I | 15 | 35 | 73.3318749510538 | 0e3b4c | 2023-11-13 14:49:59.802 |
| +I | 16 | 84 | 16.24326410122912 | ac2d6e | 2023-11-13 14:49:59.802 |
| +I | 18 | 41 | 32.38455189801736 | b07afb | 2023-11-13 14:50:00.804 |
| +I | 19 | 24 | 77.6947569111452 | 7f72ac | 2023-11-13 14:50:00.803 |
| +I | 20 | 92 | 82.53929937026987 | 051fb9 | 2023-11-13 14:50:00.802 |
| +I | 17 | 93 | 12.784194121509948 | bce5d9 | 2023-11-13 14:50:00.801 |
+----+-------------+-------------+--------------------------------+--------------------------------+-------------------------+
20 rows in set
為了組合這兩種執(zhí)行行為,對(duì)StreamTableEnvironment.toDataStream或StreamTableEnviron.toChangelogStream的每次調(diào)用都將具體化(materialize )(即編譯)Table API子管道(sub-pipeline),并將其插入DataStream API管道生成器(builder)中。這意味著之后必須調(diào)用StreamExecutionEnvironment.execute()或DataStream.executeAndCollect。Table API中的執(zhí)行不會(huì)觸發(fā)這些“外部部件(external parts)”。
// adds a branch with a printing sink to the StreamExecutionEnvironment
tableEnv.toDataStream(table).print();// (2)// executes a Table API end-to-end pipeline as a Flink job and prints locally,
// thus (1) has still not been executed
table.execute().print();// executes the DataStream API pipeline with the sink defined in (1) as a
// Flink job, (2) was already running before
env.execute();
上述示例中有具體應(yīng)用。
以上,本文是Flink table api 與 datastream api的集成的第一篇,主要介紹了集成的概述、table api 與 datastream api相互轉(zhuǎn)換的三個(gè)示例以及其集成的說(shuō)明(即maven依賴、import、配置以及執(zhí)行行為),并以具體的示例進(jìn)行說(shuō)明。