網(wǎng)站建立數(shù)據(jù)庫(kù)連接時(shí)出錯(cuò)職業(yè)技能培訓(xùn)機(jī)構(gòu)
1.Flink中的KeyBy
在Flink中,KeyBy作為我們常用的一個(gè)聚合類(lèi)型算子,它可以按照相同的Key對(duì)數(shù)據(jù)進(jìn)行重新分區(qū),分區(qū)之后分配到對(duì)應(yīng)的子任務(wù)當(dāng)中去。
源碼解析
keyBy 得到的結(jié)果將不再是 DataStream,而是會(huì)將 DataStream 轉(zhuǎn)換為 KeyedStream(鍵控流),KeyedStream 可以認(rèn)為是“分區(qū)流”或者“鍵控流”,它是對(duì) DataStream 按照 key 的一個(gè)邏輯分區(qū)。
所以泛型有兩個(gè)類(lèi)型:除去當(dāng)前流中的元素類(lèi)型外,還需要指定 key 的類(lèi)型。
KeyBy是如何實(shí)現(xiàn)分區(qū)的呢
Flink中的KeyBy底層其實(shí)就是通過(guò)Hash實(shí)現(xiàn)的,通過(guò)對(duì)Key的值進(jìn)行Hash,再做一次murmurHash,取模運(yùn)算。
再通過(guò)Job的并行度,就能獲取每個(gè)Key應(yīng)該分配到那個(gè)子任務(wù)中了。
2.分組和分區(qū)在Flink中的區(qū)別
分區(qū):分區(qū)(Partitioning)是將數(shù)據(jù)流劃分為多個(gè)子集,這些子集可以在不同的任務(wù)實(shí)例上進(jìn)行處理,以實(shí)現(xiàn)數(shù)據(jù)的并行處理。
數(shù)據(jù)具體去往哪個(gè)分區(qū),是通過(guò)指定的 key 值先進(jìn)行一次 hash 再進(jìn)行一次 murmurHash,通過(guò)上述計(jì)算得到的值再與并行度進(jìn)行相應(yīng)的計(jì)算得到。
分組:分組(Grouping)是將具有相同鍵值的數(shù)據(jù)元素歸類(lèi)到一起,以便進(jìn)行后續(xù)操作(如聚合、窗口計(jì)算等)。
key值相同的數(shù)據(jù)將進(jìn)入同一個(gè)分組中。
注意:數(shù)據(jù)如果具有相同的key將一定去往同一個(gè)分組和分區(qū),但是同一分區(qū)中的數(shù)據(jù)不一定屬于同一組。
3.代碼示例
package com.flink.DataStream.Aggregation;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class FlinkKeyByDemo {public static void main(String[] args) throws Exception {//TODO 創(chuàng)建Flink上下文執(zhí)行環(huán)境StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();//設(shè)置并行度為1streamExecutionEnvironment.setParallelism(1);//設(shè)置執(zhí)行模式為批處理streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);//TODO source 從集合中創(chuàng)建數(shù)據(jù)源DataStreamSource<String> dataStreamSource = streamExecutionEnvironment.fromElements("hello word", "hello flink");//TODO 方式一 匿名實(shí)現(xiàn)類(lèi)SingleOutputStreamOperator<Tuple2<String, Integer>> outputStreamOperator1 = dataStreamSource.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String s, Collector<String> collector) throws Exception {String[] s1 = s.split(" ");for (String word : s1) {collector.collect(word);}}}).map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String s) throws Exception {Tuple2<String, Integer> aa = Tuple2.of(s, 1);return aa;}})/*** keyBy 得到的結(jié)果將不再是 DataStream,而是會(huì)將 DataStream 轉(zhuǎn)換為 KeyedStream(鍵控流)* KeyedStream 可以認(rèn)為是“分區(qū)流”或者“鍵控流”,它是對(duì) DataStream 按照 key 的一個(gè)邏輯分區(qū)* 所以泛型有兩個(gè)類(lèi)型:除去當(dāng)前流中的元素類(lèi)型外,還需要指定 key 的類(lèi)型。* *//*** 分組和分區(qū)在Flink 中具有不同的含義和作用:* 分區(qū):分區(qū)(Partitioning)是將數(shù)據(jù)流劃分為多個(gè)子集,這些子集可以在不同的任務(wù)實(shí)例上進(jìn)行處理,以實(shí)現(xiàn)數(shù)據(jù)的并行處理。* 數(shù)據(jù)具體去往哪個(gè)分區(qū),是通過(guò)指定的 key 值先進(jìn)行一次 hash 再進(jìn)行一次 murmurHash,通過(guò)上述計(jì)算得到的值再與并行度進(jìn)行相應(yīng)的計(jì)算得到。* 分組:分組(Grouping)是將具有相同鍵值的數(shù)據(jù)元素歸類(lèi)到一起,以便進(jìn)行后續(xù)操作 (如聚合、窗口計(jì)算等)。* key 值相同的數(shù)據(jù)將進(jìn)入同一個(gè)分組中。* 注意:數(shù)據(jù)如果具有相同的key將一定去往同一個(gè)分組和分區(qū),但是同一分區(qū)中的數(shù)據(jù)不一定屬于同一組。* */.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {return stringIntegerTuple2.f0;}}).sum(1);//TODO 方式二 Lamda表達(dá)式實(shí)現(xiàn)SingleOutputStreamOperator<Tuple2<String, Integer>> outputStreamOperator2 = dataStreamSource.flatMap((String s, Collector<String> collector) -> {String[] s1 = s.split(" ");for (String word : s1) {collector.collect(word);}}).returns(Types.STRING).map((String word) -> {return Tuple2.of(word, 1);})//Java中l(wèi)amda表達(dá)式存在類(lèi)型擦除.returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy((Tuple2<String, Integer> s) -> {return s.f0;}).sum(1);//TODO sinkoutputStreamOperator1.print("方式一");outputStreamOperator2.print("方式二");//TODO 執(zhí)行streamExecutionEnvironment.execute("Flink KeyBy Demo");}
}