中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁(yè) > news >正文

網(wǎng)站關(guān)鍵字優(yōu)化軟件免費(fèi)關(guān)鍵詞排名優(yōu)化軟件

網(wǎng)站關(guān)鍵字優(yōu)化軟件,免費(fèi)關(guān)鍵詞排名優(yōu)化軟件,記事本做網(wǎng)站如何排版,產(chǎn)品市場(chǎng)推廣計(jì)劃書Flink 系列文章 1、Flink 部署、概念介紹、source、transformation、sink使用示例、四大基石介紹和示例等系列綜合文章鏈接 13、Flink 的table api與sql的基本概念、通用api介紹及入門示例 14、Flink 的table api與sql之?dāng)?shù)據(jù)類型: 內(nèi)置數(shù)據(jù)類型以及它們的屬性 15、Flink 的ta…

Flink 系列文章

1、Flink 部署、概念介紹、source、transformation、sink使用示例、四大基石介紹和示例等系列綜合文章鏈接

13、Flink 的table api與sql的基本概念、通用api介紹及入門示例
14、Flink 的table api與sql之?dāng)?shù)據(jù)類型: 內(nèi)置數(shù)據(jù)類型以及它們的屬性
15、Flink 的table api與sql之流式概念-詳解的介紹了動(dòng)態(tài)表、時(shí)間屬性配置(如何處理更新結(jié)果)、時(shí)態(tài)表、流上的join、流上的確定性以及查詢配置
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及FileSystem示例(1)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Elasticsearch示例(2)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache Kafka示例(3)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及JDBC示例(4)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache Hive示例(6)
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
18、Flink的SQL 支持的操作和語(yǔ)法
19、Flink 的Table API 和 SQL 中的內(nèi)置函數(shù)及示例(1)
19、Flink 的Table API 和 SQL 中的自定義函數(shù)及示例(2)
19、Flink 的Table API 和 SQL 中的自定義函數(shù)及示例(3)
19、Flink 的Table API 和 SQL 中的自定義函數(shù)及示例(4)
20、Flink SQL之SQL Client: 不用編寫代碼就可以嘗試 Flink SQL,可以直接提交 SQL 任務(wù)到集群上
21、Flink 的table API與DataStream API 集成(1)- 介紹及入門示例、集成說(shuō)明
21、Flink 的table API與DataStream API 集成(2)- 批處理模式和inser-only流處理
21、Flink 的table API與DataStream API 集成(3)- changelog流處理、管道示例、類型轉(zhuǎn)換和老版本轉(zhuǎn)換示例
21、Flink 的table API與DataStream API 集成(完整版)
22、Flink 的table api與sql之創(chuàng)建表的DDL
24、Flink 的table api與sql之Catalogs(介紹、類型、java api和sql實(shí)現(xiàn)ddl、java api和sql操作catalog)-1
24、Flink 的table api與sql之Catalogs(java api操作數(shù)據(jù)庫(kù)、表)-2
24、Flink 的table api與sql之Catalogs(java api操作視圖)-3
24、Flink 的table api與sql之Catalogs(java api操作分區(qū)與函數(shù))-4
25、Flink 的table api與sql之函數(shù)(自定義函數(shù)示例)
26、Flink 的SQL之概覽與入門示例
27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介紹及詳細(xì)示例(1)
27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介紹及詳細(xì)示例(2)
27、Flink 的SQL之SELECT (窗口函數(shù))介紹及詳細(xì)示例(3)
27、Flink 的SQL之SELECT (窗口聚合)介紹及詳細(xì)示例(4)
27、Flink 的SQL之SELECT (Group Aggregation分組聚合、Over Aggregation Over聚合 和 Window Join 窗口關(guān)聯(lián))介紹及詳細(xì)示例(5)
27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介紹及詳細(xì)示例(6)
27、Flink 的SQL之SELECT (Pattern Recognition 模式檢測(cè))介紹及詳細(xì)示例(7)
28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 語(yǔ)句
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1)
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(2)
30、Flink SQL之SQL 客戶端(通過(guò)kafka和filesystem的例子介紹了配置文件使用-表、視圖等)
32、Flink table api和SQL 之用戶自定義 Sources & Sinks實(shí)現(xiàn)及詳細(xì)示例
33、Flink 的Table API 和 SQL 中的時(shí)區(qū)
41、Flink之Hive 方言介紹及詳細(xì)示例
42、Flink 的table api與sql之Hive Catalog
43、Flink之Hive 讀寫及詳細(xì)驗(yàn)證示例
44、Flink之module模塊介紹及使用示例和Flink SQL使用hive內(nèi)置函數(shù)及自定義函數(shù)詳細(xì)示例–網(wǎng)上有些說(shuō)法好像是錯(cuò)誤的


