中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當前位置: 首頁 > news >正文

那些網(wǎng)站做的比較好公眾號推廣一個6元

那些網(wǎng)站做的比較好,公眾號推廣一個6元,科技特長生,太原做網(wǎng)站頁面的1、什么是CEP? Flink CEP即 Flink Complex Event Processing,是基于DataStream流式數(shù)據(jù)提供的一套復雜事件處理編程模型。你可以把他理解為基于無界流的一套正則匹配模型,即對于無界流中的各種數(shù)據(jù)(稱為事件),提供一種組合匹配的…

1、什么是CEP?

Flink CEP即 Flink Complex Event Processing,是基于DataStream流式數(shù)據(jù)提供的一套復雜事件處理編程模型。你可以把他理解為基于無界流的一套正則匹配模型,即對于無界流中的各種數(shù)據(jù)(稱為事件),提供一種組合匹配的功能。

在這里插入圖片描述
上圖中,以不同形狀代表一個DataStream中不同屬性的事件。以一個圓圈和一個三角組成一個Pattern后,就可以快速過濾出原來的DataStream中符合規(guī)律的數(shù)據(jù)。舉個例子,比如很多網(wǎng)站需要對惡意登錄的用戶進行屏蔽,如果用戶連續(xù)三次輸入錯誤的密碼,那就要鎖定當前用戶。在這個場景下,所有用戶的登錄行為就構(gòu)成了一個無界的數(shù)據(jù)流DataStream。而連續(xù)三次登錄失敗就是一個匹配模型Pattern。CEP編程模型的功能就是從用戶登錄行為這個無界數(shù)據(jù)流DataStream中,找出符合這個匹配模Pattern的所有數(shù)據(jù)。這種場景下,使用我們前面介紹的各種DataStream API其實也是可以實現(xiàn)的,不過相對就麻煩很多。而CEP編程模型則提供了非常簡單靈活的功能實現(xiàn)方式。

2、代碼實現(xiàn)

2.1 引入maven依賴:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.roy</groupId><artifactId>FlinkDemo</artifactId><version>1.0</version><properties><flink.version>1.12.5</flink.version><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><log4j.version>2.12.1</log4j.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><!-- CEP主要是下面這個依賴 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2-uber</artifactId><version>2.8.3-10.0</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.14</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.1.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>

2.2 基本流程

//1、獲取原始事件流
DataStream<Event> input = ......; 
//2、定義匹配器
Pattern<Event,?> pattern = .......; 
//3、獲取匹配流
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
//4、將匹配流中的數(shù)據(jù)處理形成結(jié)果數(shù)據(jù)流
DataStream<Result> resultStream = patternStream.process(new PatternProcessFunction<Event, Result>() {@Overridepublic void processMatch(Map<String, List<Event>> pattern,Context ctx,Collector<Result> out) throws Exception {}
});

2.3 完整代碼

注意:代碼運行前,先啟動2.4 nlk socket服務

package com.roy.flink.project.userlogin;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.List;
import java.util.Map;/*** @desc 十秒內(nèi)連續(xù)登錄失敗的用戶分析。使用Flink CEP進行快速模式匹配*/
public class MyUserLoginAna {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// //BoundedOutOfOrdernessWatermarks定時提交Watermark的間隔env.getConfig().setAutoWatermarkInterval(1000L);// 使用Socket測試env.setParallelism(1);// 1、獲取原始事件流(10.86.97.206改為實際地址)final DataStreamSource<String> dataStreamSource = env.socketTextStream("10.86.97.206",7777);final SingleOutputStreamOperator<UserLoginRecord> userLoginRecordStream = dataStreamSource.map(new MapFunction<String, UserLoginRecord>() {@Overridepublic UserLoginRecord map(String s) throws Exception {final String[] splitVal = s.split(",");return new UserLoginRecord(splitVal[0], Integer.parseInt(splitVal[1]), Long.parseLong(splitVal[2]));}}).assignTimestampsAndWatermarks(WatermarkStrategy.<UserLoginRecord>forBoundedOutOfOrderness(Duration.ofSeconds(1))// 主要針對亂序流,由于亂序流中需要等待遲到數(shù)據(jù)到齊,所以必須設置一個固定量的延遲時間.withTimestampAssigner((SerializableTimestampAssigner<UserLoginRecord>) (element, recordTimestamp) -> element.getLoginTime()));// 2、定義匹配器// 2.1:10秒內(nèi)出現(xiàn)3次登錄失敗的記錄(不一定連續(xù))// Flink CEP定義消息匹配器。
//        final Pattern<UserLoginRecord, UserLoginRecord> pattern = Pattern.<UserLoginRecord>begin("start").where(new SimpleCondition<UserLoginRecord>() {
//            @Override
//            public boolean filter(UserLoginRecord userLoginRecord) throws Exception {
//                return 1 == userLoginRecord.getLoginRes();
//            }
//        }).times(3).within(Time.seconds(10));// 2.2:連續(xù)三次登錄失敗。next表示連續(xù)匹配。 不連續(xù)匹配使用followByfinal Pattern<UserLoginRecord, UserLoginRecord> pattern = Pattern.<UserLoginRecord>begin("one").where(new SimpleCondition<UserLoginRecord>() {@Overridepublic boolean filter(UserLoginRecord value) throws Exception {return 1 == value.getLoginRes();}}).next("two").where(new SimpleCondition<UserLoginRecord>() {@Overridepublic boolean filter(UserLoginRecord value) throws Exception {return 1 == value.getLoginRes();}}).next("three").where(new SimpleCondition<UserLoginRecord>() {@Overridepublic boolean filter(UserLoginRecord value) throws Exception {return 1 == value.getLoginRes();}}).within(Time.seconds(10));// 3、獲取匹配流final PatternStream<UserLoginRecord> badUser = CEP.pattern(userLoginRecordStream, pattern);final MyProcessFunction myProcessFunction = new MyProcessFunction();// 4、將匹配流中的數(shù)據(jù)處理成結(jié)果數(shù)據(jù)流final SingleOutputStreamOperator<UserLoginRecord> badUserStream = badUser.process(myProcessFunction);badUserStream.print("badUser");env.execute("UserLoginAna");}// mainpublic static class MyProcessFunction extends PatternProcessFunction<UserLoginRecord,UserLoginRecord>{@Overridepublic void processMatch(Map<String, List<UserLoginRecord>> match, Context ctx, Collector<UserLoginRecord> out) throws Exception {// 針對2.1 連續(xù)3次登錄失敗
//            final List<UserLoginRecord> records = match.get("start");
//            for(UserLoginRecord record : records){
//                out.collect(record);
//            }// 針對2.2 非連續(xù)3次登錄失敗final List<UserLoginRecord> records = match.get("three");for(UserLoginRecord record : records){out.collect(record);}}// processMarch}// MyProcessFunction
}

