目前網(wǎng)站是做響應(yīng)式的好嗎廣告策劃方案怎么做
Flink基礎(chǔ)
1、系統(tǒng)時間與事件時間
系統(tǒng)時間(處理時間)
在Sparksreaming的任務(wù)計算時,使用的是系統(tǒng)時間。
假設(shè)所用窗口為滾動窗口,大小為5分鐘。那么每五分鐘,都會對接收的數(shù)據(jù)進(jìn)行提交任務(wù).
但是,這里有個要注意的點,有個概念叫時間軸對齊。若我們在12:12開始接收數(shù)據(jù),按道理我們會在12:17進(jìn)行提交任務(wù)。事實上我們會在12:20進(jìn)行提交任務(wù),因為會進(jìn)行時間軸對齊,將一天按照五分鐘進(jìn)行劃分,會對應(yīng)到12:20。在此時提交任務(wù),后面每個五分鐘提交任務(wù),都會對應(yīng)到我們所劃分的時間軸。
事件時間
flink支持帶有事件時間的窗口(Window)操作
事件時間區(qū)別于系統(tǒng)時間,如下舉例:
flink處理實時數(shù)據(jù),對數(shù)據(jù)進(jìn)行逐條處理。設(shè)定事件時間為5分鐘,12:00開始接收數(shù)據(jù),接收的第一條數(shù)據(jù)時間為12:01,接收的第二條數(shù)據(jù)為12:02。假設(shè)從此時起沒有收到數(shù)據(jù),那么將不會進(jìn)行提交任務(wù)。**到了12:06,接收到了第三條數(shù)據(jù)。第三條數(shù)據(jù)的接收時間自12:00起,已經(jīng)超過了五分鐘,**那么此時便會進(jìn)行任務(wù)提交。
2、wordcount簡單案例的實現(xiàn)
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class Demo01StreamWordCount {public static void main(String[] args) throws Exception {// 1、構(gòu)建Flink環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2、通過Socket模擬無界流環(huán)境,方便FLink處理// 虛擬機啟動:nc -lk 8888// 從Source構(gòu)建第一個DataStream// TODO C:\Windows\System32\drivers\etc\hosts文件中配置了master與IP地址的映射,所以這里可以使用masterDataStream<String> lineDS = env.socketTextStream("master", 8888);// 統(tǒng)計每個單詞的數(shù)量// 第一步:將每行數(shù)據(jù)的每個單詞切出來并進(jìn)行扁平化處理DataStream<String> wordsDS = lineDS.flatMap(new FlatMapFunction<String, String>() {/***FlatMapFunction<String, String>: 表示輸入、輸出數(shù)據(jù)的類型* @param line DS中的一條數(shù)據(jù)* @param out 通過collect方法將數(shù)據(jù)發(fā)送到下游* @throws Exception*/@Overridepublic void flatMap(String line, Collector<String> out) throws Exception {for (String word : line.split(",")) {// 將每個單詞發(fā)送到下游out.collect(word);}}});// 第二步:將每個單詞變成 KV格式,V置為1;返回的數(shù)據(jù)是一個二元組Tuple2DataStream<Tuple2<String, Integer>> wordKVDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String word) throws Exception {return Tuple2.of(word, 1);}});/*** 第三步:按每一個單詞進(jìn)行分組; 無法再使用其父類DataStream進(jìn)行定義(無法向上轉(zhuǎn)型)* KeyedStream<T, K> 是 DataStream<T> 的一個特殊化版本,它添加了與鍵控操作相關(guān)的特定方法(如 reduce、aggregate、window 等)。* 由于 KeyedStream 提供了額外的功能和方法,它不能簡單地被視為 DataStream 的一個簡單實例,* 因為它實現(xiàn)了額外的接口(如 KeyedOperations<T, K>)并可能覆蓋了某些方法的行為以支持鍵控操作。*/KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordKVDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple2) throws Exception {// 對Key進(jìn)行分組return tuple2.f0;}});// 第四步:對1進(jìn)行聚合sum,下標(biāo)是從0開始的DataStream<Tuple2<String, Integer>> wordCntDS = keyedDS.sum(1);// 3、打印結(jié)果:將DS中的內(nèi)容Sink到控制臺wordCntDS.print();// 執(zhí)行任務(wù)env.execute();}
}
3、設(shè)置任務(wù)執(zhí)行的并行度
本機為8核,可并行16的線程
手動改變?nèi)蝿?wù)的并行度,若不設(shè)置則會顯示1-16,設(shè)置后只會顯示1-2
env.setParallelism(2);
setBufferTimeout():設(shè)置輸出緩沖區(qū)刷新的最大時間頻率(毫秒)。
env.setBufferTimeout(200);
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class Demo01StreamWordCount {public static void main(String[] args) throws Exception {// 1、構(gòu)建Flink環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 手動改變?nèi)蝿?wù)的并行度,默認(rèn)并行度為最大,env.setParallelism(2);// setBufferTimeout():設(shè)置輸出緩沖區(qū)刷新的最大時間頻率(毫秒)。env.setBufferTimeout(200);// 2、通過Socket模擬無界流環(huán)境,方便FLink處理// 虛擬機啟動:nc -lk 8888// 從Source構(gòu)建第一個DataStreamDataStream<String> lineDS = env.socketTextStream("master", 8888);System.out.println("lineDS并行度:" + lineDS.getParallelism());// 統(tǒng)計每個單詞的數(shù)量// 第一步:將每行數(shù)據(jù)的每個單詞切出來并進(jìn)行扁平化處理DataStream<String> wordsDS = lineDS.flatMap(new FlatMapFunction<String, String>() {/**** @param line DS中的一條數(shù)據(jù)* @param out 通過collect方法將數(shù)據(jù)發(fā)送到下游* @throws Exception*/@Overridepublic void flatMap(String line, Collector<String> out) throws Exception {for (String word : line.split(",")) {// 將每個單詞發(fā)送到下游out.collect(word);}}});System.out.println("wordsDS并行度:" + wordsDS.getParallelism());// 第二步:將每個單詞變成 KV格式,V置為1DataStream<Tuple2<String, Integer>> wordKVDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String word) throws Exception {return Tuple2.of(word, 1);}});System.out.println("wordKVDS并行度:" + wordKVDS.getParallelism());// 第三步:按每一個單詞進(jìn)行分組// keyBy之后數(shù)據(jù)流會進(jìn)行分組,相同的key會進(jìn)入同一個線程中被處理// 傳遞數(shù)據(jù)的規(guī)則:hash取余(線程總數(shù),默認(rèn)CPU的總線程數(shù))原理KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordKVDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple2) throws Exception {return tuple2.f0;}});System.out.println("keyedDS并行度:" + keyedDS.getParallelism());// 第四步:對1進(jìn)行聚合sumDataStream<Tuple2<String, Integer>> wordCntDS = keyedDS.sum(1);System.out.println("wordCntDS并行度:" + wordCntDS.getParallelism());// 3、打印結(jié)果:將DS中的內(nèi)容Sink到控制臺keyedDS.print();env.execute();}
}
4、設(shè)置批/流處理方式,使用Lambda表達(dá)式,使用自定類實現(xiàn)接口中抽象的方法
package com.shujia.flink.core;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class Demo02BatchWordCount {public static void main(String[] args) throws Exception {// 1、構(gòu)建環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 設(shè)置Flink程序的處理方式:默認(rèn)是流處理/*** BATCH:批處理,只能處理有界流,底層是MR模型,可以進(jìn)行預(yù)聚合* STREAMING:流處理,可以處理無界流,也可以處理有界流,底層是持續(xù)流模型,數(shù)據(jù)一條一條處理* AUTOMATIC:自動判斷,當(dāng)所有的Source都是有界流則使用BATCH模式,當(dāng)Source中有一個是無界流則會使用STREAMING模式*/env.setRuntimeMode(RuntimeExecutionMode.BATCH);// 2、獲得第一個DS// 通過readTextFile可以基于文件構(gòu)建有界流DataStream<String> wordsFileDS = env.readTextFile("flink/data/words.txt");// 3、DS之間的轉(zhuǎn)換// 統(tǒng)計每個單詞的數(shù)量// 第一步:將每行數(shù)據(jù)的每個單詞切出來并進(jìn)行扁平化處理// Flink處理邏輯傳入的方式// new XXXFunction 使用匿名內(nèi)部類
// DataStream<String> wordsDS = wordsFileDS.flatMap(new FlatMapFunction<String, String>() {
// /**
// * @param line DS中的一條數(shù)據(jù)
// * @param out 通過collect方法將數(shù)據(jù)發(fā)送到下游
// * @throws Exception
// * Type parameters:
// * FlatMapFunction<T, O>
// * <T> – Type of the input elements. <O> – Type of the returned elements.
// */
// @Override
// public void flatMap(String line, Collector<String> out) throws Exception {
// for (String word : line.split(",")) {
// // 將每個單詞發(fā)送到下游
// out.collect(word);
// }
// }
// });/*** 使用Lambda表達(dá)式* 使用時得清楚FlatMapFunction中所要實現(xiàn)的抽象方法flatMap的兩個參數(shù)的含義* ()->{}* 通過 -> 分隔,左邊是函數(shù)的參數(shù),右邊是函數(shù)實現(xiàn)的具體邏輯* 并且需要給出 flatMap函數(shù)的輸出類型,Types.STRING* line: 輸入數(shù)據(jù)類型, out: 輸出數(shù)據(jù)類型*/DataStream<String> wordsDS = wordsFileDS.flatMap((line, out) -> {for (String word : line.split(",")) {out.collect(word);}}, Types.STRING);//TODO 使用自定類實現(xiàn)接口中抽象的方法,一般不使用這種方法wordsFileDS.flatMap(new MyFunction()).print();// 第二步:將每個單詞變成 KV格式,V置為1
// DataStream<Tuple2<String, Integer>> wordKVDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
// @Override
// public Tuple2<String, Integer> map(String word) throws Exception {
// return Tuple2.of(word, 1);
// }
// });// TODO 此處需要給出 map函數(shù)的輸出類型,Types.TUPLE(Types.STRING, Types.INT),是一個二元組DataStream<Tuple2<String, Integer>> wordKVDS = wordsDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));/*** 第三步:按每一個單詞進(jìn)行分組* keyBy之后數(shù)據(jù)流會進(jìn)行分組,相同的key會進(jìn)入同一個線程中被處理* 傳遞數(shù)據(jù)的規(guī)則:hash取余(線程總數(shù),默認(rèn)CPU的總線程數(shù),本機為16)原理*/
// KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordKVDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
// @Override
// public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
// return tuple2.f0;
// }
// });// TODO 此處的Types.STRING 并不是直接表示某個方法的輸出類型,而是用來指定 keyBy 方法中鍵(key)的類型。這里可以省略!KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordKVDS.keyBy(kv -> kv.f0, Types.STRING);// 第四步:對1進(jìn)行聚合sum,無需指定返回值類型DataStream<Tuple2<String, Integer>> wordCntDS = keyedDS.sum(1);// 4、最終結(jié)果的處理(保存/輸出打印)wordCntDS.print();env.execute();}
}class MyFunction implements FlatMapFunction<String,String>{@Overridepublic void flatMap(String line, Collector<String> out) throws Exception {for (String word : line.split(",")) {// 將每個單詞發(fā)送到下游out.collect(word);}}
}
5、source
Flink 在流處理和批處理上的 source 大概有 4 類:
基于本地集合的 source、
基于文件的 source、
基于網(wǎng)絡(luò)套接字的 source、
自定義的 source。自定義的 source 常見的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,當(dāng)然你也可以定義自己的 source。
1、從本地集合source中讀取數(shù)據(jù)
package com.shujia.flink.source;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;public class Demo01ListSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 本地集合SourceArrayList<String> arrList = new ArrayList<>();arrList.add("flink");arrList.add("flink");arrList.add("flink");arrList.add("flink");//TODO 有界流,fromCollectionDataStream<String> listDS = env.fromCollection(arrList);listDS.print();env.execute();}
}
2、新版本從本地文件中讀取數(shù)據(jù),有界流和無界流兩種方式
package com.shujia.flink.source;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.io.File;
import java.time.Duration;public class Demo02FileSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//TODO 歷史版本讀文件的方式,有界流DataStream<String> oldFileDS = env.readTextFile("flink/data/words.txt");
// oldFileDS.print();//TODO 讀取案例一: 新版本加載文件的方式:FileSource,默認(rèn)是有界流FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("flink/data/words.txt")).build();//TODO 從Source加載數(shù)據(jù)構(gòu)建DS,使用自帶source類,使用 fromSourceDataStream<String> fileSourceDS = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");fileSourceDS.print();//TODO 讀取案例二: 將讀取文件變成無界流FileSource<String> fileSource2 = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("flink/data/words"))//TODO 使成為無界流讀取一個文件夾中的數(shù)據(jù),類似Flume中的spool dir,可以監(jiān)控一個目錄下文件的變化// Duration.ofSeconds(5) 以5秒為間隔持續(xù)監(jiān)控.monitorContinuously(Duration.ofSeconds(5)).build();DataStream<String> fileSourceDS2 = env.fromSource(fileSource2,WatermarkStrategy.noWatermarks(),"fileSource2");fileSourceDS2.print();env.execute();}
}
3、自定義source類,區(qū)分有界流與無界流
- 只有在Source啟動時會執(zhí)行一次
run方法如果會結(jié)束,則Source會得到一個有界流
run方法如果不會結(jié)束,則Source會得到一個無界流
package com.shujia.flink.source;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;public class Demo03MySource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// TODO 使用自定義source類,通過addSource對其進(jìn)行添加DataStream<String> mySourceDS = env.addSource(new MySource());mySourceDS.print();env.execute();}
}class MySource implements SourceFunction<String>{/*** 只有在Source啟動時會執(zhí)行一次* run方法如果會結(jié)束,則Source會得到一個有界流* run方法如果不會結(jié)束,則Source會得到一個無界流* 下面的例子Source會得到一個無界流*/@Overridepublic void run(SourceContext<String> ctx) throws Exception {System.out.println("run方法啟動了");// ctx 可以通過collect方法向下游發(fā)送數(shù)據(jù)long cnt = 0L;while(true){ctx.collect(cnt+"");cnt ++;// 休眠一會Thread.sleep(1000);}}// Source結(jié)束時會執(zhí)行@Overridepublic void cancel() {System.out.println("Source結(jié)束了");}
}
4、自定義source類,讀取MySQL中的數(shù)據(jù),并進(jìn)行處理
package com.shujia.flink.source;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;public class Demo04MyMySQLSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Students> studentDS = env.addSource(new MyMySQLSource());// 統(tǒng)計班級人數(shù)DataStream<Tuple2<String, Integer>> clazzCntDS = studentDS.map(stu -> Tuple2.of(stu.clazz, 1), Types.TUPLE(Types.STRING, Types.INT)).keyBy(t2 -> t2.f0).sum(1);clazzCntDS.print();// 統(tǒng)計性別人數(shù)DataStream<Tuple2<String, Integer>> genderCntDS = studentDS.map(stu -> Tuple2.of(stu.gender, 1), Types.TUPLE(Types.STRING, Types.INT)).keyBy(t2 -> t2.f0).sum(1);genderCntDS.print();env.execute();}
}// TODO 自定義source類從MySQL中讀取數(shù)據(jù)
class MyMySQLSource implements SourceFunction<Students> {@Overridepublic void run(SourceContext<Students> ctx) throws Exception {//TODO run方法只會執(zhí)行一次創(chuàng)建下列對象的操作// 建立連接Connection conn = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata_30", "root", "123456");// 創(chuàng)建StatementStatement st = conn.createStatement();// 執(zhí)行查詢ResultSet rs = st.executeQuery("select * from students2");// 遍歷rs提取每一條數(shù)據(jù)while (rs.next()) {long id = rs.getLong("id");String name = rs.getString("name");int age = rs.getInt("age");String gender = rs.getString("gender");String clazz = rs.getString("clazz");Students stu = new Students(id, name, age, gender, clazz);ctx.collect(stu);/*** 16> (文科四班,1)* 15> (女,1)* 15> (女,2)* 2> (男,1)* 7> (文科六班,1)* 15> (女,3)* 2> (男,2)* 17> (理科六班,1)* 17> (理科六班,2)* 13> (理科五班,1)* 20> (理科二班,1)* 13> (理科四班,1)*/}rs.close();st.close();conn.close();}@Overridepublic void cancel() {}
}// TODO 創(chuàng)建一個類,用于存儲從MySQL中取出的數(shù)據(jù)
class Students {Long id;String name;Integer age;String gender;String clazz;public Students(Long id, String name, Integer age, String gender, String clazz) {this.id = id;this.name = name;this.age = age;this.gender = gender;this.clazz = clazz;}
}
6、sink
Flink 將轉(zhuǎn)換計算后的數(shù)據(jù)發(fā)送的地點 。
Flink 常見的 Sink 大概有如下幾類:
寫入文件、
打印出來、
寫入 socket 、
自定義的 sink 。自定義的 sink 常見的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也可以定義自己的 sink。
1、構(gòu)建FileSink,監(jiān)控一個端口中的數(shù)據(jù)并將其寫入到本地文件夾中
package com.shujia.flink.sink;import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;import java.time.Duration;
public class Demo01FileSink {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> lineDS = env.socketTextStream("master", 8888);// 構(gòu)建FileSinkFileSink<String> fileSink = FileSink.<String>forRowFormat(new Path("flink/data/fileSink"), new SimpleStringEncoder<String>("UTF-8")).withRollingPolicy(DefaultRollingPolicy.builder()// 這個設(shè)置定義了滾動的時間間隔。.withRolloverInterval(Duration.ofSeconds(10))// 這個設(shè)置定義了一個不活動間隔。.withInactivityInterval(Duration.ofSeconds(10))// 這個設(shè)置定義了單個日志文件可以增長到的最大大小。在這個例子中,每個日志文件在被滾動之前可以增長到最多1MB。.withMaxPartSize(MemorySize.ofMebiBytes(1)).build()).build();lineDS.sinkTo(fileSink);env.execute();}
}
2、自定義sink類
package com.shujia.flink.sink;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.util.ArrayList;public class Demo02MySink {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();ArrayList<String> arrList = new ArrayList<>();arrList.add("flink");arrList.add("flink");arrList.add("flink");arrList.add("flink");DataStreamSource<String> ds = env.fromCollection(arrList);ds.addSink(new MySinkFunction());env.execute();/*** 進(jìn)入了invoke方法* flink* 進(jìn)入了invoke方法* flink* 進(jìn)入了invoke方法* flink* 進(jìn)入了invoke方法* flink*/}
}class MySinkFunction implements SinkFunction<String>{@Overridepublic void invoke(String value, Context context) throws Exception {System.out.println("進(jìn)入了invoke方法");// invoke 每一條數(shù)據(jù)會執(zhí)行一次// 最終數(shù)據(jù)需要sink到哪里,就對value進(jìn)行處理即可System.out.println(value);}
}
7、Transformation:數(shù)據(jù)轉(zhuǎn)換的常用操作
1、Map
package com.shujia.flink.tf;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo01Map {public static void main(String[] args) throws Exception {// 傳入一條數(shù)據(jù)返回一條數(shù)據(jù)StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> ds = env.socketTextStream("master", 8888);// 1、使用匿名內(nèi)部類DataStream<Tuple2<String, Integer>> mapDS = ds.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String word) throws Exception {return Tuple2.of(word, 1);}});// mapDS.print();// 2、使用lambda表達(dá)式DataStream<Tuple2<String, Integer>> mapDS2 =ds.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));mapDS2.print();env.execute();}
}
2、FlatMap
package com.shujia.flink.tf;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class Demo02FlatMap {public static void main(String[] args) throws Exception {// 傳入一條數(shù)據(jù)返回多條數(shù)據(jù),類似UDTF函數(shù)StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> ds = env.socketTextStream("master", 8888);// 1、使用匿名內(nèi)部類SingleOutputStreamOperator<Tuple2<String, Integer>> flatMapDS01 = ds.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {for (String word : line.split(",")) {out.collect(Tuple2.of(word, 1));}}});flatMapDS01.print();// 2、使用lambda表達(dá)式SingleOutputStreamOperator<Tuple> flatMapDS02 = ds.flatMap((line, out) -> {for (String word : line.split(",")) {out.collect(Tuple2.of(word, 1));}}, Types.TUPLE(Types.STRING, Types.INT));flatMapDS02.print();env.execute();}
}
3、Filter
package com.shujia.flink.tf;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo03Filter {public static void main(String[] args) throws Exception {// 過濾數(shù)據(jù),注意返回值必須是布爾類型,返回true則保留數(shù)據(jù),返回false則過濾數(shù)據(jù)StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> ds = env.socketTextStream("master", 8888);/*** Integer.valueOf:該方法將字符串參數(shù)轉(zhuǎn)換為 Integer 對象。返回的是 Integer 類型,即 java.lang.Integer 的一個實例。* Integer.parseInt:該方法將字符串參數(shù)解析為基本數(shù)據(jù)類型 int 的值。返回的是 int 類型的值,而不是對象。* 無需指定返回值類型*/// 只輸出大于10的數(shù)字SingleOutputStreamOperator<String> filterDS = ds.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {return Integer.parseInt(value) > 10;}});filterDS.print();ds.filter(value -> Integer.parseInt(value) > 10).print();env.execute();}
}
4、KeyBy
// 兩種不同的簡寫方式
ds.keyBy(value -> value.toLowerCase(), Types.STRING).print();
ds.keyBy(String::toLowerCase, Types.STRING).print();
package com.shujia.flink.tf;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo04KeyBy {public static void main(String[] args) throws Exception {// 用于就數(shù)據(jù)流分組,讓相同的Key進(jìn)入到同一個任務(wù)中進(jìn)行處理,后續(xù)可以跟聚合操作StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> ds = env.socketTextStream("master", 8888);KeyedStream<String, String> keyByDS = ds.keyBy(new KeySelector<String, String>() {@Overridepublic String getKey(String value) throws Exception {return value;}});keyByDS.print();// 兩種不同的簡寫方式ds.keyBy(value -> value.toLowerCase(), Types.STRING).print();ds.keyBy(String::toLowerCase, Types.STRING).print();env.execute();}
}
5、Reduce
package com.shujia.flink.tf;import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo05Reduce {public static void main(String[] args) throws Exception {// 用于對KeyBy之后的數(shù)據(jù)流進(jìn)行聚合計算StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> ds = env.socketTextStream("master", 8888);// 統(tǒng)計班級的平均年齡/** 文科一班,20* 文科一班,22* 文科一班,21* 文科一班,20* 文科一班,22** 理科一班,20* 理科一班,21* 理科一班,20* 理科一班,21* 理科一班,20**/SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> kvDS = ds.map(line -> {String[] split = line.split(",");String clazz = split[0];int age = Integer.parseInt(split[1]);return Tuple3.of(clazz, age, 1);}, Types.TUPLE(Types.STRING, Types.INT, Types.INT));KeyedStream<Tuple3<String, Integer, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0, Types.STRING);keyByDS.reduce(new ReduceFunction<Tuple3<String, Integer, Integer>>() {@Overridepublic Tuple3<String, Integer, Integer> reduce(Tuple3<String, Integer, Integer> value1, Tuple3<String, Integer, Integer> value2) throws Exception {return Tuple3.of(value1.f0, value1.f1 + value2.f1, value1.f2 + value2.f2);}}).map(t3 -> Tuple2.of(t3.f0, (double) t3.f1 / t3.f2),Types.TUPLE(Types.STRING,Types.DOUBLE)).print();keyByDS.reduce((v1,v2)->Tuple3.of(v1.f0, v1.f1 + v2.f1, v1.f2 + v2.f2)).map(t3 -> Tuple2.of(t3.f0, (double) t3.f1 / t3.f2),Types.TUPLE(Types.STRING,Types.DOUBLE)).print();env.execute();}
}
6、Window
package com.shujia.flink.tf;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class Demo06Window {public static void main(String[] args) throws Exception {// Flink窗口操作:時間、計數(shù)、會話StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> ds = env.socketTextStream("master", 8888);SingleOutputStreamOperator<Tuple2<String, Integer>> kvDS = ds.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));// 每隔5s統(tǒng)計每個單詞的數(shù)量 ---> 滾動窗口實現(xiàn)(與spark中的定義相同)SingleOutputStreamOperator<Tuple2<String, Integer>> outputDS01 = kvDS// 按照Tuple2中的第一個元素進(jìn)行分組.keyBy(kv -> kv.f0, Types.STRING)// 設(shè)置滾動時間.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))// 對Tuple2中的第二個元素(索引為1的元素,即Integer類型)進(jìn)行求和.sum(1);// outputDS01.print();// 每隔5s統(tǒng)計最近10s內(nèi)的每個單詞的數(shù)量 ---> 滑動窗口實現(xiàn)(與spark中的定義相同)kvDS.keyBy(kv -> kv.f0, Types.STRING)// 設(shè)置窗口大小和滑動大小.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).sum(1).print();env.execute();}
}
7、Union
package com.shujia.flink.tf;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo07Union {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> ds01 = env.socketTextStream("master", 8888);DataStream<String> ds02 = env.socketTextStream("master", 9999);DataStream<String> unionDS = ds01.union(ds02);// union 就是將兩個相同結(jié)構(gòu)的DS合并成一個DS(上下合并)unionDS.print();env.execute();}
}
8、Process
通過processElement實現(xiàn)Map算子操作、flatMap算子操作(實現(xiàn)扁平化)、filter算子操作
package com.shujia.flink.tf;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;public class Demo08Process {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> ds01 = env.socketTextStream("master", 8888);ds01.process(new ProcessFunction<String, Object>() {/** 每進(jìn)來一條數(shù)據(jù)就會執(zhí)行一次* value :一條數(shù)據(jù)* ctx:可以獲取任務(wù)執(zhí)行時的信息* out:用于輸出數(shù)據(jù)* ProcessFunction<String, Object>.Context ctx:flink的上下文對象*/@Overridepublic void processElement(String value, ProcessFunction<String, Object>.Context ctx, Collector<Object> out) throws Exception {// 通過processElement實現(xiàn)Map算子操作out.collect(Tuple2.of(value, 1));// 通過processElement實現(xiàn)flatMap算子操作(實現(xiàn)扁平化)for (String word : value.split(",")) {out.collect(word);}// 通過processElement實現(xiàn)filter算子操作if("java".equals(value)){out.collect("java ok");}}}).print();env.execute();}
}
通過processElement實現(xiàn)KeyBy算子操作
package com.shujia.flink.tf;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;import java.util.HashMap;public class Demo09KeyByProcess {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> ds01 = env.socketTextStream("master", 8888);KeyedStream<Tuple2<String, Integer>, String> keyedDS = ds01.process(new ProcessFunction<String, Tuple2<String, Integer>>() {@Overridepublic void processElement(String value, ProcessFunction<String, Tuple2<String, Integer>>.Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {for (String word : value.split(",")) {out.collect(Tuple2.of(word, 1));}}}).keyBy(t2 -> t2.f0, Types.STRING);// 基于分組之后的數(shù)據(jù)流同樣可以調(diào)用process方法/*** KeyedProcessFunction<K, I, O>* Type parameters:* <K> – Type of the key. <I> – Type of the input elements. <O> – Type of the output elements.*/keyedDS.process(new KeyedProcessFunction<String, Tuple2<String, Integer>, String>() {HashMap<String, Integer> wordCntMap;// 當(dāng)KeyedProcessFunction構(gòu)建時只會執(zhí)行一次,這樣就避免了重復(fù)創(chuàng)建HashMap對象@Overridepublic void open(Configuration parameters) throws Exception {wordCntMap = new HashMap<String, Integer>();}// 每一條數(shù)據(jù)會執(zhí)行一次@Overridepublic void processElement(Tuple2<String, Integer> value, KeyedProcessFunction<String, Tuple2<String, Integer>, String>.Context ctx, Collector<String> out) throws Exception {// 通過process實現(xiàn)word count// 判斷word是不是第一次進(jìn)入,通過HashMap查找word是否有count值String word = value.f0;int cnt = 1;if (wordCntMap.containsKey(word)) {//get 在集合中通過value來獲取對應(yīng)的值int newCnt = wordCntMap.get(word) + 1;wordCntMap.put(word, newCnt);cnt = newCnt;} else {wordCntMap.put(word, 1);}out.collect(word + ":" + cnt);}}).print();env.execute();}
}
8、Flink并行度
如何設(shè)置并行度?
1、考慮吞吐量
有聚合操作的任務(wù):1w條/s 一個并行度
無聚合操作的任務(wù):10w條/s 一個并行度
2、考慮集群本身的資源
注:
Task的數(shù)量由并行度以及有無Shuffle一起決定(可在shuffle之前觀察是否有可合并的Task,可以來減少Task數(shù)量)
Task Slot數(shù)量 是由任務(wù)中最大的并行度決定
TaskManager的數(shù)量由配置文件中每個TaskManager設(shè)置的Slot數(shù)量及任務(wù)所需的Slot數(shù)量一起決定
FLink 并行度設(shè)置的幾種方式:
1、通過env設(shè)置,不推薦,如果需要調(diào)整并行度得修改代碼重新打包提交任務(wù)
2、每個算子可以單獨設(shè)置并行度,視實際情況決定,一般不常用
3、還可以在提交任務(wù)的時候指定并行度,最常用 比較推薦的方式
命令行:flink run 可以通過 -p 參數(shù)設(shè)置全局并行度
4、配置文件flink-conf.yaml中設(shè)置
web UI:填寫parallelism輸入框即可設(shè)置,優(yōu)先級:算子本身的設(shè)置 > env做的全局設(shè)置 > 提交任務(wù)時指定的 > 配置文件flink-conf.yaml
package com.shujia.flink.core;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo03Parallelism {public static void main(String[] args) throws Exception {/*** 如何設(shè)置并行度?* 1、考慮吞吐量* 有聚合操作的任務(wù):1w條/s 一個并行度* 無聚合操作的任務(wù):10w條/s 一個并行度* 2、考慮集群本身的資源** Task的數(shù)量由并行度以及有無Shuffle一起決定(可在shuffle之前觀察是否有可合并的Task,可以來減少Task數(shù)量)* Task Slot數(shù)量 是由任務(wù)中最大的并行度決定* TaskManager的數(shù)量由配置文件中每個TaskManager設(shè)置的Slot數(shù)量及任務(wù)所需的Slot數(shù)量一起決定**/// FLink 并行度設(shè)置的幾種方式StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 1、通過env設(shè)置,不推薦,如果需要調(diào)整并行度得修改代碼重新打包提交任務(wù)env.setParallelism(3);// socketTextStream的并行度為1,無法調(diào)整DataStreamSource<String> ds = env.socketTextStream("master", 8888);// 2、每個算子可以單獨設(shè)置并行度,視實際情況決定,一般不常用SingleOutputStreamOperator<Tuple2<String, Integer>> kvDS = ds.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT)).setParallelism(4);SingleOutputStreamOperator<Tuple2<String, Integer>> wordCntDS2P =kvDS.keyBy(kv -> kv.f0).sum(1).setParallelism(2);// 如果算子不設(shè)置并行度則以全局為準(zhǔn)wordCntDS2P.print();/*** 3、還可以在提交任務(wù)的時候指定并行度,最常用 比較推薦的方式* 命令行:flink run 可以通過 -p 參數(shù)設(shè)置全局并行度* * web UI:填寫parallelism輸入框即可設(shè)置,優(yōu)先級:算子本身的設(shè)置 > env做的全局設(shè)置 > 提交任務(wù)時指定的 > 配置文件flink-conf.yaml*/env.execute();}
}
上述代碼執(zhí)行如下:
9、事件時間
事件時間:指的是數(shù)據(jù)產(chǎn)生的時間或是數(shù)據(jù)發(fā)生的時間。它是數(shù)據(jù)本身所攜帶的時間信息,代表了事件真實發(fā)生的時間。在Flink中,事件時間通過數(shù)據(jù)元素自身帶有的時間戳來表示,這個時間戳具有業(yè)務(wù)含義,并與系統(tǒng)時間獨立。
1、案例一:基于事件事件的滾動窗口的實現(xiàn)
窗口的觸發(fā)條件:
1、水位線大于等于窗口的結(jié)束時間
2、窗口內(nèi)有數(shù)據(jù)
水位線:某個線程中所接收到的數(shù)據(jù)中最大的時間戳
水位線設(shè)置1: 單調(diào)遞增時間戳策略,不考慮數(shù)據(jù)亂序問題。所傳入數(shù)據(jù)的最大事件時間作為水位線
.<Tuple2<String, Long>>forMonotonousTimestamps()
水位線設(shè)置2 設(shè)置水位線前移,容忍5s的數(shù)據(jù)亂序到達(dá),本質(zhì)上將水位線前移5s,缺點:導(dǎo)致任務(wù)延時變大.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
package com.shujia.flink.core;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.time.Duration;public class Demo04EventTime {public static void main(String[] args) throws Exception {// 事件時間:數(shù)據(jù)本身自帶的時間StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 設(shè)置全局并行度env.setParallelism(1);/*數(shù)據(jù)格式:單詞,時間戳(很大的整數(shù),Long類型)a,1722233813000a,1722233814000a,1722233815000a,1722233816000a,1722233817000a,1722233818000a,1722233819000a,1722233820000a,1722233822000a,1722233827000*/DataStreamSource<String> wordTsDS = env.socketTextStream("master", 8888);SingleOutputStreamOperator<Tuple2<String, Long>> mapDS = wordTsDS.map(line -> Tuple2.of(line.split(",")[0], Long.parseLong(line.split(",")[1])), Types.TUPLE(Types.STRING, Types.LONG));// 指定數(shù)據(jù)的時間戳,告訴Flink,將其作為事件時間進(jìn)行處理SingleOutputStreamOperator<Tuple2<String, Long>> assDS = mapDS.assignTimestampsAndWatermarks(WatermarkStrategy// 水位線:某個線程中所接收到的數(shù)據(jù)中最大的時間戳
// //水位線設(shè)置1: 單調(diào)遞增時間戳策略,不考慮數(shù)據(jù)亂序問題。所傳入數(shù)據(jù)的最大事件時間作為水位線
// .<Tuple2<String, Long>>forMonotonousTimestamps()//TODO :水位線設(shè)置2 設(shè)置水位線前移,容忍5s的數(shù)據(jù)亂序到達(dá),本質(zhì)上將水位線前移5s,缺點:導(dǎo)致任務(wù)延時變大.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))// 指定事件時間,可以提取數(shù)據(jù)的某一部分作為事件時間.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> t2, long recordTimestamp) {return t2.f1;}}));// 不管是事件時間還是處理時間都需要搭配窗口操作一起使用assDS.map(kv -> Tuple2.of(kv.f0, 1), Types.TUPLE(Types.STRING, Types.INT)).keyBy(t2 -> t2.f0)/*** 窗口的觸發(fā)條件* 1、水位線大于等于窗口的結(jié)束時間* 2、窗口內(nèi)有數(shù)據(jù)*TumblingEventTimeWindows:滾動窗口*/.window(TumblingEventTimeWindows.of(Time.seconds(5))).sum(1).print();env.execute();}
}
2、案例二:自定義水平線策略
多并行度,map之后指定水位線生成策略
注:必須兩個線程中的水位線都超過了窗口的大小,才能觸發(fā)窗口的執(zhí)行
當(dāng)窗口滿足執(zhí)行條件:
1、所有線程的水位線都超過了窗口的結(jié)束時間 (依次每兩個不同編號的線程為一組,該組均超過)
2、窗口有數(shù)據(jù) 觸發(fā)一次process方法
package tfTest;import com.shujia.flink.event.MyEvent;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;public class Demo05WaterMarkStrategy {public static void main(String[] args) throws Exception {// 自定義水位線策略// 參考鏈接:https://blog.csdn.net/zznanyou/article/details/121666563StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<String> eventDS = env.socketTextStream("master", 8888);// 將每條數(shù)據(jù)變成MyEvent類型eventDS.map(new MapFunction<String, MyEvent>() {@Overridepublic MyEvent map(String value) throws Exception {String[] split = value.split(",");return new MyEvent(split[0],Long.parseLong(split[1]));}})// TODO 設(shè)置事件時間和自定義水平線策略.assignTimestampsAndWatermarks(new WatermarkStrategy<MyEvent>() {@Overridepublic TimestampAssigner<MyEvent> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new SerializableTimestampAssigner<MyEvent>() {@Overridepublic long extractTimestamp(MyEvent element, long recordTimestamp) {return element.getTs();}};}@Overridepublic WatermarkGenerator<MyEvent> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new MyMapWatermarkGenerator();}}).keyBy(my-> my.getWord()).window(TumblingEventTimeWindows.of(Time.seconds(5)))// 當(dāng)窗口滿足執(zhí)行條件:1、所有線程的水位線都超過了窗口的結(jié)束時間 2、窗口有數(shù)據(jù) 觸發(fā)一次process方法.process(new ProcessWindowFunction<MyEvent, String, String, TimeWindow>() {@Overridepublic void process(String s, ProcessWindowFunction<MyEvent, String, String, TimeWindow>.Context context, Iterable<MyEvent> elements, Collector<String> out) throws Exception {System.out.println("窗口觸發(fā)執(zhí)行了。");System.out.println("當(dāng)前水位線為:" + context.currentWatermark() + ",當(dāng)前窗口的開始時間:" + context.window().getStart() + ",當(dāng)前窗口的結(jié)束時間:" + context.window().getEnd());// 基于elements做統(tǒng)計 通過out可以將結(jié)果發(fā)送到下游}}).print();env.execute();}
}// 用于map之后指定水位線生成策略
class MyMapWatermarkGenerator implements WatermarkGenerator<MyEvent> {private final long maxOutOfOrderness = 0;private long currentMaxTimeStamp;//TODO 每來一條數(shù)據(jù)會處理一次,若maxOutOfOrderness為0,則為單調(diào)遞增時間戳策略;若不為0,則是水位線前移策略@Overridepublic void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {currentMaxTimeStamp = Math.max(currentMaxTimeStamp, eventTimestamp);System.out.println("當(dāng)前線程編號為:" + Thread.currentThread().getId() + ",當(dāng)前水位線為:" + (currentMaxTimeStamp - maxOutOfOrderness));}// 周期性的執(zhí)行:env.getConfig().getAutoWatermarkInterval(); 默認(rèn)是200ms@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 發(fā)送output.emitWatermark(new Watermark(currentMaxTimeStamp - maxOutOfOrderness));}
}
執(zhí)行結(jié)果:
多并行度,source之后設(shè)置水位線策略
效果通線程并行度為1的情況
package com.shujia.flink.core;import com.shujia.flink.event.MyEvent;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;public class Demo05WaterMarkStrategy {public static void main(String[] args) throws Exception {// 自定義水位線策略// 參考鏈接:https://blog.csdn.net/zznanyou/article/details/121666563StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<String> eventDS = env.socketTextStream("master", 8888);// 在Source之后就指定水位線策略eventDS.assignTimestampsAndWatermarks(new WatermarkStrategy<String>() {// 指定時間戳的提取策略@Overridepublic TimestampAssigner<String> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new SerializableTimestampAssigner<String>() {@Overridepublic long extractTimestamp(String element, long recordTimestamp) {return Long.parseLong(element.split(",")[1]);}};// 簡寫方式
// return (ele,ts)->Long.parseLong(ele.split(",")[1]);}// 指定水位線的策略@Overridepublic WatermarkGenerator<String> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new MyWatermarkGenerator();}})// 將數(shù)據(jù)變成KV格式,即:單詞,1.map(line -> Tuple2.of(line.split(",")[0], 1), Types.TUPLE(Types.STRING, Types.INT)).keyBy(t2 -> t2.f0).window(TumblingEventTimeWindows.of(Time.seconds(5)))// 當(dāng)窗口滿足執(zhí)行條件:1、水位線超過了窗口的結(jié)束時間 2、窗口有數(shù)據(jù) 觸發(fā)一次process方法.process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {@Overridepublic void process(String s, ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>.Context context, Iterable<Tuple2<String, Integer>> elements, Collector<Tuple2<String, Integer>> out) throws Exception {System.out.println("窗口觸發(fā)執(zhí)行了。");System.out.println("當(dāng)前水位線為:" + context.currentWatermark() + ",當(dāng)前窗口的開始時間:" + context.window().getStart() + ",當(dāng)前窗口的結(jié)束時間:" + context.window().getEnd());// 基于elements做統(tǒng)計 通過out可以將結(jié)果發(fā)送到下游}}).print();env.execute();}
}// 用于Source之后直接指定水位線生成策略
class MyWatermarkGenerator implements WatermarkGenerator<String> {private final long maxOutOfOrderness = 0;private long currentMaxTimeStamp;// 每來一條數(shù)據(jù)會處理一次@Overridepublic void onEvent(String event, long eventTimestamp, WatermarkOutput output) {currentMaxTimeStamp = Math.max(currentMaxTimeStamp, eventTimestamp);System.out.println("當(dāng)前線程編號為:" + Thread.currentThread().getId() + ",當(dāng)前水位線為:" + (currentMaxTimeStamp - maxOutOfOrderness));}// 周期性的執(zhí)行:env.getConfig().getAutoWatermarkInterval(); 默認(rèn)是200ms@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(currentMaxTimeStamp - maxOutOfOrderness));}
}
10、窗口
1、時間窗口:滾動與滑動窗口
時間窗口:滾動、滑動
時間類型:處理時間、事件時間
package com.shujia.flink.window;import com.shujia.flink.event.MyEvent;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.time.Duration;public class Demo01TimeWindow {public static void main(String[] args) throws Exception {/** 時間窗口:滾動、滑動* 時間類型:處理時間、事件時間*/StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<MyEvent> myDS = env.socketTextStream("master", 8888).map(new MapFunction<String, MyEvent>() {@Overridepublic MyEvent map(String value) throws Exception {String[] split = value.split(",");return new MyEvent(split[0], Long.parseLong(split[1]));}});// 基于處理時間的滾動、滑動窗口SingleOutputStreamOperator<Tuple2<String, Integer>> processDS = myDS.map(e -> Tuple2.of(e.getWord(), 1), Types.TUPLE(Types.STRING, Types.INT)).keyBy(t2 -> t2.f0)// 滾動窗口 每隔5s統(tǒng)計一次
// .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))// 滑動窗口 每隔5s統(tǒng)計最近10s內(nèi)的數(shù)據(jù).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).sum(1);// 基于事件時間的滾動、滑動窗口SingleOutputStreamOperator<MyEvent> assDS = myDS.assignTimestampsAndWatermarks(// 設(shè)置水位線策略、指定事件時間WatermarkStrategy// Duration.ofSeconds(5):水位線前移5s.<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event, ts) -> event.getTs()));SingleOutputStreamOperator<Tuple2<String, Integer>> eventDS = assDS.map(e -> Tuple2.of(e.getWord(), 1), Types.TUPLE(Types.STRING, Types.INT)).keyBy(t2 -> t2.f0)// 滾動窗口,由于水位線前移了5s,整體有5s的延時
// .window(TumblingEventTimeWindows.of(Time.seconds(5)))// 滑動窗口.window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5))).sum(1);// processDS.print();eventDS.print();env.execute();}
}
2、會話窗口
基于處理時間的會話窗口,當(dāng)一段時間沒有數(shù)據(jù),那么就認(rèn)定此次會話結(jié)束并觸發(fā)窗口的執(zhí)行
基于事件時間的會話窗口,連續(xù)接收的兩條數(shù)據(jù)的事件時間之差要大于5s(窗口大小),才能觸發(fā)窗口的執(zhí)行
package com.shujia.flink.window;import com.shujia.flink.event.MyEvent;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class Demo02Session {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<MyEvent> myDS = env.socketTextStream("master", 8888).map(new MapFunction<String, MyEvent>() {@Overridepublic MyEvent map(String value) throws Exception {String[] split = value.split(",");return new MyEvent(split[0], Long.parseLong(split[1]));}});// 基于處理時間的會話窗口,當(dāng)一段時間沒有數(shù)據(jù),那么就認(rèn)定此次會話結(jié)束并觸發(fā)窗口的執(zhí)行SingleOutputStreamOperator<Tuple2<String, Integer>> processSessionDS = myDS.map(e -> Tuple2.of(e.getWord(), 1), Types.TUPLE(Types.STRING, Types.INT)).keyBy(t2 -> t2.f0)// 10秒內(nèi)沒有數(shù)據(jù),則認(rèn)定此次會話結(jié)束并觸發(fā)窗口的執(zhí)行.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))).sum(1);//TODO 基于事件時間的會話窗口,連續(xù)接收的兩條數(shù)據(jù)的事件時間之差要大于5s(窗口大小),才能觸發(fā)窗口的執(zhí)行// 指定水位線策略并提供數(shù)據(jù)中的時間戳解析規(guī)則SingleOutputStreamOperator<MyEvent> assDS = myDS.assignTimestampsAndWatermarks(WatermarkStrategy.<MyEvent>forMonotonousTimestamps().withTimestampAssigner((e, ts) -> e.getTs()));SingleOutputStreamOperator<Tuple2<String, Integer>> eventSessionDS = assDS.map(e -> Tuple2.of(e.getWord(), 1), Types.TUPLE(Types.STRING, Types.INT)).keyBy(t2 -> t2.f0).window(EventTimeSessionWindows.withGap(Time.seconds(5))).sum(1);// processSessionDS.print();eventSessionDS.print();env.execute();}
}
3、計數(shù)窗口:滾動、滑動
滾動下:每同一個key的5條數(shù)據(jù)會統(tǒng)計一次
滑動下:每隔同一個key的5條數(shù)據(jù),統(tǒng)計最近的同一個key的10條數(shù)據(jù)
package com.shujia.flink.window;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo03CountWindow {public static void main(String[] args) throws Exception {// 計數(shù)窗口:滾動、滑動StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> ds = env.socketTextStream("master", 8888);ds.map(word-> Tuple2.of(word,1), Types.TUPLE(Types.STRING,Types.INT)).keyBy(t2->t2.f0)
// .countWindow(5) // 每同一個key的5條數(shù)據(jù)會統(tǒng)計一次.countWindow(10,5) // 每隔同一個key的5條數(shù)據(jù),統(tǒng)計最近的同一個key的10條數(shù)據(jù).sum(1).print();env.execute();/*** 每隔同一個key的5條數(shù)據(jù),統(tǒng)計最近的同一個key的10條數(shù)據(jù)* 輸入:* a* a* a* a* a* b* b* b* a* a* a* a* a* 輸出:* 13> (a,5)* 13> (a,10)*/}
}