文章目錄

  • Flink 系列文章
  • 一、Table API 與 DataStream API集成
    • 1、概述
    • 2、 DataStream 和 Table 相互轉(zhuǎn)換示例
      • 1)、示例1 - toDataStream
      • 2)、示例2 - toChangelogStream
      • 3)、示例3 - 通過(guò)僅切換標(biāo)志來(lái)處理批處理和流數(shù)據(jù)
    • 3、集成說(shuō)明
      • 1)、maven依賴
      • 2)、import
      • 3)、Configuration
      • 4)、執(zhí)行行為
        • 1、DataStream API
        • 2、Table API


本文是Flink table api 與 datastream api的集成的第一篇,主要介紹了集成的概述、table api 與 datastream api相互轉(zhuǎn)換的三個(gè)示例以及其集成的說(shuō)明(即maven依賴、import、配置以及執(zhí)行行為),并以具體的示例進(jìn)行說(shuō)明。
本文依賴flink、kafka集群能正常使用。
本文分為3個(gè)部分,即集成概述、三個(gè)入門示例、集成說(shuō)明。
本文的示例是在Flink 1.17版本中運(yùn)行。

一、Table API 與 DataStream API集成

1、概述

在定義數(shù)據(jù)處理管道時(shí),Table API和DataStream API同樣重要。

DataStream API在一個(gè)相對(duì)低級(jí)的命令式編程API中提供流處理的原語(yǔ)(即時(shí)間、狀態(tài)和數(shù)據(jù)流管理)。Table API抽象了許多內(nèi)部構(gòu)件,并提供了結(jié)構(gòu)化和聲明性API。

這兩個(gè)API都可以處理有界和無(wú)界流。

在處理歷史數(shù)據(jù)時(shí),需要管理有界流。無(wú)界流發(fā)生在實(shí)時(shí)處理場(chǎng)景中,這些場(chǎng)景可能先使用歷史數(shù)據(jù)進(jìn)行初始化。

為了有效執(zhí)行,這兩個(gè)API都以優(yōu)化的批處理執(zhí)行模式提供處理有界流。然而,由于批處理只是流的一種特殊情況,因此也可以在常規(guī)流執(zhí)行模式下運(yùn)行有界流的管道。

一個(gè)API中的管道可以端到端定義,而不依賴于另一個(gè)API。然而,出于各種原因,混合這兩種API可能是有用的:

  • 在DataStream API中實(shí)現(xiàn)主管道(main pipeline)之前,使用表生態(tài)系統(tǒng)(table ecosystem)輕松訪問(wèn)目錄(catalogs )或連接到外部系統(tǒng)。
  • 在DataStream API中實(shí)現(xiàn)主管道之前,訪問(wèn)一些SQL函數(shù)以進(jìn)行無(wú)狀態(tài)數(shù)據(jù)規(guī)范化和清理。
  • 如果table API中不存在更低級(jí)的操作(例如自定義計(jì)時(shí)器處理),則不時(shí)切換到DataStream API。

Flink提供了特殊的橋接功能,以使與DataStream API的集成盡可能順利。

