中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當前位置: 首頁 > news >正文

佛山網(wǎng)站建設(shè)的首選免費網(wǎng)站外鏈推廣

佛山網(wǎng)站建設(shè)的首選,免費網(wǎng)站外鏈推廣,紀委網(wǎng)站舉報怎么做,程序外包學習目標:三棲合一架構(gòu)師 本文是《大數(shù)據(jù)Flink學習圣經(jīng)》 V1版本,是 《尼恩 大數(shù)據(jù) 面試寶典》姊妹篇。 這里特別說明一下:《尼恩 大數(shù)據(jù) 面試寶典》5個專題 PDF 自首次發(fā)布以來, 已經(jīng)匯集了 好幾百題,大量的大廠面試…

學習目標:三棲合一架構(gòu)師

本文是《大數(shù)據(jù)Flink學習圣經(jīng)》 V1版本,是 《尼恩 大數(shù)據(jù) 面試寶典》姊妹篇。

這里特別說明一下:《尼恩 大數(shù)據(jù) 面試寶典》5個專題 PDF 自首次發(fā)布以來, 已經(jīng)匯集了 好幾百題,大量的大廠面試干貨、正貨 。 《尼恩 大數(shù)據(jù) 面試寶典》面試題集合, 將變成大數(shù)據(jù)學習和面試的必讀書籍。

于是,尼恩架構(gòu)團隊 趁熱打鐵,推出 《大數(shù)據(jù)Flink學習圣經(jīng)》,《大數(shù)據(jù)HBASE學習圣經(jīng)》

《大數(shù)據(jù)Flink學習圣經(jīng)》 后面會不斷升級,不斷 迭代, 變成大數(shù)據(jù)領(lǐng)域 學習和面試的必讀書籍

最終,幫助大家成長為 三棲合一架構(gòu)師,進大廠,拿高薪。

《尼恩 架構(gòu)筆記》《尼恩高并發(fā)三部曲》《尼恩Java面試寶典》的PDF,請到公號【技術(shù)自由圈】取

“Java+大數(shù)據(jù)” 雙棲架構(gòu)成功案例

成功案例1:

驚天大逆襲:失業(yè)4個月,3年小伙1個月喜提架構(gòu)Offer,而且是大齡跨行,超級牛

成功案例2:

極速拿offer:阿里P6被裁后極速上岸,1個月內(nèi)喜提2優(yōu)質(zhì)offer(含滴滴)

文章目錄

    • 學習目標:三棲合一架構(gòu)師
    • “Java+大數(shù)據(jù)” 雙棲架構(gòu)成功案例
    • 1. 什么是Flink
      • 大數(shù)據(jù)
      • 分布式計算
      • 分布式存儲
        • 分布式存儲:
        • 分布式文件系統(tǒng):
      • 批處理和流處理
        • 批處理(Batch Processing):
        • 流處理(Stream Processing):
      • 開源大數(shù)據(jù)技術(shù)
        • Hadoop:
        • YARN(Yet Another Resource Negotiator):
        • Spark:
        • Flink:
    • 2. Flink 部署
      • Master-Worker架構(gòu):
        • 兼容性:
      • Standalone集群的特點:
      • Standalone集群的部署方式:
      • Docker部署flink簡單集群
        • 1、在服務器創(chuàng)建flink目錄
        • 2、docker-compose.yml腳本創(chuàng)建
        • 3、啟動flink
        • 4、瀏覽器上查看頁面dashboard
    • 3. Flink快速應用
      • 實操1: 單詞統(tǒng)計案例(批數(shù)據(jù))
        • 1.1 需求
        • 1.2 代碼實現(xiàn)
      • 實操2: 單詞統(tǒng)計案例(流數(shù)據(jù))
        • 2.1 需求
        • 2.2 代碼實現(xiàn)
      • Flink程序開發(fā)的流程總結(jié)
    • 4. Flink分布式架構(gòu)與核心組件
      • Flink作業(yè)提交過程
      • Flink核心組件
      • Flink組件棧
      • 作業(yè)執(zhí)行階段
    • 5. Flink 開發(fā)
      • 開發(fā)環(huán)境搭建
      • Flink連接器
        • Source
          • 基于本地集合的Source
          • 基于文件的Source
            • 讀取本地文件
            • 讀取HDFS文件數(shù)據(jù)
            • 讀取CSV文件數(shù)據(jù)
            • 讀取壓縮文件
            • 遍歷目錄
            • 讀取kafka
          • 自定義source
        • Sink
          • 基于本地集合的sink
          • 基于文件的sink
            • 將數(shù)據(jù)寫入本地文件
            • 將數(shù)據(jù)寫入HDFS
      • Flink API
        • map
        • flatMap
        • mapPartition
        • filter
        • reduce
        • reduceGroup
        • aggregate
        • distinct
        • join
        • union
        • connect
        • rebalance
        • hashPartition
        • sortPartition
      • 窗口
        • 時間概念
        • 窗口程序
        • 滾動窗口
        • 滑動窗口
        • 會話窗口
        • 基于數(shù)量窗口
      • 觸發(fā)器
      • 清除器
    • 6. Flink程序本地執(zhí)行和集群執(zhí)行
      • 6.1. 本地執(zhí)行
      • 6.2. 集群執(zhí)行
    • 7. Flink的廣播變量
    • 8. Flink的累加器
    • 9. Flink的分布式緩存
    • 說在后面
    • 作者介紹
    • 參考
    • 推薦閱讀

1. 什么是Flink

大數(shù)據(jù)

大數(shù)據(jù)(Big Data)是指規(guī)模龐大、結(jié)構(gòu)多樣且速度快速增長的數(shù)據(jù)集合。這些數(shù)據(jù)集合通常包含傳統(tǒng)數(shù)據(jù)庫管理系統(tǒng)無法有效處理的數(shù)據(jù),具有高度的復雜性和挑戰(zhàn)性。大數(shù)據(jù)的主要特點包括三個維度**:三V**,即Volume(數(shù)據(jù)量大)、Variety(數(shù)據(jù)多樣性)、Velocity(數(shù)據(jù)速度)。

  1. 數(shù)據(jù)量大(Volume):大數(shù)據(jù)的最明顯特征之一是其龐大的數(shù)據(jù)量。傳統(tǒng)的數(shù)據(jù)處理方法和工具在處理這種規(guī)模的數(shù)據(jù)時可能會變得低效或不可行。
  2. 數(shù)據(jù)多樣性(Variety):大數(shù)據(jù)不僅包括結(jié)構(gòu)化數(shù)據(jù)(如表格數(shù)據(jù)),還包括半結(jié)構(gòu)化數(shù)據(jù)(如JSON、XML)和非結(jié)構(gòu)化數(shù)據(jù)(如文本、圖像、音頻、視頻等)。這些數(shù)據(jù)可能來自不同的源頭和不同的格式。
  3. 數(shù)據(jù)速度快(Velocity):大數(shù)據(jù)往往以高速率產(chǎn)生、流動和累積。這要求數(shù)據(jù)處理系統(tǒng)能夠?qū)崟r或近實時地處理數(shù)據(jù),以便從中獲取有價值的信息。

分布式計算

隨著計算機技術(shù)的發(fā)展和數(shù)據(jù)規(guī)模的增大,單臺計算機的處理能力和存儲容量逐漸變得有限,無法滿足大數(shù)據(jù)處理的要求。為了應對這一挑戰(zhàn),分布式計算應運而生,它利用多臺計算機組成集群,將計算任務分割成多個子任務并在不同的計算節(jié)點上并行執(zhí)行,從而提高計算效率和處理能力。

分布式計算的核心思想是將大問題劃分為小問題,將任務分發(fā)給多個計算節(jié)點并行執(zhí)行,最后將結(jié)果合并得到最終的解。這種方式有效地解決了單臺計算機無法處理大規(guī)模數(shù)據(jù)和高并發(fā)計算的問題。同時,分布式計算還具有良好的可擴展性,可以根據(jù)數(shù)據(jù)量的增加靈活地擴展集群規(guī)模,以應對不斷增長的數(shù)據(jù)挑戰(zhàn)。

分布式計算的概念聽起來很高深,其背后的思想?yún)s十分樸素,即分而治之,又稱為分治法(Divide and Conquer)。分治法是一種解決問題的算法設(shè)計策略,它將一個問題分解成多個相同或相似的子問題,然后分別解決這些子問題,最后將子問題的解合并起來得到原問題的解。分治法常用于解決復雜問題,尤其是在大數(shù)據(jù)處理中,可以將大規(guī)模的數(shù)據(jù)集合分割成更小的部分,然后分別處理這些部分,最后合并結(jié)果。

在處理大數(shù)據(jù)問題時,可以使用分治法的思想來提高效率和可擴展性,以下是一些應用分治法處理大數(shù)據(jù)問題的示例:

  1. MapReduce 模式:分治法的經(jīng)典應用是 MapReduce 模式,它將大規(guī)模的數(shù)據(jù)集合分為多個小塊,每個小塊由不同的計算節(jié)點進行處理,然后將結(jié)果合并。這種方法適用于批處理任務,如數(shù)據(jù)清洗、轉(zhuǎn)換、聚合等。

  2. 并行計算:將大規(guī)模的計算任務分解成多個小任務,分配給不同的計算節(jié)點并行處理,最后合并結(jié)果。這適用于需要大量計算的問題,如數(shù)值模擬、圖算法等。

  3. 分布式排序:將大規(guī)模數(shù)據(jù)集合分割成多個部分,每個部分在不同的計算節(jié)點上進行排序,然后使用合并排序算法將這些有序部分合并為整體有序的數(shù)據(jù)集合。

  4. 分區(qū)和分片:在分布式存儲系統(tǒng)中,可以將數(shù)據(jù)分區(qū)和分片存儲在不同的節(jié)點上,通過分區(qū)鍵或哈希函數(shù)將數(shù)據(jù)分配到不同的存儲節(jié)點上,從而實現(xiàn)數(shù)據(jù)的分布式存儲和管理。

  5. 分布式機器學習:將大規(guī)模的機器學習任務分解成多個子任務,在分布式計算環(huán)境中分別進行訓練,然后合并模型參數(shù),如分布式隨機梯度下降算法。

  6. 數(shù)據(jù)分割和合并:對于需要頻繁訪問的大數(shù)據(jù)集合,可以將數(shù)據(jù)分割成多個小塊,每個小塊存儲在不同的存儲節(jié)點上,然后根據(jù)需要進行合并,以減少數(shù)據(jù)訪問的開銷。

分治法在大數(shù)據(jù)處理中的應用不僅有助于提高處理效率,還可以充分利用分布式計算和存儲資源,從而更好地應對大數(shù)據(jù)量和復雜性。然而,在應用分治法時需要考慮合適的數(shù)據(jù)分割策略、任務調(diào)度、結(jié)果合并等問題,以確保分治法的正確性和性能。

然而,分布式計算也帶來了一些挑戰(zhàn),如數(shù)據(jù)一致性、通信開銷、任務調(diào)度等問題,需要綜合考慮各種因素來設(shè)計和優(yōu)化分布式系統(tǒng)。同時,分布式計算也需要開發(fā)者具備分布式系統(tǒng)設(shè)計和調(diào)優(yōu)的知識和技能,以確保系統(tǒng)的性能和穩(wěn)定性。

分布式存儲

當數(shù)據(jù)量巨大且單機存儲已無法滿足需求時,分布式存儲和分布式文件系統(tǒng)成為處理大數(shù)據(jù)的關(guān)鍵技術(shù)。下面我會詳細介紹分布式存儲和分布式文件系統(tǒng)的概念、特點和常見的實現(xiàn)。

分布式存儲:

分布式存儲是將數(shù)據(jù)分散存儲在多個節(jié)點上,以提供高容量、高性能、高可靠性和可擴展性的數(shù)據(jù)存儲解決方案。每個節(jié)點都可以通過網(wǎng)絡訪問數(shù)據(jù),并且多個節(jié)點協(xié)同工作來處理數(shù)據(jù)請求。分布式存儲的核心目標是解決單機存儲的瓶頸,同時提供高可靠性和可用性。

