中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

網(wǎng)頁源代碼搜索關(guān)鍵字如何seo推廣

網(wǎng)頁源代碼搜索關(guān)鍵字,如何seo推廣,分類目錄網(wǎng)站有哪些,可以做用戶調(diào)研的網(wǎng)站文章目錄 1、filter算子實(shí)現(xiàn)分流2、分流:使用側(cè)輸出流3、合流:union4、合流:connect5、connect案例 分流,很形象的一個(gè)詞,就像一條大河,遇到岸邊有分叉的,而形成了主流和測(cè)流。對(duì)于數(shù)據(jù)流也一樣…

文章目錄

  • 1、filter算子實(shí)現(xiàn)分流
  • 2、分流:使用側(cè)輸出流
  • 3、合流:union
  • 4、合流:connect
  • 5、connect案例

分流,很形象的一個(gè)詞,就像一條大河,遇到岸邊有分叉的,而形成了主流和測(cè)流。對(duì)于數(shù)據(jù)流也一樣,不過是一個(gè)個(gè)水滴替換成了一條條數(shù)據(jù)。

在這里插入圖片描述

將一條數(shù)據(jù)流拆分成完全獨(dú)立的兩條、甚至多條流。也就是基于一個(gè)DataStream,定義一些篩選條件,將符合條件的數(shù)據(jù)揀選出來放到對(duì)應(yīng)的流里。

在這里插入圖片描述

1、filter算子實(shí)現(xiàn)分流

Demo案例:讀取一個(gè)整數(shù)數(shù)字流,將數(shù)據(jù)流劃分為奇數(shù)流和偶數(shù)流。

實(shí)現(xiàn)思路:針對(duì)同一個(gè)流,多次條用filter算子來拆分

public class SplitStreamByFilter {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();SingleOutputStreamOperator<Integer> ds = env.socketTextStream("node01", 9527).map(Integer::valueOf);//將ds 分為兩個(gè)流 ,一個(gè)是奇數(shù)流,一個(gè)是偶數(shù)流//使用filter 過濾兩次SingleOutputStreamOperator<Integer> ds1 = ds.filter(x -> x % 2 == 0);SingleOutputStreamOperator<Integer> ds2 = ds.filter(x -> x % 2 == 1);ds1.print("偶數(shù)");ds2.print("奇數(shù)");env.execute();}
}

以上實(shí)現(xiàn)的明顯缺陷是,同一條數(shù)據(jù),被多次處理。以上其實(shí)是將原始數(shù)據(jù)流stream復(fù)制兩份,然后對(duì)每一份分別做篩選,冗余且低效。

2、分流:使用側(cè)輸出流

基本步驟為:

  • 使用process算子(Flink分層API中的最底層的處理函數(shù))
  • 定義OutputTag對(duì)象,即輸出標(biāo)簽對(duì)象,用于后面標(biāo)記和提取側(cè)流
  • 調(diào)用上下文ctx的.output()方法
  • 通過主流獲取側(cè)流
案例:實(shí)現(xiàn)將WaterSensor按照Id類型進(jìn)行分流

先定義下MapFunction的轉(zhuǎn)換規(guī)則,用來將輸入的數(shù)據(jù)轉(zhuǎn)為自定義的WaterSensor對(duì)象:

public class WaterSensorMapFunction implements MapFunction<StringWaterSensor>{@Overridepublic WaterSensor map(String value) throws Exception {String[] strArr = value.split( regex: ",");//String組裝對(duì)象return new WaterSensor(strArr[0],Long.value0f(strArr[1]),Integer.value0f(strArr[2]));}
}

使用側(cè)流:

public class SplitStreamByOutputTag {    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();SingleOutputStreamOperator<WaterSensor> ds = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction());//定義兩個(gè)輸出標(biāo)簽對(duì)象,用于后面標(biāo)記和提取側(cè)流OutputTag<WaterSensor> s1 = new OutputTag<>("s1", Types.POJO(WaterSensor.class));OutputTag<WaterSensor> s2 = new OutputTag<>("s2", Types.POJO(WaterSensor.class));//返回的都是主流SingleOutputStreamOperator<WaterSensor> ds1 = ds.process(new ProcessFunction<WaterSensor, WaterSensor>(){@Override//形參為別為:流中的一條數(shù)據(jù)、上下文對(duì)象、收集器public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {if ("s1".equals(value.getId())) {ctx.output(s1, value);} else if ("s2".equals(value.getId())) {ctx.output(s2, value);} else {//主流out.collect(value);}}});ds1.print("主流");SideOutputDataStream<WaterSensor> s1DS = ds1.getSideOutput(s1);SideOutputDataStream<WaterSensor> s2DS = ds1.getSideOutput(s2);s1DS.printToErr("側(cè)流s1");  //區(qū)別主流,讓控制臺(tái)輸出標(biāo)紅s2DS.printToErr("側(cè)流s2");env.execute();}
}

相關(guān)傳參說明,首先是創(chuàng)建OutputTag對(duì)象時(shí)的傳參:

  • 第一個(gè)參數(shù)為標(biāo)簽名,用于區(qū)分是哪一個(gè)側(cè)流
  • 第二個(gè)是放入側(cè)流中的數(shù)據(jù)的類型,且必須是Flink的類型(TypeInfomation,借助Types類)
  • OutputTag的泛型,是流到對(duì)應(yīng)的側(cè)流的數(shù)據(jù)類型

ProcessFunction接口的泛型中:

  • 第一個(gè)是輸入的數(shù)據(jù)類型
  • 第二個(gè)是輸出到主流上的數(shù)據(jù)類型

ctx.output方法的形參:

  • 第一個(gè)為outputTag對(duì)象
  • 第二個(gè)為數(shù)據(jù),上面代碼中傳value即直接輸出數(shù)據(jù)本身,也可輸出處理后的數(shù)據(jù),主流側(cè)流數(shù)據(jù)類型不用一致

看下運(yùn)行效果:

在這里插入圖片描述

3、合流:union

將來源不同的多條流,合并成一條來聯(lián)合處理,即合流。最簡(jiǎn)單的合流操作,就是直接將多條流合在一起,叫作流的聯(lián)合(union)

在這里插入圖片描述

union的條件是:

  • 每條流中要合并的數(shù)據(jù)類型必須相同(原始不同,可先借助map,在union)
  • 合并之后的新流會(huì)包括所有流中的元素,數(shù)據(jù)類型不變
stream1.union(stream2, stream3, ...)  //可變長(zhǎng)參數(shù)
public class UnionExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> ds1 = env.fromElements(1, 2, 3);DataStreamSource<Integer> ds2 = env.fromElements(2, 2, 3);DataStreamSource<String> ds3 = env.fromElements("2", "2", "3");ds1.union(ds2,ds3.map(Integer::valueOf)).print();env.execute();}
}
//輸出:
1
2
3
2
2
3
2
2
3

4、合流:connect

union合并流受限于數(shù)據(jù)類型,因此還有另一種合流操作:connect

在這里插入圖片描述

public class ConnectDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//Integer流SingleOutputStreamOperator<Integer> source1 = env.socketTextStream("node01", 9527).map(i -> Integer.parseInt(i));//String流DataStreamSource<String> source2 = env.socketTextStream("node01", 2795);/*** 總結(jié): 使用 connect 合流* 1、一次只能連接 2條流* 2、流的數(shù)據(jù)類型可以不一樣* 3、 連接后可以調(diào)用 map、flatmap、process來處理,但是各處理各的*/ConnectedStreams<Integer, String> connect = source1.connect(source2);SingleOutputStreamOperator<String> result = connect.map(new CoMapFunction<Integer, String, String>() {@Overridepublic String map1(Integer value) throws Exception {return "來源于原source1流:" + value.toString();}@Overridepublic String map2(String value) throws Exception {return "來源于原source2流:" + value;}});result.print();env.execute();    }
}