在DataStream 和Table API之間切換會(huì)增加一些轉(zhuǎn)換開(kāi)銷。例如,部分處理二進(jìn)制數(shù)據(jù)的表運(yùn)行時(shí)(即RowData)的內(nèi)部數(shù)據(jù)結(jié)構(gòu)需要轉(zhuǎn)換為更用戶友好的數(shù)據(jù)結(jié)構(gòu)(即Row)。通常,這個(gè)開(kāi)銷可以忽略。

  • maven依賴
    本篇文章,如果沒(méi)有特殊說(shuō)明,將使用如下maven依賴
<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-gateway</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-uber --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-uber</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.0-1.17</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-hive_2.12</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>3.1.2</version></dependency><!-- flink連接器 --><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-connector-kafka</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-compress</artifactId><version>1.24.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.2</version><!-- <scope>provided</scope> --></dependency></dependencies>

2、 DataStream 和 Table 相互轉(zhuǎn)換示例

Flink提供了專門的StreamTableEnvironment,用于與DataStream API集成。這些環(huán)境使用其他方法擴(kuò)展常規(guī)TableEnvironment,并將DataStream API中使用的StreamExecutionEnvironments作為參數(shù)。

1)、示例1 - toDataStream

下面的代碼展示了如何在兩個(gè)API之間來(lái)回切換的示例。表的列名和類型自動(dòng)從DataStream的TypeInformation派生。由于DataStream API本機(jī)不支持變更日志處理,因此代碼假設(shè)在流到表和表到流轉(zhuǎn)換期間僅附加/僅插入語(yǔ)義。

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;/*** @author alanchan**/
public class ConvertingDataStreamAndTableDemo {/*** @param args* @throws Exception*/public static void main(String[] args) throws Exception {// 1、創(chuàng)建運(yùn)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);// 2、創(chuàng)建輸入流DataStream<String> dataStream = env.fromElements("alan", "alanchan", "alanchanchn");// 3、將datastream 轉(zhuǎn)為 tableTable inputTable = tenv.fromDataStream(dataStream);// 4、創(chuàng)建視圖,該步驟不是必須,將姓名轉(zhuǎn)為大寫tenv.createTemporaryView("InputTable", inputTable);Table resultTable = tenv.sqlQuery("SELECT UPPER(f0) FROM InputTable");// 5、將table轉(zhuǎn)成datastream進(jìn)行輸出DataStream<Row> resultStream = tenv.toDataStream(resultTable);resultStream.print();env.execute();}}
  • 示例輸出
12> +I[ALAN]
14> +I[ALANCHANCHN]
13> +I[ALANCHAN]

fromDataStream和toDataStream的完整語(yǔ)義可以在下面的部分中找到。特別是,本節(jié)討論了如何使用更復(fù)雜的嵌套類型來(lái)影響模式派生。它還包括使用事件時(shí)間和水印。

根據(jù)查詢的類型,在許多情況下,生成的動(dòng)態(tài)表是一個(gè)管道,它不僅在將表轉(zhuǎn)換為數(shù)據(jù)流時(shí)產(chǎn)生僅插入的更改,而且還產(chǎn)生收回和其他類型的更新。在表到流轉(zhuǎn)換期間,這可能會(huì)導(dǎo)致類似于以下內(nèi)容的異常

Table sink 'Unregistered_DataStream_Sink_1' doesn't support consuming update changes [...].

在這種情況下,需要再次修改查詢或切換到ChangelogStream。

2)、示例2 - toChangelogStream

下面的示例顯示如何轉(zhuǎn)換更新表。
每個(gè)結(jié)果行表示更改日志中的一個(gè)條目,該條目具有更改標(biāo)志,可以通過(guò)對(duì)其調(diào)用row.getKind()來(lái)查詢。在本例中,alan的第二個(gè)分?jǐn)?shù)在更改之前(-U)創(chuàng)建更新,在更改之后(+U)創(chuàng)建更新。

