中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當前位置: 首頁 > news >正文

個人網(wǎng)站系統(tǒng)優(yōu)秀網(wǎng)站網(wǎng)頁設(shè)計圖片

個人網(wǎng)站系統(tǒng),優(yōu)秀網(wǎng)站網(wǎng)頁設(shè)計圖片,飄雪影視在線觀看西瓜,手機端的網(wǎng)站首頁該怎么做1.map 特性&#xff1a;接收一個數(shù)據(jù)&#xff0c;經(jīng)過處理之后&#xff0c;就返回一個數(shù)據(jù) 1.1. 源碼分析 我們來看看map的源碼 map需要接收一個MapFunction<T,R>的對象&#xff0c;其中泛型T表示傳入的數(shù)據(jù)類型&#xff0c;R表示經(jīng)過處理之后輸出的數(shù)據(jù)類型我們繼續(xù)往…

1.map

特性:接收一個數(shù)據(jù),經(jīng)過處理之后,就返回一個數(shù)據(jù)
在這里插入圖片描述

1.1. 源碼分析

  • 我們來看看map的源碼
    在這里插入圖片描述
    map需要接收一個MapFunction<T,R>的對象,其中泛型T表示傳入的數(shù)據(jù)類型,R表示經(jīng)過處理之后輸出的數(shù)據(jù)類型
  • 我們繼續(xù)往下點,看看MapFunction<T,R>的源碼
    在這里插入圖片描述
    這是一個接口,那么在代碼中,我們就需要實現(xiàn)這個接口

1.2. 案例

那么我們現(xiàn)在要實現(xiàn)一個功能,就是從給一個文件中讀取數(shù)據(jù),返回每一行的字符串長度。

我們要讀取的文件內(nèi)容如下
在這里插入圖片描述

代碼貼在這里(為了讓打擊不看迷糊,導(dǎo)包什么的我就省略了)

public class TransformTest1_Base {public static void main(String[] args) throws Exception {// 1. 獲取執(zhí)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 將并行度設(shè)為1env.setParallelism(1);// 3. 讀取文件夾DataStreamSource<String> inputDataStream = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\FlinkTutorial\\src\\main\\resources\\sensor");// 4. 將文件夾每一行的數(shù)據(jù)都返回它的長度// 在這里我們用匿名內(nèi)部類的方式創(chuàng)建了一個MapFunction對象SingleOutputStreamOperator<Integer> dataStream = inputDataStream.map(new MapFunction<String, Integer>() {// 5. 重寫map方法,參數(shù)s是接收到的一個數(shù)據(jù),我們只需要返回它的長度就行了。@Overridepublic Integer map(String s) throws Exception {return s.length();}});// 6. 打印輸出dataStream.print();// 7. 啟動執(zhí)行環(huán)境env.execute();}
}

顯示
在這里插入圖片描述

1.3. 總結(jié)

map的使用范圍就是需要對的那個數(shù)據(jù)進行處理,并且每次返回一個數(shù)據(jù)的時候,map就比較方便了。

2. flatMap