分布式存儲的特點包括:

  • 橫向擴展性:可以通過增加節(jié)點來擴展存儲容量和性能,適應不斷增長的數(shù)據(jù)量和負載。
  • 高可靠性和容錯性:數(shù)據(jù)在多個節(jié)點上冗余存儲,當某個節(jié)點出現(xiàn)故障時,數(shù)據(jù)依然可用,不會丟失。
  • 數(shù)據(jù)分布和復制:數(shù)據(jù)按照一定策略分布在不同節(jié)點上,數(shù)據(jù)的復制確保了數(shù)據(jù)的可用性和容錯性。
  • 并發(fā)訪問和高性能:支持多個客戶端同時訪問數(shù)據(jù),實現(xiàn)高并發(fā)和更好的性能。
  • 靈活的數(shù)據(jù)模型:支持多種數(shù)據(jù)類型和訪問方式,如文件系統(tǒng)、對象存儲、鍵值存儲等。

分布式文件系統(tǒng):

分布式文件系統(tǒng)是一種特殊類型的分布式存儲,主要用于存儲和管理文件數(shù)據(jù)。它提供了類似于傳統(tǒng)單機文件系統(tǒng)的接口,但是在底層實現(xiàn)上,數(shù)據(jù)被分散存儲在多個節(jié)點上。分布式文件系統(tǒng)能夠自動處理數(shù)據(jù)的分布、復制、一致性和故障恢復等問題。

常見的分布式文件系統(tǒng)特點包括:

  • 命名空間和路徑:分布式文件系統(tǒng)通過路徑來訪問文件,類似于傳統(tǒng)文件系統(tǒng)的目錄結(jié)構(gòu)。
  • 數(shù)據(jù)分布和復制:文件被切分成塊并分散存儲在多個節(jié)點上,同時進行數(shù)據(jù)復制以實現(xiàn)冗余和高可用性。
  • 一致性和數(shù)據(jù)一致性模型:分布式文件系統(tǒng)需要保證數(shù)據(jù)的一致性,不同節(jié)點上的數(shù)據(jù)副本需要保持同步。
  • 訪問控制和權(quán)限管理:提供用戶和應用程序訪問控制和權(quán)限管理功能,確保數(shù)據(jù)安全性。
  • 高性能:分布式文件系統(tǒng)通常優(yōu)化了數(shù)據(jù)的讀寫性能,以滿足大數(shù)據(jù)場景的需求。
  • 擴展性:可以通過增加節(jié)點來擴展存儲容量和性能。

常見的分布式文件系統(tǒng)包括:

  • Hadoop HDFS(Hadoop Distributed File System):Hadoop生態(tài)系統(tǒng)中的分布式文件系統(tǒng),適用于大數(shù)據(jù)存儲。
  • Ceph:開源的分布式存儲系統(tǒng),提供塊存儲、文件系統(tǒng)和對象存儲。
  • GlusterFS:開源的分布式文件系統(tǒng),可以線性擴展存儲容量和性能。

總之,分布式存儲和分布式文件系統(tǒng)在大數(shù)據(jù)時代扮演著重要角色,幫助我們存儲、管理和訪問海量的數(shù)據(jù),解決了傳統(tǒng)單機存儲無法應對的挑戰(zhàn)。

批處理和流處理

批處理和流處理是大數(shù)據(jù)處理領(lǐng)域中常見的兩種數(shù)據(jù)處理模式,用于不同類型的數(shù)據(jù)處理需求。下面將詳細介紹這兩種模式,并給出相關(guān)的應用場景示例。

批處理(Batch Processing):

批處理是指將一批數(shù)據(jù)集合在一起,在一個固定的時間間隔內(nèi)對這批數(shù)據(jù)進行處理和分析。批處理通常適用于數(shù)據(jù)量較大、處理周期較長、要求高一致性的場景。

特點:

  • 數(shù)據(jù)被集中處理,適合周期性分析和報告生成。
  • 數(shù)據(jù)被切分成小塊,每個小塊在一個作業(yè)中被處理。
  • 數(shù)據(jù)處理時間較長,不適合實時性要求高的場景。

應用場景示例:

  1. 離線數(shù)據(jù)分析:對歷史數(shù)據(jù)進行分析,從中發(fā)現(xiàn)趨勢、模式和規(guī)律,用于業(yè)務決策。例如,銷售數(shù)據(jù)分析、用戶行為分析。
  2. 批量推薦系統(tǒng):基于用戶歷史行為數(shù)據(jù),定期生成推薦結(jié)果。例如,電影推薦、商品推薦。
  3. 數(shù)據(jù)清洗和預處理:對大規(guī)模數(shù)據(jù)進行清洗、過濾和預處理,提高數(shù)據(jù)質(zhì)量和可用性。例如,清理無效數(shù)據(jù)、填充缺失值。
  4. 大規(guī)模ETL(Extract, Transform, Load):將數(shù)據(jù)從源系統(tǒng)中抽取出來,經(jīng)過轉(zhuǎn)換和加工后加載到目標系統(tǒng)。例如,數(shù)據(jù)倉庫的構(gòu)建。

流處理(Stream Processing):

流處理是指在數(shù)據(jù)生成的時候立即進行處理,實現(xiàn)數(shù)據(jù)的實時處理和分析。流處理通常適用于數(shù)據(jù)實時性要求高、需要快速響應的場景。

特點:

  • 數(shù)據(jù)是實時流動的,需要快速處理和響應。
  • 數(shù)據(jù)是持續(xù)不斷地到達,需要實時計算和分析。
  • 可能會遇到延遲和數(shù)據(jù)亂序等問題。

應用場景示例:

  1. 實時監(jiān)控和告警:對實時數(shù)據(jù)進行監(jiān)控和分析,及時發(fā)現(xiàn)異常并觸發(fā)告警。例如,網(wǎng)絡流量監(jiān)控、系統(tǒng)性能監(jiān)控。
  2. 實時數(shù)據(jù)分析:對流式數(shù)據(jù)進行實時分析,從中提取有價值的信息。例如,實時點擊流分析、實時市場行情分析。
  3. 實時推薦系統(tǒng):基于用戶實時行為數(shù)據(jù),實時生成推薦結(jié)果。例如,新聞推薦、廣告推薦。
  4. 實時數(shù)據(jù)倉庫:構(gòu)建實時數(shù)據(jù)倉庫,將實時數(shù)據(jù)集成、加工和分析。例如,實時銷售數(shù)據(jù)分析、實時用戶行為分析。

總之,批處理和流處理分別適用于不同類型的數(shù)據(jù)處理需求,根據(jù)業(yè)務需求和實時性要求選擇合適的處理模式。

開源大數(shù)據(jù)技術(shù)

當談論大數(shù)據(jù)處理時,Hadoop、YARN、Spark和Flink都是重要的技術(shù)。它們都屬于大數(shù)據(jù)領(lǐng)域的分布式計算框架,但在功能和使用方式上有所不同。

Hadoop:

Hadoop是一個開源的分布式存儲和計算框架,最初由Apache開發(fā),用于處理大規(guī)模數(shù)據(jù)集。Hadoop的核心組件包括:

  1. Hadoop Distributed File System(HDFS):HDFS是一種分布式文件系統(tǒng),用于存儲大規(guī)模數(shù)據(jù)。它將數(shù)據(jù)分成多個塊,并將這些塊分散存儲在集群中的不同節(jié)點上。HDFS支持高可靠性、冗余存儲和數(shù)據(jù)復制。

  2. MapReduce:MapReduce是Hadoop的計算模型,用于處理分布式數(shù)據(jù)。它將計算任務分成Map和Reduce兩個階段,分布在集群中的節(jié)點上并行執(zhí)行。Map階段負責數(shù)據(jù)的拆分和處理,Reduce階段負責數(shù)據(jù)的匯總和計算。

YARN(Yet Another Resource Negotiator):

YARN是Hadoop的資源管理器,它負責集群資源的管理和分配。YARN將集群資源劃分為容器(Containers),并分配給不同的應用程序。這種資源的隔離和管理允許多個應用程序同時在同一個Hadoop集群上運行,從而提高了資源利用率和集群的多租戶能力。

Spark:

Apache Spark是一個通用的分布式計算引擎,旨在提供高性能、易用性和多功能性。與傳統(tǒng)的Hadoop MapReduce相比,Spark具有更快的執(zhí)行速度,因為它將數(shù)據(jù)加載到內(nèi)存中并進行內(nèi)存計算。Spark支持多種計算模式,包括批處理、交互式查詢、流處理和機器學習。

Spark的主要特點和組件包括:

  1. RDD(Resilient Distributed Dataset):RDD是Spark的核心數(shù)據(jù)抽象,表示分布式的數(shù)據(jù)集。RDD支持并行操作和容錯性,可以在計算過程中重新計算丟失的分區(qū)。

  2. Spark SQL:Spark SQL是用于處理結(jié)構(gòu)化數(shù)據(jù)的組件,支持SQL查詢和操作。它能夠?qū)DD和傳統(tǒng)的數(shù)據(jù)源(如Hive)無縫集成。

  3. Spark Streaming:Spark Streaming是用于處理實時流數(shù)據(jù)的模塊,支持微批處理模式。它能夠?qū)崟r數(shù)據(jù)流分割成小批次并進行處理。

  4. MLlib:MLlib是Spark的機器學習庫,提供了常見的機器學習算法和工具,用于訓練和評估模型。

  5. GraphX:GraphX是Spark的圖計算庫,用于處理圖數(shù)據(jù)和圖算法。

Flink:

Apache Flink是一個流式處理引擎和分布式批處理框架,具有低延遲、高吞吐量和容錯性。Flink支持流批一體化,能夠?qū)崿F(xiàn)實時流處理和批處理作業(yè)的無縫切換。它的核心特點包括:

  1. DataStream API:Flink的DataStream API用于處理實時流數(shù)據(jù),支持事件時間處理、窗口操作和狀態(tài)管理。它能夠處理高吞吐量的實時數(shù)據(jù)流。
  2. DataSet API:Flink的DataSet API用于批處理作業(yè),類似于Hadoop的MapReduce。它支持豐富的操作符和優(yōu)化技術(shù)。
  3. Stateful Stream Processing:Flink支持有狀態(tài)的流式處理,可以在處理過程中保存和管理狀態(tài)。這對于實現(xiàn)復雜的數(shù)據(jù)處理邏輯很有用。
  4. Event Time Processing:Flink支持事件時間處理,能夠處理亂序事件并準確計算窗口操作的結(jié)果。
  5. Table API和SQL:Flink提供了Table API和SQL查詢,使開發(fā)人員可以使用類似SQL的語法來查詢和分析數(shù)據(jù)。
  6. 可以連接大數(shù)據(jù)生態(tài)圈各類組件,包括Kafka、Elasticsearch、JDBC、HDFS和Amazon S3
  7. 可以運行在Kubernetes、YARN、Mesos和獨立(Standalone)集群上。

Flink在流處理上的幾個主要優(yōu)勢如下:

  1. 真正的流計算引擎:Flink具有更好的streaming計算模型,可以進行非常高效的狀態(tài)運算和窗口操作。Spark Streaming仍然是微批處理引擎。

  2. 更低延遲:Flink可以實現(xiàn)毫秒級的低延遲處理,而Spark Streaming延遲較高。

  3. 更好的容錯機制:Flink支持更細粒度的狀態(tài)管理和檢查點機制,可以實現(xiàn)精確一次的狀態(tài)一致性語義。Spark較難做到確保exactly once。

  4. 支持有限數(shù)據(jù)流和無限數(shù)據(jù)流:Flink可處理有開始和結(jié)束的有限數(shù)據(jù)流,也能處理無限不斷增長的數(shù)據(jù)流。Spark Streaming更適合有限數(shù)據(jù)集。

  5. 更易統(tǒng)一批處理和流處理:Flink提供了DataStream和DataSet API,可以輕松統(tǒng)一批處理和流處理。Spark需要聯(lián)合Spark SQL使用。

  6. 更優(yōu)秀的內(nèi)存管理:Flink具有自己的內(nèi)存管理,可以根據(jù)不同查詢優(yōu)化內(nèi)存使用。Spark依賴Hadoop YARN進行資源調(diào)度。

  7. 更高性能:在部分場景下,Flink擁有比Spark Streaming更高的吞吐和低的延遲。