本示例僅僅以一個(gè)方法來(lái)展示,避免沒(méi)有必要的代碼,運(yùn)行框架參考上述示例。

	public static void test2() throws Exception {// 1、創(chuàng)建運(yùn)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);// 2、創(chuàng)建輸入流DataStream<Row> dataStream = env.fromElements(Row.of("alan", 18), Row.of("alanchan", 19), Row.of("alanchanchn", 20), Row.of("alan", 20));// 3、將datastream 轉(zhuǎn)為 tableTable inputTable = tenv.fromDataStream(dataStream).as("name", "salary");// 4、創(chuàng)建視圖,該步驟不是必須tenv.createTemporaryView("InputTable", inputTable);Table resultTable = tenv.sqlQuery("SELECT name, SUM(salary) FROM InputTable GROUP BY name");// 5、將table轉(zhuǎn)成datastream進(jìn)行輸出DataStream<Row> resultStream = tenv.toChangelogStream(resultTable);resultStream.print();env.execute();}
  • 運(yùn)行結(jié)果
2> +I[alan, 18]
16> +I[alanchan, 19]
16> +I[alanchanchn, 20]
2> -U[alan, 18]
2> +U[alan, 38]

fromChangelogStream和toChangelogStream的完整語(yǔ)義可以在下面的部分中找到。特別是,本節(jié)討論了如何使用更復(fù)雜的嵌套類型來(lái)影響模式派生。它包括使用事件時(shí)間和水印。它討論了如何為輸入和輸出流聲明主鍵和變更日志模式。

上面的示例顯示了如何通過(guò)為每個(gè)傳入記錄連續(xù)發(fā)出逐行更新來(lái)增量計(jì)算最終結(jié)果。然而,在輸入流有限(即有界)的情況下,通過(guò)利用批處理原理可以更有效地計(jì)算結(jié)果。

在批處理中,可以在連續(xù)的階段中執(zhí)行運(yùn)算符,這些階段在發(fā)出結(jié)果之前使用整個(gè)輸入表。例如,連接操作符可以在執(zhí)行實(shí)際連接之前對(duì)兩個(gè)有界輸入進(jìn)行排序(即排序合并連接算法),或者在使用另一個(gè)輸入之前從一個(gè)輸入構(gòu)建哈希表(即哈希連接算法的構(gòu)建/探測(cè)階段)。

DataStream API和Table API都提供專門的批處理運(yùn)行時(shí)模式。

3)、示例3 - 通過(guò)僅切換標(biāo)志來(lái)處理批處理和流數(shù)據(jù)

下面的示例說(shuō)明了統(tǒng)一管道能夠通過(guò)僅切換標(biāo)志來(lái)處理批處理和流數(shù)據(jù)。

public static void test3() throws Exception {// 1、創(chuàng)建運(yùn)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.BATCH);StreamTableEnvironment tenv = StreamTableEnvironment.create(env);// 2、創(chuàng)建輸入流DataStream<Row> dataStream = env.fromElements(Row.of("alan", 18), Row.of("alanchan", 19), Row.of("alanchanchn", 20), Row.of("alan", 20));// 3、將datastream 轉(zhuǎn)為 tableTable inputTable = tenv.fromDataStream(dataStream).as("name", "salary");// 4、創(chuàng)建視圖,該步驟不是必須tenv.createTemporaryView("InputTable", inputTable);Table resultTable = tenv.sqlQuery("SELECT name, SUM(salary) FROM InputTable GROUP BY name");// 5、將table轉(zhuǎn)成datastream進(jìn)行輸出DataStream<Row> resultStream = tenv.toChangelogStream(resultTable);resultStream.print();env.execute();}
  • 運(yùn)行結(jié)果

注意比較和示例2的輸出區(qū)別

+I[alanchan, 19]
+I[alan, 38]
+I[alanchanchn, 20]

一旦將changelog 應(yīng)用于外部系統(tǒng)(例如鍵值存儲(chǔ)),可以看到兩種模式都能夠產(chǎn)生完全相同的輸出表。通過(guò)在發(fā)出結(jié)果之前使用所有輸入數(shù)據(jù),批處理模式的更改日志僅由僅插入的更改組成。有關(guān)更多細(xì)節(jié),請(qǐng)參閱下面的專用批處理模式部分。

3、集成說(shuō)明

將Table API與DataStream API相結(jié)合的項(xiàng)目需要添加以下橋接模塊之一。
它們包括對(duì) flink-table-api-java或flink-table-api-scala的可傳遞依賴性,以及相應(yīng)的特定于語(yǔ)言的DataStream api模塊。

1)、maven依賴

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>1.17.1</version><scope>provided</scope>
</dependency>