  • 接收一個數(shù)據(jù),可以返回多條數(shù)據(jù)

2.1. 源碼分析

在這里插入圖片描述
我們發(fā)現(xiàn),它需要傳入一個FlatMapFunction的一個對象
在這里插入圖片描述

我們繼續(xù)點進去,看看FlatMapFunction的源碼,可以發(fā)現(xiàn),FlatMapFunction<T,R>也是一個接口,并且接口里面的方法的返回值是一個Collector,也就是多個值的集合。

2.2. 案例

我們還是讀取那個文件,這次我們要做的處理是,將文件的每一行數(shù)據(jù)按照逗號隔開,給出代碼:

public class TransformTest2_Base {public static void main(String[] args) throws Exception {// 1. 獲取執(zhí)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 設(shè)置并行度env.setParallelism(1);// 3. 讀取文件夾DataStreamSource<String> dataStream = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\FlinkTutorial\\src\\main\\resources\\sensor");// 4. 用匿名內(nèi)部類的方式重寫FlatMapFuncction,將每行字符按","隔開SingleOutputStreamOperator<String> flatMapStream = dataStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String s, Collector<String> collector) throws Exception {// 5. 分割一行字符,獲得對應(yīng)的字符串數(shù)組String[] split = s.split(",");for (String slt : split) {// 6. 將這些數(shù)據(jù)返回collector.collect(slt);}}});// 7. 打印輸出處理后的數(shù)據(jù)flatMapStream.print();// 8. 啟動執(zhí)行環(huán)境env.execute();}
}

可以看到執(zhí)行的結(jié)果
在這里插入圖片描述

3. filter

聽這個名字就知道是個過濾器,用來過濾數(shù)據(jù)。
在這里插入圖片描述

3.1. 源碼分析

我們看看filer的源碼,繼承子FilterFunction,可以看到,這次泛型就只有一個值了,因為filter只允許返回的數(shù)據(jù)<=原來的數(shù)據(jù),所以只做過濾,并不能改變數(shù)據(jù)蕾西,沒必要設(shè)置返回的類型
在這里插入圖片描述
我們繼續(xù)點進去,看看FilterFunction的源碼
在這里插入圖片描述
果不其然,也是一個接口,而里面的filter方法只有一個參數(shù),并且返回的是一個boolean類型,若返回true則var1原樣返回,若返回false,則var1會被過濾掉。

3.2. 案例

我們還是讀取以上文件,這一次我們返回以"sensor_1"開頭的字符串,其余的一律不返回,給出代碼

public class TransformTest3_Base {public static void main(String[] args) throws Exception {// 1. 獲取執(zhí)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 設(shè)置并行度env.setParallelism(1);// 3. 讀取文件DataStreamSource<String> dataStream = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\FlinkTutorial\\src\\main\\resources\\sensor");// 4. 用匿名內(nèi)部類的方式重寫FilterFunctionSingleOutputStreamOperator<String> filterDataStream = dataStream.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String s) throws Exception {// 5. 若s以"sensor_1"開頭,則返回truereturn s.startsWith("\"sensor_1\"");}});// 6. 打印處理后的數(shù)據(jù)filterDataStream.print();// 7. 啟動執(zhí)行環(huán)境env.execute();}
}

4. 分組聚合

  • 注意:任何的聚合操作都有默認的分組,聚合是在分組的基礎(chǔ)上進行的。比如,對整體進行求和,那么分組就是整體。所以,在做聚合操作之前,一定要明確是在哪個分組上進行聚合操作
  • 注意:聚合操作,本質(zhì)上是一個多對一(一對一是多對一的特殊情況)的操作。特別注意的是這個’一‘,可以是一個值(mean, sum等),同樣也可以是一個對象(list, set等對象)

4.1. 分組(keyBy)

在這里插入圖片描述
DataStream → KeyedStream:邏輯地將一個流拆分成不相交的分區(qū),每個分區(qū)包含具有相同 key 的元素,在內(nèi)部以 hash 的形式實現(xiàn)的。

  • 分組就是為了聚合操作做準備的,keyBy方法會將數(shù)據(jù)流按照hash實現(xiàn),分別放在不同的分區(qū),每個分區(qū)都可以進行聚合操作。
  • 我們可以用這個性質(zhì),計算每一個sensor溫度的最大值,我們?yōu)榇藢⑽募薷?#xff1a;
    在這里插入圖片描述
    分組之后的圖就是所有sensor_1在一個分區(qū)里,sensor_6,sensor_7,sensor_10在不同的三個分區(qū),也就是有四個分區(qū),而后三個分區(qū)中只有一條數(shù)據(jù),所以最大值和最小值都只有一個
  • 在flink中,分組操作是由keyBy方法來完成的,我們來看看keyBy的源碼
    在這里插入圖片描述
    可以發(fā)現(xiàn),keyBy可以對對象和元組進行聚合。

4.2. 聚合

這些算子可以針對 KeyedStream 的每一個支流做聚合。
? sum():對每個支流求和
? min():對每個支流求最小值
? max():對每個支流求最大值
? minBy()
? maxBy()
我們來看看max()的源碼
在這里插入圖片描述
這也是傳一個屬性名,也就是求對應(yīng)的屬性名的最大值。

4.3. 實例演示

public class TransformTest1_RollingAggreation {public static void main(String[] args) throws Exception {// 1. 獲取執(zhí)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 設(shè)置并行度env.setParallelism(1);// 3. 讀取文件DataStreamSource<String> stringDataStreamSource = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\FlinkTutorial\\src\\main\\resources\\sensor");// 4. 用map將每行數(shù)據(jù)變成一個對象SingleOutputStreamOperator<SensorReading> map = stringDataStreamSource.map(new MapFunction<String, SensorReading>() {@Overridepublic SensorReading map(String s) throws Exception {String[] split = s.split(",");return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));}});// 5. 分組操作,以id屬性分組KeyedStream<SensorReading, Tuple> keyedstream = map.keyBy("id");// 6. 聚合操作,求每個分組的溫度最大值SingleOutputStreamOperator<SensorReading> resultStream = keyedstream.max("temperature");// 7. 打印輸出resultStream.print();// 8. 啟動執(zhí)行環(huán)境env.execute();}
}

運行結(jié)果
在這里插入圖片描述
誒,這有人就要問了,不是求每一個分組的溫度最大值么?為什么sensor_1的這個分組所有的數(shù)據(jù)都有?
答:flink是一個流處理分布式框架,這是一條數(shù)據(jù)流,每來一個數(shù)據(jù)就得處理一次,所以輸出的都是當前狀態(tài)下的最大值。

4.4. reduce自定義聚合

在實際生產(chǎn)中,不可能讓我們完成這么簡單的操作就行了,所以我們需要更復(fù)雜的操作,而reduce就是滿足這個條件,它可以讓我們自定義聚合的方式。