UserLoginRecord對象,如下:


public class UserLoginRecord {private String userId;private int loginRes; // 0-成功, 1-失敗private long loginTime;public UserLoginRecord() {}public UserLoginRecord(String userId, int loginRes, long loginTime) {this.userId = userId;this.loginRes = loginRes;this.loginTime = loginTime;}@Overridepublic String toString() {return "UserLoginRecord{" +"userId='" + userId + '\'' +", loginRes=" + loginRes +", loginTime=" + loginTime +'}';}public String getUserId() {return userId;}public void setUserId(String userId) {this.userId = userId;}public int getLoginRes() {return loginRes;}public void setLoginRes(int loginRes) {this.loginRes = loginRes;}public long getLoginTime() {return loginTime;}public void setLoginTime(long loginTime) {this.loginTime = loginTime;}
}

2.4 nlk模擬socket服務端

在這里插入圖片描述

2.5 IDEA控制臺打印

在這里插入圖片描述

http://www.risenshineclean.com/news/62635.html

相關文章:

  • 網(wǎng)站制作動態(tài)轉(zhuǎn)靜態(tài)怎么做關鍵詞自動優(yōu)化工具
  • 哪家做網(wǎng)站比較好店鋪引流的30種方法
  • 公司介紹模板ppt莆田關鍵詞優(yōu)化報價
  • 各大網(wǎng)址收錄查詢seo的優(yōu)化原理
  • 網(wǎng)站建設項目設計報告如何做好網(wǎng)絡營銷管理
  • 做電商網(wǎng)站注意什么云南網(wǎng)絡推廣
  • wordpress網(wǎng)站出現(xiàn)域名加兩個雙引號的圖片死鏈接怎樣在百度上發(fā)表文章
  • 網(wǎng)站產(chǎn)品詳情用哪個軟件做的seo零基礎視頻教程
  • 自己弄一個網(wǎng)站要多少錢漯河網(wǎng)站推廣公司
  • 頁游和做網(wǎng)站南寧seo排名優(yōu)化
  • 黃頁網(wǎng)站大全免費什么是新媒體運營
  • 做網(wǎng)站需要什么特色全網(wǎng)seo優(yōu)化電話
  • 網(wǎng)站建設維護需要作假嗎域名查詢注冊商
  • 沒得公司可以做網(wǎng)站嘛百度電腦端網(wǎng)頁版入口
  • 電商網(wǎng)站 網(wǎng)站服務內(nèi)容百度店鋪免費入駐
  • 互助網(wǎng)站開發(fā)seo站長查詢
  • 網(wǎng)站必做外鏈濟南今日頭條新聞
  • 做的網(wǎng)站如何防止怕爬蟲新聞早知道
  • 旅游網(wǎng)站開發(fā)方案ppt網(wǎng)絡營銷概述ppt
  • 貴州安順做公司網(wǎng)站搜索引擎優(yōu)化的方法與技巧
  • 邢臺市政建設集團股份有限公司網(wǎng)站百度云網(wǎng)盤入口
  • 制造業(yè)網(wǎng)站建設惠州自動seo
  • 做網(wǎng)站怎么添加背景圖片黃金網(wǎng)站app大全
  • 廣州天美展覽公司網(wǎng)站營銷策劃公司是干什么的
  • 連云港企業(yè)建站 網(wǎng)站36優(yōu)化大師下載安裝
  • 如何給網(wǎng)站死鏈接做404北京seo外包平臺
  • 校園網(wǎng)站建設的請示免費企業(yè)網(wǎng)站管理系統(tǒng)
  • 北京海淀國稅局網(wǎng)站北京seo網(wǎng)站管理
  • 中英文企業(yè)網(wǎng)站怎么做信息流廣告代理商
  • 帶做網(wǎng)站綠標互聯(lián)網(wǎng)營銷師怎么做