那些網(wǎng)站做的比較好公眾號推廣一個6元
1、什么是CEP?
Flink CEP即 Flink Complex Event Processing,是基于DataStream流式數(shù)據(jù)提供的一套復雜事件處理編程模型。你可以把他理解為基于無界流的一套正則匹配模型,即對于無界流中的各種數(shù)據(jù)(稱為事件),提供一種組合匹配的功能。
上圖中,以不同形狀代表一個DataStream中不同屬性的事件。以一個圓圈和一個三角組成一個Pattern后,就可以快速過濾出原來的DataStream中符合規(guī)律的數(shù)據(jù)。舉個例子,比如很多網(wǎng)站需要對惡意登錄的用戶進行屏蔽,如果用戶連續(xù)三次輸入錯誤的密碼,那就要鎖定當前用戶。在這個場景下,所有用戶的登錄行為就構(gòu)成了一個無界的數(shù)據(jù)流DataStream。而連續(xù)三次登錄失敗就是一個匹配模型Pattern。CEP編程模型的功能就是從用戶登錄行為這個無界數(shù)據(jù)流DataStream中,找出符合這個匹配模Pattern的所有數(shù)據(jù)。這種場景下,使用我們前面介紹的各種DataStream API其實也是可以實現(xiàn)的,不過相對就麻煩很多。而CEP編程模型則提供了非常簡單靈活的功能實現(xiàn)方式。
2、代碼實現(xiàn)
2.1 引入maven依賴:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.roy</groupId><artifactId>FlinkDemo</artifactId><version>1.0</version><properties><flink.version>1.12.5</flink.version><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><log4j.version>2.12.1</log4j.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><!-- CEP主要是下面這個依賴 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2-uber</artifactId><version>2.8.3-10.0</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.14</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.1.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>
2.2 基本流程
//1、獲取原始事件流
DataStream<Event> input = ......;
//2、定義匹配器
Pattern<Event,?> pattern = .......;
//3、獲取匹配流
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
//4、將匹配流中的數(shù)據(jù)處理形成結(jié)果數(shù)據(jù)流
DataStream<Result> resultStream = patternStream.process(new PatternProcessFunction<Event, Result>() {@Overridepublic void processMatch(Map<String, List<Event>> pattern,Context ctx,Collector<Result> out) throws Exception {}
});
2.3 完整代碼
注意:代碼運行前,先啟動2.4 nlk socket服務
package com.roy.flink.project.userlogin;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.List;
import java.util.Map;/*** @desc 十秒內(nèi)連續(xù)登錄失敗的用戶分析。使用Flink CEP進行快速模式匹配*/
public class MyUserLoginAna {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// //BoundedOutOfOrdernessWatermarks定時提交Watermark的間隔env.getConfig().setAutoWatermarkInterval(1000L);// 使用Socket測試env.setParallelism(1);// 1、獲取原始事件流(10.86.97.206改為實際地址)final DataStreamSource<String> dataStreamSource = env.socketTextStream("10.86.97.206",7777);final SingleOutputStreamOperator<UserLoginRecord> userLoginRecordStream = dataStreamSource.map(new MapFunction<String, UserLoginRecord>() {@Overridepublic UserLoginRecord map(String s) throws Exception {final String[] splitVal = s.split(",");return new UserLoginRecord(splitVal[0], Integer.parseInt(splitVal[1]), Long.parseLong(splitVal[2]));}}).assignTimestampsAndWatermarks(WatermarkStrategy.<UserLoginRecord>forBoundedOutOfOrderness(Duration.ofSeconds(1))// 主要針對亂序流,由于亂序流中需要等待遲到數(shù)據(jù)到齊,所以必須設置一個固定量的延遲時間.withTimestampAssigner((SerializableTimestampAssigner<UserLoginRecord>) (element, recordTimestamp) -> element.getLoginTime()));// 2、定義匹配器// 2.1:10秒內(nèi)出現(xiàn)3次登錄失敗的記錄(不一定連續(xù))// Flink CEP定義消息匹配器。
// final Pattern<UserLoginRecord, UserLoginRecord> pattern = Pattern.<UserLoginRecord>begin("start").where(new SimpleCondition<UserLoginRecord>() {
// @Override
// public boolean filter(UserLoginRecord userLoginRecord) throws Exception {
// return 1 == userLoginRecord.getLoginRes();
// }
// }).times(3).within(Time.seconds(10));// 2.2:連續(xù)三次登錄失敗。next表示連續(xù)匹配。 不連續(xù)匹配使用followByfinal Pattern<UserLoginRecord, UserLoginRecord> pattern = Pattern.<UserLoginRecord>begin("one").where(new SimpleCondition<UserLoginRecord>() {@Overridepublic boolean filter(UserLoginRecord value) throws Exception {return 1 == value.getLoginRes();}}).next("two").where(new SimpleCondition<UserLoginRecord>() {@Overridepublic boolean filter(UserLoginRecord value) throws Exception {return 1 == value.getLoginRes();}}).next("three").where(new SimpleCondition<UserLoginRecord>() {@Overridepublic boolean filter(UserLoginRecord value) throws Exception {return 1 == value.getLoginRes();}}).within(Time.seconds(10));// 3、獲取匹配流final PatternStream<UserLoginRecord> badUser = CEP.pattern(userLoginRecordStream, pattern);final MyProcessFunction myProcessFunction = new MyProcessFunction();// 4、將匹配流中的數(shù)據(jù)處理成結(jié)果數(shù)據(jù)流final SingleOutputStreamOperator<UserLoginRecord> badUserStream = badUser.process(myProcessFunction);badUserStream.print("badUser");env.execute("UserLoginAna");}// mainpublic static class MyProcessFunction extends PatternProcessFunction<UserLoginRecord,UserLoginRecord>{@Overridepublic void processMatch(Map<String, List<UserLoginRecord>> match, Context ctx, Collector<UserLoginRecord> out) throws Exception {// 針對2.1 連續(xù)3次登錄失敗
// final List<UserLoginRecord> records = match.get("start");
// for(UserLoginRecord record : records){
// out.collect(record);
// }// 針對2.2 非連續(xù)3次登錄失敗final List<UserLoginRecord> records = match.get("three");for(UserLoginRecord record : records){out.collect(record);}}// processMarch}// MyProcessFunction
}
UserLoginRecord對象,如下:
public class UserLoginRecord {private String userId;private int loginRes; // 0-成功, 1-失敗private long loginTime;public UserLoginRecord() {}public UserLoginRecord(String userId, int loginRes, long loginTime) {this.userId = userId;this.loginRes = loginRes;this.loginTime = loginTime;}@Overridepublic String toString() {return "UserLoginRecord{" +"userId='" + userId + '\'' +", loginRes=" + loginRes +", loginTime=" + loginTime +'}';}public String getUserId() {return userId;}public void setUserId(String userId) {this.userId = userId;}public int getLoginRes() {return loginRes;}public void setLoginRes(int loginRes) {this.loginRes = loginRes;}public long getLoginTime() {return loginTime;}public void setLoginTime(long loginTime) {this.loginTime = loginTime;}
}