總體來說,Flink作為新一代流處理引擎,在延遲、容錯、易用性方面優(yōu)于Spark Streaming。但Spark生態(tài)更加完善,也在努力減小與Flink的差距。需要根據(jù)具體場景選擇最優(yōu)的框架。

總的來說,Flink在流處理領(lǐng)域的優(yōu)勢主要體現(xiàn)在事件時間處理、低延遲、精確一次語義和狀態(tài)管理等方面。這些特性使得Flink在處理實時流數(shù)據(jù)時能夠更好地滿足復雜的業(yè)務需求,特別是對于需要高準確性和可靠性的應用場景。

2. Flink 部署

Apache Flink在1.7版本中進行了重大的架構(gòu)重構(gòu),引入了Master-Worker架構(gòu),這使得Flink能夠更好地適應不同的集群基礎(chǔ)設(shè)施,包括Standalone、Hadoop YARN和Kubernetes等。下面會詳細介紹一下Flink 1.7版本引入的Master-Worker架構(gòu)以及其在不同集群基礎(chǔ)設(shè)施中的適應性。

Master-Worker架構(gòu):

Flink 1.7版本中引入的Master-Worker架構(gòu)是為了解決之前版本中存在的一些問題,如資源管理、高可用性等。在這個架構(gòu)中,Flink將任務管理和資源管理分離,引入了JobManager和ResourceManager兩個主要角色。

  • JobManager:負責接受和調(diào)度任務,維護任務的狀態(tài)和元數(shù)據(jù)信息,還負責處理容錯機制。JobManager分為兩種:JobManager(高可用模式)和StandaloneJobManager(非高可用模式)。

  • ResourceManager:負責管理集群中的資源,包括分配任務的資源、維護資源池等。

這種架構(gòu)的優(yōu)勢在于解耦任務的管理和資源的管理,使得Flink能夠更好地適應不同的集群環(huán)境和基礎(chǔ)設(shè)施。

兼容性:

Flink的Master-Worker架構(gòu)設(shè)計使其能夠兼容幾乎所有主流信息系統(tǒng)的基礎(chǔ)設(shè)施,包括:

  • Standalone集群:在Standalone模式下,Flink的JobManager和ResourceManager都運行在同一個進程中,適用于簡單的開發(fā)和測試場景。

  • Hadoop YARN集群:Flink可以部署在現(xiàn)有的Hadoop YARN集群上,通過ResourceManager與YARN ResourceManager進行交互,實現(xiàn)資源管理。

  • Kubernetes集群:Flink還支持在Kubernetes集群中部署,通過Kubernetes提供的資源管理能力來管理任務和資源。

這種兼容性使得Flink可以靈活地在不同的集群環(huán)境中運行,滿足不同場景下的需求。

總之,Flink在1.7版本中引入的Master-Worker架構(gòu)使其在資源管理、高可用性等方面有了更好的表現(xiàn),同時也使得Flink能夠更好地適應各種不同的集群基礎(chǔ)設(shè)施,包括Standalone、Hadoop YARN和Kubernetes等。這為Flink的部署和使用帶來了更多的靈活性和選擇性。

Standalone集群是Apache Flink中一種簡單的部署模式,適用于開發(fā)、測試和小規(guī)模應用場景。下面我將詳細介紹Standalone集群的特點以及部署方式。

Standalone集群的特點:

  1. 簡單部署:Standalone集群是Flink的最簡單部署模式之一,不需要依賴其他集群管理工具,可以在單個機器上部署。

  2. 資源共享:Standalone集群中的JobManager和TaskManager共享同一份資源,例如內(nèi)存和CPU。這使得資源管理相對簡單,但也可能在資源競爭時影響任務的性能。

  3. 適用于開發(fā)和測試:Standalone集群適用于開發(fā)和測試階段,可以在本地機器上模擬Flink集群環(huán)境,方便開發(fā)人員進行調(diào)試和測試。

  4. 不支持高可用性:Standalone集群默認情況下不支持高可用性,即不具備故障恢復和任務遷移的能力。如果需要高可用性,可以通過運行多個JobManager實例來實現(xiàn)。

Standalone集群的部署方式:

  1. 安裝Flink:首先,需要下載并安裝Flink??梢詮墓俜骄W(wǎng)站下載預編譯的二進制文件,解壓到指定目錄。也可以從以下網(wǎng)站下載:

    apache-flink安裝包下載_開源鏡像站-阿里云 (aliyun.com)

  2. 配置Flink:進入Flink的安裝目錄,修改conf/flink-conf.yaml配置文件。主要配置項包括jobmanager.rpc.addresstaskmanager.numberOfTaskSlots等。

  3. 啟動JobManager:打開終端,進入Flink安裝目錄,執(zhí)行以下命令啟動JobManager:

    ./bin/start-cluster.sh
    
  4. 啟動TaskManager:打開終端,進入Flink安裝目錄,執(zhí)行以下命令啟動TaskManager:

    ./bin/taskmanager.sh start
    
  5. 提交作業(yè):使用Flink客戶端工具提交作業(yè)??梢允褂靡韵旅钐峤籎AR文件中的作業(yè):

    ./bin/flink run -c your.main.Class ./path/to/your.jar
    
  6. 停止集群:可以使用以下命令停止整個Standalone集群:

    ./bin/stop-cluster.sh
    

總之,Standalone集群是一個簡單且易于部署的Flink集群模式,適用于開發(fā)、測試和小規(guī)模應用場景。然而,由于其資源共享和不支持高可用性的特點,不適合部署在生產(chǎn)環(huán)境中。

下面提供利用Docker部署flink standalone簡單集群。

Docker部署flink簡單集群

Flink程序可以作為集群內(nèi)的分布式系統(tǒng)運行,也可以以獨立模式或在YARN、Mesos、基于Docker的環(huán)境和其他資源管理框架下進行部署。

1、在服務器創(chuàng)建flink目錄

mkdir  flink

目錄的結(jié)構(gòu)如下:

2、docker-compose.yml腳本創(chuàng)建

docker 容器的編排文件,具體如下

3、啟動flink

(1)后臺運行

一般推薦生產(chǎn)環(huán)境下使用該選項。

docker-compose up -d

(2)前臺運行

控制臺將會同時打印所有容器的輸出信息,可以很方便進行調(diào)試。

docker-compose up

4、瀏覽器上查看頁面dashboard

訪問web界面

http://cdh1:8081/

3. Flink快速應用

? 通過一個單詞統(tǒng)計的案例,快速上手應用Flink,進行流處理(Streaming)和批處理(Batch)

實操1: 單詞統(tǒng)計案例(批數(shù)據(jù))

1.1 需求

統(tǒng)計一個文件中各個單詞出現(xiàn)的次數(shù),把統(tǒng)計結(jié)果輸出到文件

步驟:
1、讀取數(shù)據(jù)源
2、處理數(shù)據(jù)源

a、將讀到的數(shù)據(jù)源文件中的每一行根據(jù)空格切分

b、將切分好的每個單詞拼接1

c、根據(jù)單詞聚合(將相同的單詞放在一起)

d、累加相同的單詞(單詞后面的1進行累加)

3、保存處理結(jié)果

1.2 代碼實現(xiàn)

  • 引入依賴
<!--flink核心包-->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.7.2</version>
</dependency>
<!--flink流處理包-->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.7.2</version><scope>provided</scope>
</dependency>	
  • Java程序
package com.crazymaker.bigdata.wordcount.batch;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;/*** 1、讀取數(shù)據(jù)源* 2、處理數(shù)據(jù)源*  a、將讀到的數(shù)據(jù)源文件中的每一行根據(jù)空格切分*  b、將切分好的每個單詞拼接1*  c、根據(jù)單詞聚合(將相同的單詞放在一起)*  d、累加相同的單詞(單詞后面的1進行累加)* 3、保存處理結(jié)果*/
public class WordCountJavaBatch {public static void main(String[] args) throws Exception {String inputPath="D:\\data\\input\\hello.txt";String outputPath="D:\\data\\output\\hello.txt";//獲取flink的運行環(huán)境ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();DataSet<String> text = executionEnvironment.readTextFile(inputPath);FlatMapOperator<String, Tuple2<String, Integer>> wordOndOnes = text.flatMap(new SplitClz());//0代表第1個元素UnsortedGrouping<Tuple2<String, Integer>> groupedWordAndOne = wordOndOnes.groupBy(0);//1代表第1個元素AggregateOperator<Tuple2<String, Integer>> out = groupedWordAndOne.sum(1);out.writeAsCsv(outputPath, "\n", " ").setParallelism(1);//設(shè)置并行度executionEnvironment.execute();//人為調(diào)用執(zhí)行方法}static class SplitClz implements FlatMapFunction<String,Tuple2<String,Integer>>{public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] s1 = s.split(" ");for (String word:s1) {collector.collect(new Tuple2<String,Integer>(word,1));//發(fā)送到下游}}}
}

源文件的內(nèi)容

統(tǒng)計的結(jié)果

實操2: 單詞統(tǒng)計案例(流數(shù)據(jù))

nc

netcat:

flink開發(fā)時候,經(jīng)常用socket作為source;使用linux/mac環(huán)境開發(fā),可以在終端中開啟 nc -l 9000(開啟netcat程序,作為服務端,發(fā)送數(shù)據(jù));

nc是netcat的縮寫,有著網(wǎng)絡界的瑞士軍刀美譽。因為它短小精悍、功能實用,被設(shè)計為一個簡單、可靠的網(wǎng)絡工具。

nc作用

  • 數(shù)據(jù)傳輸
  • 文件傳輸
  • 機器之間網(wǎng)絡測速

2.1 需求

Socket模擬實時發(fā)送單詞,

使用Flink實時接收數(shù)據(jù),對指定時間窗口內(nèi)(如5s)的數(shù)據(jù)進行聚合統(tǒng)計,每隔1s匯總計算一次,并且把時間窗口內(nèi)計算結(jié)果打印出來。

2.2 代碼實現(xiàn)

package com.crazymaker.bigdata.wordcount.stream;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;/*** 	Socket模擬實時發(fā)送單詞,使用Flink實時接收數(shù)據(jù)*/
public class WordCountStream {public static void main(String[] args) throws Exception {// 監(jiān)聽的ip和端口號,以main參數(shù)形式傳入,約定第一個參數(shù)為ip,第二個參數(shù)為端口
//        String ip = args[0];String ip = "127.0.0.1";
//        int port = Integer.parseInt(args[1]);int port = 9000;// 獲取Flink流執(zhí)行環(huán)境StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();// 獲取socket輸入數(shù)據(jù)DataStreamSource<String> textStream = streamExecutionEnvironment.socketTextStream(ip, port, "\n");SingleOutputStreamOperator<Tuple2<String, Long>> tuple2SingleOutputStreamOperator = textStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {public void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception {String[] splits = s.split("\\s");for (String word : splits) {collector.collect(Tuple2.of(word, 1l));}}});SingleOutputStreamOperator<Tuple2<String, Long>> word = tuple2SingleOutputStreamOperator.keyBy(0).sum(1);// 打印數(shù)據(jù)word.print();// 觸發(fā)任務執(zhí)行streamExecutionEnvironment.execute("wordcount stream process");}
}

Flink程序開發(fā)的流程總結(jié)

Flink程序開發(fā)的流程總結(jié)如下:

1)獲得一個執(zhí)行環(huán)境

2)加載/創(chuàng)建初始化數(shù)據(jù)

3)指定數(shù)據(jù)操作的算子

4)指定結(jié)果數(shù)據(jù)存放位置

5)調(diào)用execute()觸發(fā)執(zhí)行程序

注意:Flink程序是延遲計算的,只有最后調(diào)用execute()方法的時候才會真正觸發(fā)執(zhí)行程序

4. Flink分布式架構(gòu)與核心組件

Flink作業(yè)提交過程

standalone模式下的作業(yè)提交過程如下:

在一個作業(yè)提交前,Master和TaskManager等進程需要先被啟動。

