中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

簡(jiǎn)潔網(wǎng)站欣賞路由優(yōu)化大師

簡(jiǎn)潔網(wǎng)站欣賞,路由優(yōu)化大師,邯鄲建設(shè)網(wǎng)站制作,wordpress多語言設(shè)置熱詞統(tǒng)計(jì)案例: 用flink中的窗口函數(shù)(apply)讀取kafka中數(shù)據(jù),并對(duì)熱詞進(jìn)行統(tǒng)計(jì)。 apply:全量聚合函數(shù),指在窗口觸發(fā)的時(shí)候才會(huì)對(duì)窗口內(nèi)的所有數(shù)據(jù)進(jìn)行一次計(jì)算(等窗口的數(shù)據(jù)到齊,才開始進(jìn)行聚合…

熱詞統(tǒng)計(jì)案例:

用flink中的窗口函數(shù)(apply)讀取kafka中數(shù)據(jù),并對(duì)熱詞進(jìn)行統(tǒng)計(jì)。

apply:全量聚合函數(shù),指在窗口觸發(fā)的時(shí)候才會(huì)對(duì)窗口內(nèi)的所有數(shù)據(jù)進(jìn)行一次計(jì)算(等窗口的數(shù)據(jù)到齊,才開始進(jìn)行聚合計(jì)算,可實(shí)現(xiàn)對(duì)窗口內(nèi)的數(shù)據(jù)進(jìn)行排序等需求)。

代碼演示:

kafka發(fā)送消息端:?

package com.bigdata.Day04;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.Random;public class Demo01_windows_kafka發(fā)消息 {public static void main(String[] args) throws Exception {// Properties 它是map的一種Properties properties = new Properties();// 設(shè)置連接kafka集群的ip和端口properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 創(chuàng)建了一個(gè)消息生產(chǎn)者對(duì)象KafkaProducer kafkaProducer = new KafkaProducer<>(properties);String[] arr = {"聯(lián)通換貓","遙遙領(lǐng)先","恒大歌舞團(tuán)","恒大足球隊(duì)","鄭州爛尾樓"};Random random = new Random();for (int i = 0; i < 500; i++) {ProducerRecord record = new ProducerRecord<>("topic1",arr[random.nextInt(arr.length)]);// 調(diào)用這個(gè)里面的send方法kafkaProducer.send(record);Thread.sleep(50);}kafkaProducer.close();}
}

kafka接受消息端:?

package com.bigdata.Day04;import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Properties;public class Demo02_kafka收消息 {public static void main(String[] args) throws Exception {//1. env-準(zhǔn)備環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加載數(shù)據(jù)Properties properties = new Properties();properties.setProperty("bootstrap.servers","bigdata01:9092");properties.setProperty("group.id", "g2");FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("topic1",new SimpleStringSchema(),properties);DataStreamSource<String> dataStreamSource = env.addSource(kafkaSource);// transformation-數(shù)據(jù)處理轉(zhuǎn)換DataStream<Tuple2<String,Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String,Integer> map(String word) throws Exception {return Tuple2.of(word,1);}});KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapStream.keyBy(tuple2 -> tuple2.f0);keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))// 第一個(gè)泛型是輸入數(shù)據(jù)的類型,第二個(gè)泛型是返回值類型   第三個(gè)是key 的類型, 第四個(gè)是窗口對(duì)象.apply(new WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {@Overridepublic void apply(String key,  // 分組key    {"俄烏戰(zhàn)爭(zhēng)",[1,1,1,1,1]}TimeWindow window, // 窗口對(duì)象Iterable<Tuple2<String, Integer>> input, // 分組key在窗口的所有數(shù)據(jù)Collector<String> out  // 用于輸出) throws Exception {long start = window.getStart();long end = window.getEnd();// lang3 包下的工具類String startStr = DateFormatUtils.format(start,"yyyy-MM-dd HH:mm:ss");String endStr = DateFormatUtils.format(end,"yyyy-MM-dd HH:mm:ss");int sum = 0;for(Tuple2<String,Integer> tuple2: input){sum += tuple2.f1;}out.collect(key +"," + startStr +","+endStr +",sum="+sum);}}).print();//5. execute-執(zhí)行env.execute();}
}

當(dāng)執(zhí)行kafka接收消息端時(shí),會(huì)報(bào)如下錯(cuò)誤:?

?錯(cuò)誤原因:在對(duì)kafka中數(shù)據(jù)進(jìn)行KeyBy分組處理時(shí),使用了lambda表達(dá)式

?

解決方法:

在使用KeyBy時(shí),將函數(shù)的各種參數(shù)類型都寫清楚,修改后的代碼如下:

package com.bigdata.Day04;import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Properties;public class Demo02_kafka收消息 {public static void main(String[] args) throws Exception {//1. env-準(zhǔn)備環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加載數(shù)據(jù)Properties properties = new Properties();properties.setProperty("bootstrap.servers","bigdata01:9092");properties.setProperty("group.id", "g2");FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("topic1",new SimpleStringSchema(),properties);DataStreamSource<String> dataStreamSource = env.addSource(kafkaSource);// transformation-數(shù)據(jù)處理轉(zhuǎn)換DataStream<Tuple2<String,Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String,Integer> map(String word) throws Exception {return Tuple2.of(word,1);}});KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple2) throws Exception {return tuple2.f0;}});keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))// 第一個(gè)泛型是輸入數(shù)據(jù)的類型,第二個(gè)泛型是返回值類型   第三個(gè)是key 的類型, 第四個(gè)是窗口對(duì)象.apply(new WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {@Overridepublic void apply(String key,  // 分組key    {"俄烏戰(zhàn)爭(zhēng)",[1,1,1,1,1]}TimeWindow window, // 窗口對(duì)象Iterable<Tuple2<String, Integer>> input, // 分組key在窗口的所有數(shù)據(jù)Collector<String> out  // 用于輸出) throws Exception {long start = window.getStart();long end = window.getEnd();// lang3 包下的工具類String startStr = DateFormatUtils.format(start,"yyyy-MM-dd HH:mm:ss");String endStr = DateFormatUtils.format(end,"yyyy-MM-dd HH:mm:ss");int sum = 0;for(Tuple2<String,Integer> tuple2: input){sum += tuple2.f1;}out.collect(key +"," + startStr +","+endStr +",sum="+sum);}}).print();//5. execute-執(zhí)行env.execute();}
}

http://www.risenshineclean.com/news/47412.html

相關(guān)文章:

  • 使用php做的網(wǎng)站有哪些西安自助建站
  • 裝飾公司手機(jī)網(wǎng)站湖南網(wǎng)站seo地址
  • 做我韓國(guó)連續(xù)劇網(wǎng)站線上營(yíng)銷技巧和營(yíng)銷方法
  • asp.net 網(wǎng)站壓縮線下推廣宣傳方式有哪些
  • 網(wǎng)絡(luò)培訓(xùn)的網(wǎng)站建設(shè)搜索引擎優(yōu)化要考慮哪些方面
  • 網(wǎng)站標(biāo)題關(guān)鍵字營(yíng)銷方案策劃書
  • 灌云縣建設(shè)局網(wǎng)站營(yíng)銷公司
  • 海外推廣有前途嗎無錫seo網(wǎng)絡(luò)推廣
  • wordpress分享js代碼無錫網(wǎng)站seo顧問
  • 多城市網(wǎng)站如何做seo建立網(wǎng)站的步驟
  • 京東內(nèi)部券網(wǎng)站怎么做百度公司全稱
  • 好的做網(wǎng)站東莞百度推廣排名
  • 培訓(xùn)的網(wǎng)站建設(shè)鳴蟬智能建站
  • 怎樣看出一個(gè)網(wǎng)站是那個(gè)公司做的合肥百度推廣公司哪家好
  • 鄭州網(wǎng)站設(shè)計(jì)專家seo掛機(jī)賺錢
  • 北京國(guó)貿(mào)網(wǎng)站建設(shè)網(wǎng)絡(luò)優(yōu)化器下載
  • 網(wǎng)站制作公司承擔(dān)seo外包公司多嗎
  • 網(wǎng)站加關(guān)鍵詞代碼培訓(xùn)網(wǎng)站建設(shè)
  • 仿網(wǎng)站制作教學(xué)視頻網(wǎng)絡(luò)營(yíng)銷與直播電商怎么樣
  • 四川成都營(yíng)銷型網(wǎng)站數(shù)據(jù)分析網(wǎng)站
  • 網(wǎng)站建設(shè) 后端前端廣告聯(lián)盟平臺(tái)哪個(gè)好
  • jsp是前端還是后端開發(fā)的煙臺(tái)seo網(wǎng)絡(luò)推廣
  • 在華圖做網(wǎng)站編輯友情鏈接交換條件
  • 百度收錄網(wǎng)站定位地址公司網(wǎng)絡(luò)推廣營(yíng)銷
  • 網(wǎng)站開發(fā)需要什么資質(zhì)百度官方網(wǎng)頁版
  • 怎么健手機(jī)網(wǎng)站最新新聞熱點(diǎn)事件及評(píng)論
  • 彈性云主機(jī)做網(wǎng)站營(yíng)銷策略范文
  • 論壇網(wǎng)站建設(shè)網(wǎng)站推廣的方法有哪幾種
  • 現(xiàn)在購(gòu)物平臺(tái)哪個(gè)最好seo黑帽培訓(xùn)
  • 網(wǎng)站建設(shè)策劃案怎么寫推廣普通話手抄報(bào)文字內(nèi)容