競價網(wǎng)站托管濟(jì)南seo優(yōu)化外包服務(wù)公司
🚀 作者 :“大數(shù)據(jù)小禪”
🚀 文章簡介 :【Flink實(shí)戰(zhàn)】玩轉(zhuǎn)Flink里面核心的Source Operator實(shí)戰(zhàn)
🚀 歡迎小伙伴們 點(diǎn)贊👍、收藏?、留言💬
目錄導(dǎo)航
- Flink 的API層級介紹Source Operator速覽
- Flink 預(yù)定義的Source 數(shù)據(jù)源 案例實(shí)戰(zhàn)
- Flink自定義的Source 數(shù)據(jù)源案例-訂單來源實(shí)戰(zhàn)
Flink 的API層級介紹Source Operator速覽
-
Flink的API層級 為流式/批式處理應(yīng)用程序的開發(fā)提供了不同級別的抽象
-
第一層是最底層的抽象為有狀態(tài)實(shí)時流處理,抽象實(shí)現(xiàn)是 Process Function,用于底層處理
-
第二層抽象是 Core APIs,許多應(yīng)用程序不需要使用到上述最底層抽象的 API,而是使用 Core APIs 進(jìn)行開發(fā)
- 例如各種形式的用戶自定義轉(zhuǎn)換(transformations)、聯(lián)接(joins)、聚合(aggregations)、窗口(windows)和狀態(tài)(state)操作等,此層 API 中處理的數(shù)據(jù)類型在每種編程語言中都有其對應(yīng)的類。
-
第三層抽象是 Table API。 是以表Table為中心的聲明式編程API,Table API 使用起來很簡潔但是表達(dá)能力差
- 類似數(shù)據(jù)庫中關(guān)系模型中的操作,比如 select、project、join、group-by 和 aggregate 等
- 允許用戶在編寫應(yīng)用程序時將 Table API 與 DataStream/DataSet API 混合使用
-
第四層最頂層抽象是 SQL,這層程序表達(dá)式上都類似于 Table API,但是其程序?qū)崿F(xiàn)都是 SQL 查詢表達(dá)式
- SQL 抽象與 Table API 抽象之間的關(guān)聯(lián)是非常緊密的
-
注意:Table和SQL層變動多,還在持續(xù)發(fā)展中,大致知道即可,核心是第一和第二層
-
-
Flink編程模型
-
Source來源
-
元素集合
- env.fromElements
- env.fromColletion
- env.fromSequence(start,end);
-
文件/文件系統(tǒng)
- env.readTextFile(本地文件);
- env.readTextFile(HDFS文件);
-
基于Socket
- env.socketTextStream(“ip”, 8888)
-
自定義Source,實(shí)現(xiàn)接口自定義數(shù)據(jù)源,rich相關(guān)的api更豐富
-
并行度為1
- SourceFunction
- RichSourceFunction
-
并行度大于1
- ParallelSourceFunction
- RichParallelSourceFunction
-
-
-
Connectors與第三方系統(tǒng)進(jìn)行對接(用于source或者sink都可以)
- Flink本身提供Connector例如kafka、RabbitMQ、ES等
- 注意:Flink程序打包一定要將相應(yīng)的connetor相關(guān)類打包進(jìn)去,不然就會失敗
-
Apache Bahir連接器
- 里面也有kafka、RabbitMQ、ES的連接器更多
-
總結(jié) 和外部系統(tǒng)進(jìn)行讀取寫入的
- 第一種 Flink 里面預(yù)定義的 source 和 sink。
- 第二種 Flink 內(nèi)部也提供部分 Boundled connectors。
- 第三種是第三方 Apache Bahir 項(xiàng)目中的連接器。
- 第四種是通過異步 IO 方式
- 異步I/O是Flink提供的非常底層的與外部系統(tǒng)交互
Flink 預(yù)定義的Source 數(shù)據(jù)源 案例實(shí)戰(zhàn)
- Source來源
- 元素集合
- env.fromElements
- env.fromColletion
- env.fromSequence(start,end);
- 元素集合
public static void main(String [] args) throws Exception {//構(gòu)建執(zhí)行任務(wù)環(huán)境以及任務(wù)的啟動的入口, 存儲全局相關(guān)的參數(shù)StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//相同類型元素的數(shù)據(jù)流 sourceDataStream<String> stringDS1 = env.fromElements("java,SpringBoot", "spring cloud,redis", "kafka,小滴課堂");stringDS1.print("stringDS1");DataStream<String> stringDS2 = env.fromCollection(Arrays.asList("微服務(wù)項(xiàng)目大課,java","alibabacloud,rabbitmq","hadoop,hbase"));stringDS2.print("stringDS2");DataStreamSource<Long> longDS3 = env.fromSequence(0,10);longDS3.print("longDS3");//DataStream需要調(diào)用execute,可以取個名稱env.execute("xdclass job");}
- 文件/文件系統(tǒng)
- env.readTextFile(本地文件);
- env.readTextFile(HDFS文件);
public static void main(String [] args) throws Exception {//構(gòu)建執(zhí)行任務(wù)環(huán)境以及任務(wù)的啟動的入口, 存儲全局相關(guān)的參數(shù)StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> textDS = env.readTextFile("/Users/xdclass/Desktop/xdclass_access.log");//DataStream<String> textDS = env.readTextFile("hdfs://xdclass_node:8010/file/log/words.txt");textDS.print();env.execute("xdclass job");
}
- 基于Socket
- env.socketTextStream(“ip”, 8888)
public static void main(String [] args) throws Exception {//構(gòu)建執(zhí)行任務(wù)環(huán)境以及任務(wù)的啟動的入口, 存儲全局相關(guān)的參數(shù)StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> stringDataStream = env.socketTextStream("127.0.0.1",8888);stringDataStream.print();env.execute(" job");
}
Flink自定義的Source 數(shù)據(jù)源案例-訂單來源實(shí)戰(zhàn)
-
自定義Source,實(shí)現(xiàn)接口自定義數(shù)據(jù)源
-
并行度為1
- SourceFunction
- RichSourceFunction
-
并行度大于1
- ParallelSourceFunction
- RichParallelSourceFunction
-
Rich相關(guān)的api更豐富,多了Open、Close方法,用于初始化連接等
-
-
創(chuàng)建接口
@Data
@AllArgsConstructor
@NoArgsConstructor
public class VideoOrder {private String tradeNo;private String title;private int money;private int userId;private Date createTime;}public class VideoOrderSource extends RichParallelSourceFunction<VideoOrder> {private volatile Boolean flag = true;private Random random = new Random();private static List<String> list = new ArrayList<>();static {list.add("spring boot2.x課程");list.add("微服務(wù)SpringCloud課程");list.add("RabbitMQ消息隊列");list.add("Kafka課程");list.add("Flink流式技術(shù)課程");list.add("工業(yè)級微服務(wù)項(xiàng)目大課訓(xùn)練營");list.add("Linux課程");}@Overridepublic void run(SourceContext<VideoOrder> ctx) throws Exception {while (flag){Thread.sleep(1000);String id = UUID.randomUUID().toString();int userId = random.nextInt(10);int money = random.nextInt(100);int videoNum = random.nextInt(list.size());String title = list.get(videoNum);ctx.collect(new VideoOrder(id,title,money,userId,new Date()));}}/*** 取消任務(wù)*/@Overridepublic void cancel() {flag = false;}
}
- 案例
public static void main(String [] args) throws Exception {//構(gòu)建執(zhí)行任務(wù)環(huán)境以及任務(wù)的啟動的入口, 存儲全局相關(guān)的參數(shù)StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<VideoOrder> videoOrderDataStream = env.addSource(new VideoOrderSource());videoOrderDataStream.print();//DataStream需要調(diào)用execute,可以取個名稱env.execute("custom source job");}
不斷產(chǎn)生很多訂單