  • 我們來看看reduce的源碼
    在這里插入圖片描述
    reduce需要傳入的是一個ReduceFunction的對象,我們再來看看ReduceFunction是個什么東西
    在這里插入圖片描述
    var1是當前這個分組的狀態(tài),var2是新加入的值,而reduce函數(shù)體就是我們要進行的操作,返回一個新的狀態(tài)。
    到這我就明白了,要是我們向?qū)崟r獲取最大溫度的話,var1是之前的最大溫度,通過var1和var2的比較就能實現(xiàn)。

4.5. reduce實例

我們這一次要實現(xiàn)一個實時的溫度最大值,也就是返回的數(shù)據(jù)中的時間戳是當前的。

public class TransformTest1_Reduce {public static void main(String[] args) throws Exception {// 1. 獲取執(zhí)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 設(shè)置并行度env.setParallelism(1);// 3. 讀取文件DataStreamSource<String> dataStream = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\FlinkTutorial\\src\\main\\resources\\sensor");// 4. 通過map將每行數(shù)據(jù)轉(zhuǎn)換為一個對象SingleOutputStreamOperator<SensorReading> map = dataStream.map(new MapFunction<String, SensorReading>() {@Overridepublic SensorReading map(String s) throws Exception {String[] split = s.split(",");return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));}});// 5. 按對象的id分組KeyedStream<SensorReading, Tuple> keyStream = map.keyBy("id");// 6. reduce自定義聚合SingleOutputStreamOperator<SensorReading> reduce = keyStream.reduce(new ReduceFunction<SensorReading>() {@Overridepublic SensorReading reduce(SensorReading sensorReading, SensorReading t1) throws Exception {// 7. 獲取當前時間為止接收到的最大溫度return new SensorReading(sensorReading.getId(), System.currentTimeMillis(), Math.max(sensorReading.getTemperature(),t1.getTemperature()));}});// 8. 打印輸出reduce.print();// 9. 啟動運行環(huán)境env.execute();}
}

這一次的輸出我們就得你好好研究一下了。
在這里插入圖片描述
從這塊可以發(fā)現(xiàn),我們獲取的都是當前的時間戳,而且時間戳也在改變,這一點很好理解,但是下面這個數(shù)據(jù)就很詭異了。
在這里插入圖片描述

  • 這兩塊的時間戳為什么沒有改變呢?這需要我們再來看看reduce方法了,reduce方法是傳入兩個參數(shù),第一個是當前的狀態(tài),第二個是新讀取的值,通過方法體的操作返回一個最新的狀態(tài)。
  • 仔細理解一下這句話,若我剛開始沒有數(shù)據(jù)的時候,那么哪來的狀態(tài)呢?所以reduce把接收到的第一個參數(shù)作為狀態(tài),其中sensor_6,7,8這三個分區(qū)只有一個數(shù)據(jù),所以直接拿來當作狀態(tài)。

5. 多流轉(zhuǎn)換算子

5.1. 分流操作(Split 和 Select)

  • Split能將流中的數(shù)據(jù)按條件貼上標簽,比如我把溫度大于30度的對象貼上一個high標簽,把溫度低于30度的貼上一個low標簽,標簽可以貼多個。那么就把流中的數(shù)據(jù),按照標簽分類了(這里并沒有分流)
    在這里插入圖片描述
  • Select是按照標簽來分流
    在這里插入圖片描述
  1. split源碼
    在這里插入圖片描述
    可以發(fā)現(xiàn),返回的是一個SplitStream,需要傳入一個選擇器,我們看看OutputSeclector的源碼
    在這里插入圖片描述
    傳入value,返回這個value對應(yīng)的標簽,實現(xiàn)對這個value進行類似"分類"的操作。
  2. select源碼
    在這里插入圖片描述
    只需要接收一個或者多個標簽就能返回包含那個標簽對象的數(shù)據(jù)流。

5.2. 實例演示