我們可以在Flink主目錄中執(zhí)行腳本來啟動這些進程:

bin/start-cluster.sh。

Master和TaskManager被啟動后,TaskManager需要將自己注冊給Master中的ResourceManager。

這個初始化和資源注冊過程發(fā)生在單個作業(yè)提交前,我們稱之為第0步。

接下來,我們將逐步解析Flink作業(yè)的提交過程,具體步驟如下:

① 用戶編寫應用程序代碼,并使用Flink客戶端(Client)提交該作業(yè)。通常,這些程序會使用Java或Scala語言編寫,并調(diào)用Flink API 構(gòu)建出邏輯視圖。這些代碼以及相關(guān)配置文件被編譯并打包,然后被提交至Master節(jié)點的Dispatcher,形成一個應用作業(yè)(Application)。

② Dispatcher接收到提交的作業(yè)后,會啟動一個JobManager,該JobManager負責協(xié)調(diào)這個作業(yè)的各項任務。

③ JobManager向ResourceManager申請所需的作業(yè)資源,這些資源可能包括CPU、內(nèi)存等。

④ 由于在前面的步驟中,TaskManager已經(jīng)向ResourceManager注冊了可供使用的資源,這時處于空閑狀態(tài)的TaskManager將被分配給JobManager。

⑤ JobManager將用戶作業(yè)中的邏輯視圖轉(zhuǎn)化為物理執(zhí)行圖,如圖3-3所示,該圖顯示了作業(yè)被并行化后的執(zhí)行過程。JobManager將計算任務分配并部署到多個TaskManager上。此時,一個Flink作業(yè)正式開始執(zhí)行。

在計算任務執(zhí)行過程中,TaskManager可能會與其他TaskManager交換數(shù)據(jù),使用特定的數(shù)據(jù)交換策略。同時,TaskManager還會將任務的狀態(tài)信息傳遞給JobManager,這些狀態(tài)信息包括任務的啟動、執(zhí)行和終止狀態(tài),以及快照的元數(shù)據(jù)等。

Flink核心組件

在這個作業(yè)提交流程的基礎(chǔ)上,我們可以更詳細地介紹涉及的各個組件的功能和角色:

  1. Client(客戶端):用戶通常使用Flink提供的客戶端工具(如位于Flink主目錄下的bin目錄中的命令行工具)來提交作業(yè)??蛻舳藭τ脩籼峤坏腇link作業(yè)進行預處理,并將作業(yè)提交到Flink集群中。在提交作業(yè)時,客戶端需要配置一些必要的參數(shù),例如使用Standalone集群還是YARN集群等。整個作業(yè)會被打包成一個JAR文件,DataStream API會被轉(zhuǎn)換成一個JobGraph,該圖類似于邏輯視圖(如圖3-2所示)。

  2. Dispatcher(調(diào)度器):Dispatcher可以接收多個作業(yè),每次接收作業(yè)時,會為該作業(yè)分配一個JobManager。Dispatcher通過提供表述性狀態(tài)轉(zhuǎn)移(REST)式的接口,使用超文本傳輸協(xié)議(HTTP)來對外提供服務。

  3. JobManager(作業(yè)管理器):JobManager是單個Flink作業(yè)的協(xié)調(diào)者。每個作業(yè)都有一個對應的JobManager負責管理。JobManager將客戶端提交的JobGraph轉(zhuǎn)化為ExecutionGraph,該圖類似于并行物理執(zhí)行圖(如圖3-3所示)。JobManager會向ResourceManager申請所需的資源。一旦獲取足夠的資源,JobManager會將ExecutionGraph及其計算任務分發(fā)到多個TaskManager上。此外,JobManager還管理多個TaskManager,包括收集作業(yè)狀態(tài)信息、生成檢查點、必要時進行故障恢復等。

  4. ResourceManager(資源管理器):Flink可以在Standalone、YARN、Kubernetes等環(huán)境中部署,而不同環(huán)境對計算資源的管理模式有所不同。為了解決資源分配問題,Flink引入了ResourceManager模塊。在Flink中,計算資源的基本單位是TaskManager上的任務槽位(Slot)。ResourceManager的主要職責是從資源提供方(如YARN)獲取計算資源。當JobManager需要計算資源時,ResourceManager會將空閑的Slot分配給JobManager。在計算任務結(jié)束后,ResourceManager會回收這些空閑Slot。

  5. TaskManager(任務管理器):TaskManager是實際執(zhí)行計算任務的節(jié)點。一般來說,一個Flink作業(yè)會分布在多個TaskManager上執(zhí)行,每個TaskManager提供一定數(shù)量的Slot。當一個TaskManager啟動后,相關(guān)的Slot信息會被注冊到ResourceManager中。當Flink作業(yè)提交后,ResourceManager會將空閑的Slot分配給JobManager。一旦JobManager獲取了空閑Slot,它會將具體的計算任務部署到這些Slot上,并在這些Slot上執(zhí)行。在執(zhí)行過程中,TaskManager可能需要與其他TaskManager進行數(shù)據(jù)交換,因此需要進行必要的數(shù)據(jù)通信。總之,TaskManager負責具體計算任務的執(zhí)行,它會在啟動時將Slot資源向ResourceManager注冊。

Flink組件棧

  1. 部署層

    • Local模式:Flink支持本地模式,包括單節(jié)點(SingleNode)和單虛擬機(SingleJVM)模式。在SingleNode模式中,JobManager和TaskManager運行在同一個節(jié)點上;在SingleJVM模式中,所有角色都在同一個JVM中運行。
    • Cluster模式:Flink可以部署在Standalone、YARN、Mesos和Kubernetes集群上。Standalone集群需要配置JobManager和TaskManager的節(jié)點,然后通過Flink提供的腳本啟動。YARN、Mesos和Kubernetes集群提供了更強大的資源管理和集群擴展能力。
    • Cloud模式:Flink還可以部署在各大云平臺上,如AWS、谷歌云和阿里云,使用戶能夠在云環(huán)境中靈活地部署和運行作業(yè)。
  2. 運行時層

    • 運行時層是Flink的核心組件,支持分布式執(zhí)行和處理。該層負責將用戶提交的作業(yè)轉(zhuǎn)化為任務,并分發(fā)到相應的JobManager和TaskManager上執(zhí)行。運行時層還涵蓋了檢查點和故障恢復機制,確保作業(yè)的容錯性和穩(wěn)定性。
  3. API層

    • Flink的API層提供了DataStream API和DataSet API,分別用于流式處理和批處理。這兩個API允許開發(fā)者使用各種操作符和轉(zhuǎn)換來處理數(shù)據(jù),包括轉(zhuǎn)換、連接、聚合、窗口等計算任務。
  4. 上層工具

    • 在API層之上,Flink提供了一些工具來擴展其功能:
      • 復雜事件處理(CEP):面向流處理的庫,用于檢測和處理復雜的事件模式。
      • 圖計算庫(Gelly):面向批處理的圖計算庫,用于執(zhí)行圖算法。
      • Table API和SQL:針對SQL用戶和關(guān)系型數(shù)據(jù)處理場景的接口,允許使用SQL語法和表操作處理流和批數(shù)據(jù)。
      • PyFlink:針對Python用戶的接口,使其能夠使用Flink進行數(shù)據(jù)處理,目前主要基于Table API。

綜上所述,Flink在不同層次上提供了豐富的組件和工具,支持流式處理和批處理,以及與不同環(huán)境(本地、集群、云)的無縫集成,使開發(fā)者能夠靈活地構(gòu)建和部署大規(guī)模數(shù)據(jù)處理應用程序。

作業(yè)執(zhí)行階段

在Apache Flink中,數(shù)據(jù)流作業(yè)的執(zhí)行過程可以劃分為多個階段,從邏輯視圖到物理執(zhí)行圖的轉(zhuǎn)換。這個過程包括從StreamGraph到JobGraph,再到ExecutionGraph,最終映射到實際的物理執(zhí)行圖。下面詳細說明這個過程:

  1. StreamGraph(邏輯視圖):StreamGraph是用戶編寫的流處理應用程序的邏輯表示。它包含了數(shù)據(jù)流的轉(zhuǎn)換操作、算子之間的關(guān)系、事件時間處理策略、容錯配置等。StreamGraph是用戶定義的數(shù)據(jù)流拓撲,是一種高級抽象,用戶可以通過DataStream API構(gòu)建StreamGraph。

  2. JobGraph(作業(yè)圖):JobGraph是從StreamGraph派生而來的,表示一個具體的作業(yè)執(zhí)行計劃。在JobGraph中,StreamGraph中的邏輯算子被映射為具體的物理算子,且有明確的執(zhí)行順序和任務間的依賴關(guān)系。JobGraph還包含了資源配置、任務并行度、優(yōu)化選項等信息。JobGraph是從邏輯視圖轉(zhuǎn)向物理執(zhí)行的關(guān)鍵步驟。

  3. ExecutionGraph(執(zhí)行圖):ExecutionGraph是JobGraph的執(zhí)行時表示,它是實際執(zhí)行計劃的核心。在ExecutionGraph中,JobGraph中的每個任務都會被映射到一個具體的執(zhí)行任務,每個任務可以包含一個或多個子任務,這些子任務被映射到不同的TaskManager上。ExecutionGraph還負責維護作業(yè)的執(zhí)行狀態(tài),以及任務之間的調(diào)度和通信。

  4. 物理執(zhí)行圖:ExecutionGraph被映射到實際的物理執(zhí)行圖,即在TaskManager集群上真正執(zhí)行的任務拓撲。物理執(zhí)行圖包括了任務的并行執(zhí)行、數(shù)據(jù)交換、任務狀態(tài)管理等細節(jié),它是作業(yè)在分布式環(huán)境中實際運行的體現(xiàn)。

總結(jié)起來,StreamGraph到JobGraph到ExecutionGraph的轉(zhuǎn)換是Flink作業(yè)執(zhí)行計劃的關(guān)鍵步驟。從邏輯視圖到物理執(zhí)行圖的轉(zhuǎn)換過程考慮了作業(yè)的拓撲結(jié)構(gòu)、資源分配、任務調(diào)度等方面的問題,確保了作業(yè)可以在分布式環(huán)境中高效執(zhí)行。這一系列轉(zhuǎn)換過程使得用戶可以通過高層次的抽象來描述作業(yè)邏輯,而Flink框架會負責將其轉(zhuǎn)化為可執(zhí)行的任務圖,實現(xiàn)數(shù)據(jù)流的處理和計算。

5. Flink 開發(fā)

Flink 應用程序結(jié)構(gòu)主要包含三部分,Source/Transformation/Sink,如下圖所示:

代碼程序結(jié)構(gòu)

Source: 數(shù)據(jù)源,Flink 在流處理和批處理上的 source 大概有 4 類:

  • 基于本地集合的 source
  • 基于文件的 source
  • 基于網(wǎng)絡套接字的 source
  • 自定義的 source。自定義的 source 常見的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,當然你也可以定義自己的 source。

Transformation:數(shù)據(jù)轉(zhuǎn)換的各種操作,有 Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select / Project 等,操作很多,可以將數(shù)據(jù)轉(zhuǎn)換計算成你想要的數(shù)據(jù)。

Sink:接收器,Flink 將轉(zhuǎn)換計算后的數(shù)據(jù)發(fā)送的地點 ,你可能需要存儲下來,Flink 常見的 Sink 大概有如下幾類:

  • 寫入文件
  • 打印輸出
  • 寫入 socket
  • 自定義的 sink 。自定義的 sink 常見的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也可以定義自己的 Sink。

開發(fā)環(huán)境搭建

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.lagou</groupId><artifactId>flinkdemo</artifactId><version>1.0-SNAPSHOT</version><dependencies><!--flink核心包--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.7.2</version></dependency><!--flink流處理包--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.7.2</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-hadoop-compatibility_2.12</artifactId><version>1.7.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.7.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>2.7.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.7.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>1.7.2</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.73</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.12</artifactId><version>1.7.2</version></dependency></dependencies><build><plugins><!-- 打jar插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins></build></project>

Flink連接器

