佛山網(wǎng)站建設(shè)的首選免費網(wǎng)站外鏈推廣
學習目標:三棲合一架構(gòu)師
本文是《大數(shù)據(jù)Flink學習圣經(jīng)》 V1版本,是 《尼恩 大數(shù)據(jù) 面試寶典》姊妹篇。
這里特別說明一下:《尼恩 大數(shù)據(jù) 面試寶典》5個專題 PDF 自首次發(fā)布以來, 已經(jīng)匯集了 好幾百題,大量的大廠面試干貨、正貨 。 《尼恩 大數(shù)據(jù) 面試寶典》面試題集合, 將變成大數(shù)據(jù)學習和面試的必讀書籍。
于是,尼恩架構(gòu)團隊 趁熱打鐵,推出 《大數(shù)據(jù)Flink學習圣經(jīng)》,《大數(shù)據(jù)HBASE學習圣經(jīng)》
《大數(shù)據(jù)Flink學習圣經(jīng)》 后面會不斷升級,不斷 迭代, 變成大數(shù)據(jù)領(lǐng)域 學習和面試的必讀書籍,
最終,幫助大家成長為 三棲合一架構(gòu)師,進大廠,拿高薪。
《尼恩 架構(gòu)筆記》《尼恩高并發(fā)三部曲》《尼恩Java面試寶典》的PDF,請到公號【技術(shù)自由圈】取
“Java+大數(shù)據(jù)” 雙棲架構(gòu)成功案例
成功案例1:
驚天大逆襲:失業(yè)4個月,3年小伙1個月喜提架構(gòu)Offer,而且是大齡跨行,超級牛
成功案例2:
極速拿offer:阿里P6被裁后極速上岸,1個月內(nèi)喜提2優(yōu)質(zhì)offer(含滴滴)
文章目錄
- 學習目標:三棲合一架構(gòu)師
- “Java+大數(shù)據(jù)” 雙棲架構(gòu)成功案例
- 1. 什么是Flink
- 大數(shù)據(jù)
- 分布式計算
- 分布式存儲
- 分布式存儲:
- 分布式文件系統(tǒng):
- 批處理和流處理
- 批處理(Batch Processing):
- 流處理(Stream Processing):
- 開源大數(shù)據(jù)技術(shù)
- Hadoop:
- YARN(Yet Another Resource Negotiator):
- Spark:
- Flink:
- 2. Flink 部署
- Master-Worker架構(gòu):
- 兼容性:
- Standalone集群的特點:
- Standalone集群的部署方式:
- Docker部署flink簡單集群
- 1、在服務器創(chuàng)建flink目錄
- 2、docker-compose.yml腳本創(chuàng)建
- 3、啟動flink
- 4、瀏覽器上查看頁面dashboard
- 3. Flink快速應用
- 實操1: 單詞統(tǒng)計案例(批數(shù)據(jù))
- 1.1 需求
- 1.2 代碼實現(xiàn)
- 實操2: 單詞統(tǒng)計案例(流數(shù)據(jù))
- 2.1 需求
- 2.2 代碼實現(xiàn)
- Flink程序開發(fā)的流程總結(jié)
- 4. Flink分布式架構(gòu)與核心組件
- Flink作業(yè)提交過程
- Flink核心組件
- Flink組件棧
- 作業(yè)執(zhí)行階段
- 5. Flink 開發(fā)
- 開發(fā)環(huán)境搭建
- Flink連接器
- Source
- 基于本地集合的Source
- 基于文件的Source
- 讀取本地文件
- 讀取HDFS文件數(shù)據(jù)
- 讀取CSV文件數(shù)據(jù)
- 讀取壓縮文件
- 遍歷目錄
- 讀取kafka
- 自定義source
- Sink
- 基于本地集合的sink
- 基于文件的sink
- 將數(shù)據(jù)寫入本地文件
- 將數(shù)據(jù)寫入HDFS
- Flink API
- map
- flatMap
- mapPartition
- filter
- reduce
- reduceGroup
- aggregate
- distinct
- join
- union
- connect
- rebalance
- hashPartition
- sortPartition
- 窗口
- 時間概念
- 窗口程序
- 滾動窗口
- 滑動窗口
- 會話窗口
- 基于數(shù)量窗口
- 觸發(fā)器
- 清除器
- 6. Flink程序本地執(zhí)行和集群執(zhí)行
- 6.1. 本地執(zhí)行
- 6.2. 集群執(zhí)行
- 7. Flink的廣播變量
- 8. Flink的累加器
- 9. Flink的分布式緩存
- 說在后面
- 作者介紹
- 參考
- 推薦閱讀
1. 什么是Flink
大數(shù)據(jù)
大數(shù)據(jù)(Big Data)是指規(guī)模龐大、結(jié)構(gòu)多樣且速度快速增長的數(shù)據(jù)集合。這些數(shù)據(jù)集合通常包含傳統(tǒng)數(shù)據(jù)庫管理系統(tǒng)無法有效處理的數(shù)據(jù),具有高度的復雜性和挑戰(zhàn)性。大數(shù)據(jù)的主要特點包括三個維度**:三V**,即Volume(數(shù)據(jù)量大)、Variety(數(shù)據(jù)多樣性)、Velocity(數(shù)據(jù)速度)。
- 數(shù)據(jù)量大(Volume):大數(shù)據(jù)的最明顯特征之一是其龐大的數(shù)據(jù)量。傳統(tǒng)的數(shù)據(jù)處理方法和工具在處理這種規(guī)模的數(shù)據(jù)時可能會變得低效或不可行。
- 數(shù)據(jù)多樣性(Variety):大數(shù)據(jù)不僅包括結(jié)構(gòu)化數(shù)據(jù)(如表格數(shù)據(jù)),還包括半結(jié)構(gòu)化數(shù)據(jù)(如JSON、XML)和非結(jié)構(gòu)化數(shù)據(jù)(如文本、圖像、音頻、視頻等)。這些數(shù)據(jù)可能來自不同的源頭和不同的格式。
- 數(shù)據(jù)速度快(Velocity):大數(shù)據(jù)往往以高速率產(chǎn)生、流動和累積。這要求數(shù)據(jù)處理系統(tǒng)能夠?qū)崟r或近實時地處理數(shù)據(jù),以便從中獲取有價值的信息。
分布式計算
隨著計算機技術(shù)的發(fā)展和數(shù)據(jù)規(guī)模的增大,單臺計算機的處理能力和存儲容量逐漸變得有限,無法滿足大數(shù)據(jù)處理的要求。為了應對這一挑戰(zhàn),分布式計算應運而生,它利用多臺計算機組成集群,將計算任務分割成多個子任務并在不同的計算節(jié)點上并行執(zhí)行,從而提高計算效率和處理能力。
分布式計算的核心思想是將大問題劃分為小問題,將任務分發(fā)給多個計算節(jié)點并行執(zhí)行,最后將結(jié)果合并得到最終的解。這種方式有效地解決了單臺計算機無法處理大規(guī)模數(shù)據(jù)和高并發(fā)計算的問題。同時,分布式計算還具有良好的可擴展性,可以根據(jù)數(shù)據(jù)量的增加靈活地擴展集群規(guī)模,以應對不斷增長的數(shù)據(jù)挑戰(zhàn)。
分布式計算的概念聽起來很高深,其背后的思想?yún)s十分樸素,即分而治之,又稱為分治法(Divide and Conquer)。分治法是一種解決問題的算法設(shè)計策略,它將一個問題分解成多個相同或相似的子問題,然后分別解決這些子問題,最后將子問題的解合并起來得到原問題的解。分治法常用于解決復雜問題,尤其是在大數(shù)據(jù)處理中,可以將大規(guī)模的數(shù)據(jù)集合分割成更小的部分,然后分別處理這些部分,最后合并結(jié)果。
在處理大數(shù)據(jù)問題時,可以使用分治法的思想來提高效率和可擴展性,以下是一些應用分治法處理大數(shù)據(jù)問題的示例:
-
MapReduce 模式:分治法的經(jīng)典應用是 MapReduce 模式,它將大規(guī)模的數(shù)據(jù)集合分為多個小塊,每個小塊由不同的計算節(jié)點進行處理,然后將結(jié)果合并。這種方法適用于批處理任務,如數(shù)據(jù)清洗、轉(zhuǎn)換、聚合等。
-
并行計算:將大規(guī)模的計算任務分解成多個小任務,分配給不同的計算節(jié)點并行處理,最后合并結(jié)果。這適用于需要大量計算的問題,如數(shù)值模擬、圖算法等。
-
分布式排序:將大規(guī)模數(shù)據(jù)集合分割成多個部分,每個部分在不同的計算節(jié)點上進行排序,然后使用合并排序算法將這些有序部分合并為整體有序的數(shù)據(jù)集合。
-
分區(qū)和分片:在分布式存儲系統(tǒng)中,可以將數(shù)據(jù)分區(qū)和分片存儲在不同的節(jié)點上,通過分區(qū)鍵或哈希函數(shù)將數(shù)據(jù)分配到不同的存儲節(jié)點上,從而實現(xiàn)數(shù)據(jù)的分布式存儲和管理。
-
分布式機器學習:將大規(guī)模的機器學習任務分解成多個子任務,在分布式計算環(huán)境中分別進行訓練,然后合并模型參數(shù),如分布式隨機梯度下降算法。
-
數(shù)據(jù)分割和合并:對于需要頻繁訪問的大數(shù)據(jù)集合,可以將數(shù)據(jù)分割成多個小塊,每個小塊存儲在不同的存儲節(jié)點上,然后根據(jù)需要進行合并,以減少數(shù)據(jù)訪問的開銷。
分治法在大數(shù)據(jù)處理中的應用不僅有助于提高處理效率,還可以充分利用分布式計算和存儲資源,從而更好地應對大數(shù)據(jù)量和復雜性。然而,在應用分治法時需要考慮合適的數(shù)據(jù)分割策略、任務調(diào)度、結(jié)果合并等問題,以確保分治法的正確性和性能。
然而,分布式計算也帶來了一些挑戰(zhàn),如數(shù)據(jù)一致性、通信開銷、任務調(diào)度等問題,需要綜合考慮各種因素來設(shè)計和優(yōu)化分布式系統(tǒng)。同時,分布式計算也需要開發(fā)者具備分布式系統(tǒng)設(shè)計和調(diào)優(yōu)的知識和技能,以確保系統(tǒng)的性能和穩(wěn)定性。
分布式存儲
當數(shù)據(jù)量巨大且單機存儲已無法滿足需求時,分布式存儲和分布式文件系統(tǒng)成為處理大數(shù)據(jù)的關(guān)鍵技術(shù)。下面我會詳細介紹分布式存儲和分布式文件系統(tǒng)的概念、特點和常見的實現(xiàn)。
分布式存儲:
分布式存儲是將數(shù)據(jù)分散存儲在多個節(jié)點上,以提供高容量、高性能、高可靠性和可擴展性的數(shù)據(jù)存儲解決方案。每個節(jié)點都可以通過網(wǎng)絡訪問數(shù)據(jù),并且多個節(jié)點協(xié)同工作來處理數(shù)據(jù)請求。分布式存儲的核心目標是解決單機存儲的瓶頸,同時提供高可靠性和可用性。
分布式存儲的特點包括:
- 橫向擴展性:可以通過增加節(jié)點來擴展存儲容量和性能,適應不斷增長的數(shù)據(jù)量和負載。
- 高可靠性和容錯性:數(shù)據(jù)在多個節(jié)點上冗余存儲,當某個節(jié)點出現(xiàn)故障時,數(shù)據(jù)依然可用,不會丟失。
- 數(shù)據(jù)分布和復制:數(shù)據(jù)按照一定策略分布在不同節(jié)點上,數(shù)據(jù)的復制確保了數(shù)據(jù)的可用性和容錯性。
- 并發(fā)訪問和高性能:支持多個客戶端同時訪問數(shù)據(jù),實現(xiàn)高并發(fā)和更好的性能。
- 靈活的數(shù)據(jù)模型:支持多種數(shù)據(jù)類型和訪問方式,如文件系統(tǒng)、對象存儲、鍵值存儲等。
分布式文件系統(tǒng):
分布式文件系統(tǒng)是一種特殊類型的分布式存儲,主要用于存儲和管理文件數(shù)據(jù)。它提供了類似于傳統(tǒng)單機文件系統(tǒng)的接口,但是在底層實現(xiàn)上,數(shù)據(jù)被分散存儲在多個節(jié)點上。分布式文件系統(tǒng)能夠自動處理數(shù)據(jù)的分布、復制、一致性和故障恢復等問題。
常見的分布式文件系統(tǒng)特點包括:
- 命名空間和路徑:分布式文件系統(tǒng)通過路徑來訪問文件,類似于傳統(tǒng)文件系統(tǒng)的目錄結(jié)構(gòu)。
- 數(shù)據(jù)分布和復制:文件被切分成塊并分散存儲在多個節(jié)點上,同時進行數(shù)據(jù)復制以實現(xiàn)冗余和高可用性。
- 一致性和數(shù)據(jù)一致性模型:分布式文件系統(tǒng)需要保證數(shù)據(jù)的一致性,不同節(jié)點上的數(shù)據(jù)副本需要保持同步。
- 訪問控制和權(quán)限管理:提供用戶和應用程序訪問控制和權(quán)限管理功能,確保數(shù)據(jù)安全性。
- 高性能:分布式文件系統(tǒng)通常優(yōu)化了數(shù)據(jù)的讀寫性能,以滿足大數(shù)據(jù)場景的需求。
- 擴展性:可以通過增加節(jié)點來擴展存儲容量和性能。
常見的分布式文件系統(tǒng)包括:
- Hadoop HDFS(Hadoop Distributed File System):Hadoop生態(tài)系統(tǒng)中的分布式文件系統(tǒng),適用于大數(shù)據(jù)存儲。
- Ceph:開源的分布式存儲系統(tǒng),提供塊存儲、文件系統(tǒng)和對象存儲。
- GlusterFS:開源的分布式文件系統(tǒng),可以線性擴展存儲容量和性能。
總之,分布式存儲和分布式文件系統(tǒng)在大數(shù)據(jù)時代扮演著重要角色,幫助我們存儲、管理和訪問海量的數(shù)據(jù),解決了傳統(tǒng)單機存儲無法應對的挑戰(zhàn)。
批處理和流處理
批處理和流處理是大數(shù)據(jù)處理領(lǐng)域中常見的兩種數(shù)據(jù)處理模式,用于不同類型的數(shù)據(jù)處理需求。下面將詳細介紹這兩種模式,并給出相關(guān)的應用場景示例。
批處理(Batch Processing):
批處理是指將一批數(shù)據(jù)集合在一起,在一個固定的時間間隔內(nèi)對這批數(shù)據(jù)進行處理和分析。批處理通常適用于數(shù)據(jù)量較大、處理周期較長、要求高一致性的場景。
特點:
- 數(shù)據(jù)被集中處理,適合周期性分析和報告生成。
- 數(shù)據(jù)被切分成小塊,每個小塊在一個作業(yè)中被處理。
- 數(shù)據(jù)處理時間較長,不適合實時性要求高的場景。
應用場景示例:
- 離線數(shù)據(jù)分析:對歷史數(shù)據(jù)進行分析,從中發(fā)現(xiàn)趨勢、模式和規(guī)律,用于業(yè)務決策。例如,銷售數(shù)據(jù)分析、用戶行為分析。
- 批量推薦系統(tǒng):基于用戶歷史行為數(shù)據(jù),定期生成推薦結(jié)果。例如,電影推薦、商品推薦。
- 數(shù)據(jù)清洗和預處理:對大規(guī)模數(shù)據(jù)進行清洗、過濾和預處理,提高數(shù)據(jù)質(zhì)量和可用性。例如,清理無效數(shù)據(jù)、填充缺失值。
- 大規(guī)模ETL(Extract, Transform, Load):將數(shù)據(jù)從源系統(tǒng)中抽取出來,經(jīng)過轉(zhuǎn)換和加工后加載到目標系統(tǒng)。例如,數(shù)據(jù)倉庫的構(gòu)建。
流處理(Stream Processing):
流處理是指在數(shù)據(jù)生成的時候立即進行處理,實現(xiàn)數(shù)據(jù)的實時處理和分析。流處理通常適用于數(shù)據(jù)實時性要求高、需要快速響應的場景。
特點:
- 數(shù)據(jù)是實時流動的,需要快速處理和響應。
- 數(shù)據(jù)是持續(xù)不斷地到達,需要實時計算和分析。
- 可能會遇到延遲和數(shù)據(jù)亂序等問題。
應用場景示例:
- 實時監(jiān)控和告警:對實時數(shù)據(jù)進行監(jiān)控和分析,及時發(fā)現(xiàn)異常并觸發(fā)告警。例如,網(wǎng)絡流量監(jiān)控、系統(tǒng)性能監(jiān)控。
- 實時數(shù)據(jù)分析:對流式數(shù)據(jù)進行實時分析,從中提取有價值的信息。例如,實時點擊流分析、實時市場行情分析。
- 實時推薦系統(tǒng):基于用戶實時行為數(shù)據(jù),實時生成推薦結(jié)果。例如,新聞推薦、廣告推薦。
- 實時數(shù)據(jù)倉庫:構(gòu)建實時數(shù)據(jù)倉庫,將實時數(shù)據(jù)集成、加工和分析。例如,實時銷售數(shù)據(jù)分析、實時用戶行為分析。
總之,批處理和流處理分別適用于不同類型的數(shù)據(jù)處理需求,根據(jù)業(yè)務需求和實時性要求選擇合適的處理模式。
開源大數(shù)據(jù)技術(shù)
當談論大數(shù)據(jù)處理時,Hadoop、YARN、Spark和Flink都是重要的技術(shù)。它們都屬于大數(shù)據(jù)領(lǐng)域的分布式計算框架,但在功能和使用方式上有所不同。
Hadoop:
Hadoop是一個開源的分布式存儲和計算框架,最初由Apache開發(fā),用于處理大規(guī)模數(shù)據(jù)集。Hadoop的核心組件包括:
-
Hadoop Distributed File System(HDFS):HDFS是一種分布式文件系統(tǒng),用于存儲大規(guī)模數(shù)據(jù)。它將數(shù)據(jù)分成多個塊,并將這些塊分散存儲在集群中的不同節(jié)點上。HDFS支持高可靠性、冗余存儲和數(shù)據(jù)復制。
-
MapReduce:MapReduce是Hadoop的計算模型,用于處理分布式數(shù)據(jù)。它將計算任務分成Map和Reduce兩個階段,分布在集群中的節(jié)點上并行執(zhí)行。Map階段負責數(shù)據(jù)的拆分和處理,Reduce階段負責數(shù)據(jù)的匯總和計算。
YARN(Yet Another Resource Negotiator):
YARN是Hadoop的資源管理器,它負責集群資源的管理和分配。YARN將集群資源劃分為容器(Containers),并分配給不同的應用程序。這種資源的隔離和管理允許多個應用程序同時在同一個Hadoop集群上運行,從而提高了資源利用率和集群的多租戶能力。
Spark:
Apache Spark是一個通用的分布式計算引擎,旨在提供高性能、易用性和多功能性。與傳統(tǒng)的Hadoop MapReduce相比,Spark具有更快的執(zhí)行速度,因為它將數(shù)據(jù)加載到內(nèi)存中并進行內(nèi)存計算。Spark支持多種計算模式,包括批處理、交互式查詢、流處理和機器學習。
Spark的主要特點和組件包括:
-
RDD(Resilient Distributed Dataset):RDD是Spark的核心數(shù)據(jù)抽象,表示分布式的數(shù)據(jù)集。RDD支持并行操作和容錯性,可以在計算過程中重新計算丟失的分區(qū)。
-
Spark SQL:Spark SQL是用于處理結(jié)構(gòu)化數(shù)據(jù)的組件,支持SQL查詢和操作。它能夠?qū)DD和傳統(tǒng)的數(shù)據(jù)源(如Hive)無縫集成。
-
Spark Streaming:Spark Streaming是用于處理實時流數(shù)據(jù)的模塊,支持微批處理模式。它能夠?qū)崟r數(shù)據(jù)流分割成小批次并進行處理。
-
MLlib:MLlib是Spark的機器學習庫,提供了常見的機器學習算法和工具,用于訓練和評估模型。
-
GraphX:GraphX是Spark的圖計算庫,用于處理圖數(shù)據(jù)和圖算法。
Flink:
Apache Flink是一個流式處理引擎和分布式批處理框架,具有低延遲、高吞吐量和容錯性。Flink支持流批一體化,能夠?qū)崿F(xiàn)實時流處理和批處理作業(yè)的無縫切換。它的核心特點包括:
- DataStream API:Flink的DataStream API用于處理實時流數(shù)據(jù),支持事件時間處理、窗口操作和狀態(tài)管理。它能夠處理高吞吐量的實時數(shù)據(jù)流。
- DataSet API:Flink的DataSet API用于批處理作業(yè),類似于Hadoop的MapReduce。它支持豐富的操作符和優(yōu)化技術(shù)。
- Stateful Stream Processing:Flink支持有狀態(tài)的流式處理,可以在處理過程中保存和管理狀態(tài)。這對于實現(xiàn)復雜的數(shù)據(jù)處理邏輯很有用。
- Event Time Processing:Flink支持事件時間處理,能夠處理亂序事件并準確計算窗口操作的結(jié)果。
- Table API和SQL:Flink提供了Table API和SQL查詢,使開發(fā)人員可以使用類似SQL的語法來查詢和分析數(shù)據(jù)。
- 可以連接大數(shù)據(jù)生態(tài)圈各類組件,包括Kafka、Elasticsearch、JDBC、HDFS和Amazon S3
- 可以運行在Kubernetes、YARN、Mesos和獨立(Standalone)集群上。
Flink在流處理上的幾個主要優(yōu)勢如下:
-
真正的流計算引擎:Flink具有更好的streaming計算模型,可以進行非常高效的狀態(tài)運算和窗口操作。Spark Streaming仍然是微批處理引擎。
-
更低延遲:Flink可以實現(xiàn)毫秒級的低延遲處理,而Spark Streaming延遲較高。
-
更好的容錯機制:Flink支持更細粒度的狀態(tài)管理和檢查點機制,可以實現(xiàn)精確一次的狀態(tài)一致性語義。Spark較難做到確保exactly once。
-
支持有限數(shù)據(jù)流和無限數(shù)據(jù)流:Flink可處理有開始和結(jié)束的有限數(shù)據(jù)流,也能處理無限不斷增長的數(shù)據(jù)流。Spark Streaming更適合有限數(shù)據(jù)集。
-
更易統(tǒng)一批處理和流處理:Flink提供了DataStream和DataSet API,可以輕松統(tǒng)一批處理和流處理。Spark需要聯(lián)合Spark SQL使用。
-
更優(yōu)秀的內(nèi)存管理:Flink具有自己的內(nèi)存管理,可以根據(jù)不同查詢優(yōu)化內(nèi)存使用。Spark依賴Hadoop YARN進行資源調(diào)度。
-
更高性能:在部分場景下,Flink擁有比Spark Streaming更高的吞吐和低的延遲。
總體來說,Flink作為新一代流處理引擎,在延遲、容錯、易用性方面優(yōu)于Spark Streaming。但Spark生態(tài)更加完善,也在努力減小與Flink的差距。需要根據(jù)具體場景選擇最優(yōu)的框架。
總的來說,Flink在流處理領(lǐng)域的優(yōu)勢主要體現(xiàn)在事件時間處理、低延遲、精確一次語義和狀態(tài)管理等方面。這些特性使得Flink在處理實時流數(shù)據(jù)時能夠更好地滿足復雜的業(yè)務需求,特別是對于需要高準確性和可靠性的應用場景。
2. Flink 部署
Apache Flink在1.7版本中進行了重大的架構(gòu)重構(gòu),引入了Master-Worker架構(gòu),這使得Flink能夠更好地適應不同的集群基礎(chǔ)設(shè)施,包括Standalone、Hadoop YARN和Kubernetes等。下面會詳細介紹一下Flink 1.7版本引入的Master-Worker架構(gòu)以及其在不同集群基礎(chǔ)設(shè)施中的適應性。
Master-Worker架構(gòu):
Flink 1.7版本中引入的Master-Worker架構(gòu)是為了解決之前版本中存在的一些問題,如資源管理、高可用性等。在這個架構(gòu)中,Flink將任務管理和資源管理分離,引入了JobManager和ResourceManager兩個主要角色。
-
JobManager:負責接受和調(diào)度任務,維護任務的狀態(tài)和元數(shù)據(jù)信息,還負責處理容錯機制。JobManager分為兩種:JobManager(高可用模式)和StandaloneJobManager(非高可用模式)。
-
ResourceManager:負責管理集群中的資源,包括分配任務的資源、維護資源池等。
這種架構(gòu)的優(yōu)勢在于解耦任務的管理和資源的管理,使得Flink能夠更好地適應不同的集群環(huán)境和基礎(chǔ)設(shè)施。
兼容性:
Flink的Master-Worker架構(gòu)設(shè)計使其能夠兼容幾乎所有主流信息系統(tǒng)的基礎(chǔ)設(shè)施,包括:
-
Standalone集群:在Standalone模式下,Flink的JobManager和ResourceManager都運行在同一個進程中,適用于簡單的開發(fā)和測試場景。
-
Hadoop YARN集群:Flink可以部署在現(xiàn)有的Hadoop YARN集群上,通過ResourceManager與YARN ResourceManager進行交互,實現(xiàn)資源管理。
-
Kubernetes集群:Flink還支持在Kubernetes集群中部署,通過Kubernetes提供的資源管理能力來管理任務和資源。
這種兼容性使得Flink可以靈活地在不同的集群環(huán)境中運行,滿足不同場景下的需求。
總之,Flink在1.7版本中引入的Master-Worker架構(gòu)使其在資源管理、高可用性等方面有了更好的表現(xiàn),同時也使得Flink能夠更好地適應各種不同的集群基礎(chǔ)設(shè)施,包括Standalone、Hadoop YARN和Kubernetes等。這為Flink的部署和使用帶來了更多的靈活性和選擇性。
Standalone集群是Apache Flink中一種簡單的部署模式,適用于開發(fā)、測試和小規(guī)模應用場景。下面我將詳細介紹Standalone集群的特點以及部署方式。
Standalone集群的特點:
-
簡單部署:Standalone集群是Flink的最簡單部署模式之一,不需要依賴其他集群管理工具,可以在單個機器上部署。
-
資源共享:Standalone集群中的JobManager和TaskManager共享同一份資源,例如內(nèi)存和CPU。這使得資源管理相對簡單,但也可能在資源競爭時影響任務的性能。
-
適用于開發(fā)和測試:Standalone集群適用于開發(fā)和測試階段,可以在本地機器上模擬Flink集群環(huán)境,方便開發(fā)人員進行調(diào)試和測試。
-
不支持高可用性:Standalone集群默認情況下不支持高可用性,即不具備故障恢復和任務遷移的能力。如果需要高可用性,可以通過運行多個JobManager實例來實現(xiàn)。
Standalone集群的部署方式:
-
安裝Flink:首先,需要下載并安裝Flink??梢詮墓俜骄W(wǎng)站下載預編譯的二進制文件,解壓到指定目錄。也可以從以下網(wǎng)站下載:
apache-flink安裝包下載_開源鏡像站-阿里云 (aliyun.com)
-
配置Flink:進入Flink的安裝目錄,修改
conf/flink-conf.yaml
配置文件。主要配置項包括jobmanager.rpc.address
和taskmanager.numberOfTaskSlots
等。 -
啟動JobManager:打開終端,進入Flink安裝目錄,執(zhí)行以下命令啟動JobManager:
./bin/start-cluster.sh
-
啟動TaskManager:打開終端,進入Flink安裝目錄,執(zhí)行以下命令啟動TaskManager:
./bin/taskmanager.sh start
-
提交作業(yè):使用Flink客戶端工具提交作業(yè)??梢允褂靡韵旅钐峤籎AR文件中的作業(yè):
./bin/flink run -c your.main.Class ./path/to/your.jar
-
停止集群:可以使用以下命令停止整個Standalone集群:
./bin/stop-cluster.sh
總之,Standalone集群是一個簡單且易于部署的Flink集群模式,適用于開發(fā)、測試和小規(guī)模應用場景。然而,由于其資源共享和不支持高可用性的特點,不適合部署在生產(chǎn)環(huán)境中。
下面提供利用Docker部署flink standalone簡單集群。
Docker部署flink簡單集群
Flink程序可以作為集群內(nèi)的分布式系統(tǒng)運行,也可以以獨立模式或在YARN、Mesos、基于Docker的環(huán)境和其他資源管理框架下進行部署。
1、在服務器創(chuàng)建flink目錄
mkdir flink
目錄的結(jié)構(gòu)如下:
2、docker-compose.yml腳本創(chuàng)建
docker 容器的編排文件,具體如下
3、啟動flink
(1)后臺運行
一般推薦生產(chǎn)環(huán)境下使用該選項。
docker-compose up -d
(2)前臺運行
控制臺將會同時打印所有容器的輸出信息,可以很方便進行調(diào)試。
docker-compose up
4、瀏覽器上查看頁面dashboard
訪問web界面
http://cdh1:8081/
3. Flink快速應用
? 通過一個單詞統(tǒng)計的案例,快速上手應用Flink,進行流處理(Streaming)和批處理(Batch)
實操1: 單詞統(tǒng)計案例(批數(shù)據(jù))
1.1 需求
統(tǒng)計一個文件中各個單詞出現(xiàn)的次數(shù),把統(tǒng)計結(jié)果輸出到文件
步驟:
1、讀取數(shù)據(jù)源
2、處理數(shù)據(jù)源
a、將讀到的數(shù)據(jù)源文件中的每一行根據(jù)空格切分
b、將切分好的每個單詞拼接1
c、根據(jù)單詞聚合(將相同的單詞放在一起)
d、累加相同的單詞(單詞后面的1進行累加)
3、保存處理結(jié)果
1.2 代碼實現(xiàn)
- 引入依賴
<!--flink核心包-->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.7.2</version>
</dependency>
<!--flink流處理包-->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.7.2</version><scope>provided</scope>
</dependency>
- Java程序
package com.crazymaker.bigdata.wordcount.batch;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;/*** 1、讀取數(shù)據(jù)源* 2、處理數(shù)據(jù)源* a、將讀到的數(shù)據(jù)源文件中的每一行根據(jù)空格切分* b、將切分好的每個單詞拼接1* c、根據(jù)單詞聚合(將相同的單詞放在一起)* d、累加相同的單詞(單詞后面的1進行累加)* 3、保存處理結(jié)果*/
public class WordCountJavaBatch {public static void main(String[] args) throws Exception {String inputPath="D:\\data\\input\\hello.txt";String outputPath="D:\\data\\output\\hello.txt";//獲取flink的運行環(huán)境ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();DataSet<String> text = executionEnvironment.readTextFile(inputPath);FlatMapOperator<String, Tuple2<String, Integer>> wordOndOnes = text.flatMap(new SplitClz());//0代表第1個元素UnsortedGrouping<Tuple2<String, Integer>> groupedWordAndOne = wordOndOnes.groupBy(0);//1代表第1個元素AggregateOperator<Tuple2<String, Integer>> out = groupedWordAndOne.sum(1);out.writeAsCsv(outputPath, "\n", " ").setParallelism(1);//設(shè)置并行度executionEnvironment.execute();//人為調(diào)用執(zhí)行方法}static class SplitClz implements FlatMapFunction<String,Tuple2<String,Integer>>{public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] s1 = s.split(" ");for (String word:s1) {collector.collect(new Tuple2<String,Integer>(word,1));//發(fā)送到下游}}}
}
源文件的內(nèi)容
統(tǒng)計的結(jié)果
實操2: 單詞統(tǒng)計案例(流數(shù)據(jù))
nc
netcat:
flink開發(fā)時候,經(jīng)常用socket作為source;使用linux/mac環(huán)境開發(fā),可以在終端中開啟 nc -l 9000(開啟netcat程序,作為服務端,發(fā)送數(shù)據(jù));
nc是netcat的縮寫,有著網(wǎng)絡界的瑞士軍刀美譽。因為它短小精悍、功能實用,被設(shè)計為一個簡單、可靠的網(wǎng)絡工具。
nc作用
- 數(shù)據(jù)傳輸
- 文件傳輸
- 機器之間網(wǎng)絡測速
2.1 需求
Socket模擬實時發(fā)送單詞,
使用Flink實時接收數(shù)據(jù),對指定時間窗口內(nèi)(如5s)的數(shù)據(jù)進行聚合統(tǒng)計,每隔1s匯總計算一次,并且把時間窗口內(nèi)計算結(jié)果打印出來。
2.2 代碼實現(xiàn)
package com.crazymaker.bigdata.wordcount.stream;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;/*** Socket模擬實時發(fā)送單詞,使用Flink實時接收數(shù)據(jù)*/
public class WordCountStream {public static void main(String[] args) throws Exception {// 監(jiān)聽的ip和端口號,以main參數(shù)形式傳入,約定第一個參數(shù)為ip,第二個參數(shù)為端口
// String ip = args[0];String ip = "127.0.0.1";
// int port = Integer.parseInt(args[1]);int port = 9000;// 獲取Flink流執(zhí)行環(huán)境StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();// 獲取socket輸入數(shù)據(jù)DataStreamSource<String> textStream = streamExecutionEnvironment.socketTextStream(ip, port, "\n");SingleOutputStreamOperator<Tuple2<String, Long>> tuple2SingleOutputStreamOperator = textStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {public void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception {String[] splits = s.split("\\s");for (String word : splits) {collector.collect(Tuple2.of(word, 1l));}}});SingleOutputStreamOperator<Tuple2<String, Long>> word = tuple2SingleOutputStreamOperator.keyBy(0).sum(1);// 打印數(shù)據(jù)word.print();// 觸發(fā)任務執(zhí)行streamExecutionEnvironment.execute("wordcount stream process");}
}
Flink程序開發(fā)的流程總結(jié)
Flink程序開發(fā)的流程總結(jié)如下:
1)獲得一個執(zhí)行環(huán)境
2)加載/創(chuàng)建初始化數(shù)據(jù)
3)指定數(shù)據(jù)操作的算子
4)指定結(jié)果數(shù)據(jù)存放位置
5)調(diào)用execute()觸發(fā)執(zhí)行程序
注意:Flink程序是延遲計算的,只有最后調(diào)用execute()方法的時候才會真正觸發(fā)執(zhí)行程序
4. Flink分布式架構(gòu)與核心組件
Flink作業(yè)提交過程
standalone模式下的作業(yè)提交過程如下:
在一個作業(yè)提交前,Master和TaskManager等進程需要先被啟動。
我們可以在Flink主目錄中執(zhí)行腳本來啟動這些進程:
bin/start-cluster.sh。
Master和TaskManager被啟動后,TaskManager需要將自己注冊給Master中的ResourceManager。
這個初始化和資源注冊過程發(fā)生在單個作業(yè)提交前,我們稱之為第0步。
接下來,我們將逐步解析Flink作業(yè)的提交過程,具體步驟如下:
① 用戶編寫應用程序代碼,并使用Flink客戶端(Client)提交該作業(yè)。通常,這些程序會使用Java或Scala語言編寫,并調(diào)用Flink API 構(gòu)建出邏輯視圖。這些代碼以及相關(guān)配置文件被編譯并打包,然后被提交至Master節(jié)點的Dispatcher,形成一個應用作業(yè)(Application)。
② Dispatcher接收到提交的作業(yè)后,會啟動一個JobManager,該JobManager負責協(xié)調(diào)這個作業(yè)的各項任務。
③ JobManager向ResourceManager申請所需的作業(yè)資源,這些資源可能包括CPU、內(nèi)存等。
④ 由于在前面的步驟中,TaskManager已經(jīng)向ResourceManager注冊了可供使用的資源,這時處于空閑狀態(tài)的TaskManager將被分配給JobManager。
⑤ JobManager將用戶作業(yè)中的邏輯視圖轉(zhuǎn)化為物理執(zhí)行圖,如圖3-3所示,該圖顯示了作業(yè)被并行化后的執(zhí)行過程。JobManager將計算任務分配并部署到多個TaskManager上。此時,一個Flink作業(yè)正式開始執(zhí)行。
在計算任務執(zhí)行過程中,TaskManager可能會與其他TaskManager交換數(shù)據(jù),使用特定的數(shù)據(jù)交換策略。同時,TaskManager還會將任務的狀態(tài)信息傳遞給JobManager,這些狀態(tài)信息包括任務的啟動、執(zhí)行和終止狀態(tài),以及快照的元數(shù)據(jù)等。
Flink核心組件
在這個作業(yè)提交流程的基礎(chǔ)上,我們可以更詳細地介紹涉及的各個組件的功能和角色:
-
Client(客戶端):用戶通常使用Flink提供的客戶端工具(如位于Flink主目錄下的bin目錄中的命令行工具)來提交作業(yè)??蛻舳藭τ脩籼峤坏腇link作業(yè)進行預處理,并將作業(yè)提交到Flink集群中。在提交作業(yè)時,客戶端需要配置一些必要的參數(shù),例如使用Standalone集群還是YARN集群等。整個作業(yè)會被打包成一個JAR文件,DataStream API會被轉(zhuǎn)換成一個JobGraph,該圖類似于邏輯視圖(如圖3-2所示)。
-
Dispatcher(調(diào)度器):Dispatcher可以接收多個作業(yè),每次接收作業(yè)時,會為該作業(yè)分配一個JobManager。Dispatcher通過提供表述性狀態(tài)轉(zhuǎn)移(REST)式的接口,使用超文本傳輸協(xié)議(HTTP)來對外提供服務。
-
JobManager(作業(yè)管理器):JobManager是單個Flink作業(yè)的協(xié)調(diào)者。每個作業(yè)都有一個對應的JobManager負責管理。JobManager將客戶端提交的JobGraph轉(zhuǎn)化為ExecutionGraph,該圖類似于并行物理執(zhí)行圖(如圖3-3所示)。JobManager會向ResourceManager申請所需的資源。一旦獲取足夠的資源,JobManager會將ExecutionGraph及其計算任務分發(fā)到多個TaskManager上。此外,JobManager還管理多個TaskManager,包括收集作業(yè)狀態(tài)信息、生成檢查點、必要時進行故障恢復等。
-
ResourceManager(資源管理器):Flink可以在Standalone、YARN、Kubernetes等環(huán)境中部署,而不同環(huán)境對計算資源的管理模式有所不同。為了解決資源分配問題,Flink引入了ResourceManager模塊。在Flink中,計算資源的基本單位是TaskManager上的任務槽位(Slot)。ResourceManager的主要職責是從資源提供方(如YARN)獲取計算資源。當JobManager需要計算資源時,ResourceManager會將空閑的Slot分配給JobManager。在計算任務結(jié)束后,ResourceManager會回收這些空閑Slot。
-
TaskManager(任務管理器):TaskManager是實際執(zhí)行計算任務的節(jié)點。一般來說,一個Flink作業(yè)會分布在多個TaskManager上執(zhí)行,每個TaskManager提供一定數(shù)量的Slot。當一個TaskManager啟動后,相關(guān)的Slot信息會被注冊到ResourceManager中。當Flink作業(yè)提交后,ResourceManager會將空閑的Slot分配給JobManager。一旦JobManager獲取了空閑Slot,它會將具體的計算任務部署到這些Slot上,并在這些Slot上執(zhí)行。在執(zhí)行過程中,TaskManager可能需要與其他TaskManager進行數(shù)據(jù)交換,因此需要進行必要的數(shù)據(jù)通信。總之,TaskManager負責具體計算任務的執(zhí)行,它會在啟動時將Slot資源向ResourceManager注冊。
Flink組件棧
-
部署層:
- Local模式:Flink支持本地模式,包括單節(jié)點(SingleNode)和單虛擬機(SingleJVM)模式。在SingleNode模式中,JobManager和TaskManager運行在同一個節(jié)點上;在SingleJVM模式中,所有角色都在同一個JVM中運行。
- Cluster模式:Flink可以部署在Standalone、YARN、Mesos和Kubernetes集群上。Standalone集群需要配置JobManager和TaskManager的節(jié)點,然后通過Flink提供的腳本啟動。YARN、Mesos和Kubernetes集群提供了更強大的資源管理和集群擴展能力。
- Cloud模式:Flink還可以部署在各大云平臺上,如AWS、谷歌云和阿里云,使用戶能夠在云環(huán)境中靈活地部署和運行作業(yè)。
-
運行時層:
- 運行時層是Flink的核心組件,支持分布式執(zhí)行和處理。該層負責將用戶提交的作業(yè)轉(zhuǎn)化為任務,并分發(fā)到相應的JobManager和TaskManager上執(zhí)行。運行時層還涵蓋了檢查點和故障恢復機制,確保作業(yè)的容錯性和穩(wěn)定性。
-
API層:
- Flink的API層提供了DataStream API和DataSet API,分別用于流式處理和批處理。這兩個API允許開發(fā)者使用各種操作符和轉(zhuǎn)換來處理數(shù)據(jù),包括轉(zhuǎn)換、連接、聚合、窗口等計算任務。
-
上層工具:
- 在API層之上,Flink提供了一些工具來擴展其功能:
- 復雜事件處理(CEP):面向流處理的庫,用于檢測和處理復雜的事件模式。
- 圖計算庫(Gelly):面向批處理的圖計算庫,用于執(zhí)行圖算法。
- Table API和SQL:針對SQL用戶和關(guān)系型數(shù)據(jù)處理場景的接口,允許使用SQL語法和表操作處理流和批數(shù)據(jù)。
- PyFlink:針對Python用戶的接口,使其能夠使用Flink進行數(shù)據(jù)處理,目前主要基于Table API。
- 在API層之上,Flink提供了一些工具來擴展其功能:
綜上所述,Flink在不同層次上提供了豐富的組件和工具,支持流式處理和批處理,以及與不同環(huán)境(本地、集群、云)的無縫集成,使開發(fā)者能夠靈活地構(gòu)建和部署大規(guī)模數(shù)據(jù)處理應用程序。
作業(yè)執(zhí)行階段
在Apache Flink中,數(shù)據(jù)流作業(yè)的執(zhí)行過程可以劃分為多個階段,從邏輯視圖到物理執(zhí)行圖的轉(zhuǎn)換。這個過程包括從StreamGraph到JobGraph,再到ExecutionGraph,最終映射到實際的物理執(zhí)行圖。下面詳細說明這個過程:
-
StreamGraph(邏輯視圖):StreamGraph是用戶編寫的流處理應用程序的邏輯表示。它包含了數(shù)據(jù)流的轉(zhuǎn)換操作、算子之間的關(guān)系、事件時間處理策略、容錯配置等。StreamGraph是用戶定義的數(shù)據(jù)流拓撲,是一種高級抽象,用戶可以通過DataStream API構(gòu)建StreamGraph。
-
JobGraph(作業(yè)圖):JobGraph是從StreamGraph派生而來的,表示一個具體的作業(yè)執(zhí)行計劃。在JobGraph中,StreamGraph中的邏輯算子被映射為具體的物理算子,且有明確的執(zhí)行順序和任務間的依賴關(guān)系。JobGraph還包含了資源配置、任務并行度、優(yōu)化選項等信息。JobGraph是從邏輯視圖轉(zhuǎn)向物理執(zhí)行的關(guān)鍵步驟。
-
ExecutionGraph(執(zhí)行圖):ExecutionGraph是JobGraph的執(zhí)行時表示,它是實際執(zhí)行計劃的核心。在ExecutionGraph中,JobGraph中的每個任務都會被映射到一個具體的執(zhí)行任務,每個任務可以包含一個或多個子任務,這些子任務被映射到不同的TaskManager上。ExecutionGraph還負責維護作業(yè)的執(zhí)行狀態(tài),以及任務之間的調(diào)度和通信。
-
物理執(zhí)行圖:ExecutionGraph被映射到實際的物理執(zhí)行圖,即在TaskManager集群上真正執(zhí)行的任務拓撲。物理執(zhí)行圖包括了任務的并行執(zhí)行、數(shù)據(jù)交換、任務狀態(tài)管理等細節(jié),它是作業(yè)在分布式環(huán)境中實際運行的體現(xiàn)。
總結(jié)起來,StreamGraph到JobGraph到ExecutionGraph的轉(zhuǎn)換是Flink作業(yè)執(zhí)行計劃的關(guān)鍵步驟。從邏輯視圖到物理執(zhí)行圖的轉(zhuǎn)換過程考慮了作業(yè)的拓撲結(jié)構(gòu)、資源分配、任務調(diào)度等方面的問題,確保了作業(yè)可以在分布式環(huán)境中高效執(zhí)行。這一系列轉(zhuǎn)換過程使得用戶可以通過高層次的抽象來描述作業(yè)邏輯,而Flink框架會負責將其轉(zhuǎn)化為可執(zhí)行的任務圖,實現(xiàn)數(shù)據(jù)流的處理和計算。
5. Flink 開發(fā)
Flink 應用程序結(jié)構(gòu)主要包含三部分,Source/Transformation/Sink,如下圖所示:
Source: 數(shù)據(jù)源,Flink 在流處理和批處理上的 source 大概有 4 類:
- 基于本地集合的 source
- 基于文件的 source
- 基于網(wǎng)絡套接字的 source
- 自定義的 source。自定義的 source 常見的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,當然你也可以定義自己的 source。
Transformation:數(shù)據(jù)轉(zhuǎn)換的各種操作,有 Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select / Project 等,操作很多,可以將數(shù)據(jù)轉(zhuǎn)換計算成你想要的數(shù)據(jù)。
Sink:接收器,Flink 將轉(zhuǎn)換計算后的數(shù)據(jù)發(fā)送的地點 ,你可能需要存儲下來,Flink 常見的 Sink 大概有如下幾類:
- 寫入文件
- 打印輸出
- 寫入 socket
- 自定義的 sink 。自定義的 sink 常見的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也可以定義自己的 Sink。
開發(fā)環(huán)境搭建
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.lagou</groupId><artifactId>flinkdemo</artifactId><version>1.0-SNAPSHOT</version><dependencies><!--flink核心包--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.7.2</version></dependency><!--flink流處理包--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.7.2</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-hadoop-compatibility_2.12</artifactId><version>1.7.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.7.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>2.7.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.7.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>1.7.2</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.73</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.12</artifactId><version>1.7.2</version></dependency></dependencies><build><plugins><!-- 打jar插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins></build></project>
Flink連接器
在實際生產(chǎn)環(huán)境中,數(shù)據(jù)通常分布在各種不同的系統(tǒng)中,包括文件系統(tǒng)、數(shù)據(jù)庫、消息隊列等。Flink作為一個大數(shù)據(jù)處理框架,需要與這些外部系統(tǒng)進行數(shù)據(jù)交互,以實現(xiàn)數(shù)據(jù)的輸入、處理和輸出。在Flink中,Source和Sink是兩個關(guān)鍵模塊,它們扮演著與外部系統(tǒng)進行數(shù)據(jù)連接和交互的重要角色,被統(tǒng)稱為外部連接器(Connector)。
-
Source(數(shù)據(jù)源):Source是Flink作業(yè)的輸入模塊,用于從外部系統(tǒng)中讀取數(shù)據(jù)并將其轉(zhuǎn)化為Flink的數(shù)據(jù)流。Source負責實現(xiàn)與不同數(shù)據(jù)源的交互邏輯,將外部數(shù)據(jù)源的數(shù)據(jù)逐條或批量讀取到Flink的數(shù)據(jù)流中,以便后續(xù)的數(shù)據(jù)處理。常見的Source包括從文件中讀取數(shù)據(jù)、從消息隊列(如Kafka、RabbitMQ)中消費數(shù)據(jù)、從數(shù)據(jù)庫中讀取數(shù)據(jù)等。
-
Sink(數(shù)據(jù)接收器):Sink是Flink作業(yè)的輸出模塊,用于將Flink計算的結(jié)果輸出到外部系統(tǒng)中。Sink負責實現(xiàn)將Flink數(shù)據(jù)流中的數(shù)據(jù)寫入到外部數(shù)據(jù)源,以便后續(xù)的持久化存儲、展示或其他處理。Sink的實現(xiàn)需要考慮數(shù)據(jù)的可靠性、一致性以及可能的事務性要求。常見的Sink包括將數(shù)據(jù)寫入文件、將數(shù)據(jù)寫入數(shù)據(jù)庫、將數(shù)據(jù)寫入消息隊列等。
外部連接器在Flink中的作用非常關(guān)鍵,它們使得Flink作業(yè)可以與各種不同類型的數(shù)據(jù)源和數(shù)據(jù)目的地進行交互,實現(xiàn)了數(shù)據(jù)的流入和流出。這種靈活的連接機制使得Flink在處理大數(shù)據(jù)時能夠更好地集成已有的系統(tǒng)和數(shù)據(jù),實現(xiàn)復雜的數(shù)據(jù)流處理和分析任務。
Source
Flink在批處理中常見的source主要有兩大類。
- 基于本地集合的source(Collection-based-source)
- 基于文件的source(File-based-source)
基于本地集合的Source
在Flink中最常見的創(chuàng)建本地集合的DataSet方式有三種。
- 使用env.fromElements(),這種方式也支持Tuple,自定義對象等復合形式。
- 使用env.fromCollection(),這種方式支持多種Collection的具體類型。
- 使用env.generateSequence(),這種方法創(chuàng)建基于Sequence的DataSet。
使用方式如下:
package com.demo.broad;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.DataSet;import java.util.ArrayList;
import java.util.List;
import java.util.ArrayDeque;
import java.util.Stack;
import java.util.stream.Stream;public class BatchFromCollection {public static void main(String[] args) throws Exception {// 獲取flink執(zhí)行環(huán)境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 0.用element創(chuàng)建DataSet(fromElements)DataSet<String> ds0 = env.fromElements("spark", "flink");ds0.print();// 1.用Tuple創(chuàng)建DataSet(fromElements)DataSet<Tuple2<Integer, String>> ds1 = env.fromElements(new Tuple2<>(1, "spark"),new Tuple2<>(2, "flink"));ds1.print();// 2.用Array創(chuàng)建DataSetDataSet<String> ds2 = env.fromCollection(new ArrayList<String>() {{add("spark");add("flink");}});ds2.print();// 3.用ArrayDeque創(chuàng)建DataSetDataSet<String> ds3 = env.fromCollection(new ArrayDeque<String>() {{add("spark");add("flink");}});ds3.print();// 4.用List創(chuàng)建DataSetDataSet<String> ds4 = env.fromCollection(new ArrayList<String>() {{add("spark");add("flink");}});ds4.print();// 5.用ArrayList創(chuàng)建DataSetDataSet<String> ds5 = env.fromCollection(new ArrayList<String>() {{add("spark");add("flink");}});ds5.print();// 6.用List創(chuàng)建DataSetDataSet<String> ds6 = env.fromCollection(new ArrayList<String>() {{add("spark");add("flink");}});ds6.print();// 7.用List創(chuàng)建DataSetDataSet<String> ds7 = env.fromCollection(new ArrayList<String>() {{add("spark");add("flink");}});ds7.print();// 8.用Stack創(chuàng)建DataSetDataSet<String> ds8 = env.fromCollection(new Stack<String>() {{add("spark");add("flink");}});ds8.print();// 9.用Stream創(chuàng)建DataSet(Stream相當于lazy List,避免在中間過程中生成不必要的集合)DataSet<String> ds9 = env.fromCollection(Stream.of("spark", "flink"));ds9.print();// 10.用List創(chuàng)建DataSetDataSet<String> ds10 = env.fromCollection(new ArrayList<String>() {{add("spark");add("flink");}});ds10.print();// 11.用HashSet創(chuàng)建DataSetDataSet<String> ds11 = env.fromCollection(new HashSet<String>() {{add("spark");add("flink");}});ds11.print();// 12.用Iterable創(chuàng)建DataSetDataSet<String> ds12 = env.fromCollection(new ArrayList<String>() {{add("spark");add("flink");}});ds12.print();// 13.用ArrayList創(chuàng)建DataSetDataSet<String> ds13 = env.fromCollection(new ArrayList<String>() {{add("spark");add("flink");}});ds13.print();// 14.用Stack創(chuàng)建DataSetDataSet<String> ds14 = env.fromCollection(new Stack<String>() {{add("spark");add("flink");}});ds14.print();// 15.用HashMap創(chuàng)建DataSetDataSet<Tuple2<Integer, String>> ds15 = env.fromCollection(new HashMap<Integer, String>() {{put(1, "spark");put(2, "flink");}}.entrySet());ds15.print();// 16.用Range創(chuàng)建DataSetDataSet<Integer> ds16 = env.fromCollection(IntStream.rangeClosed(1, 8).boxed().collect(Collectors.toList()));ds16.print();// 17.用generateSequence創(chuàng)建DataSetDataSet<Long> ds17 = env.generateSequence(1, 9);ds17.print();}
}
基于文件的Source
Flink支持直接從外部文件存儲系統(tǒng)中讀取文件的方式來創(chuàng)建Source數(shù)據(jù)源,Flink支持的方式有以下幾種:
- 讀取本地文件數(shù)據(jù)
- 讀取HDFS文件數(shù)據(jù)
- 讀取CSV文件數(shù)據(jù)
- 讀取壓縮文件
- 遍歷目錄
下面分別介紹每個數(shù)據(jù)源的加載方式:
讀取本地文件
package com.demo.batch;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;public class BatchFromFile {public static void main(String[] args) throws Exception {// 使用readTextFile讀取本地文件// 初始化環(huán)境ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();// 加載數(shù)據(jù)DataSet<String> datas = environment.readTextFile("data.txt");// 觸發(fā)程序執(zhí)行datas.print();}
}
讀取HDFS文件數(shù)據(jù)
package com.demo.batch;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;public class BatchFromFile {public static void main(String[] args) throws Exception {// 使用readTextFile讀取本地文件// 初始化環(huán)境ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();// 加載數(shù)據(jù)DataSet<String> datas = environment.readTextFile("hdfs://node01:8020/README.txt");// 觸發(fā)程序執(zhí)行datas.print();}
}
讀取CSV文件數(shù)據(jù)
package com.demo.batch;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.common.functions.MapFunction;public class BatchFromCsvFile {public static void main(String[] args) throws Exception {// 初始化環(huán)境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 用于映射CSV文件的POJO classpublic static class Student {public int id;public String name;public Student() {}public Student(int id, String name) {this.id = id;this.name = name;}@Overridepublic String toString() {return "Student(" + id + ", " + name + ")";}}// 讀取CSV文件DataSet<Student> csvDataSet = env.readCsvFile("./data/input/student.csv").ignoreFirstLine().pojoType(Student.class, "id", "name");csvDataSet.print();}
}
讀取壓縮文件
對于以下壓縮類型,不需要指定任何額外的inputformat方法,flink可以自動識別并且解壓。但是,壓縮文件可能不會并行讀取,可能是順序讀取的,這樣可能會影響作業(yè)的可伸縮性。
壓縮格式 | 擴展名 | 并行化 |
---|---|---|
DEFLATE | .deflate | no |
GZIP | .gz .gzip | no |
Bzip2 | .bz2 | no |
XZ | .xz | no |
package com.demo.batch;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;public class BatchFromCompressFile {public static void main(String[] args) throws Exception {// 初始化環(huán)境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 加載數(shù)據(jù)DataSet<String> result = env.readTextFile("D:\\BaiduNetdiskDownload\\hbase-1.3.1-bin.tar.gz");// 觸發(fā)程序執(zhí)行result.print();}
}
遍歷目錄
flink支持對一個文件目錄內(nèi)的所有文件,包括所有子目錄中的所有文件的遍歷訪問方式。
對于從文件中讀取數(shù)據(jù),當讀取數(shù)個文件夾的時候,嵌套的文件默認是不會被讀取的,只會讀取第一個文件,其他的都會被忽略。所以我們需要使用recursive.file.enumeration進行遞歸讀取
package com.demo.batch;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;public class BatchFromCompressFile {public static void main(String[] args) throws Exception {// 初始化環(huán)境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 加載數(shù)據(jù)DataSet<String> result = env.readTextFile("D:\\BaiduNetdiskDownload\\hbase-1.3.1-bin.tar.gz");// 觸發(fā)程序執(zhí)行result.print();}
}
讀取kafka
public class StreamFromKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers","teacher2:9092");FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>("mytopic2", new SimpleStringSchema(), properties);DataStreamSource<String> data = env.addSource(consumer);SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = data.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {for (String word : s.split(" ")) {collector.collect(Tuple2.of(word, 1));}}});SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(0).sum(1);result.print();env.execute();}
}
自定義source
private static class SimpleSource
implements SourceFunction<Tuple2<String, Integer>> { private int offset = 0; private boolean isRunning = true; @Override public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception { while (isRunning) { Thread.sleep(500); ctx.collect(new Tuple2<>("" + offset, offset)); offset++; if (offset == 1000) { isRunning = false; } } } @Override public void cancel() { isRunning = false; }
}
自定義Source,從0開始計數(shù),將數(shù)字發(fā)送到下游在主邏輯中調(diào)用這個Source。
DataStream<Tuple2<String, Integer>> countStream = env.addSource(new SimpleSource());
Sink
flink在批處理中常見的sink
- 基于本地集合的sink(Collection-based-sink)
- 基于文件的sink(File-based-sink)
基于本地集合的sink
目標:
基于下列數(shù)據(jù),分別進行打印輸出,error輸出,collect()
(19, "zhangsan", 178.8),
(17, "lisi", 168.8),
(18, "wangwu", 184.8),
(21, "zhaoliu", 164.8)
代碼:
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;public class BatchSinkCollection {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();List<Tuple3<Integer, String, Double>> stuData = new ArrayList<>();stuData.add(new Tuple3<>(19, "zhangsan", 178.8));stuData.add(new Tuple3<>(17, "lisi", 168.8));stuData.add(new Tuple3<>(18, "wangwu", 184.8));stuData.add(new Tuple3<>(21, "zhaoliu", 164.8));DataSet<Tuple3<Integer, String, Double>> stu = env.fromCollection(stuData);stu.print();stu.printToErr();stu.collect().forEach(System.out::println);env.execute();}
}
基于文件的sink
- flink支持多種存儲設(shè)備上的文件,包括本地文件,hdfs文件等。
- flink支持多種文件的存儲格式,包括text文件,CSV文件等。
- writeAsText():TextOuputFormat - 將元素作為字符串寫入行。字符串是通過調(diào)用每個元素的toString()方法獲得的。
將數(shù)據(jù)寫入本地文件
目標:
基于下列數(shù)據(jù),寫入到文件中
Map(1 -> "spark", 2 -> "flink")
代碼:
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;import java.util.HashMap;
import java.util.Map;public class BatchSinkFile {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();Map<Integer, String> data1 = new HashMap<>();data1.put(1, "spark");data1.put(2, "flink");DataSet<Map<Integer, String>> ds1 = env.fromElements(data1);ds1.setParallelism(1).writeAsText("test/data1/aa", FileSystem.WriteMode.OVERWRITE).setParallelism(1);env.execute();}
}
將數(shù)據(jù)寫入HDFS
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;import java.util.HashMap;
import java.util.Map;public class BatchSinkFile {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();Map<Integer, String> data1 = new HashMap<>();data1.put(1, "spark");data1.put(2, "flink");DataSet<Map<Integer, String>> ds1 = env.fromElements(data1);ds1.setParallelism(1).writeAsText("hdfs://bigdata1:9000/a", FileSystem.WriteMode.OVERWRITE).setParallelism(1);env.execute();}
}
Flink API
Flink的API層提供了DataStream API和DataSet API,分別用于流式處理和批處理。這兩個API允許開發(fā)者使用各種操作符和轉(zhuǎn)換來處理數(shù)據(jù),包括轉(zhuǎn)換、連接、聚合、窗口等計算任務。
在Flink中,根據(jù)不同的場景(流處理或批處理),需要設(shè)置不同的執(zhí)行環(huán)境。在批處理場景下,需要使用DataSet API,并設(shè)置批處理執(zhí)行環(huán)境。在流處理場景下,需要使用DataStream API,并設(shè)置流處理執(zhí)行環(huán)境。
以下是在不同場景下設(shè)置執(zhí)行環(huán)境的示例代碼,分別展示了批處理和流處理的情況,包括Scala和Java語言。
批處理場景 - 設(shè)置DataSet API的批處理執(zhí)行環(huán)境(Java):
import org.apache.flink.api.java.ExecutionEnvironment;public class BatchJobExample {public static void main(String[] args) throws Exception {// 創(chuàng)建批處理執(zhí)行環(huán)境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 在這里添加批處理作業(yè)的代碼邏輯// ...// 執(zhí)行作業(yè)env.execute("Batch Job Example");}
}
流處理場景 - 設(shè)置DataStream API的流處理執(zhí)行環(huán)境(Java):
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class StreamJobExample {public static void main(String[] args) throws Exception {// 創(chuàng)建流處理執(zhí)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 在這里添加流處理作業(yè)的代碼邏輯// ...// 執(zhí)行作業(yè)env.execute("Stream Job Example");}
}
批處理場景 - 設(shè)置DataSet API的批處理執(zhí)行環(huán)境(Scala):
import org.apache.flink.api.scala._object BatchJobExample {def main(args: Array[String]): Unit = {// 創(chuàng)建批處理執(zhí)行環(huán)境val env = ExecutionEnvironment.getExecutionEnvironment// 在這里添加批處理作業(yè)的代碼邏輯// ...// 執(zhí)行作業(yè)env.execute("Batch Job Example")}
}
流處理場景 - 設(shè)置DataStream API的流處理執(zhí)行環(huán)境(Scala):
import org.apache.flink.streaming.api.scala._object StreamJobExample {def main(args: Array[String]): Unit = {// 創(chuàng)建流處理執(zhí)行環(huán)境val env = StreamExecutionEnvironment.getExecutionEnvironment// 在這里添加流處理作業(yè)的代碼邏輯// ...// 執(zhí)行作業(yè)env.execute("Stream Job Example")}
}
根據(jù)以上示例代碼,可以在不同的場景下選擇合適的執(zhí)行環(huán)境和API來構(gòu)建和執(zhí)行Flink作業(yè)。注意在導入包時,確保使用正確的包名和類名,以適應批處理或流處理的環(huán)境。
以下是一些常用的API函數(shù)和操作,以表格形式提供:
API 類型 | 常用函數(shù)和操作 | 描述 |
---|---|---|
DataStream API | map , flatMap | 對數(shù)據(jù)流中的每個元素進行映射或扁平化操作。 |
filter | 過濾出滿足條件的元素。 | |
keyBy | 按指定的字段或鍵對數(shù)據(jù)流進行分區(qū)。 | |
window | 將數(shù)據(jù)流按照時間窗口或計數(shù)窗口劃分。 | |
reduce , fold | 在窗口內(nèi)對元素進行聚合操作。 | |
union | 合并多個數(shù)據(jù)流。 | |
connect , coMap , coFlatMap | 連接兩個不同類型的數(shù)據(jù)流并應用相應的函數(shù)。 | |
timeWindow , countWindow | 定義時間窗口或計數(shù)窗口。 | |
process | 自定義處理函數(shù),實現(xiàn)更復雜的流處理邏輯。 | |
DataSet API | map , flatMap | 對數(shù)據(jù)集中的每個元素進行映射或扁平化操作。 |
filter | 過濾出滿足條件的元素。 | |
groupBy | 按指定的字段或鍵對數(shù)據(jù)集進行分組。 | |
reduce , fold | 對分組后的數(shù)據(jù)集進行聚合操作。 | |
join , coGroup | 對兩個數(shù)據(jù)集進行內(nèi)連接或外連接操作。 | |
cross , cartesian | 對兩個數(shù)據(jù)集進行笛卡爾積操作。 | |
distinct | 去除數(shù)據(jù)集中的重復元素。 | |
groupBy , aggregate | 分組并對分組后的數(shù)據(jù)集進行聚合操作。 | |
first , min , max | 獲取數(shù)據(jù)集中的第一個、最小或最大元素。 | |
sum , avg | 計算數(shù)據(jù)集中元素的和或平均值。 | |
collect | 將數(shù)據(jù)集中的元素收集到本地的集合中。 |
這些API函數(shù)和操作涵蓋了Flink中流處理和批處理的常見操作,可以幫助用戶實現(xiàn)各種復雜的數(shù)據(jù)處理和分析任務。根據(jù)實際需求,可以選擇適合的API函數(shù)和操作來構(gòu)建Flink作業(yè)。
下面是一些參見的API的說明:
map
將DataSet中的每一個元素轉(zhuǎn)換為另外一個元素
示例
使用map操作,將以下數(shù)據(jù)
"1,張三", "2,李四", "3,王五", "4,趙六"
轉(zhuǎn)換為一個scala的樣例類。
步驟
- 獲取
ExecutionEnvironment
運行環(huán)境 - 使用
fromCollection
構(gòu)建數(shù)據(jù)源 - 創(chuàng)建一個
User
樣例類 - 使用
map
操作執(zhí)行轉(zhuǎn)換 - 打印測試
參考代碼
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.common.functions.MapFunction;public class User {public String id;public String name;public User() {}public User(String id, String name) {this.id = id;this.name = name;}@Overridepublic String toString() {return "User(" + id + ", " + name + ")";}public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSet<String> textDataSet = env.fromCollection(Arrays.asList("1,張三", "2,李四", "3,王五", "4,趙六"));DataSet<User> userDataSet = textDataSet.map(new MapFunction<String, User>() {@Overridepublic User map(String text) throws Exception {String[] fieldArr = text.split(",");return new User(fieldArr[0], fieldArr[1]);}});userDataSet.print();}
}
flatMap
將DataSet中的每一個元素轉(zhuǎn)換為0…n個元素
示例
分別將以下數(shù)據(jù),轉(zhuǎn)換成國家
、省份
、城市
三個維度的數(shù)據(jù)。
將以下數(shù)據(jù)
張三,中國,江西省,南昌市
李四,中國,河北省,石家莊市
Tom,America,NewYork,Manhattan
轉(zhuǎn)換為
(張三,中國)
(張三,中國,江西省)
(張三,中國,江西省,江西省)
(李四,中國)
(李四,中國,河北省)
(李四,中國,河北省,河北省)
(Tom,America)
(Tom,America,NewYork)
(Tom,America,NewYork,NewYork)
思路
-
以上數(shù)據(jù)為一條轉(zhuǎn)換為三條,顯然,應當使用
flatMap
來實現(xiàn) -
分別在
flatMap
函數(shù)中構(gòu)建三個數(shù)據(jù),并放入到一個列表中姓名, 國家
姓名, 國家省份
姓名, 國家省份城市
步驟
- 構(gòu)建批處理運行環(huán)境
- 構(gòu)建本地集合數(shù)據(jù)源
- 使用
flatMap
將一條數(shù)據(jù)轉(zhuǎn)換為三條數(shù)據(jù)- 使用逗號分隔字段
- 分別構(gòu)建國家、國家省份、國家省份城市三個元組
- 打印輸出
參考代碼
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;public class UserProcessing {public static class User {public String name;public String country;public String province;public String city;public User() {}public User(String name, String country, String province, String city) {this.name = name;this.country = country;this.province = province;this.city = city;}@Overridepublic String toString() {return "User(" + name + ", " + country + ", " + province + ", " + city + ")";}}public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSet<String> userDataSet = env.fromCollection(new ArrayList<String>() {{add("張三,中國,江西省,南昌市");add("李四,中國,河北省,石家莊市");add("Tom,America,NewYork,Manhattan");}});DataSet<User> resultDataSet = userDataSet.flatMap(new FlatMapFunction<String, User>() {@Overridepublic void flatMap(String text, Collector<User> collector) throws Exception {String[] fieldArr = text.split(",");String name = fieldArr[0];String country = fieldArr[1];String province = fieldArr[2];String city = fieldArr[3];collector.collect(new User(name, country, province, city));collector.collect(new User(name, country, province + city, ""));collector.collect(new User(name, country, province + city, city));}});resultDataSet.print();}
}
mapPartition
將一個分區(qū)
中的元素轉(zhuǎn)換為另一個元素
示例
使用mapPartition操作,將以下數(shù)據(jù)
"1,張三", "2,李四", "3,王五", "4,趙六"
轉(zhuǎn)換為一個scala的樣例類。
步驟
- 獲取
ExecutionEnvironment
運行環(huán)境 - 使用
fromCollection
構(gòu)建數(shù)據(jù)源 - 創(chuàng)建一個
User
樣例類 - 使用
mapPartition
操作執(zhí)行轉(zhuǎn)換 - 打印測試
參考代碼
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;public class MapPartitionExample {public static class User {public String id;public String name;public User() {}public User(String id, String name) {this.id = id;this.name = name;}@Overridepublic String toString() {return "User(" + id + ", " + name + ")";}}public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSet<String> userDataSet = env.fromCollection(new ArrayList<String>() {{add("1,張三");add("2,李四");add("3,王五");add("4,趙六");}});DataSet<User> resultDataSet = userDataSet.mapPartition(new MapPartitionFunction<String, User>() {@Overridepublic void mapPartition(Iterable<String> iterable, Collector<User> collector) throws Exception {// TODO: 打開連接Iterator<String> iterator = iterable.iterator();while (iterator.hasNext()) {String ele = iterator.next();String[] fieldArr = ele.split(",");collector.collect(new User(fieldArr[0], fieldArr[1]));}// TODO: 關(guān)閉連接}});resultDataSet.print();}
}
map
和mapPartition
的效果是一樣的,但如果在map的函數(shù)中,需要訪問一些外部存儲。例如:
訪問mysql數(shù)據(jù)庫,需要打開連接
, 此時效率較低。而使用mapPartition
可以有效減少連接數(shù),提高效率
filter
過濾出來
一些符合條件的元素
示例:
過濾出來以下以h
開頭的單詞。
"hadoop", "hive", "spark", "flink"
步驟
- 獲取
ExecutionEnvironment
運行環(huán)境 - 使用
fromCollection
構(gòu)建數(shù)據(jù)源 - 使用
filter
操作執(zhí)行過濾 - 打印測試
參考代碼
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;import java.util.ArrayList;
import java.util.List;public class FilterExample {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSet<String> wordDataSet = env.fromCollection(new ArrayList<String>() {{add("hadoop");add("hive");add("spark");add("flink");}});DataSet<String> resultDataSet = wordDataSet.filter(word -> word.startsWith("h"));resultDataSet.print();}
}
reduce
可以對一個dataset
或者一個group
來進行聚合計算,最終聚合成一個元素
示例1
請將以下元組數(shù)據(jù),使用reduce
操作聚合成一個最終結(jié)果
("java" , 1) , ("java", 1) ,("java" , 1)
將上傳元素數(shù)據(jù)轉(zhuǎn)換為("java",3)
步驟
- 獲取
ExecutionEnvironment
運行環(huán)境 - 使用
fromCollection
構(gòu)建數(shù)據(jù)源 - 使用
redice
執(zhí)行聚合操作 - 打印測試
參考代碼
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;import java.util.ArrayList;
import java.util.List;public class ReduceExample {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSet<Tuple2<String, Integer>> wordCountDataSet = env.fromCollection(new ArrayList<Tuple2<String, Integer>>() {{add(new Tuple2<>("java", 1));add(new Tuple2<>("java", 1));add(new Tuple2<>("java", 1));}});DataSet<Tuple2<String, Integer>> resultDataSet = wordCountDataSet.reduce((wc1, wc2) ->new Tuple2<>(wc2.f0, wc1.f1 + wc2.f1));resultDataSet.print();}
}
示例2
請將以下元組數(shù)據(jù),下按照單詞使用groupBy
進行分組,再使用reduce
操作聚合成一個最終結(jié)果
("java" , 1) , ("java", 1) ,("scala" , 1)
轉(zhuǎn)換為
("java", 2), ("scala", 1)
步驟
- 獲取
ExecutionEnvironment
運行環(huán)境 - 使用
fromCollection
構(gòu)建數(shù)據(jù)源 - 使用
groupBy
按照單詞進行分組 - 使用
reduce
對每個分組進行統(tǒng)計 - 打印測試
參考代碼
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple;import java.util.ArrayList;
import java.util.List;public class GroupByReduceExample {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSet<Tuple2<String, Integer>> wordCountDataSet = env.fromCollection(new ArrayList<Tuple2<String, Integer>>() {{add(new Tuple2<>("java", 1));add(new Tuple2<>("java", 1));add(new Tuple2<>("scala", 1));}});DataSet<Tuple2<String, Integer>> groupedDataSet = wordCountDataSet.groupBy(0).reduce((wc1, wc2) ->new Tuple2<>(wc1.f0, wc1.f1 + wc2.f1));groupedDataSet.print();}
}
reduceGroup
可以對一個dataset或者一個group來進行聚合計算,最終聚合成一個元素
reduce和reduceGroup的區(qū)別
- reduce是將數(shù)據(jù)一個個拉取到另外一個節(jié)點,然后再執(zhí)行計算
- reduceGroup是先在每個group所在的節(jié)點上執(zhí)行計算,然后再拉取
示例
請將以下元組數(shù)據(jù),下按照單詞使用groupBy
進行分組,再使用reduceGroup
操作進行單詞計數(shù)
("java" , 1) , ("java", 1) ,("scala" , 1)
步驟
- 獲取
ExecutionEnvironment
運行環(huán)境 - 使用
fromCollection
構(gòu)建數(shù)據(jù)源 - 使用
groupBy
按照單詞進行分組 - 使用
reduceGroup
對每個分組進行統(tǒng)計 - 打印測試
參考代碼
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;public class GroupByReduceGroupExample {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataSet<Tuple2<String, Integer>> wordCountDataSet = env.fromCollection(new ArrayList<Tuple2<String, Integer>>() {{add(new Tuple2<>("java", 1));add(new Tuple2<>("java", 1));add(new Tuple2<>("scala", 1));}});DataSet<Tuple2<String, Integer>> groupedDataSet = wordCountDataSet.groupBy(0).reduceGroup((Iterable<Tuple2<String, Integer>> iter, Collector<Tuple2<String, Integer>> collector) -> {Tuple2<String, Integer> result = new Tuple2<>();for (Tuple2<String, Integer> wc : iter) {result.f0 = wc.f0;result.f1 += wc.f1;}collector.collect(result);});groupedDataSet.print();}
}
aggregate
按照內(nèi)置的方式來進行聚合, Aggregate只能作用于元組
上。例如:SUM/MIN/MAX…
示例
請將以下元組數(shù)據(jù),使用aggregate
操作進行單詞統(tǒng)計
("java" , 1) , ("java", 1) ,("scala" , 1)
步驟
- 獲取
ExecutionEnvironment
運行環(huán)境 - 使用
fromCollection
構(gòu)建數(shù)據(jù)源 - 使用
groupBy
按照單詞進行分組 - 使用
aggregate
對每個分組進行SUM
統(tǒng)計 - 打印測試
參考代碼
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.tuple.Tuple2;import java.util.ArrayList;
import java.util.List;public class GroupByAggregateExample {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSet<Tuple2<String, Integer>> wordCountDataSet = env.fromCollection(new ArrayList<Tuple2<String, Integer>>() {{add(new Tuple2<>("java", 1));add(new Tuple2<>("java", 1));add(new Tuple2<>("scala", 1));}});DataSet<Tuple2<String, Integer>> groupedDataSet = wordCountDataSet.groupBy(0);DataSet<Tuple2<String, Integer>> resultDataSet = groupedDataSet.aggregate(Aggregations.SUM, 1);resultDataSet.print();}
}
注意
要使用aggregate,只能使用字段索引名或索引名稱來進行分組
groupBy(0)
,否則會報一下錯誤:Exception in thread "main" java.lang.UnsupportedOperationException: Aggregate does not support grouping with KeySelector functions, yet.
distinct
去除重復的數(shù)據(jù)
示例
請將以下元組數(shù)據(jù),使用distinct
操作去除重復的單詞
("java" , 1) , ("java", 2) ,("scala" , 1)
去重得到
("java", 1), ("scala", 1)
步驟
- 獲取
ExecutionEnvironment
運行環(huán)境 - 使用
fromCollection
構(gòu)建數(shù)據(jù)源 - 使用
distinct
指定按照哪個字段來進行去重 - 打印測試
參考代碼
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;import java.util.ArrayList;
import java.util.List;public class DistinctExample {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSet<Tuple2<String, Integer>> wordCountDataSet = env.fromCollection(new ArrayList<Tuple2<String, Integer>>() {{add(new Tuple2<>("java", 1));add(new Tuple2<>("java", 1));add(new Tuple2<>("scala", 1));}});DataSet<Tuple2<String, Integer>> resultDataSet = wordCountDataSet.distinct(0);resultDataSet.print();}
}
join
使用join可以將兩個DataSet連接起來
示例:
有兩個csv文件,有一個為score.csv
,一個為subject.csv
,分別保存了成績數(shù)據(jù)以及學科數(shù)據(jù)。
需要將這兩個數(shù)據(jù)連接到一起,然后打印出來。
步驟
-
分別將兩個文件復制到項目中的
data/join/input
中 -
構(gòu)建批處理環(huán)境
-
創(chuàng)建兩個樣例類
* 學科Subject(學科ID、學科名字) * 成績Score(唯一ID、學生姓名、學科ID、分數(shù)——Double類型)
-
分別使用
readCsvFile
加載csv數(shù)據(jù)源,并制定泛型 -
使用join連接兩個DataSet,并使用
where
、equalTo
方法設(shè)置關(guān)聯(lián)條件 -
打印關(guān)聯(lián)后的數(shù)據(jù)源
參考代碼
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple4;public class JoinExample {public static class Score {public int id;public String name;public int subjectId;public double score;public Score() {}public Score(int id, String name, int subjectId, double score) {this.id = id;this.name = name;this.subjectId = subjectId;this.score = score;}@Overridepublic String toString() {return "Score(" + id + ", " + name + ", " + subjectId + ", " + score + ")";}}public static class Subject {public int id;public String name;public Subject() {}public Subject(int id, String name) {this.id = id;this.name = name;}@Overridepublic String toString() {return "Subject(" + id + ", " + name + ")";}}public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSet<Score> scoreDataSet = env.readCsvFile("./data/join/input/score.csv").ignoreFirstLine().pojoType(Score.class);DataSet<Subject> subjectDataSet = env.readCsvFile("./data/join/input/subject.csv").ignoreFirstLine().pojoType(Subject.class);DataSet<Tuple4<Integer, String, Integer, Double>> joinedDataSet = scoreDataSet.join(subjectDataSet).where("subjectId").equalTo("id").projectFirst(0, 1, 2, 3).projectSecond(1);joinedDataSet.print();}
}
union
將兩個DataSet取并集,不會去重。
示例
將以下數(shù)據(jù)進行取并集操作
數(shù)據(jù)集1
"hadoop", "hive", "flume"
數(shù)據(jù)集2
"hadoop", "hive", "spark"
步驟
- 構(gòu)建批處理運行環(huán)境
- 使用
fromCollection
創(chuàng)建兩個數(shù)據(jù)源 - 使用
union
將兩個數(shù)據(jù)源關(guān)聯(lián)在一起 - 打印測試
參考代碼
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;import java.util.ArrayList;
import java.util.List;public class UnionExample {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSet<String> wordDataSet1 = env.fromCollection(new ArrayList<String>() {{add("hadoop");add("hive");add("flume");}});DataSet<String> wordDataSet2 = env.fromCollection(new ArrayList<String>() {{add("hadoop");add("hive");add("spark");}});DataSet<String> resultDataSet = wordDataSet1.union(wordDataSet2);resultDataSet.print();}
}
connect
connect()提供了和union()類似的功能,即連接兩個數(shù)據(jù)流,它與union()的區(qū)別如下。
-
connect()只能連接兩個數(shù)據(jù)流,union()可以連接多個數(shù)據(jù)流。
-
connect()所連接的兩個數(shù)據(jù)流的數(shù)據(jù)類型可以不一致,union()所連接的兩個或多個數(shù)據(jù)流的數(shù)據(jù)類型必須一致。
-
兩個DataStream經(jīng)過connect()之后被轉(zhuǎn)化為ConnectedStreams, ConnectedStreams會對兩個流的數(shù)據(jù)應用不同的處理方法,且兩個流之間可以共享狀態(tài)。
DataStream<Integer> intStream = senv.fromElements(2, 1, 5, 3, 4, 7);
DataStream<String> stringStream = senv.fromElements("A", "B", "C", "D"); ConnectedStreams<Integer, String> connectedStream =
intStream.connect(stringStream);
DataStream<String> mapResult = connectedStream.map(new MyCoMapFunction()); // CoMapFunction的3個泛型分別對應第一個流的輸入類型、第二個流的輸入類型,輸出類型
public static class MyCoMapFunction implements CoMapFunction<Integer, String, String>
{ @Override public String map1(Integer input1) { return input1.toString(); } @Override public String map2(String input2) { return input2; }
}
rebalance
Flink也會產(chǎn)生數(shù)據(jù)傾斜
的時候,例如:當前的數(shù)據(jù)量有10億條,在處理過程就有可能發(fā)生如下狀況:
rebalance
會使用輪詢的方式將數(shù)據(jù)均勻打散,這是處理數(shù)據(jù)傾斜最好的選擇。
步驟
-
構(gòu)建批處理運行環(huán)境
-
使用
env.generateSequence
創(chuàng)建0-100的并行數(shù)據(jù) -
使用
fiter
過濾出來大于8
的數(shù)字 -
使用map操作傳入
RichMapFunction
,將當前子任務的ID和數(shù)字構(gòu)建成一個元組在RichMapFunction中可以使用`getRuntimeContext.getIndexOfThisSubtask`獲取子任務序號
-
打印測試
參考代碼
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;public class MapWithSubtaskIndexExample {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSet<Long> numDataSet = env.generateSequence(0, 100);DataSet<Long> filterDataSet = numDataSet.filter(num -> num > 8);DataSet<Tuple2<Long, Long>> resultDataSet = filterDataSet.map(new RichMapFunction<Long, Tuple2<Long, Long>>() {@Overridepublic Tuple2<Long, Long> map(Long in) throws Exception {return new Tuple2<>(getRuntimeContext().getIndexOfThisSubtask(), in);}});resultDataSet.print();}
}
上述代碼沒有加rebalance,通過觀察,有可能會出現(xiàn)數(shù)據(jù)傾斜。
在filter計算完后,調(diào)用rebalance
,這樣,就會均勻地將數(shù)據(jù)分布到每一個分區(qū)中。
hashPartition
按照指定的key進行hash分區(qū)
示例
基于以下列表數(shù)據(jù)來創(chuàng)建數(shù)據(jù)源,并按照hashPartition進行分區(qū),然后輸出到文件。
List(1,1,1,1,1,1,1,2,2,2,2,2)
步驟
- 構(gòu)建批處理運行環(huán)境
- 設(shè)置并行度為
2
- 使用
fromCollection
構(gòu)建測試數(shù)據(jù)集 - 使用
partitionByHash
按照字符串的hash進行分區(qū) - 調(diào)用
writeAsText
寫入文件到data/partition_output
目錄中 - 打印測試
參考代碼
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.core.fs.FileSystem;import java.util.ArrayList;
import java.util.List;public class PartitionByHashExample {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// Set parallelism to 2env.setParallelism(2);DataSet<Integer> numDataSet = env.fromCollection(new ArrayList<Integer>() {{add(1);add(1);add(1);add(1);add(1);add(1);add(1);add(2);add(2);add(2);add(2);add(2);}});DataSet<Integer> partitionDataSet = numDataSet.partitionByHash(num -> num.toString());partitionDataSet.writeAsText("./data/partition_output", FileSystem.WriteMode.OVERWRITE);partitionDataSet.print();env.execute();}
}
sortPartition
指定字段對分區(qū)中的數(shù)據(jù)進行排序
示例
按照以下列表來創(chuàng)建數(shù)據(jù)集
List("hadoop", "hadoop", "hadoop", "hive", "hive", "spark", "spark", "flink")
對分區(qū)進行排序后,輸出到文件。
步驟
- 構(gòu)建批處理運行環(huán)境
- 使用
fromCollection
構(gòu)建測試數(shù)據(jù)集 - 設(shè)置數(shù)據(jù)集的并行度為
2
- 使用
sortPartition
按照字符串進行降序排序 - 調(diào)用
writeAsText
寫入文件到data/sort_output
目錄中 - 啟動執(zhí)行
參考代碼
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.core.fs.FileSystem;import java.util.ArrayList;
import java.util.List;public class SortPartitionExample {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSet<String> wordDataSet = env.fromCollection(new ArrayList<String>() {{add("hadoop");add("hadoop");add("hadoop");add("hive");add("hive");add("spark");add("spark");add("flink");}});wordDataSet.setParallelism(2);DataSet<String> sortedDataSet = wordDataSet.sortPartition(str -> str, Order.DESCENDING);sortedDataSet.writeAsText("./data/sort_output/", FileSystem.WriteMode.OVERWRITE);env.execute("App");}
}
窗口
在許多情況下,我們需要解決這樣的問題:針對一個特定的時間段,例如一個小時,我們需要對數(shù)據(jù)進行統(tǒng)計和分析。但是,要實現(xiàn)這種數(shù)據(jù)窗口操作,首先需要確定哪些數(shù)據(jù)應該進入這個窗口。在深入了解窗口操作的定義之前,我們必須先確定作業(yè)將使用哪種時間語義。
換句話說,時間窗口是數(shù)據(jù)處理中的一個關(guān)鍵概念,用于將數(shù)據(jù)劃分為特定的時間段進行計算。然而,在確定如何定義這些窗口之前,我們必須選擇適合的時間語義,即事件時間、處理時間或攝入時間。不同的時間語義在數(shù)據(jù)處理中具有不同的含義和用途,因此在選擇時間窗口之前,我們需要明確作業(yè)所需的時間語義,以便正確地界定和處理數(shù)據(jù)窗口。
時間概念
…
由于字數(shù)限制,此處省略
完整內(nèi)容,請參見《大數(shù)據(jù)Flink學習圣經(jīng)》,pdf 免費找尼恩獲取
窗口程序
…
由于字數(shù)限制,此處省略
完整內(nèi)容,請參見《大數(shù)據(jù)Flink學習圣經(jīng)》,pdf 免費找尼恩獲取
滾動窗口
…
由于字數(shù)限制,此處省略
完整內(nèi)容,請參見《大數(shù)據(jù)Flink學習圣經(jīng)》,pdf 免費找尼恩獲取
滑動窗口
…
由于字數(shù)限制,此處省略
完整內(nèi)容,請參見《大數(shù)據(jù)Flink學習圣經(jīng)》,pdf 免費找尼恩獲取
會話窗口
…
由于字數(shù)限制,此處省略
完整內(nèi)容,請參見《大數(shù)據(jù)Flink學習圣經(jīng)》,pdf 免費找尼恩獲取
基于數(shù)量窗口
…
由于字數(shù)限制,此處省略
完整內(nèi)容,請參見《大數(shù)據(jù)Flink學習圣經(jīng)》,pdf 免費找尼恩獲取
觸發(fā)器
…
由于字數(shù)限制,此處省略
完整內(nèi)容,請參見《大數(shù)據(jù)Flink學習圣經(jīng)》,pdf 免費找尼恩獲取
清除器
…
由于字數(shù)限制,此處省略
完整內(nèi)容,請參見《大數(shù)據(jù)Flink學習圣經(jīng)》,pdf 免費找尼恩獲取
6. Flink程序本地執(zhí)行和集群執(zhí)行
6.1. 本地執(zhí)行
…
由于字數(shù)限制,此處省略
完整內(nèi)容,請參見《大數(shù)據(jù)Flink學習圣經(jīng)》,pdf 免費找尼恩獲取
6.2. 集群執(zhí)行
…
由于字數(shù)限制,此處省略
完整內(nèi)容,請參見《大數(shù)據(jù)Flink學習圣經(jīng)》,pdf 免費找尼恩獲取
7. Flink的廣播變量
…
由于字數(shù)限制,此處省略
完整內(nèi)容,請參見《大數(shù)據(jù)Flink學習圣經(jīng)》,pdf 免費找尼恩獲取
8. Flink的累加器
…
由于字數(shù)限制,此處省略
完整內(nèi)容,請參見《大數(shù)據(jù)Flink學習圣經(jīng)》,pdf 免費找尼恩獲取
9. Flink的分布式緩存
…
由于字數(shù)限制,此處省略
完整內(nèi)容,請參見《大數(shù)據(jù)Flink學習圣經(jīng)》,pdf 免費找尼恩獲取
關(guān)于TABLE API & SQL , 以及狀態(tài)和檢查點等知識 會陸續(xù)補充!敬請期待!
說在后面
本文是《大數(shù)據(jù)Flink學習圣經(jīng)》 V1版本, 是 《尼恩 大數(shù)據(jù) 面試寶典》 姊妹篇。
這里特別說明一下:《尼恩 大數(shù)據(jù) 面試寶典》5個專題 PDF 自首次發(fā)布以來, 已經(jīng)收集了 好幾百題,大量的大廠面試干貨、正貨 。 《尼恩 大數(shù)據(jù) 面試寶典》面試題集合, 已經(jīng)變成大數(shù)據(jù)學習和面試的必讀書籍。
于是,尼恩架構(gòu)團隊趁熱打鐵,推出 《大數(shù)據(jù)Flink學習圣經(jīng)》。
完整的pdf,可以關(guān)注尼恩的 公眾號【技術(shù)自由圈】領(lǐng)取。
并且,《大數(shù)據(jù)Flink學習圣經(jīng)》、 《尼恩 大數(shù)據(jù) 面試寶典》 都會持續(xù)迭代、不斷更新,以 吸納最新的面試題,最新版本,具體請參見 公眾號【技術(shù)自由圈】
作者介紹
一作:Andy,資深架構(gòu)師, 《Java 高并發(fā)核心編程 加強版》作者之1 。
二作:尼恩,41歲資深老架構(gòu)師, IT領(lǐng)域資深作家、著名博主?!禞ava 高并發(fā)核心編程 加強版 卷1、卷2、卷3》創(chuàng)世作者。 《K8S學習圣經(jīng)》《Docker學習圣經(jīng)》《Go學習圣經(jīng)》等11個PDF 圣經(jīng)的作者。 也是一個 資深架構(gòu)導師、架構(gòu)轉(zhuǎn)化 導師, 成功指導了多個中級Java、高級Java轉(zhuǎn)型架構(gòu)師崗位, 最高的學員年薪拿到近100W。
參考
- Flink原理與實踐
- Apache Flink Documentation | Apache Flink
- Apache Flink 學習網(wǎng) (flink-learning.org.cn)
推薦閱讀
《尼恩大數(shù)據(jù)面試寶典專題1:史上最全Hadoop面試題》
《尼恩大數(shù)據(jù)面試寶典專題2:絕密100個Spark面試題,熟背100遍,猛拿高薪》
《尼恩大數(shù)據(jù)面試寶典專題3:史上最全Hive面試題,不斷迭代,持續(xù)升級》
《尼恩大數(shù)據(jù)面試寶典專題4:史上最全Flink面試題,不斷迭代,持續(xù)升級》
《尼恩大數(shù)據(jù)面試寶典專題5:史上最全HBase面試題,不斷迭代,持續(xù)升級》
《尼恩 架構(gòu)筆記》《尼恩高并發(fā)三部曲》《尼恩Java面試寶典》PDF,請到下面公號【技術(shù)自由圈】取↓↓↓