  • 我們這一次要把讀取到的數(shù)據(jù)分成三條流,一條是high(高于30度),一條是low(低于30度),一條是all(所有的數(shù)據(jù))。代碼:
public class TransformTest4_MultipleStreams {public static void main(String[] args) throws Exception {// 1. 獲取執(zhí)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 設(shè)置并行度env.setParallelism(1);// 3. 讀取文件DataStreamSource<String> dataStream = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\FlinkTutorial\\src\\main\\resources\\sensor");// 4. 通過map將每行數(shù)據(jù)轉(zhuǎn)換為一個對象SingleOutputStreamOperator<SensorReading> map = dataStream.map(new MapFunction<String, SensorReading>() {@Overridepublic SensorReading map(String s) throws Exception {String[] split = s.split(",");return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));}});// 5. 按條件貼標簽SplitStream<SensorReading> split = map.split(new OutputSelector<SensorReading>() {@Overridepublic Iterable<String> select(SensorReading value) {return value.getTemperature() > 30 ? Collections.singletonList("high") : Collections.singletonList("low");}});// 6. 按標簽選擇,生成不同的數(shù)據(jù)流DataStream<SensorReading> high = split.select("high");DataStream<SensorReading> low = split.select("low");DataStream<SensorReading> all = split.select("high", "low");high.print("high");low.print("low");all.print("all");env.execute();}
}

5.3. 合流操作Connect 和 CoMap

在這里插入圖片描述
DataStream,DataStream → ConnectedStreams:連接兩個保持他們類型的數(shù)
據(jù)流,兩個數(shù)據(jù)流被 Connect 之后,只是被放在了一個同一個流中,內(nèi)部依然保持各自的數(shù)據(jù)和形式不發(fā)生任何變化,兩個流相互獨立。
在這里插入圖片描述
ConnectedStreams → DataStream:作用于 ConnectedStreams 上,功能與 map和 flatMap 一樣,對 ConnectedStreams 中的每一個 Stream 分別進行 map 和 flatMap處理。
類似于一國兩制,看似兩條流合并在了一起,其實內(nèi)部依舊是按照自己的約定運行,類型并沒有改變。

  1. connect源碼
    在這里插入圖片描述
    將當前調(diào)用者的流和參數(shù)中的流合并,返回一個ConnectedStreams<T,R>類型
    在這里插入圖片描述
    我們再來看看ConnectionStreams<T,R>中的map方法,其中要傳的是一個CoMapFunction<IN1,IN2,R>的對象,最重要的就是這個類,我們來看看這個類
    在這里插入圖片描述
    這個CoMapFunction<IN1,IN2,R>和之前的MapFunction不太一樣,這里要重寫的方法有兩個,map1和map2,一個是針對IN1的,一個是針對IN2的,R就是返回類型。
    這下全明白了,在這個方法內(nèi)部,對這兩條流分別操作,合成一條流。

5.4. 實例演示

public class TransformTest5_MultipleStreams {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 1. 讀取文件DataStreamSource<String> dataStreamSource = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\FlinkTutorial\\src\\main\\resources\\sensor");// 2. 將每行數(shù)據(jù)變成一個對象SingleOutputStreamOperator<SensorReading> map = dataStreamSource.map(new MapFunction<String, SensorReading>() {@Overridepublic SensorReading map(String s) throws Exception {String[] split = s.split(",");return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));}});// 3. 將數(shù)據(jù)打上標簽SplitStream<SensorReading> split = map.split(new OutputSelector<SensorReading>() {@Overridepublic Iterable<String> select(SensorReading value) {return value.getTemperature() > 30 ? Collections.singletonList("high") : Collections.singletonList("low");}});// 4. 按照高溫和低溫的標簽分成兩條流DataStream<SensorReading> high = split.select("high");DataStream<SensorReading> low = split.select("low");// 5. 將high流的數(shù)據(jù)轉(zhuǎn)換為二元組SingleOutputStreamOperator<Tuple2<String, Double>> tuple2SingleOutputStreamOperator = high.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {@Overridepublic Tuple2<String, Double> map(SensorReading sensorReading) throws Exception {return new Tuple2<>(sensorReading.getId(), sensorReading.getTemperature());}});// 6. 將tuple2SingleOutputStreamOperator和low連接ConnectedStreams<Tuple2<String, Double>, SensorReading> connect = tuple2SingleOutputStreamOperator.connect(low);// 7. 調(diào)用map傳參CoMapFunction將兩條流合并成一條流objectSingleOutputStreamOperatorSingleOutputStreamOperator<Object> objectSingleOutputStreamOperator = connect.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {// 這是處理high流的方法@Overridepublic Object map1(Tuple2<String, Double> value) throws Exception {return new Tuple3<>(value.getField(0), value.getField(1), "temp is too high");}// 這是處理low流的方法@Overridepublic Object map2(SensorReading value) throws Exception {return new Tuple2<>(value.getTemperature(), "normal");}});objectSingleOutputStreamOperator.print();env.execute();}
}

5.5. 多條流合并(union)

之前我們只能合并兩條流,那我們要合并多條流呢?這里我們就需要用到union方法。
在這里插入圖片描述

