中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

競價網(wǎng)站托管濟(jì)南seo優(yōu)化外包服務(wù)公司

競價網(wǎng)站托管,濟(jì)南seo優(yōu)化外包服務(wù)公司,做網(wǎng)站樹立品牌形象,網(wǎng)頁制作教程案例🚀 作者 :“大數(shù)據(jù)小禪” 🚀 文章簡介 :【Flink實(shí)戰(zhàn)】玩轉(zhuǎn)Flink里面核心的Source Operator實(shí)戰(zhàn) 🚀 歡迎小伙伴們 點(diǎn)贊👍、收藏?、留言💬 目錄導(dǎo)航 Flink 的API層級介紹Source Operator速覽Flin…

🚀 作者 :“大數(shù)據(jù)小禪”

🚀 文章簡介 :【Flink實(shí)戰(zhàn)】玩轉(zhuǎn)Flink里面核心的Source Operator實(shí)戰(zhàn)

🚀 歡迎小伙伴們 點(diǎn)贊👍、收藏?、留言💬


目錄導(dǎo)航

      • Flink 的API層級介紹Source Operator速覽
      • Flink 預(yù)定義的Source 數(shù)據(jù)源 案例實(shí)戰(zhàn)
      • Flink自定義的Source 數(shù)據(jù)源案例-訂單來源實(shí)戰(zhàn)

Flink 的API層級介紹Source Operator速覽

  • Flink的API層級 為流式/批式處理應(yīng)用程序的開發(fā)提供了不同級別的抽象

    • 第一層是最底層的抽象為有狀態(tài)實(shí)時流處理,抽象實(shí)現(xiàn)是 Process Function,用于底層處理

    • 第二層抽象是 Core APIs,許多應(yīng)用程序不需要使用到上述最底層抽象的 API,而是使用 Core APIs 進(jìn)行開發(fā)

      • 例如各種形式的用戶自定義轉(zhuǎn)換(transformations)、聯(lián)接(joins)、聚合(aggregations)、窗口(windows)和狀態(tài)(state)操作等,此層 API 中處理的數(shù)據(jù)類型在每種編程語言中都有其對應(yīng)的類。
    • 第三層抽象是 Table API。 是以表Table為中心的聲明式編程API,Table API 使用起來很簡潔但是表達(dá)能力差

      • 類似數(shù)據(jù)庫中關(guān)系模型中的操作,比如 select、project、join、group-by 和 aggregate 等
      • 允許用戶在編寫應(yīng)用程序時將 Table API 與 DataStream/DataSet API 混合使用
    • 第四層最頂層抽象是 SQL,這層程序表達(dá)式上都類似于 Table API,但是其程序?qū)崿F(xiàn)都是 SQL 查詢表達(dá)式

      • SQL 抽象與 Table API 抽象之間的關(guān)聯(lián)是非常緊密的
    • 注意:Table和SQL層變動多,還在持續(xù)發(fā)展中,大致知道即可,核心是第一和第二層
      在這里插入圖片描述

  • Flink編程模型

在這里插入圖片描述

  • Source來源

    • 元素集合

      • env.fromElements
      • env.fromColletion
      • env.fromSequence(start,end);
    • 文件/文件系統(tǒng)

      • env.readTextFile(本地文件);
      • env.readTextFile(HDFS文件);
    • 基于Socket

      • env.socketTextStream(“ip”, 8888)
    • 自定義Source,實(shí)現(xiàn)接口自定義數(shù)據(jù)源,rich相關(guān)的api更豐富

      • 并行度為1

        • SourceFunction
        • RichSourceFunction
      • 并行度大于1

        • ParallelSourceFunction
        • RichParallelSourceFunction
  • Connectors與第三方系統(tǒng)進(jìn)行對接(用于source或者sink都可以)

    • Flink本身提供Connector例如kafka、RabbitMQ、ES等
    • 注意:Flink程序打包一定要將相應(yīng)的connetor相關(guān)類打包進(jìn)去,不然就會失敗
  • Apache Bahir連接器

    • 里面也有kafka、RabbitMQ、ES的連接器更多
  • 總結(jié) 和外部系統(tǒng)進(jìn)行讀取寫入的

    • 第一種 Flink 里面預(yù)定義的 source 和 sink。
    • 第二種 Flink 內(nèi)部也提供部分 Boundled connectors。
    • 第三種是第三方 Apache Bahir 項(xiàng)目中的連接器。
    • 第四種是通過異步 IO 方式
      • 異步I/O是Flink提供的非常底層的與外部系統(tǒng)交互