在實際生產(chǎn)環(huán)境中,數(shù)據(jù)通常分布在各種不同的系統(tǒng)中,包括文件系統(tǒng)、數(shù)據(jù)庫、消息隊列等。Flink作為一個大數(shù)據(jù)處理框架,需要與這些外部系統(tǒng)進行數(shù)據(jù)交互,以實現(xiàn)數(shù)據(jù)的輸入、處理和輸出。在Flink中,Source和Sink是兩個關(guān)鍵模塊,它們扮演著與外部系統(tǒng)進行數(shù)據(jù)連接和交互的重要角色,被統(tǒng)稱為外部連接器(Connector)。

  1. Source(數(shù)據(jù)源):Source是Flink作業(yè)的輸入模塊,用于從外部系統(tǒng)中讀取數(shù)據(jù)并將其轉(zhuǎn)化為Flink的數(shù)據(jù)流。Source負責實現(xiàn)與不同數(shù)據(jù)源的交互邏輯,將外部數(shù)據(jù)源的數(shù)據(jù)逐條或批量讀取到Flink的數(shù)據(jù)流中,以便后續(xù)的數(shù)據(jù)處理。常見的Source包括從文件中讀取數(shù)據(jù)、從消息隊列(如Kafka、RabbitMQ)中消費數(shù)據(jù)、從數(shù)據(jù)庫中讀取數(shù)據(jù)等。

  2. Sink(數(shù)據(jù)接收器):Sink是Flink作業(yè)的輸出模塊,用于將Flink計算的結(jié)果輸出到外部系統(tǒng)中。Sink負責實現(xiàn)將Flink數(shù)據(jù)流中的數(shù)據(jù)寫入到外部數(shù)據(jù)源,以便后續(xù)的持久化存儲、展示或其他處理。Sink的實現(xiàn)需要考慮數(shù)據(jù)的可靠性、一致性以及可能的事務性要求。常見的Sink包括將數(shù)據(jù)寫入文件、將數(shù)據(jù)寫入數(shù)據(jù)庫、將數(shù)據(jù)寫入消息隊列等。

外部連接器在Flink中的作用非常關(guān)鍵,它們使得Flink作業(yè)可以與各種不同類型的數(shù)據(jù)源和數(shù)據(jù)目的地進行交互,實現(xiàn)了數(shù)據(jù)的流入和流出。這種靈活的連接機制使得Flink在處理大數(shù)據(jù)時能夠更好地集成已有的系統(tǒng)和數(shù)據(jù),實現(xiàn)復雜的數(shù)據(jù)流處理和分析任務。

Source

Flink在批處理中常見的source主要有兩大類。

  • 基于本地集合的source(Collection-based-source)
  • 基于文件的source(File-based-source)
基于本地集合的Source

在Flink中最常見的創(chuàng)建本地集合的DataSet方式有三種。

  1. 使用env.fromElements(),這種方式也支持Tuple,自定義對象等復合形式。
  2. 使用env.fromCollection(),這種方式支持多種Collection的具體類型。
  3. 使用env.generateSequence(),這種方法創(chuàng)建基于Sequence的DataSet。

使用方式如下:

package com.demo.broad;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.DataSet;import java.util.ArrayList;
import java.util.List;
import java.util.ArrayDeque;
import java.util.Stack;
import java.util.stream.Stream;public class BatchFromCollection {public static void main(String[] args) throws Exception {// 獲取flink執(zhí)行環(huán)境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 0.用element創(chuàng)建DataSet(fromElements)DataSet<String> ds0 = env.fromElements("spark", "flink");ds0.print();// 1.用Tuple創(chuàng)建DataSet(fromElements)DataSet<Tuple2<Integer, String>> ds1 = env.fromElements(new Tuple2<>(1, "spark"),new Tuple2<>(2, "flink"));ds1.print();// 2.用Array創(chuàng)建DataSetDataSet<String> ds2 = env.fromCollection(new ArrayList<String>() {{add("spark");add("flink");}});ds2.print();// 3.用ArrayDeque創(chuàng)建DataSetDataSet<String> ds3 = env.fromCollection(new ArrayDeque<String>() {{add("spark");add("flink");}});ds3.print();// 4.用List創(chuàng)建DataSetDataSet<String> ds4 = env.fromCollection(new ArrayList<String>() {{add("spark");add("flink");}});ds4.print();// 5.用ArrayList創(chuàng)建DataSetDataSet<String> ds5 = env.fromCollection(new ArrayList<String>() {{add("spark");add("flink");}});ds5.print();// 6.用List創(chuàng)建DataSetDataSet<String> ds6 = env.fromCollection(new ArrayList<String>() {{add("spark");add("flink");}});ds6.print();// 7.用List創(chuàng)建DataSetDataSet<String> ds7 = env.fromCollection(new ArrayList<String>() {{add("spark");add("flink");}});ds7.print();// 8.用Stack創(chuàng)建DataSetDataSet<String> ds8 = env.fromCollection(new Stack<String>() {{add("spark");add("flink");}});ds8.print();// 9.用Stream創(chuàng)建DataSet(Stream相當于lazy List,避免在中間過程中生成不必要的集合)DataSet<String> ds9 = env.fromCollection(Stream.of("spark", "flink"));ds9.print();// 10.用List創(chuàng)建DataSetDataSet<String> ds10 = env.fromCollection(new ArrayList<String>() {{add("spark");add("flink");}});ds10.print();// 11.用HashSet創(chuàng)建DataSetDataSet<String> ds11 = env.fromCollection(new HashSet<String>() {{add("spark");add("flink");}});ds11.print();// 12.用Iterable創(chuàng)建DataSetDataSet<String> ds12 = env.fromCollection(new ArrayList<String>() {{add("spark");add("flink");}});ds12.print();// 13.用ArrayList創(chuàng)建DataSetDataSet<String> ds13 = env.fromCollection(new ArrayList<String>() {{add("spark");add("flink");}});ds13.print();// 14.用Stack創(chuàng)建DataSetDataSet<String> ds14 = env.fromCollection(new Stack<String>() {{add("spark");add("flink");}});ds14.print();// 15.用HashMap創(chuàng)建DataSetDataSet<Tuple2<Integer, String>> ds15 = env.fromCollection(new HashMap<Integer, String>() {{put(1, "spark");put(2, "flink");}}.entrySet());ds15.print();// 16.用Range創(chuàng)建DataSetDataSet<Integer> ds16 = env.fromCollection(IntStream.rangeClosed(1, 8).boxed().collect(Collectors.toList()));ds16.print();// 17.用generateSequence創(chuàng)建DataSetDataSet<Long> ds17 = env.generateSequence(1, 9);ds17.print();}
}
基于文件的Source

Flink支持直接從外部文件存儲系統(tǒng)中讀取文件的方式來創(chuàng)建Source數(shù)據(jù)源,Flink支持的方式有以下幾種:

  1. 讀取本地文件數(shù)據(jù)
  2. 讀取HDFS文件數(shù)據(jù)
  3. 讀取CSV文件數(shù)據(jù)
  4. 讀取壓縮文件
  5. 遍歷目錄

下面分別介紹每個數(shù)據(jù)源的加載方式:

讀取本地文件
package com.demo.batch;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;public class BatchFromFile {public static void main(String[] args) throws Exception {// 使用readTextFile讀取本地文件// 初始化環(huán)境ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();// 加載數(shù)據(jù)DataSet<String> datas = environment.readTextFile("data.txt");// 觸發(fā)程序執(zhí)行datas.print();}
}
讀取HDFS文件數(shù)據(jù)
package com.demo.batch;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;public class BatchFromFile {public static void main(String[] args) throws Exception {// 使用readTextFile讀取本地文件// 初始化環(huán)境ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();// 加載數(shù)據(jù)DataSet<String> datas = environment.readTextFile("hdfs://node01:8020/README.txt");// 觸發(fā)程序執(zhí)行datas.print();}
}
讀取CSV文件數(shù)據(jù)
package com.demo.batch;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.common.functions.MapFunction;public class BatchFromCsvFile {public static void main(String[] args) throws Exception {// 初始化環(huán)境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 用于映射CSV文件的POJO classpublic static class Student {public int id;public String name;public Student() {}public Student(int id, String name) {this.id = id;this.name = name;}@Overridepublic String toString() {return "Student(" + id + ", " + name + ")";}}// 讀取CSV文件DataSet<Student> csvDataSet = env.readCsvFile("./data/input/student.csv").ignoreFirstLine().pojoType(Student.class, "id", "name");csvDataSet.print();}
}
讀取壓縮文件

對于以下壓縮類型,不需要指定任何額外的inputformat方法,flink可以自動識別并且解壓。但是,壓縮文件可能不會并行讀取,可能是順序讀取的,這樣可能會影響作業(yè)的可伸縮性。

壓縮格式擴展名并行化
DEFLATE.deflateno
GZIP.gz .gzipno
Bzip2.bz2no
XZ.xzno
package com.demo.batch;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;public class BatchFromCompressFile {public static void main(String[] args) throws Exception {// 初始化環(huán)境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 加載數(shù)據(jù)DataSet<String> result = env.readTextFile("D:\\BaiduNetdiskDownload\\hbase-1.3.1-bin.tar.gz");// 觸發(fā)程序執(zhí)行result.print();}
}
遍歷目錄

flink支持對一個文件目錄內(nèi)的所有文件,包括所有子目錄中的所有文件的遍歷訪問方式。

對于從文件中讀取數(shù)據(jù),當讀取數(shù)個文件夾的時候,嵌套的文件默認是不會被讀取的,只會讀取第一個文件,其他的都會被忽略。所以我們需要使用recursive.file.enumeration進行遞歸讀取

package com.demo.batch;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;public class BatchFromCompressFile {public static void main(String[] args) throws Exception {// 初始化環(huán)境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 加載數(shù)據(jù)DataSet<String> result = env.readTextFile("D:\\BaiduNetdiskDownload\\hbase-1.3.1-bin.tar.gz");// 觸發(fā)程序執(zhí)行result.print();}
}
讀取kafka
public class StreamFromKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers","teacher2:9092");FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>("mytopic2", new SimpleStringSchema(), properties);DataStreamSource<String> data = env.addSource(consumer);SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = data.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {for (String word : s.split(" ")) {collector.collect(Tuple2.of(word, 1));}}});SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(0).sum(1);result.print();env.execute();}
}
自定義source
private static class SimpleSource  
implements SourceFunction<Tuple2<String, Integer>> { private int offset = 0; private boolean isRunning = true; @Override public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception { while (isRunning) { Thread.sleep(500); ctx.collect(new Tuple2<>("" + offset, offset)); offset++; if (offset == 1000) { isRunning = false; } } } @Override public void cancel() { isRunning = false; } 
} 

自定義Source,從0開始計數(shù),將數(shù)字發(fā)送到下游在主邏輯中調(diào)用這個Source。

DataStream<Tuple2<String, Integer>> countStream = env.addSource(new SimpleSource()); 

Sink

flink在批處理中常見的sink

  • 基于本地集合的sink(Collection-based-sink)
  • 基于文件的sink(File-based-sink)
基于本地集合的sink

目標:

基于下列數(shù)據(jù),分別進行打印輸出,error輸出,collect()

(19, "zhangsan", 178.8),
(17, "lisi", 168.8),
(18, "wangwu", 184.8),
(21, "zhaoliu", 164.8)

代碼:

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;public class BatchSinkCollection {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();List<Tuple3<Integer, String, Double>> stuData = new ArrayList<>();stuData.add(new Tuple3<>(19, "zhangsan", 178.8));stuData.add(new Tuple3<>(17, "lisi", 168.8));stuData.add(new Tuple3<>(18, "wangwu", 184.8));stuData.add(new Tuple3<>(21, "zhaoliu", 164.8));DataSet<Tuple3<Integer, String, Double>> stu = env.fromCollection(stuData);stu.print();stu.printToErr();stu.collect().forEach(System.out::println);env.execute();}
}
基于文件的sink
  • flink支持多種存儲設(shè)備上的文件,包括本地文件,hdfs文件等。
  • flink支持多種文件的存儲格式,包括text文件,CSV文件等。
  • writeAsText():TextOuputFormat - 將元素作為字符串寫入行。字符串是通過調(diào)用每個元素的toString()方法獲得的。
將數(shù)據(jù)寫入本地文件

目標:

基于下列數(shù)據(jù),寫入到文件中

Map(1 -> "spark", 2 -> "flink")

代碼:

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;import java.util.HashMap;
import java.util.Map;public class BatchSinkFile {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();Map<Integer, String> data1 = new HashMap<>();data1.put(1, "spark");data1.put(2, "flink");DataSet<Map<Integer, String>> ds1 = env.fromElements(data1);ds1.setParallelism(1).writeAsText("test/data1/aa", FileSystem.WriteMode.OVERWRITE).setParallelism(1);env.execute();}
}
將數(shù)據(jù)寫入HDFS
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;import java.util.HashMap;
import java.util.Map;public class BatchSinkFile {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();Map<Integer, String> data1 = new HashMap<>();data1.put(1, "spark");data1.put(2, "flink");DataSet<Map<Integer, String>> ds1 = env.fromElements(data1);ds1.setParallelism(1).writeAsText("hdfs://bigdata1:9000/a", FileSystem.WriteMode.OVERWRITE).setParallelism(1);env.execute();}
}

Flink API

Flink的API層提供了DataStream API和DataSet API,分別用于流式處理和批處理。這兩個API允許開發(fā)者使用各種操作符和轉(zhuǎn)換來處理數(shù)據(jù),包括轉(zhuǎn)換、連接、聚合、窗口等計算任務。

在Flink中,根據(jù)不同的場景(流處理或批處理),需要設(shè)置不同的執(zhí)行環(huán)境。在批處理場景下,需要使用DataSet API,并設(shè)置批處理執(zhí)行環(huán)境。在流處理場景下,需要使用DataStream API,并設(shè)置流處理執(zhí)行環(huán)境。

以下是在不同場景下設(shè)置執(zhí)行環(huán)境的示例代碼,分別展示了批處理和流處理的情況,包括Scala和Java語言。

批處理場景 - 設(shè)置DataSet API的批處理執(zhí)行環(huán)境(Java)

import org.apache.flink.api.java.ExecutionEnvironment;public class BatchJobExample {public static void main(String[] args) throws Exception {// 創(chuàng)建批處理執(zhí)行環(huán)境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 在這里添加批處理作業(yè)的代碼邏輯// ...// 執(zhí)行作業(yè)env.execute("Batch Job Example");}
}

流處理場景 - 設(shè)置DataStream API的流處理執(zhí)行環(huán)境(Java)

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class StreamJobExample {public static void main(String[] args) throws Exception {// 創(chuàng)建流處理執(zhí)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 在這里添加流處理作業(yè)的代碼邏輯// ...// 執(zhí)行作業(yè)env.execute("Stream Job Example");}
}

批處理場景 - 設(shè)置DataSet API的批處理執(zhí)行環(huán)境(Scala)

import org.apache.flink.api.scala._object BatchJobExample {def main(args: Array[String]): Unit = {// 創(chuàng)建批處理執(zhí)行環(huán)境val env = ExecutionEnvironment.getExecutionEnvironment// 在這里添加批處理作業(yè)的代碼邏輯// ...// 執(zhí)行作業(yè)env.execute("Batch Job Example")}
}

流處理場景 - 設(shè)置DataStream API的流處理執(zhí)行環(huán)境(Scala)

import org.apache.flink.streaming.api.scala._object StreamJobExample {def main(args: Array[String]): Unit = {// 創(chuàng)建流處理執(zhí)行環(huán)境val env = StreamExecutionEnvironment.getExecutionEnvironment// 在這里添加流處理作業(yè)的代碼邏輯// ...// 執(zhí)行作業(yè)env.execute("Stream Job Example")}
}

根據(jù)以上示例代碼,可以在不同的場景下選擇合適的執(zhí)行環(huán)境和API來構(gòu)建和執(zhí)行Flink作業(yè)。注意在導入包時,確保使用正確的包名和類名,以適應批處理或流處理的環(huán)境。

以下是一些常用的API函數(shù)和操作,以表格形式提供:

API 類型常用函數(shù)和操作描述
DataStream APImap, flatMap對數(shù)據(jù)流中的每個元素進行映射或扁平化操作。
filter過濾出滿足條件的元素。
keyBy按指定的字段或鍵對數(shù)據(jù)流進行分區(qū)。
window將數(shù)據(jù)流按照時間窗口或計數(shù)窗口劃分。
reduce, fold在窗口內(nèi)對元素進行聚合操作。
union合并多個數(shù)據(jù)流。
connect, coMap, coFlatMap連接兩個不同類型的數(shù)據(jù)流并應用相應的函數(shù)。
timeWindow, countWindow定義時間窗口或計數(shù)窗口。
process自定義處理函數(shù),實現(xiàn)更復雜的流處理邏輯。
DataSet APImap, flatMap對數(shù)據(jù)集中的每個元素進行映射或扁平化操作。
filter過濾出滿足條件的元素。
groupBy按指定的字段或鍵對數(shù)據(jù)集進行分組。
reduce, fold對分組后的數(shù)據(jù)集進行聚合操作。
join, coGroup對兩個數(shù)據(jù)集進行內(nèi)連接或外連接操作。
cross, cartesian對兩個數(shù)據(jù)集進行笛卡爾積操作。
distinct去除數(shù)據(jù)集中的重復元素。
groupBy, aggregate分組并對分組后的數(shù)據(jù)集進行聚合操作。
first, min, max獲取數(shù)據(jù)集中的第一個、最小或最大元素。
sum, avg計算數(shù)據(jù)集中元素的和或平均值。
collect將數(shù)據(jù)集中的元素收集到本地的集合中。

這些API函數(shù)和操作涵蓋了Flink中流處理和批處理的常見操作,可以幫助用戶實現(xiàn)各種復雜的數(shù)據(jù)處理和分析任務。根據(jù)實際需求,可以選擇適合的API函數(shù)和操作來構(gòu)建Flink作業(yè)。

下面是一些參見的API的說明:

map


將DataSet中的每一個元素轉(zhuǎn)換為另外一個元素

示例

使用map操作,將以下數(shù)據(jù)

"1,張三", "2,李四", "3,王五", "4,趙六"

轉(zhuǎn)換為一個scala的樣例類。

步驟

  1. 獲取ExecutionEnvironment運行環(huán)境
  2. 使用fromCollection構(gòu)建數(shù)據(jù)源
  3. 創(chuàng)建一個User樣例類
  4. 使用map操作執(zhí)行轉(zhuǎn)換
  5. 打印測試

參考代碼

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.common.functions.MapFunction;public class User {public String id;public String name;public User() {}public User(String id, String name) {this.id = id;this.name = name;}@Overridepublic String toString() {return "User(" + id + ", " + name + ")";}public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSet<String> textDataSet = env.fromCollection(Arrays.asList("1,張三", "2,李四", "3,王五", "4,趙六"));DataSet<User> userDataSet = textDataSet.map(new MapFunction<String, User>() {@Overridepublic User map(String text) throws Exception {String[] fieldArr = text.split(",");return new User(fieldArr[0], fieldArr[1]);}});userDataSet.print();}
}

flatMap


將DataSet中的每一個元素轉(zhuǎn)換為0…n個元素

示例

分別將以下數(shù)據(jù),轉(zhuǎn)換成國家、省份城市三個維度的數(shù)據(jù)。

將以下數(shù)據(jù)

張三,中國,江西省,南昌市
李四,中國,河北省,石家莊市
Tom,America,NewYork,Manhattan

轉(zhuǎn)換為

(張三,中國)
(張三,中國,江西省)
(張三,中國,江西省,江西省)
(李四,中國)
(李四,中國,河北省)
(李四,中國,河北省,河北省)
(Tom,America)
(Tom,America,NewYork)
(Tom,America,NewYork,NewYork)

思路

  • 以上數(shù)據(jù)為一條轉(zhuǎn)換為三條,顯然,應當使用flatMap來實現(xiàn)

  • 分別在flatMap函數(shù)中構(gòu)建三個數(shù)據(jù),并放入到一個列表中

    姓名, 國家
    姓名, 國家省份
    姓名, 國家省份城市

步驟

  1. 構(gòu)建批處理運行環(huán)境
  2. 構(gòu)建本地集合數(shù)據(jù)源
  3. 使用flatMap將一條數(shù)據(jù)轉(zhuǎn)換為三條數(shù)據(jù)
    • 使用逗號分隔字段
    • 分別構(gòu)建國家、國家省份、國家省份城市三個元組
  4. 打印輸出

參考代碼

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;public class UserProcessing {public static class User {public String name;public String country;public String province;public String city;public User() {}public User(String name, String country, String province, String city) {this.name = name;this.country = country;this.province = province;this.city = city;}@Overridepublic String toString() {return "User(" + name + ", " + country + ", " + province + ", " + city + ")";}}public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSet<String> userDataSet = env.fromCollection(new ArrayList<String>() {{add("張三,中國,江西省,南昌市");add("李四,中國,河北省,石家莊市");add("Tom,America,NewYork,Manhattan");}});DataSet<User> resultDataSet = userDataSet.flatMap(new FlatMapFunction<String, User>() {@Overridepublic void flatMap(String text, Collector<User> collector) throws Exception {String[] fieldArr = text.split(",");String name = fieldArr[0];String country = fieldArr[1];String province = fieldArr[2];String city = fieldArr[3];collector.collect(new User(name, country, province, city));collector.collect(new User(name, country, province + city, ""));collector.collect(new User(name, country, province + city, city));}});resultDataSet.print();}
}

mapPartition


將一個分區(qū)中的元素轉(zhuǎn)換為另一個元素

示例

使用mapPartition操作,將以下數(shù)據(jù)

"1,張三", "2,李四", "3,王五", "4,趙六"

轉(zhuǎn)換為一個scala的樣例類。

步驟

  1. 獲取ExecutionEnvironment運行環(huán)境
  2. 使用fromCollection構(gòu)建數(shù)據(jù)源
  3. 創(chuàng)建一個User樣例類
  4. 使用mapPartition操作執(zhí)行轉(zhuǎn)換
  5. 打印測試

參考代碼

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;public class MapPartitionExample {public static class User {public String id;public String name;public User() {}public User(String id, String name) {this.id = id;this.name = name;}@Overridepublic String toString() {return "User(" + id + ", " + name + ")";}}public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSet<String> userDataSet = env.fromCollection(new ArrayList<String>() {{add("1,張三");add("2,李四");add("3,王五");add("4,趙六");}});DataSet<User> resultDataSet = userDataSet.mapPartition(new MapPartitionFunction<String, User>() {@Overridepublic void mapPartition(Iterable<String> iterable, Collector<User> collector) throws Exception {// TODO: 打開連接Iterator<String> iterator = iterable.iterator();while (iterator.hasNext()) {String ele = iterator.next();String[] fieldArr = ele.split(",");collector.collect(new User(fieldArr[0], fieldArr[1]));}// TODO: 關(guān)閉連接}});resultDataSet.print();}
}

mapmapPartition的效果是一樣的,但如果在map的函數(shù)中,需要訪問一些外部存儲。例如:

訪問mysql數(shù)據(jù)庫,需要打開連接, 此時效率較低。而使用mapPartition可以有效減少連接數(shù),提高效率

filter


過濾出來一些符合條件的元素

示例:

過濾出來以下以h開頭的單詞。

"hadoop", "hive", "spark", "flink"

步驟

  1. 獲取ExecutionEnvironment運行環(huán)境
  2. 使用fromCollection構(gòu)建數(shù)據(jù)源
  3. 使用filter操作執(zhí)行過濾
  4. 打印測試

參考代碼

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;import java.util.ArrayList;
import java.util.List;public class FilterExample {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSet<String> wordDataSet = env.fromCollection(new ArrayList<String>() {{add("hadoop");add("hive");add("spark");add("flink");}});DataSet<String> resultDataSet = wordDataSet.filter(word -> word.startsWith("h"));resultDataSet.print();}
}

reduce


可以對一個dataset或者一個group來進行聚合計算,最終聚合成一個元素

示例1

請將以下元組數(shù)據(jù),使用reduce操作聚合成一個最終結(jié)果

("java" , 1) , ("java", 1) ,("java" , 1) 

將上傳元素數(shù)據(jù)轉(zhuǎn)換為("java",3)

步驟

  1. 獲取ExecutionEnvironment運行環(huán)境
  2. 使用fromCollection構(gòu)建數(shù)據(jù)源
  3. 使用redice執(zhí)行聚合操作
  4. 打印測試

參考代碼

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;import java.util.ArrayList;
import java.util.List;public class ReduceExample {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSet<Tuple2<String, Integer>> wordCountDataSet = env.fromCollection(new ArrayList<Tuple2<String, Integer>>() {{add(new Tuple2<>("java", 1));add(new Tuple2<>("java", 1));add(new Tuple2<>("java", 1));}});DataSet<Tuple2<String, Integer>> resultDataSet = wordCountDataSet.reduce((wc1, wc2) ->new Tuple2<>(wc2.f0, wc1.f1 + wc2.f1));resultDataSet.print();}
}

示例2

請將以下元組數(shù)據(jù),下按照單詞使用groupBy進行分組,再使用reduce操作聚合成一個最終結(jié)果

("java" , 1) , ("java", 1) ,("scala" , 1)  

轉(zhuǎn)換為

("java", 2), ("scala", 1)

步驟

  1. 獲取ExecutionEnvironment運行環(huán)境
  2. 使用fromCollection構(gòu)建數(shù)據(jù)源
  3. 使用groupBy按照單詞進行分組
  4. 使用reduce對每個分組進行統(tǒng)計
  5. 打印測試

參考代碼

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple;import java.util.ArrayList;
import java.util.List;public class GroupByReduceExample {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSet<Tuple2<String, Integer>> wordCountDataSet = env.fromCollection(new ArrayList<Tuple2<String, Integer>>() {{add(new Tuple2<>("java", 1));add(new Tuple2<>("java", 1));add(new Tuple2<>("scala", 1));}});DataSet<Tuple2<String, Integer>> groupedDataSet = wordCountDataSet.groupBy(0).reduce((wc1, wc2) ->new Tuple2<>(wc1.f0, wc1.f1 + wc2.f1));groupedDataSet.print();}
}

reduceGroup


可以對一個dataset或者一個group來進行聚合計算,最終聚合成一個元素

reduce和reduceGroup的區(qū)別

reduce和reduceGroup區(qū)別

  • reduce是將數(shù)據(jù)一個個拉取到另外一個節(jié)點,然后再執(zhí)行計算
  • reduceGroup是先在每個group所在的節(jié)點上執(zhí)行計算,然后再拉取

示例

請將以下元組數(shù)據(jù),下按照單詞使用groupBy進行分組,再使用reduceGroup操作進行單詞計數(shù)

("java" , 1) , ("java", 1) ,("scala" , 1)  

步驟

  1. 獲取ExecutionEnvironment運行環(huán)境
  2. 使用fromCollection構(gòu)建數(shù)據(jù)源
  3. 使用groupBy按照單詞進行分組
  4. 使用reduceGroup對每個分組進行統(tǒng)計
  5. 打印測試

參考代碼

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;public class GroupByReduceGroupExample {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataSet<Tuple2<String, Integer>> wordCountDataSet = env.fromCollection(new ArrayList<Tuple2<String, Integer>>() {{add(new Tuple2<>("java", 1));add(new Tuple2<>("java", 1));add(new Tuple2<>("scala", 1));}});DataSet<Tuple2<String, Integer>> groupedDataSet = wordCountDataSet.groupBy(0).reduceGroup((Iterable<Tuple2<String, Integer>> iter, Collector<Tuple2<String, Integer>> collector) -> {Tuple2<String, Integer> result = new Tuple2<>();for (Tuple2<String, Integer> wc : iter) {result.f0 = wc.f0;result.f1 += wc.f1;}collector.collect(result);});groupedDataSet.print();}
}

aggregate


按照內(nèi)置的方式來進行聚合, Aggregate只能作用于元組上。例如:SUM/MIN/MAX…

示例

請將以下元組數(shù)據(jù),使用aggregate操作進行單詞統(tǒng)計

("java" , 1) , ("java", 1) ,("scala" , 1)

步驟

  1. 獲取ExecutionEnvironment運行環(huán)境
  2. 使用fromCollection構(gòu)建數(shù)據(jù)源
  3. 使用groupBy按照單詞進行分組
  4. 使用aggregate對每個分組進行SUM統(tǒng)計
  5. 打印測試

參考代碼

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.tuple.Tuple2;import java.util.ArrayList;
import java.util.List;public class GroupByAggregateExample {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSet<Tuple2<String, Integer>> wordCountDataSet = env.fromCollection(new ArrayList<Tuple2<String, Integer>>() {{add(new Tuple2<>("java", 1));add(new Tuple2<>("java", 1));add(new Tuple2<>("scala", 1));}});DataSet<Tuple2<String, Integer>> groupedDataSet = wordCountDataSet.groupBy(0);DataSet<Tuple2<String, Integer>> resultDataSet = groupedDataSet.aggregate(Aggregations.SUM, 1);resultDataSet.print();}
}

注意

要使用aggregate,只能使用字段索引名或索引名稱來進行分組groupBy(0),否則會報一下錯誤:

Exception in thread "main" java.lang.UnsupportedOperationException: Aggregate does not support grouping with KeySelector functions, yet.

distinct


去除重復的數(shù)據(jù)

示例

請將以下元組數(shù)據(jù),使用distinct操作去除重復的單詞

("java" , 1) , ("java", 2) ,("scala" , 1)

去重得到

("java", 1), ("scala", 1)

步驟

  1. 獲取ExecutionEnvironment運行環(huán)境
  2. 使用fromCollection構(gòu)建數(shù)據(jù)源
  3. 使用distinct指定按照哪個字段來進行去重
  4. 打印測試

參考代碼

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;import java.util.ArrayList;
import java.util.List;public class DistinctExample {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSet<Tuple2<String, Integer>> wordCountDataSet = env.fromCollection(new ArrayList<Tuple2<String, Integer>>() {{add(new Tuple2<>("java", 1));add(new Tuple2<>("java", 1));add(new Tuple2<>("scala", 1));}});DataSet<Tuple2<String, Integer>> resultDataSet = wordCountDataSet.distinct(0);resultDataSet.print();}
}

join


使用join可以將兩個DataSet連接起來

示例:

有兩個csv文件,有一個為score.csv,一個為subject.csv,分別保存了成績數(shù)據(jù)以及學科數(shù)據(jù)。

csv樣例

需要將這兩個數(shù)據(jù)連接到一起,然后打印出來。

join結(jié)果

步驟

  1. 分別將兩個文件復制到項目中的data/join/input

  2. 構(gòu)建批處理環(huán)境

  3. 創(chuàng)建兩個樣例類

    * 學科Subject(學科ID、學科名字)
    * 成績Score(唯一ID、學生姓名、學科ID、分數(shù)——Double類型)
    
  4. 分別使用readCsvFile加載csv數(shù)據(jù)源,并制定泛型

  5. 使用join連接兩個DataSet,并使用where、equalTo方法設(shè)置關(guān)聯(lián)條件

  6. 打印關(guān)聯(lián)后的數(shù)據(jù)源

參考代碼

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple4;public class JoinExample {public static class Score {public int id;public String name;public int subjectId;public double score;public Score() {}public Score(int id, String name, int subjectId, double score) {this.id = id;this.name = name;this.subjectId = subjectId;this.score = score;}@Overridepublic String toString() {return "Score(" + id + ", " + name + ", " + subjectId + ", " + score + ")";}}public static class Subject {public int id;public String name;public Subject() {}public Subject(int id, String name) {this.id = id;this.name = name;}@Overridepublic String toString() {return "Subject(" + id + ", " + name + ")";}}public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSet<Score> scoreDataSet = env.readCsvFile("./data/join/input/score.csv").ignoreFirstLine().pojoType(Score.class);DataSet<Subject> subjectDataSet = env.readCsvFile("./data/join/input/subject.csv").ignoreFirstLine().pojoType(Subject.class);DataSet<Tuple4<Integer, String, Integer, Double>> joinedDataSet = scoreDataSet.join(subjectDataSet).where("subjectId").equalTo("id").projectFirst(0, 1, 2, 3).projectSecond(1);joinedDataSet.print();}
}

