簡(jiǎn)潔網(wǎng)站欣賞路由優(yōu)化大師
熱詞統(tǒng)計(jì)案例:
用flink中的窗口函數(shù)(apply)讀取kafka中數(shù)據(jù),并對(duì)熱詞進(jìn)行統(tǒng)計(jì)。
apply:全量聚合函數(shù),指在窗口觸發(fā)的時(shí)候才會(huì)對(duì)窗口內(nèi)的所有數(shù)據(jù)進(jìn)行一次計(jì)算(等窗口的數(shù)據(jù)到齊,才開始進(jìn)行聚合計(jì)算,可實(shí)現(xiàn)對(duì)窗口內(nèi)的數(shù)據(jù)進(jìn)行排序等需求)。
代碼演示:
kafka發(fā)送消息端:?
package com.bigdata.Day04;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.Random;public class Demo01_windows_kafka發(fā)消息 {public static void main(String[] args) throws Exception {// Properties 它是map的一種Properties properties = new Properties();// 設(shè)置連接kafka集群的ip和端口properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 創(chuàng)建了一個(gè)消息生產(chǎn)者對(duì)象KafkaProducer kafkaProducer = new KafkaProducer<>(properties);String[] arr = {"聯(lián)通換貓","遙遙領(lǐng)先","恒大歌舞團(tuán)","恒大足球隊(duì)","鄭州爛尾樓"};Random random = new Random();for (int i = 0; i < 500; i++) {ProducerRecord record = new ProducerRecord<>("topic1",arr[random.nextInt(arr.length)]);// 調(diào)用這個(gè)里面的send方法kafkaProducer.send(record);Thread.sleep(50);}kafkaProducer.close();}
}
kafka接受消息端:?
package com.bigdata.Day04;import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Properties;public class Demo02_kafka收消息 {public static void main(String[] args) throws Exception {//1. env-準(zhǔn)備環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加載數(shù)據(jù)Properties properties = new Properties();properties.setProperty("bootstrap.servers","bigdata01:9092");properties.setProperty("group.id", "g2");FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("topic1",new SimpleStringSchema(),properties);DataStreamSource<String> dataStreamSource = env.addSource(kafkaSource);// transformation-數(shù)據(jù)處理轉(zhuǎn)換DataStream<Tuple2<String,Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String,Integer> map(String word) throws Exception {return Tuple2.of(word,1);}});KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapStream.keyBy(tuple2 -> tuple2.f0);keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))// 第一個(gè)泛型是輸入數(shù)據(jù)的類型,第二個(gè)泛型是返回值類型 第三個(gè)是key 的類型, 第四個(gè)是窗口對(duì)象.apply(new WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {@Overridepublic void apply(String key, // 分組key {"俄烏戰(zhàn)爭(zhēng)",[1,1,1,1,1]}TimeWindow window, // 窗口對(duì)象Iterable<Tuple2<String, Integer>> input, // 分組key在窗口的所有數(shù)據(jù)Collector<String> out // 用于輸出) throws Exception {long start = window.getStart();long end = window.getEnd();// lang3 包下的工具類String startStr = DateFormatUtils.format(start,"yyyy-MM-dd HH:mm:ss");String endStr = DateFormatUtils.format(end,"yyyy-MM-dd HH:mm:ss");int sum = 0;for(Tuple2<String,Integer> tuple2: input){sum += tuple2.f1;}out.collect(key +"," + startStr +","+endStr +",sum="+sum);}}).print();//5. execute-執(zhí)行env.execute();}
}
當(dāng)執(zhí)行kafka接收消息端時(shí),會(huì)報(bào)如下錯(cuò)誤:?
?錯(cuò)誤原因:在對(duì)kafka中數(shù)據(jù)進(jìn)行KeyBy分組處理時(shí),使用了lambda表達(dá)式
?
解決方法:
在使用KeyBy時(shí),將函數(shù)的各種參數(shù)類型都寫清楚,修改后的代碼如下:
package com.bigdata.Day04;import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Properties;public class Demo02_kafka收消息 {public static void main(String[] args) throws Exception {//1. env-準(zhǔn)備環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加載數(shù)據(jù)Properties properties = new Properties();properties.setProperty("bootstrap.servers","bigdata01:9092");properties.setProperty("group.id", "g2");FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("topic1",new SimpleStringSchema(),properties);DataStreamSource<String> dataStreamSource = env.addSource(kafkaSource);// transformation-數(shù)據(jù)處理轉(zhuǎn)換DataStream<Tuple2<String,Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String,Integer> map(String word) throws Exception {return Tuple2.of(word,1);}});KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple2) throws Exception {return tuple2.f0;}});keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))// 第一個(gè)泛型是輸入數(shù)據(jù)的類型,第二個(gè)泛型是返回值類型 第三個(gè)是key 的類型, 第四個(gè)是窗口對(duì)象.apply(new WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {@Overridepublic void apply(String key, // 分組key {"俄烏戰(zhàn)爭(zhēng)",[1,1,1,1,1]}TimeWindow window, // 窗口對(duì)象Iterable<Tuple2<String, Integer>> input, // 分組key在窗口的所有數(shù)據(jù)Collector<String> out // 用于輸出) throws Exception {long start = window.getStart();long end = window.getEnd();// lang3 包下的工具類String startStr = DateFormatUtils.format(start,"yyyy-MM-dd HH:mm:ss");String endStr = DateFormatUtils.format(end,"yyyy-MM-dd HH:mm:ss");int sum = 0;for(Tuple2<String,Integer> tuple2: input){sum += tuple2.f1;}out.collect(key +"," + startStr +","+endStr +",sum="+sum);}}).print();//5. execute-執(zhí)行env.execute();}
}