  • Connect 與 Union 區(qū)別:
  1. Union 之前兩個流的類型必須是一樣,Connect 可以不一樣,在之后的 coMap中再去調(diào)整成為一樣的。
  2. Connect 只能操作兩個流,Union 可以操作多個。

若我們給出以下代碼:

high.union(low,all);

那么high,low,all三條流都會合并在一起。

http://www.risenshineclean.com/news/64571.html

相關(guān)文章:

  • 代辦公司營業(yè)執(zhí)照長沙seo推廣公司
  • 平度網(wǎng)站建設(shè)網(wǎng)絡(luò)優(yōu)化工程師簡歷
  • 網(wǎng)站建設(shè)入門教程視頻抖音代運營公司
  • 站酷網(wǎng)站的圖是用什么做的百度指數(shù)app下載
  • 做機械的網(wǎng)站有哪些軟文營銷實施背景
  • 海外營銷網(wǎng)站設(shè)計英文seo實戰(zhàn)派
  • 網(wǎng)頁設(shè)計推薦網(wǎng)站百度推廣咨詢
  • 自己買域名可以做網(wǎng)站嗎百度如何做推廣
  • 和碩網(wǎng)站建設(shè)騰訊廣告投放平臺官網(wǎng)
  • php無版權(quán)企業(yè)網(wǎng)站管理系統(tǒng)優(yōu)化軟件下載
  • 淮安神舟建設(shè)招標網(wǎng)站電商平臺怎么做
  • 外匯網(wǎng)站模版網(wǎng)絡(luò)流量分析工具
  • 做任務(wù)賺錢的網(wǎng)站起什么名字好網(wǎng)站建設(shè)百度推廣
  • 網(wǎng)站開發(fā) 上海查排名的軟件有哪些
  • app制作平臺要多少錢seo網(wǎng)站優(yōu)化培訓(xùn)班
  • 網(wǎng)站怎么做移動圖片百度一下網(wǎng)頁
  • 企業(yè)網(wǎng)站建設(shè)基本流程危機公關(guān)處理方案
  • 現(xiàn)在最流行的網(wǎng)站推廣方式有哪些搜索引擎優(yōu)化的簡稱是
  • 網(wǎng)站建設(shè)都包括哪些方面怎么做平臺推廣
  • 國外有哪些網(wǎng)站做推廣的比較好黃頁88網(wǎng)站推廣方案
  • 網(wǎng)站制作專業(yè)的公司叫什么win優(yōu)化大師有用嗎
  • 云南網(wǎng)絡(luò)公司網(wǎng)站萬能瀏覽器
  • wordpress for bae哪里搜索引擎優(yōu)化好
  • 網(wǎng)站需要做實名認證如何做優(yōu)化大師百科
  • 網(wǎng)站制作b s的基本步驟百度公司電話
  • 手機版網(wǎng)站模板網(wǎng)頁優(yōu)化最為重要的內(nèi)容是
  • 京東電子商務(wù)網(wǎng)站建設(shè)目的愛站站長工具
  • 雅思真題有網(wǎng)站做嗎網(wǎng)絡(luò)培訓(xùn)機構(gòu)排名前十
  • 網(wǎng)站開發(fā)注銷代碼搜索引擎營銷的常見方式
  • 常州做的網(wǎng)站的公司哪家好投稿平臺