使用 connect 合流的總結(jié):

  • 一次只能連接 2條流,因?yàn)閏onnect返回的是一個(gè)ConnectedStreams對(duì)象,不再是DataStreamSource或其子類了
  • 兩條流中的數(shù)據(jù)類型可以不一樣
  • 連接后可以調(diào)用 map、flatmap、process來處理,但是各處理各的

以map為例,其形參是一個(gè)CoMapFuntion接口類型,泛型則分別是流1的數(shù)據(jù)類型、流2的數(shù)據(jù)類型、合并及處理后輸出的數(shù)據(jù)類型。兩個(gè)map方法可以看出,雖然兩個(gè)流合并成一個(gè)了,但處理數(shù)據(jù)時(shí)還是各玩各的。

  • .map1()就是對(duì)第一條流中數(shù)據(jù)的map操作
  • .map2()則是針對(duì)第二條流

在這里插入圖片描述

connect 就類比被逼相親后結(jié)婚,兩個(gè)人看似成一家了,但實(shí)際上各自玩各自的。往大了舉例就相當(dāng)于一國兩制。

5、connect案例

和connect以后的map傳CoMapFunction一樣,process算子也不再傳ProcessFunction,而是CoProcessFunction,實(shí)現(xiàn)兩個(gè)方法:

  • processElement1():針對(duì)第一條流
  • processElement2():針對(duì)第二條流

connect合并后得到的ConnectedStreams也可以直接調(diào)用.keyBy()進(jìn)行按鍵分區(qū),分區(qū)后返回的還是一個(gè)ConnectedStreams

connectedStreams.keyBy(keySelector1, keySelector2);
//keySelector1和keySelector2,是兩條流中各自的鍵選擇器

ConnectedStreams進(jìn)行keyBy操作,其實(shí)就是把兩條流中key相同的數(shù)據(jù)放到了一起,然后針對(duì)來源的流再做各自處理

案例需求:連接兩條流,輸出能根據(jù)id匹配上的數(shù)據(jù),即兩個(gè)流里元組f0相同的數(shù)據(jù)(類似inner join效果)
public class ConnectKeybyDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);//二元組流DataStreamSource<Tuple2<Integer, String>> source1 = env.fromElements(Tuple2.of(1, "a1"),Tuple2.of(1, "a2"),Tuple2.of(2, "b"),Tuple2.of(3, "c"));//三元組流DataStreamSource<Tuple3<Integer, String, Integer>> source2 = env.fromElements(Tuple3.of(1, "aa1", 1),Tuple3.of(1, "aa2", 2),Tuple3.of(2, "bb", 1),Tuple3.of(3, "cc", 1));ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connect = source1.connect(source2);// 多并行度下,需要根據(jù) 關(guān)聯(lián)條件 進(jìn)行keyby,才能保證key相同的數(shù)據(jù)到一起去,才能匹配上ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connectKey = connect.keyBy(s1 -> s1.f0, s2 -> s2.f0);SingleOutputStreamOperator<String> result = connectKey.process(new CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>() {// 定義 HashMap,緩存來過的數(shù)據(jù),key=id,value=list<數(shù)據(jù)>Map<Integer, List<Tuple2<Integer, String>>> s1Cache = new HashMap<>();Map<Integer, List<Tuple3<Integer, String, Integer>>> s2Cache = new HashMap<>();@Overridepublic void processElement1(Tuple2<Integer, String> value, Context ctx, Collector<String> out) throws Exception {Integer id = value.f0;// TODO 1.來過的s1數(shù)據(jù),都存起來if (!s1Cache.containsKey(id)) {// 1.1 第一條數(shù)據(jù),初始化 value的list,放入 hashmapList<Tuple2<Integer, String>> s1Values = new ArrayList<>();s1Values.add(value);s1Cache.put(id, s1Values);} else {// 1.2 不是第一條,直接添加到 list中s1Cache.get(id).add(value);}//TODO 2.根據(jù)id,查找s2的數(shù)據(jù),只輸出 匹配上 的數(shù)據(jù)if (s2Cache.containsKey(id)) {for (Tuple3<Integer, String, Integer> s2Element : s2Cache.get(id)) {out.collect("s1:" + value + "<--------->s2:" + s2Element);}}}@Overridepublic void processElement2(Tuple3<Integer, String, Integer> value, Context ctx, Collector<String> out) throws Exception {Integer id = value.f0;// TODO 1.來過的s2數(shù)據(jù),都存起來if (!s2Cache.containsKey(id)) {// 1.1 第一條數(shù)據(jù),初始化 value的list,放入 hashmapList<Tuple3<Integer, String, Integer>> s2Values = new ArrayList<>();s2Values.add(value);s2Cache.put(id, s2Values);} else {// 1.2 不是第一條,直接添加到 list中s2Cache.get(id).add(value);}//TODO 2.根據(jù)id,查找s1的數(shù)據(jù),只輸出 匹配上 的數(shù)據(jù)if (s1Cache.containsKey(id)) {for (Tuple2<Integer, String> s1Element : s1Cache.get(id)) {out.collect("s1:" + s1Element + "<--------->s2:" + value);}}}});result.print();env.execute();}
}

