中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

三亞制作網(wǎng)站軍事新聞?lì)^條最新消息

三亞制作網(wǎng)站,軍事新聞?lì)^條最新消息,企業(yè)網(wǎng)站wap源碼,中原鄭州網(wǎng)站建設(shè)一、說明 時(shí)間屬性是大數(shù)據(jù)中的一個(gè)重要方面,像窗口(在 Table API 和 SQL )這種基于時(shí)間的操作,需要有時(shí)間信息。我們可以通過時(shí)間屬性來更加靈活高效地處理數(shù)據(jù),下面我們通過處理時(shí)間和事件時(shí)間來探討一下Flink SQL …

一、說明

時(shí)間屬性是大數(shù)據(jù)中的一個(gè)重要方面,像窗口(在 Table API 和 SQL )這種基于時(shí)間的操作,需要有時(shí)間信息。我們可以通過時(shí)間屬性來更加靈活高效地處理數(shù)據(jù),下面我們通過處理時(shí)間和事件時(shí)間來探討一下Flink SQL 時(shí)間屬性。

二、處理時(shí)間

2.1、準(zhǔn)備WaterSensor類,方便使用

package com.lyh.bean;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class WaterSensor {private String id;private Long ts;private Integer vc;
}

2.2、DataStream 到 Table 轉(zhuǎn)換時(shí)定義

處理時(shí)間屬性可以在 schema 定義的時(shí)候用 .proctime 后綴來定義。時(shí)間屬性一定不能定義在一個(gè)已有字段上,所以它新增一個(gè)字段。
代碼段:

package com.lyh.flink12;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;public class Flink_Sql_Proctime {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> waterSensorStream =env.fromElements(new WaterSensor("sensor_1", 1000L, 10),new WaterSensor("sensor_1", 2000L, 20),new WaterSensor("sensor_2", 3000L, 30),new WaterSensor("sensor_1", 4000L, 40),new WaterSensor("sensor_1", 5000L, 50),new WaterSensor("sensor_2", 6000L, 60));
// 1. 創(chuàng)建表的執(zhí)行環(huán)境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 聲明一個(gè)額外的字段來作為處理時(shí)間字段Table sensorTable = tableEnv.fromDataStream(waterSensorStream, $("id"), $("ts"), $("vc"), $("pt").proctime());sensorTable.execute().print();}
}

執(zhí)行結(jié)果:
在這里插入圖片描述

2.3、創(chuàng)建數(shù)據(jù)文件sensor.txt 數(shù)據(jù),方便使用

sensor_1,1,10
sensor_1,2,20
sensor_2,4,30
sensor_1,4,400
sensor_2,5,50
sensor_2,6,60

2.4、在創(chuàng)建表的 DDL 中定義

package com.lyh.flink12;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Flink_Sql_ddl_Procetime {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("create table sensor(id string,ts bigint,vc int,pt_time as PROCTIME()) with("+ "'connector' = 'filesystem',"+ "'path' = 'input/sensor.txt',"+ "'format' = 'csv'"+ ")");Table table = tableEnv.sqlQuery("select * from sensor");table.execute().print();}
}

運(yùn)行結(jié)果:
在這里插入圖片描述

三、事件時(shí)間

事件時(shí)間允許程序按照數(shù)據(jù)中包含的時(shí)間來處理,這樣可以在有亂序或者晚到的數(shù)據(jù)的情況下產(chǎn)生一致的處理結(jié)果。它可以保證從外部存儲(chǔ)讀取數(shù)據(jù)后產(chǎn)生可以復(fù)現(xiàn)(replayable)的結(jié)果。
除此之外,事件時(shí)間可以讓程序在流式和批式作業(yè)中使用同樣的語法。在流式程序中的事件時(shí)間屬性,在批式程序中就是一個(gè)正常的時(shí)間字段。
為了能夠處理亂序的事件,并且區(qū)分正常到達(dá)和晚到的事件,Flink 需要從事件中獲取事件時(shí)間并且產(chǎn)生 watermark(watermarks)。

3.1、DataStream 到 Table 轉(zhuǎn)換時(shí)定義

事件時(shí)間屬性可以用 .rowtime 后綴在定義 DataStream schema 的時(shí)候來定義。時(shí)間戳和 watermark 在這之前一定是在 DataStream 上已經(jīng)定義好了。
在從 DataStream 到 Table 轉(zhuǎn)換時(shí)定義事件時(shí)間屬性有兩種方式。取決于用 .rowtime 后綴修飾的字段名字是否是已有字段,事件時(shí)間字段可以是:
1、在 schema 的結(jié)尾追加一個(gè)新的字段
2、替換一個(gè)已經(jīng)存在的字段。
不管在哪種情況下,事件時(shí)間字段都表示 DataStream 中定義的事件的時(shí)間戳。
代碼:
援用上面WaterSensor類