union


將兩個DataSet取并集,不會去重。

示例

將以下數(shù)據(jù)進行取并集操作

數(shù)據(jù)集1

"hadoop", "hive", "flume"

數(shù)據(jù)集2

"hadoop", "hive", "spark"

步驟

  1. 構(gòu)建批處理運行環(huán)境
  2. 使用fromCollection創(chuàng)建兩個數(shù)據(jù)源
  3. 使用union將兩個數(shù)據(jù)源關(guān)聯(lián)在一起
  4. 打印測試

參考代碼

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;import java.util.ArrayList;
import java.util.List;public class UnionExample {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSet<String> wordDataSet1 = env.fromCollection(new ArrayList<String>() {{add("hadoop");add("hive");add("flume");}});DataSet<String> wordDataSet2 = env.fromCollection(new ArrayList<String>() {{add("hadoop");add("hive");add("spark");}});DataSet<String> resultDataSet = wordDataSet1.union(wordDataSet2);resultDataSet.print();}
}

connect

connect()提供了和union()類似的功能,即連接兩個數(shù)據(jù)流,它與union()的區(qū)別如下。

  • connect()只能連接兩個數(shù)據(jù)流,union()可以連接多個數(shù)據(jù)流。

  • connect()所連接的兩個數(shù)據(jù)流的數(shù)據(jù)類型可以不一致,union()所連接的兩個或多個數(shù)據(jù)流的數(shù)據(jù)類型必須一致。

  • 兩個DataStream經(jīng)過connect()之后被轉(zhuǎn)化為ConnectedStreams, ConnectedStreams會對兩個流的數(shù)據(jù)應用不同的處理方法,且兩個流之間可以共享狀態(tài)。

DataStream<Integer> intStream  = senv.fromElements(2, 1, 5, 3, 4, 7); 
DataStream<String> stringStream  = senv.fromElements("A", "B", "C", "D"); ConnectedStreams<Integer, String> connectedStream =  
intStream.connect(stringStream); 
DataStream<String> mapResult = connectedStream.map(new MyCoMapFunction()); // CoMapFunction的3個泛型分別對應第一個流的輸入類型、第二個流的輸入類型,輸出類型 
public static class MyCoMapFunction implements CoMapFunction<Integer, String, String> 
{ @Override public String map1(Integer input1) { return input1.toString(); } @Override public String map2(String input2) { return input2; } 
} 

rebalance


Flink也會產(chǎn)生數(shù)據(jù)傾斜的時候,例如:當前的數(shù)據(jù)量有10億條,在處理過程就有可能發(fā)生如下狀況:

數(shù)據(jù)傾斜

rebalance會使用輪詢的方式將數(shù)據(jù)均勻打散,這是處理數(shù)據(jù)傾斜最好的選擇。

rebalance

步驟

  1. 構(gòu)建批處理運行環(huán)境

  2. 使用env.generateSequence創(chuàng)建0-100的并行數(shù)據(jù)

  3. 使用fiter過濾出來大于8的數(shù)字

  4. 使用map操作傳入RichMapFunction,將當前子任務的ID和數(shù)字構(gòu)建成一個元組

    在RichMapFunction中可以使用`getRuntimeContext.getIndexOfThisSubtask`獲取子任務序號
    
  5. 打印測試

參考代碼

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;public class MapWithSubtaskIndexExample {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSet<Long> numDataSet = env.generateSequence(0, 100);DataSet<Long> filterDataSet = numDataSet.filter(num -> num > 8);DataSet<Tuple2<Long, Long>> resultDataSet = filterDataSet.map(new RichMapFunction<Long, Tuple2<Long, Long>>() {@Overridepublic Tuple2<Long, Long> map(Long in) throws Exception {return new Tuple2<>(getRuntimeContext().getIndexOfThisSubtask(), in);}});resultDataSet.print();}
}