運(yùn)行效果:

在這里插入圖片描述

http://www.risenshineclean.com/news/39032.html

相關(guān)文章:

  • php網(wǎng)站開發(fā)過程免費(fèi)下載b站視頻軟件
  • 云南昆明做網(wǎng)站西安競(jìng)價(jià)托管公司
  • 做網(wǎng)站需要什么基礎(chǔ)百度開發(fā)者平臺(tái)
  • 網(wǎng)站彈窗客服怎樣搭建自己的網(wǎng)站
  • 鹽城微網(wǎng)站建設(shè)廣州王牌seo
  • 開發(fā)一個(gè)網(wǎng)站的步驟推廣軟件賺錢的app
  • 廣州移動(dòng) 網(wǎng)站設(shè)計(jì)如何在各大平臺(tái)推廣
  • 天津建設(shè)工程信息網(wǎng)如何注冊(cè)網(wǎng)站優(yōu)化推廣招聘
  • 網(wǎng)站搭建搜外友鏈
  • 如何開網(wǎng)店無貨源不需要投資河北seo技術(shù)
  • 在設(shè)計(jì)賺錢的網(wǎng)站有哪些做網(wǎng)站需要多少錢
  • 廣東省農(nóng)業(yè)農(nóng)村廳官方網(wǎng)站成都網(wǎng)站快速開發(fā)
  • 建站源碼程序惠州seo外包服務(wù)
  • 自己的網(wǎng)站怎么開培訓(xùn)心得體會(huì)范文大全2000字
  • 外貿(mào)必看網(wǎng)站湖南百度seo
  • 做3d效果的網(wǎng)站亞馬遜關(guān)鍵詞排名提升
  • 松江品劃做網(wǎng)站云浮新增確診病例30例
  • 易企網(wǎng)站建設(shè)滁州網(wǎng)站seo
  • 系統(tǒng)優(yōu)化的方法知識(shí)點(diǎn)外貿(mào)建站優(yōu)化
  • 深圳網(wǎng)站建設(shè)加q479185700外貿(mào)網(wǎng)絡(luò)營銷推廣
  • 保障性租賃住房管理平臺(tái)優(yōu)化大師班級(jí)優(yōu)化大師
  • 網(wǎng)站建設(shè)與開發(fā)論文谷歌seo是什么意思
  • php外貿(mào)網(wǎng)站制作最快新聞資訊在哪看
  • 購物網(wǎng)站開發(fā)需求文檔百度云登錄入口
  • 石獅網(wǎng)站定制北京seo專業(yè)團(tuán)隊(duì)
  • #NAME?站長(zhǎng)工具seo優(yōu)化系統(tǒng)
  • 網(wǎng)站描述是什么濟(jì)南網(wǎng)站seo優(yōu)化
  • 換接入商網(wǎng)站備案百度指數(shù)搜索
  • 安徽網(wǎng)站建站系統(tǒng)哪家好谷歌網(wǎng)站優(yōu)化推廣
  • 可以兼職做設(shè)計(jì)的網(wǎng)站百度收錄入口