2)、import

使用DataStream API和Table API的Java或Scala版本聲明公共管道需要以下導(dǎo)入。

// imports for Java DataStream API
import org.apache.flink.streaming.api.*;
import org.apache.flink.streaming.api.environment.*;// imports for Table API with bridging to Java DataStream API
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.*;

3)、Configuration

TableEnvironment將采用傳遞的StreamExecutionEnvironment.中的所有配置選項(xiàng)。然而,不能保證對(duì)StreamExecutionEnvironment配置的進(jìn)一步更改在實(shí)例化后傳播到StreamTableEnvironment。在規(guī)劃期間,將選項(xiàng)從Table API傳播到DataStream API。

我們建議在切換到Table API之前盡早在DataStream API中設(shè)置所有配置選項(xiàng)。

import java.time.ZoneId;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;// create Java DataStream API
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// set various configuration earlyenv.setMaxParallelism(256);
env.getConfig().addDefaultKryoSerializer(MyCustomType.class, CustomKryoSerializer.class);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// then switch to Java Table API
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// set configuration early
tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Europe/Berlin"));// start defining your pipelines in both APIs...

4)、執(zhí)行行為

這兩個(gè)API都提供了執(zhí)行管道的方法。換句話說(shuō):如果被請(qǐng)求,它們將編譯一個(gè)作業(yè)圖( job graph),該作業(yè)圖將提交到集群并觸發(fā)以執(zhí)行。結(jié)果將流式傳輸?shù)铰暶鞯膕inks。

通常,這兩個(gè)API都在方法名稱中使用術(shù)語(yǔ)“執(zhí)行”來(lái)標(biāo)記這種行為。然而,Table API和DataStream API之間的執(zhí)行行為略有不同。

1、DataStream API

DataStream API的StreamExecutionEnvironment使用生成器模式(builder pattern)來(lái)構(gòu)造復(fù)雜的管道。管道可能會(huì)拆分為多個(gè)分支,這些分支可能以sink結(jié)尾,也可能不以sink結(jié)尾。環(huán)境緩沖(environment buffers)所有這些定義的分支,直到提交作業(yè)。

StreamExecutionEnvironment.execute()提交整個(gè)構(gòu)建的管道,然后清除構(gòu)建器。換句話說(shuō):不再聲明sources 和sinks ,并且可以向生成器中添加新的管道。因此,每個(gè)DataStream程序通常以對(duì)StreamExecutionEnvironment.execute()的調(diào)用結(jié)束?;蛘?#xff0c;DataStream.executeAndCollect()隱式定義了一個(gè)sink,用于將結(jié)果流式傳輸?shù)奖镜乜蛻舳恕?/p>

2、Table API