package com.lyh.flink12;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.time.Duration;import static org.apache.flink.table.api.Expressions.$;public class Flink_Sql_EventTime {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> waterSensorSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 100),new WaterSensor("sensor_1", 1000L, 100),new WaterSensor("sensor_2", 1000L, 200),new WaterSensor("sensor_2", 1000L, 200)).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordtime) -> element.getTs()));StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.fromDataStream(waterSensorSource,$("id"),$("ts"),$("vc"),$("pt").rowtime()).execute().print();}
}

運(yùn)行結(jié)果:
在這里插入圖片描述

3.2、使用已有的字段作為時(shí)間屬性

.fromDataStream(waterSensorStream, $("id"), $("ts").rowtime(), $("vc"));

3.3、在創(chuàng)建表的 DDL 中定義

事件時(shí)間屬性可以用 WATERMARK 語句在 CREATE TABLE DDL 中進(jìn)行定義。WATERMARK 語句在一個(gè)已有字段上定義一個(gè) watermark 生成表達(dá)式,同時(shí)標(biāo)記這個(gè)已有字段為時(shí)間屬性字段.

package com.lyh.flink12;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Flink_Sql_ddl_EventTime {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("create table sensor(" +"id string," +"ts bigint," +"vc int, " +"t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +"watermark for t as t - interval '5' second)" +"with("+ "'connector' = 'filesystem',"+ "'path' = 'input/sensor.txt',"+ "'format' = 'csv'"+ ")");tableEnv.sqlQuery("select * from sensor").execute().print();}
}

運(yùn)行結(jié)果:
在這里插入圖片描述
說明:
1.把一個(gè)現(xiàn)有的列定義為一個(gè)為表標(biāo)記事件時(shí)間的屬性。該列的類型必須為 TIMESTAMP(3),且是 schema 中的頂層列,它也可以是一個(gè)計(jì)算列。
2.嚴(yán)格遞增時(shí)間戳: WATERMARK FOR rowtime_column AS rowtime_column。
3.遞增時(shí)間戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘0.001’ SECOND。
亂序時(shí)間戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘string’ timeUnit。

http://www.risenshineclean.com/news/29168.html

相關(guān)文章:

  • 北票網(wǎng)站建設(shè)黃金網(wǎng)站軟件免費(fèi)
  • 做娛樂性手機(jī)網(wǎng)站推廣資源網(wǎng)
  • 直播間 網(wǎng)站建設(shè)app關(guān)鍵詞推廣
  • 淘寶客如何做網(wǎng)站今日新聞?lì)^條新聞
  • 聚財(cái)三個(gè)字公司名字哪個(gè)網(wǎng)站學(xué)seo是免費(fèi)的
  • 新網(wǎng)站快速提高排名互換鏈接的方法
  • 如何做網(wǎng)站對比網(wǎng)站優(yōu)化排名網(wǎng)站
  • 網(wǎng)站建設(shè)實(shí)戰(zhàn)視頻教程桂林網(wǎng)站設(shè)計(jì)制作
  • 報(bào)考建設(shè)八大員官方網(wǎng)站seo推廣軟件哪個(gè)好
  • wordpress 文章列表只顯示標(biāo)題外貿(mào)seo
  • 哪個(gè)網(wǎng)站做任務(wù)能賺錢我要看今日頭條
  • 廈門廣告公司有哪些aso優(yōu)化的主要內(nèi)容為
  • wordpress修改谷歌外貿(mào)seo公司
  • 做美團(tuán)網(wǎng)這種網(wǎng)站賺錢嗎亞馬遜關(guān)鍵詞搜索器
  • 英語網(wǎng)站建設(shè)如何制作一個(gè)公司網(wǎng)站
  • 上海網(wǎng)站建設(shè)平臺(tái)站霸網(wǎng)絡(luò)seo學(xué)習(xí)網(wǎng)站
  • 做網(wǎng)站有一個(gè)火箭回頂部網(wǎng)站優(yōu)化關(guān)鍵詞公司
  • 做cpa的博客網(wǎng)站類型博客網(wǎng)
  • 優(yōu)惠券推廣網(wǎng)站怎么做seo怎么搞
  • nas網(wǎng)站怎么做網(wǎng)站時(shí)事新聞最新2022
  • 順德樂從有做阿里巴巴的網(wǎng)站嗎sem競價(jià)專員是干什么的
  • 做網(wǎng)站視頻圖片加載不出來企業(yè)網(wǎng)站模板下載
  • 情感視頻素材網(wǎng)站劉連康seo培訓(xùn)哪家強(qiáng)
  • 網(wǎng)站建設(shè)和網(wǎng)站推廣可以同一家做嗎網(wǎng)站優(yōu)化排名金蘋果系統(tǒng)
  • 企業(yè)網(wǎng)站建設(shè)英文超級(jí)外鏈
  • 手機(jī)網(wǎng)站哪家好西安百度推廣優(yōu)化
  • 網(wǎng)站開發(fā) 英文文章百度收錄快的發(fā)帖平臺(tái)
  • 福州網(wǎng)頁鄭州seo排名優(yōu)化公司
  • 汕頭網(wǎng)站建設(shè)制作公司衡陽seo快速排名
  • 分類信息網(wǎng)站成都搭建如何搭建一個(gè)網(wǎng)站平臺(tái)