三亞制作網(wǎng)站軍事新聞?lì)^條最新消息
一、說明
時(shí)間屬性是大數(shù)據(jù)中的一個(gè)重要方面,像窗口(在 Table API 和 SQL )這種基于時(shí)間的操作,需要有時(shí)間信息。我們可以通過時(shí)間屬性來更加靈活高效地處理數(shù)據(jù),下面我們通過處理時(shí)間和事件時(shí)間來探討一下Flink SQL 時(shí)間屬性。
二、處理時(shí)間
2.1、準(zhǔn)備WaterSensor類,方便使用
package com.lyh.bean;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class WaterSensor {private String id;private Long ts;private Integer vc;
}
2.2、DataStream 到 Table 轉(zhuǎn)換時(shí)定義
處理時(shí)間屬性可以在 schema 定義的時(shí)候用 .proctime 后綴來定義。時(shí)間屬性一定不能定義在一個(gè)已有字段上,所以它新增一個(gè)字段。
代碼段:
package com.lyh.flink12;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;public class Flink_Sql_Proctime {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> waterSensorStream =env.fromElements(new WaterSensor("sensor_1", 1000L, 10),new WaterSensor("sensor_1", 2000L, 20),new WaterSensor("sensor_2", 3000L, 30),new WaterSensor("sensor_1", 4000L, 40),new WaterSensor("sensor_1", 5000L, 50),new WaterSensor("sensor_2", 6000L, 60));
// 1. 創(chuàng)建表的執(zhí)行環(huán)境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 聲明一個(gè)額外的字段來作為處理時(shí)間字段Table sensorTable = tableEnv.fromDataStream(waterSensorStream, $("id"), $("ts"), $("vc"), $("pt").proctime());sensorTable.execute().print();}
}
執(zhí)行結(jié)果:
2.3、創(chuàng)建數(shù)據(jù)文件sensor.txt 數(shù)據(jù),方便使用
sensor_1,1,10
sensor_1,2,20
sensor_2,4,30
sensor_1,4,400
sensor_2,5,50
sensor_2,6,60
2.4、在創(chuàng)建表的 DDL 中定義
package com.lyh.flink12;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Flink_Sql_ddl_Procetime {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("create table sensor(id string,ts bigint,vc int,pt_time as PROCTIME()) with("+ "'connector' = 'filesystem',"+ "'path' = 'input/sensor.txt',"+ "'format' = 'csv'"+ ")");Table table = tableEnv.sqlQuery("select * from sensor");table.execute().print();}
}
運(yùn)行結(jié)果:
三、事件時(shí)間
事件時(shí)間允許程序按照數(shù)據(jù)中包含的時(shí)間來處理,這樣可以在有亂序或者晚到的數(shù)據(jù)的情況下產(chǎn)生一致的處理結(jié)果。它可以保證從外部存儲(chǔ)讀取數(shù)據(jù)后產(chǎn)生可以復(fù)現(xiàn)(replayable)的結(jié)果。
除此之外,事件時(shí)間可以讓程序在流式和批式作業(yè)中使用同樣的語法。在流式程序中的事件時(shí)間屬性,在批式程序中就是一個(gè)正常的時(shí)間字段。
為了能夠處理亂序的事件,并且區(qū)分正常到達(dá)和晚到的事件,Flink 需要從事件中獲取事件時(shí)間并且產(chǎn)生 watermark(watermarks)。
3.1、DataStream 到 Table 轉(zhuǎn)換時(shí)定義
事件時(shí)間屬性可以用 .rowtime 后綴在定義 DataStream schema 的時(shí)候來定義。時(shí)間戳和 watermark 在這之前一定是在 DataStream 上已經(jīng)定義好了。
在從 DataStream 到 Table 轉(zhuǎn)換時(shí)定義事件時(shí)間屬性有兩種方式。取決于用 .rowtime 后綴修飾的字段名字是否是已有字段,事件時(shí)間字段可以是:
1、在 schema 的結(jié)尾追加一個(gè)新的字段
2、替換一個(gè)已經(jīng)存在的字段。
不管在哪種情況下,事件時(shí)間字段都表示 DataStream 中定義的事件的時(shí)間戳。
代碼:
援用上面WaterSensor類
package com.lyh.flink12;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.time.Duration;import static org.apache.flink.table.api.Expressions.$;public class Flink_Sql_EventTime {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> waterSensorSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 100),new WaterSensor("sensor_1", 1000L, 100),new WaterSensor("sensor_2", 1000L, 200),new WaterSensor("sensor_2", 1000L, 200)).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordtime) -> element.getTs()));StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.fromDataStream(waterSensorSource,$("id"),$("ts"),$("vc"),$("pt").rowtime()).execute().print();}
}
運(yùn)行結(jié)果:
3.2、使用已有的字段作為時(shí)間屬性
.fromDataStream(waterSensorStream, $("id"), $("ts").rowtime(), $("vc"));
3.3、在創(chuàng)建表的 DDL 中定義
事件時(shí)間屬性可以用 WATERMARK 語句在 CREATE TABLE DDL 中進(jìn)行定義。WATERMARK 語句在一個(gè)已有字段上定義一個(gè) watermark 生成表達(dá)式,同時(shí)標(biāo)記這個(gè)已有字段為時(shí)間屬性字段.
package com.lyh.flink12;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Flink_Sql_ddl_EventTime {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("create table sensor(" +"id string," +"ts bigint," +"vc int, " +"t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +"watermark for t as t - interval '5' second)" +"with("+ "'connector' = 'filesystem',"+ "'path' = 'input/sensor.txt',"+ "'format' = 'csv'"+ ")");tableEnv.sqlQuery("select * from sensor").execute().print();}
}
運(yùn)行結(jié)果:
說明:
1.把一個(gè)現(xiàn)有的列定義為一個(gè)為表標(biāo)記事件時(shí)間的屬性。該列的類型必須為 TIMESTAMP(3),且是 schema 中的頂層列,它也可以是一個(gè)計(jì)算列。
2.嚴(yán)格遞增時(shí)間戳: WATERMARK FOR rowtime_column AS rowtime_column。
3.遞增時(shí)間戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘0.001’ SECOND。
亂序時(shí)間戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘string’ timeUnit。