上述代碼沒有加rebalance,通過觀察,有可能會出現(xiàn)數(shù)據(jù)傾斜。

在filter計算完后,調(diào)用rebalance,這樣,就會均勻地將數(shù)據(jù)分布到每一個分區(qū)中。

hashPartition


按照指定的key進行hash分區(qū)

示例

基于以下列表數(shù)據(jù)來創(chuàng)建數(shù)據(jù)源,并按照hashPartition進行分區(qū),然后輸出到文件。

List(1,1,1,1,1,1,1,2,2,2,2,2)

步驟

  1. 構(gòu)建批處理運行環(huán)境
  2. 設(shè)置并行度為2
  3. 使用fromCollection構(gòu)建測試數(shù)據(jù)集
  4. 使用partitionByHash按照字符串的hash進行分區(qū)
  5. 調(diào)用writeAsText寫入文件到data/partition_output目錄中
  6. 打印測試

參考代碼

 import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.core.fs.FileSystem;import java.util.ArrayList;
import java.util.List;public class PartitionByHashExample {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// Set parallelism to 2env.setParallelism(2);DataSet<Integer> numDataSet = env.fromCollection(new ArrayList<Integer>() {{add(1);add(1);add(1);add(1);add(1);add(1);add(1);add(2);add(2);add(2);add(2);add(2);}});DataSet<Integer> partitionDataSet = numDataSet.partitionByHash(num -> num.toString());partitionDataSet.writeAsText("./data/partition_output", FileSystem.WriteMode.OVERWRITE);partitionDataSet.print();env.execute();}
}

sortPartition


指定字段對分區(qū)中的數(shù)據(jù)進行排序

示例

按照以下列表來創(chuàng)建數(shù)據(jù)集

List("hadoop", "hadoop", "hadoop", "hive", "hive", "spark", "spark", "flink")

對分區(qū)進行排序后,輸出到文件。

步驟

  1. 構(gòu)建批處理運行環(huán)境
  2. 使用fromCollection構(gòu)建測試數(shù)據(jù)集
  3. 設(shè)置數(shù)據(jù)集的并行度為2
  4. 使用sortPartition按照字符串進行降序排序
  5. 調(diào)用writeAsText寫入文件到data/sort_output目錄中
  6. 啟動執(zhí)行

參考代碼

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.core.fs.FileSystem;import java.util.ArrayList;
import java.util.List;public class SortPartitionExample {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSet<String> wordDataSet = env.fromCollection(new ArrayList<String>() {{add("hadoop");add("hadoop");add("hadoop");add("hive");add("hive");add("spark");add("spark");add("flink");}});wordDataSet.setParallelism(2);DataSet<String> sortedDataSet = wordDataSet.sortPartition(str -> str, Order.DESCENDING);sortedDataSet.writeAsText("./data/sort_output/", FileSystem.WriteMode.OVERWRITE);env.execute("App");}
}

窗口

在許多情況下,我們需要解決這樣的問題:針對一個特定的時間段,例如一個小時,我們需要對數(shù)據(jù)進行統(tǒng)計和分析。但是,要實現(xiàn)這種數(shù)據(jù)窗口操作,首先需要確定哪些數(shù)據(jù)應該進入這個窗口。在深入了解窗口操作的定義之前,我們必須先確定作業(yè)將使用哪種時間語義。

換句話說,時間窗口是數(shù)據(jù)處理中的一個關(guān)鍵概念,用于將數(shù)據(jù)劃分為特定的時間段進行計算。然而,在確定如何定義這些窗口之前,我們必須選擇適合的時間語義,即事件時間、處理時間或攝入時間。不同的時間語義在數(shù)據(jù)處理中具有不同的含義和用途,因此在選擇時間窗口之前,我們需要明確作業(yè)所需的時間語義,以便正確地界定和處理數(shù)據(jù)窗口。

時間概念

由于字數(shù)限制,此處省略

完整內(nèi)容,請參見《大數(shù)據(jù)Flink學習圣經(jīng)》,pdf 免費找尼恩獲取

窗口程序

由于字數(shù)限制,此處省略

完整內(nèi)容,請參見《大數(shù)據(jù)Flink學習圣經(jīng)》,pdf 免費找尼恩獲取

滾動窗口

由于字數(shù)限制,此處省略

完整內(nèi)容,請參見《大數(shù)據(jù)Flink學習圣經(jīng)》,pdf 免費找尼恩獲取

滑動窗口

由于字數(shù)限制,此處省略

完整內(nèi)容,請參見《大數(shù)據(jù)Flink學習圣經(jīng)》,pdf 免費找尼恩獲取

會話窗口

由于字數(shù)限制,此處省略

完整內(nèi)容,請參見《大數(shù)據(jù)Flink學習圣經(jīng)》,pdf 免費找尼恩獲取

基于數(shù)量窗口

由于字數(shù)限制,此處省略

完整內(nèi)容,請參見《大數(shù)據(jù)Flink學習圣經(jīng)》,pdf 免費找尼恩獲取

觸發(fā)器

由于字數(shù)限制,此處省略

完整內(nèi)容,請參見《大數(shù)據(jù)Flink學習圣經(jīng)》,pdf 免費找尼恩獲取

清除器

由于字數(shù)限制,此處省略

完整內(nèi)容,請參見《大數(shù)據(jù)Flink學習圣經(jīng)》,pdf 免費找尼恩獲取

6. Flink程序本地執(zhí)行和集群執(zhí)行

6.1. 本地執(zhí)行

由于字數(shù)限制,此處省略

完整內(nèi)容,請參見《大數(shù)據(jù)Flink學習圣經(jīng)》,pdf 免費找尼恩獲取

6.2. 集群執(zhí)行

由于字數(shù)限制,此處省略

完整內(nèi)容,請參見《大數(shù)據(jù)Flink學習圣經(jīng)》,pdf 免費找尼恩獲取

7. Flink的廣播變量

由于字數(shù)限制,此處省略

完整內(nèi)容,請參見《大數(shù)據(jù)Flink學習圣經(jīng)》,pdf 免費找尼恩獲取

8. Flink的累加器

由于字數(shù)限制,此處省略

完整內(nèi)容,請參見《大數(shù)據(jù)Flink學習圣經(jīng)》,pdf 免費找尼恩獲取

9. Flink的分布式緩存

由于字數(shù)限制,此處省略

完整內(nèi)容,請參見《大數(shù)據(jù)Flink學習圣經(jīng)》,pdf 免費找尼恩獲取

關(guān)于TABLE API & SQL , 以及狀態(tài)和檢查點等知識 會陸續(xù)補充!敬請期待!

說在后面

本文是《大數(shù)據(jù)Flink學習圣經(jīng)》 V1版本, 是 《尼恩 大數(shù)據(jù) 面試寶典》 姊妹篇。

這里特別說明一下:《尼恩 大數(shù)據(jù) 面試寶典》5個專題 PDF 自首次發(fā)布以來, 已經(jīng)收集了 好幾百題,大量的大廠面試干貨、正貨 。 《尼恩 大數(shù)據(jù) 面試寶典》面試題集合, 已經(jīng)變成大數(shù)據(jù)學習和面試的必讀書籍。

于是,尼恩架構(gòu)團隊趁熱打鐵,推出 《大數(shù)據(jù)Flink學習圣經(jīng)》。

完整的pdf,可以關(guān)注尼恩的 公眾號【技術(shù)自由圈】領(lǐng)取。

并且,《大數(shù)據(jù)Flink學習圣經(jīng)》、 《尼恩 大數(shù)據(jù) 面試寶典》 都會持續(xù)迭代、不斷更新,以 吸納最新的面試題,最新版本,具體請參見 公眾號【技術(shù)自由圈】

作者介紹

一作:Andy,資深架構(gòu)師, 《Java 高并發(fā)核心編程 加強版》作者之1 。

二作:尼恩,41歲資深老架構(gòu)師, IT領(lǐng)域資深作家、著名博主?!禞ava 高并發(fā)核心編程 加強版 卷1、卷2、卷3》創(chuàng)世作者。 《K8S學習圣經(jīng)》《Docker學習圣經(jīng)》《Go學習圣經(jīng)》等11個PDF 圣經(jīng)的作者。 也是一個 資深架構(gòu)導師、架構(gòu)轉(zhuǎn)化 導師, 成功指導了多個中級Java、高級Java轉(zhuǎn)型架構(gòu)師崗位, 最高的學員年薪拿到近100W。

參考

  • Flink原理與實踐
  • Apache Flink Documentation | Apache Flink
  • Apache Flink 學習網(wǎng) (flink-learning.org.cn)

推薦閱讀

《尼恩大數(shù)據(jù)面試寶典專題1:史上最全Hadoop面試題》

《尼恩大數(shù)據(jù)面試寶典專題2:絕密100個Spark面試題,熟背100遍,猛拿高薪》

《尼恩大數(shù)據(jù)面試寶典專題3:史上最全Hive面試題,不斷迭代,持續(xù)升級》

《尼恩大數(shù)據(jù)面試寶典專題4:史上最全Flink面試題,不斷迭代,持續(xù)升級》

《尼恩大數(shù)據(jù)面試寶典專題5:史上最全HBase面試題,不斷迭代,持續(xù)升級》

《尼恩 架構(gòu)筆記》《尼恩高并發(fā)三部曲》《尼恩Java面試寶典》PDF,請到下面公號【技術(shù)自由圈】取↓↓↓

http://www.risenshineclean.com/news/30276.html

相關(guān)文章:

  • 崇信門戶網(wǎng)個人留言seo技術(shù)交流
  • ui培訓班哪里有谷歌seo招聘
  • 服裝印花圖案網(wǎng)站seo與sem的區(qū)別
  • dw做網(wǎng)站一般設(shè)為什么樣南安網(wǎng)站建設(shè)
  • 網(wǎng)站維護和制作怎么做會計分錄免費搜索引擎入口
  • 深圳做小程序網(wǎng)站開發(fā)百度seo公司興田德潤
  • 淄博網(wǎng)站備案網(wǎng)絡服務提供者收集和使用個人信息應當符合的條件有
  • 天津正規(guī)網(wǎng)站建設(shè)調(diào)試公司霸屏seo服務
  • 廊坊網(wǎng)站建設(shè)解決方案域名注冊查詢官網(wǎng)
  • 響應設(shè)網(wǎng)站多少錢可以做百度推廣賬號注冊
  • 重慶網(wǎng)站快速排名優(yōu)化百度市場應用官方app
  • 做網(wǎng)站的技術(shù)關(guān)鍵佛山網(wǎng)絡推廣培訓
  • 品古典家具網(wǎng)站模板2023疫情最新消息今天
  • 百度關(guān)鍵詞優(yōu)化師有實力的網(wǎng)站排名優(yōu)化軟件
  • 做網(wǎng)站開發(fā)要學什么軟件杭州網(wǎng)站建設(shè)書生商友
  • 簡潔大氣公司網(wǎng)站西安百度關(guān)鍵詞排名服務
  • 西安網(wǎng)站制作公司排給公司做網(wǎng)站的公司
  • 免費建站abc怎樣做好網(wǎng)絡營銷推廣
  • 做網(wǎng)站花都區(qū)百度推廣客戶端
  • 正規(guī)營銷型網(wǎng)站定制seo描述快速排名
  • 做新聞網(wǎng)站需要什么證件云巔seo
  • 網(wǎng)站建設(shè)定金合同淘寶推廣怎么做
  • 讓別人做網(wǎng)站多久開始注冊域名搜索引擎的優(yōu)化方法有哪些
  • 重慶做企業(yè)網(wǎng)站網(wǎng)站流量排行
  • 網(wǎng)站建設(shè)電話咨詢百度詞條搜索排行
  • 網(wǎng)站管理運營網(wǎng)站收錄什么意思
  • 上海網(wǎng)站制作上海網(wǎng)站制作重慶森林壁紙
  • 廣州手機網(wǎng)站建設(shè)黑馬程序員培訓機構(gòu)官網(wǎng)
  • 怎么做有邀請碼的網(wǎng)站五年級上冊語文優(yōu)化設(shè)計答案
  • 電子商務網(wǎng)站開發(fā)代碼常德網(wǎng)站建設(shè)公司