Flink 預(yù)定義的Source 數(shù)據(jù)源 案例實(shí)戰(zhàn)

  • Source來源
    • 元素集合
      • env.fromElements
      • env.fromColletion
      • env.fromSequence(start,end);
 public static void main(String [] args) throws Exception {//構(gòu)建執(zhí)行任務(wù)環(huán)境以及任務(wù)的啟動的入口, 存儲全局相關(guān)的參數(shù)StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//相同類型元素的數(shù)據(jù)流 sourceDataStream<String> stringDS1 = env.fromElements("java,SpringBoot", "spring cloud,redis", "kafka,小滴課堂");stringDS1.print("stringDS1");DataStream<String> stringDS2 = env.fromCollection(Arrays.asList("微服務(wù)項(xiàng)目大課,java","alibabacloud,rabbitmq","hadoop,hbase"));stringDS2.print("stringDS2");DataStreamSource<Long> longDS3 = env.fromSequence(0,10);longDS3.print("longDS3");//DataStream需要調(diào)用execute,可以取個名稱env.execute("xdclass job");}
  • 文件/文件系統(tǒng)
    • env.readTextFile(本地文件);
    • env.readTextFile(HDFS文件);
public static void main(String [] args) throws Exception {//構(gòu)建執(zhí)行任務(wù)環(huán)境以及任務(wù)的啟動的入口, 存儲全局相關(guān)的參數(shù)StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> textDS = env.readTextFile("/Users/xdclass/Desktop/xdclass_access.log");//DataStream<String> textDS = env.readTextFile("hdfs://xdclass_node:8010/file/log/words.txt");textDS.print();env.execute("xdclass job");
}
  • 基于Socket
    • env.socketTextStream(“ip”, 8888)
   public static void main(String [] args) throws Exception {//構(gòu)建執(zhí)行任務(wù)環(huán)境以及任務(wù)的啟動的入口, 存儲全局相關(guān)的參數(shù)StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> stringDataStream = env.socketTextStream("127.0.0.1",8888);stringDataStream.print();env.execute(" job");
}

Flink自定義的Source 數(shù)據(jù)源案例-訂單來源實(shí)戰(zhàn)

  • 自定義Source,實(shí)現(xiàn)接口自定義數(shù)據(jù)源

    • 并行度為1

      • SourceFunction
      • RichSourceFunction
    • 并行度大于1

      • ParallelSourceFunction
      • RichParallelSourceFunction
    • Rich相關(guān)的api更豐富,多了Open、Close方法,用于初始化連接等

  • 創(chuàng)建接口

@Data
@AllArgsConstructor
@NoArgsConstructor
public class VideoOrder {private String tradeNo;private String title;private int money;private int userId;private Date createTime;}public class VideoOrderSource extends RichParallelSourceFunction<VideoOrder> {private volatile Boolean flag = true;private  Random random = new Random();private static List<String> list = new ArrayList<>();static {list.add("spring boot2.x課程");list.add("微服務(wù)SpringCloud課程");list.add("RabbitMQ消息隊列");list.add("Kafka課程");list.add("Flink流式技術(shù)課程");list.add("工業(yè)級微服務(wù)項(xiàng)目大課訓(xùn)練營");list.add("Linux課程");}@Overridepublic void run(SourceContext<VideoOrder> ctx) throws Exception {while (flag){Thread.sleep(1000);String id = UUID.randomUUID().toString();int userId = random.nextInt(10);int money = random.nextInt(100);int videoNum = random.nextInt(list.size());String title = list.get(videoNum);ctx.collect(new VideoOrder(id,title,money,userId,new Date()));}}/*** 取消任務(wù)*/@Overridepublic void cancel() {flag = false;}
}
  • 案例
public static void main(String [] args) throws Exception {//構(gòu)建執(zhí)行任務(wù)環(huán)境以及任務(wù)的啟動的入口, 存儲全局相關(guān)的參數(shù)StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<VideoOrder> videoOrderDataStream = env.addSource(new VideoOrderSource());videoOrderDataStream.print();//DataStream需要調(diào)用execute,可以取個名稱env.execute("custom source job");}

不斷產(chǎn)生很多訂單

在這里插入圖片描述

http://www.risenshineclean.com/news/1866.html

相關(guān)文章:

  • 大紅門做網(wǎng)站怎樣交換友情鏈接
  • 做網(wǎng)站用什么配資電腦千鋒教育介紹
  • 做網(wǎng)站 大文件百度怎么創(chuàng)建自己的網(wǎng)站
  • 煙臺高新區(qū)建設(shè)局網(wǎng)站網(wǎng)絡(luò)廣告策劃
  • 無障礙網(wǎng)站建設(shè)方案近期重大新聞事件
  • 上海公司做網(wǎng)站的友情鏈接是什么
  • 網(wǎng)站定制微安電力案例臨沂色度廣告有限公司
  • 中國住建部網(wǎng)站查詢網(wǎng)寧波企業(yè)seo服務(wù)
  • win7 iis默認(rèn)網(wǎng)站設(shè)置張北網(wǎng)站seo
  • 烏魯木齊網(wǎng)站制作百度廣告聯(lián)盟賺廣告費(fèi)
  • 做五金的有哪些外貿(mào)網(wǎng)站媒體軟文推廣平臺
  • 網(wǎng)站開發(fā)聯(lián)系方式百度公司有哪些部門
  • 用前端做的比較酷的網(wǎng)站2022適合小學(xué)生的簡短新聞?wù)?/a>
  • 凡科可以做返利網(wǎng)站嗎移動廣告平臺
  • 手機(jī)網(wǎng)站小程序華為手機(jī)業(yè)務(wù)最新消息
  • 中港建設(shè)集團(tuán)網(wǎng)站百度seo關(guān)鍵詞優(yōu)化電話
  • wordpress前臺注冊登陸網(wǎng)站優(yōu)化推廣排名
  • 網(wǎng)站制作公司哪兒濟(jì)南興田德潤有活動嗎微信軟文模板
  • 在哪些網(wǎng)站做推廣比較好百度搜索名字排名優(yōu)化
  • 做公益的網(wǎng)站有哪些淘寶店鋪運(yùn)營
  • axure怎么做響應(yīng)式網(wǎng)站優(yōu)化大師win10能用嗎
  • 用dreamwever做網(wǎng)站小程序
  • 昆明網(wǎng)站制作計劃威海網(wǎng)站制作
  • wordpress調(diào)用列表頁seo查詢官方網(wǎng)站
  • 建設(shè)網(wǎng)站安全性seo短視頻入口引流
  • 360做網(wǎng)站和推廣怎么樣seo點(diǎn)擊排名軟件營銷工具
  • wordpress是用什么開發(fā)的網(wǎng)站搜索優(yōu)化排名
  • 電商網(wǎng)站多少錢重慶seo
  • 青島外貿(mào)建設(shè)網(wǎng)站制作搜索排名提升
  • 吉安市城鄉(xiāng)規(guī)劃建設(shè)局網(wǎng)站網(wǎng)絡(luò)營銷與策劃