布吉商城網(wǎng)站建設(shè)泰州網(wǎng)站建設(shè)優(yōu)化
目錄
一、Flink簡介
二、為什么選擇Flink
三、與傳統(tǒng)數(shù)據(jù)處理架構(gòu)相比
四、Flinik批處理數(shù)據(jù)基礎(chǔ)代碼
五、Flink流處理基礎(chǔ)代碼
一、Flink簡介
Apache Flink 是一個(gè)框架和分布式處理引擎,用于對無界和有界數(shù) 據(jù)流進(jìn)行狀態(tài)計(jì)算。
二、為什么選擇Flink
流數(shù)據(jù)更真實(shí)地反映了我們的生活方式
傳統(tǒng)的數(shù)據(jù)架構(gòu)是基于有限數(shù)據(jù)集的
低延遲 ? 高吞吐 ? 結(jié)果的準(zhǔn)確性和良好的容錯性
三、與傳統(tǒng)數(shù)據(jù)處理架構(gòu)相比
傳統(tǒng)分析處理中,將數(shù)據(jù)從業(yè)務(wù)數(shù)據(jù)庫復(fù)制到數(shù)倉,再進(jìn)行分析和查詢
?而有狀態(tài)的流式處理
?
四、Flinik批處理數(shù)據(jù)基礎(chǔ)代碼
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;//批處理word count
public class WordCount {public static void main(String[] args) throws Exception{//創(chuàng)建執(zhí)行環(huán)境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//從文件里讀取數(shù)據(jù)String inputPath = "D:\\java\\Flink\\src\\main\\webapp\\resource\\hello.txt";DataSource<String> inputDataSet = env.readTextFile(inputPath);//對數(shù)據(jù)集進(jìn)行處理,按空格分詞展開,轉(zhuǎn)換成(word,1)二元組進(jìn)行統(tǒng)計(jì)DataSet<Tuple2<String,Integer>> resultSet = inputDataSet.flatMap(new MyflatMapper()).groupBy(0) //按照第一個(gè)位置的word分組.sum(1); //將第二個(gè)位置上的數(shù)據(jù)求和resultSet.print();}//自定義類,實(shí)現(xiàn)FlatMapFunction接口public static class MyflatMapper implements FlatMapFunction<String, Tuple2<String,Integer>>{@Overridepublic void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {//按空格分詞String[] words = s.split(" ");//遍歷所有的word,包成二元組輸出for (String word: words){collector.collect(new Tuple2<>(word,1));}}}}
五、Flink流處理基礎(chǔ)代碼
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;//流處理(數(shù)據(jù)邊來邊處理)
public class StreamWordCount {public static void main(String[] args) throws Exception{//創(chuàng)建流處理執(zhí)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//設(shè)置并行度為8env.setParallelism(8);//從文件中讀取數(shù)據(jù)
// String inputPath = "D:\\java\\Flink\\src\\main\\webapp\\resource\\hello.txt";
// DataStream<String> inputDataStream = env.readTextFile(inputPath);//從KAFKA中讀取流數(shù)據(jù)(監(jiān)聽端口號,邊輸入邊處理)//用parameter tool工具從程序啟動參數(shù)中提取配置項(xiàng)ParameterTool parameterTool = ParameterTool.fromArgs(args);String host = parameterTool.get("host");int port = parameterTool.getInt("port");DataStream<String> inputDataStream = env.socketTextStream(host,port);//基于數(shù)據(jù)流進(jìn)行轉(zhuǎn)換計(jì)算SingleOutputStreamOperator<Tuple2<String,Integer>> resultStream =inputDataStream.flatMap( new WordCount.MyflatMapper()).keyBy(0).sum(1);resultStream.print();//執(zhí)行任務(wù)env.execute();}
}