在Table API中,分支管道僅在StatementSet中受支持,其中每個(gè)分支必須聲明一個(gè)最終sink。TableEnvironment和StreamTableEnvironment都不提供專用的通用execute()方法。相反,它們提供了提交單個(gè)source-to-sink管道或語(yǔ)句集的方法:

	final static String sinkSQL = "CREATE TABLE OutputTable (\n" +" userId INT,\r\n" + " age INT,\r\n" + " balance DOUBLE,\r\n" + " userName STRING,\r\n" +" t_insert_time TIMESTAMP(3)\r\n" +") WITH (\n" +"  'connector' = 'print'\n" +")";final static String sinkSQL2 = "CREATE TABLE OutputTable2 (\n" +" userId INT,\r\n" + " age INT,\r\n" + " balance DOUBLE,\r\n" + " userName STRING,\r\n" +" t_insert_time TIMESTAMP(3)\r\n" +") WITH (\n" +"  'connector' = 'print'\n" +")";final static String sourceSQL = "CREATE TABLE InputTable (\r\n" + " userId INT,\r\n" + " age INT,\r\n" + " balance DOUBLE,\r\n" + " userName STRING,\r\n" + " t_insert_time AS localtimestamp,\r\n" + " WATERMARK FOR t_insert_time AS t_insert_time\r\n" + ") WITH (\r\n" + " 'connector' = 'datagen',\r\n" + " 'rows-per-second'='10',\r\n" + " 'fields.userId.kind'='sequence',\r\n" + " 'fields.userId.start'='1',\r\n" + " 'fields.userId.end'='20',\r\n" + " 'fields.balance.kind'='random',\r\n" + " 'fields.balance.min'='1',\r\n" + " 'fields.balance.max'='100',\r\n" + " 'fields.age.min'='1',\r\n" + " 'fields.age.max'='100',\r\n" + " 'fields.userName.length'='6'\r\n" + ");";public static void test4() throws Exception {// 1、創(chuàng)建運(yùn)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);//sinkSQL//sourceSQL// 建表tenv.executeSql(sourceSQL);//tenv.executeSql(sinkSQL);tenv.executeSql(sinkSQL2);//插入表數(shù)據(jù),方式一tenv.from("InputTable").insertInto("OutputTable").execute();tenv.executeSql("select * from OutputTable");tenv.from("InputTable").execute().print();//插入表數(shù)據(jù),方式二tenv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable");tenv.executeSql("select * from OutputTable");//插入表數(shù)據(jù),方式三tenv.createStatementSet().addInsertSql("INSERT INTO OutputTable SELECT * FROM InputTable").addInsertSql("INSERT INTO OutputTable2 SELECT * FROM InputTable").execute();// 輸出tenv.from("InputTable").execute().print();tenv.executeSql("SELECT * FROM InputTable").print();env.execute();}
  • 輸出結(jié)果
3> +I[3, 99, 36.20987556045243, d23888, 2023-11-13T14:49:58.812]
15> +I[15, 39, 68.30743253178122, 43bec8, 2023-11-13T14:49:58.812]
2> +I[2, 62, 47.280395949976885, 7bae4e, 2023-11-13T14:49:58.812]
16> +I[16, 52, 42.10205629532836, 6baf0e, 2023-11-13T14:49:58.812]
10> +I[10, 25, 58.008035887440094, d43dea, 2023-11-13T14:49:58.812]
13> +I[13, 36, 70.9215559827798, 01bb28, 2023-11-13T14:49:58.812]
12> +I[12, 38, 30.31004698340413, 322ba8, 2023-11-13T14:49:58.812]
6> +I[6, 17, 32.28909358733212, 13bf88, 2023-11-13T14:49:58.812]
9> +I[9, 49, 44.52802246768357, e8280c, 2023-11-13T14:49:58.812]
8> +I[8, 80, 18.03487847824154, 803b2a, 2023-11-13T14:49:58.812]
5> +I[5, 61, 54.43695775227862, 063f08, 2023-11-13T14:49:58.812]
7> +I[7, 64, 33.886576642098404, 443dea, 2023-11-13T14:49:58.812]
14> +I[14, 92, 63.71527772015468, 123848, 2023-11-13T14:49:58.812]
11> +I[11, 22, 30.745102844313315, e62848, 2023-11-13T14:49:58.812]
4> +I[4, 78, 88.60724929598506, 55bca8, 2023-11-13T14:49:58.812]
1> +I[1, 82, 62.50149215989057, 0bba0c, 2023-11-13T14:49:58.812]
3> +I[19, 67, 14.244993215937432, e6c911, 2023-11-13T14:49:59.806]
1> +I[17, 67, 91.05078612782468, 560b6c, 2023-11-13T14:49:59.807]
4> +I[20, 95, 82.12047947156385, 1ac5b2, 2023-11-13T14:49:59.807]
2> +I[18, 81, 25.384055001988084, fe98d1, 2023-11-13T14:49:59.806]
+----+-------------+-------------+--------------------------------+--------------------------------+-------------------------+
| op |      userId |         age |                        balance |                       userName |           t_insert_time |
+----+-------------+-------------+--------------------------------+--------------------------------+-------------------------+
| +I |           1 |          91 |             22.629318048042723 |                         923e08 | 2023-11-13 14:49:59.800 |
| +I |           2 |          67 |              75.26915785038814 |                         342baa | 2023-11-13 14:49:59.803 |
| +I |           3 |          68 |              74.06076023217011 |                         1dbbce | 2023-11-13 14:49:59.803 |
| +I |           4 |          26 |              79.47471729272772 |                         083e2e | 2023-11-13 14:49:59.802 |
| +I |           5 |          97 |              82.56249330491859 |                         4a3c6e | 2023-11-13 14:49:59.804 |
| +I |           6 |          32 |              81.74903214944425 |                         fdac4e | 2023-11-13 14:49:59.800 |
| +I |           7 |          67 |              94.80154136831771 |                         f7acea | 2023-11-13 14:49:59.800 |
| +I |           8 |          53 |              50.85073238739004 |                         cfbd0c | 2023-11-13 14:49:59.800 |
| +I |           9 |          69 |              93.64054547476522 |                         7fa9ec | 2023-11-13 14:49:59.801 |
| +I |          10 |          66 |              61.92366658766452 |                         05b86a | 2023-11-13 14:49:59.803 |
| +I |          11 |          81 |              95.61717698776191 |                         efa8ce | 2023-11-13 14:49:59.797 |
| +I |          12 |           8 |             63.573174957723076 |                         0fbfec | 2023-11-13 14:49:59.802 |
| +I |          13 |          85 |             52.938510850778734 |                         43bfa8 | 2023-11-13 14:49:59.803 |
| +I |          14 |          26 |              5.130287258770441 |                         083c6c | 2023-11-13 14:49:59.797 |
| +I |          15 |          35 |               73.3318749510538 |                         0e3b4c | 2023-11-13 14:49:59.802 |
| +I |          16 |          84 |              16.24326410122912 |                         ac2d6e | 2023-11-13 14:49:59.802 |
| +I |          18 |          41 |              32.38455189801736 |                         b07afb | 2023-11-13 14:50:00.804 |
| +I |          19 |          24 |               77.6947569111452 |                         7f72ac | 2023-11-13 14:50:00.803 |
| +I |          20 |          92 |              82.53929937026987 |                         051fb9 | 2023-11-13 14:50:00.802 |
| +I |          17 |          93 |             12.784194121509948 |                         bce5d9 | 2023-11-13 14:50:00.801 |
+----+-------------+-------------+--------------------------------+--------------------------------+-------------------------+
20 rows in set

為了組合這兩種執(zhí)行行為,對(duì)StreamTableEnvironment.toDataStream或StreamTableEnviron.toChangelogStream的每次調(diào)用都將具體化(materialize )(即編譯)Table API子管道(sub-pipeline),并將其插入DataStream API管道生成器(builder)中。這意味著之后必須調(diào)用StreamExecutionEnvironment.execute()或DataStream.executeAndCollect。Table API中的執(zhí)行不會(huì)觸發(fā)這些“外部部件(external parts)”。

// adds a branch with a printing sink to the StreamExecutionEnvironment
tableEnv.toDataStream(table).print();// (2)// executes a Table API end-to-end pipeline as a Flink job and prints locally,
// thus (1) has still not been executed
table.execute().print();// executes the DataStream API pipeline with the sink defined in (1) as a
// Flink job, (2) was already running before
env.execute();

上述示例中有具體應(yīng)用。

以上,本文是Flink table api 與 datastream api的集成的第一篇,主要介紹了集成的概述、table api 與 datastream api相互轉(zhuǎn)換的三個(gè)示例以及其集成的說(shuō)明(即maven依賴、import、配置以及執(zhí)行行為),并以具體的示例進(jìn)行說(shuō)明。

http://www.risenshineclean.com/news/50319.html

相關(guān)文章:

  • 做網(wǎng)站前段用什么軟件百度seo排名培訓(xùn) 優(yōu)化
  • 做網(wǎng)站建設(shè)一年能賺多少中國(guó)搜索網(wǎng)站排名
  • 青浦手機(jī)網(wǎng)站制作seo實(shí)戰(zhàn)密碼第三版
  • 河北省住房城鄉(xiāng)建設(shè)廳網(wǎng)站防城港網(wǎng)站seo
  • 影響網(wǎng)站速度嗎網(wǎng)站優(yōu)化哪家好
  • 網(wǎng)頁(yè)制作成品網(wǎng)站寧波百度推廣優(yōu)化
  • 鄭州做網(wǎng)站好的公企業(yè)官方網(wǎng)站推廣
  • 自己做外貿(mào)網(wǎng)站站長(zhǎng)平臺(tái)官網(wǎng)
  • 電腦網(wǎng)站開(kāi)發(fā)seo發(fā)包技術(shù)教程
  • 企業(yè)網(wǎng)站特點(diǎn)分析與描述百度收錄時(shí)間
  • 做網(wǎng)站 提要求win7系統(tǒng)優(yōu)化軟件
  • 做網(wǎng)站原型圖是用什么軟件業(yè)務(wù)網(wǎng)站制作
  • 網(wǎng)站后臺(tái)怎么掛廣告 怎么做長(zhǎng)沙百度貼吧
  • 天津建設(shè)合同備案網(wǎng)站特大新聞凌晨剛剛發(fā)生
  • 小型玩具企業(yè)網(wǎng)站建設(shè)初期階段任務(wù)服務(wù)器
  • 做網(wǎng)站開(kāi)發(fā)的網(wǎng)站做外鏈平臺(tái)有哪些
  • 上海裝修網(wǎng)官網(wǎng)長(zhǎng)沙電商優(yōu)化
  • 洛陽(yáng)做網(wǎng)站漢獅網(wǎng)絡(luò)seo優(yōu)化是什么意思
  • 微微網(wǎng)站建設(shè)交換友情鏈接推廣法
  • 東莞專業(yè)做網(wǎng)站的公司有哪些seo外包推廣
  • 深圳網(wǎng)站優(yōu)化教程廣州seo優(yōu)化電話
  • 邯鄲市網(wǎng)站建設(shè)新手怎么學(xué)電商運(yùn)營(yíng)
  • 做網(wǎng)站域名的公司網(wǎng)站模板怎么建站
  • 如何做微信官方網(wǎng)站如何快速推廣
  • 網(wǎng)站全站開(kāi)發(fā)需要學(xué)什么bt櫻桃 磁力島
  • 人與狗做的網(wǎng)站手機(jī)怎么建立網(wǎng)站
  • 如何測(cè)試 網(wǎng)站seo免費(fèi)教程
  • 網(wǎng)站無(wú)障礙建設(shè)規(guī)定北京seo優(yōu)化哪家公司好
  • 網(wǎng)站建設(shè)太金手指六六六免費(fèi)關(guān)鍵詞優(yōu)化工具
  • 做網(wǎng)站哪家南京做網(wǎng)站中國(guó)培訓(xùn)網(wǎng)官網(wǎng)