網(wǎng)站開發(fā)人員的水平滕州今日頭條新聞
前言:
? ? ? ?這是一個Flink自定義開發(fā)的基礎(chǔ)教學。本文將通過flink的DataStream模塊API,以kafka為數(shù)據(jù)源,構(gòu)建一個基礎(chǔ)測試環(huán)境;包含一個kafka生產(chǎn)者線程工具,一個自定義FilterFunction算子,一個自定義MapFunction算子,用一個flink任務(wù)的代碼邏輯,將實時讀kafka并多層處理串起來;讓讀者體會通過Flink構(gòu)建自定義函數(shù)的技巧。
一、Flink的開發(fā)模塊分析
Flink提供四個基礎(chǔ)模塊:核心SDK開發(fā)API分別是處理實時計算的DataStream和處理離線計算的DataSet;基于這兩個SDK,在其上包裝了TableAPI開發(fā)模塊的SDK;在Table API之上,定義了高度抽象可用SQL開發(fā)任務(wù)的FlinkSQL。在核心開發(fā)API之下,還有基礎(chǔ)API的接口,可用于對時間,狀態(tài),算子等最細粒度的特性對象做操作,如包裝自定義算子的ProcessWindowFunction和ProcessFunction等基礎(chǔ)函數(shù)以及內(nèi)置的對象狀態(tài)StateTtlConfig;
FLINK開發(fā)API關(guān)系結(jié)構(gòu)如下:
二、定制化開發(fā)Demo演示
2.1 場景介紹
Flink實時任務(wù)的的通用技術(shù)架構(gòu)是消息隊列中間件+Flink任務(wù):
將數(shù)據(jù)采集到Kafka或pulser這類隊列中間件的Topic,然后使用Flink內(nèi)置的kafkaSource,監(jiān)控Topic的數(shù)據(jù)情況,做實時處理。
- 這里提供一個kafka的生產(chǎn)者線程,可以自定義構(gòu)建需要的數(shù)據(jù)和上傳時間,用于控制寫入kafka的數(shù)據(jù)源;
- 重寫兩個DataStream的基礎(chǔ)算子:FilterFunction和MapFunction,用于讓讀者體會,如何對FLINK函數(shù)的重新包裝,后續(xù)更基礎(chǔ)的函數(shù)原理一樣;我這里用String數(shù)據(jù)對象做處理,減少對象轉(zhuǎn)換的SDK引入,通常要基于業(yè)務(wù)做數(shù)據(jù)polo的加工,這個自己處理,將對象換成業(yè)務(wù)對象;
- 然后使用Flink將整個業(yè)務(wù)串起來,從kafka讀數(shù)據(jù),經(jīng)過兩層處理,最終輸出需要的結(jié)果;
2.2 本地demo演示
2.2.1 pom文件
這里以flink1.14.6+scala1.12版本為例:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.example</groupId><artifactId>flinkCDC</artifactId><version>1.0-SNAPSHOT</version></parent><artifactId>flinkStream</artifactId><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink-version>1.14.6</flink-version><scala-version>2.12</scala-version><hadop-common-version>2.9.1</hadop-common-version><elasticsearch.version>7.6.2</elasticsearch.version><target.java.version>1.8</target.java.version><scala.binary.version>2.12</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.17.1</log4j.version></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots></snapshots></repository></repositories><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink-version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala-version}</artifactId><version>${flink-version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala-version}</artifactId><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions><version>${flink-version}</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${target.java.version}</source><target>${target.java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><createDependencyReducedPom>false</createDependencyReducedPom><artifactSet><excludes><exclude>org.apache.flink:flink-shaded-force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>myflinkml.DataStreamJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.1.1,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build>
</project>
2.2.2 kafka生產(chǎn)者線程方法
package org.example.util;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.*;/*** 向kafka生產(chǎn)數(shù)據(jù)** @author i7楊* @date 2024/01/12 13:02:29*/public class KafkaProducerUtil extends Thread {private String topic;public KafkaProducerUtil(String topic) {super();this.topic = topic;}private static Producer<String, String> createProducer() {// 通過Properties類設(shè)置Producer的屬性Properties properties = new Properties();
// 測試環(huán)境 kafka 配置properties.put("bootstrap.servers", "ip2:9092,ip:9092,ip3:9092");properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");return new KafkaProducer<String, String>(properties);}@Overridepublic void run() {Producer<String, String> producer = createProducer();Random random = new Random();Random random2 = new Random();while (true) {int nums = random.nextInt(10);int nums2 = random.nextInt(50);
// double nums2 = random2.nextDouble();String time = new Date().getTime() / 1000 + 5 + "";String type = "pv";try {if (nums2 % 2 == 0) {type = "pv";} else {type = "uv";}
// String info = "{\"user\":" + nums + ",\"item\":" + nums * 10 + ",\"category\":" + nums2 + ",\"pv\":" + nums2 * 5 + ",\"ts\":\"" + time + "\"}";String info = nums + "=" + nums2;System.out.println("message : " + info);producer.send(new ProducerRecord<String, String>(this.topic, info));} catch (Exception e) {e.printStackTrace();}System.out.println("=========數(shù)據(jù)已經(jīng)寫入==========");try {sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) {new KafkaProducerUtil("test01").run();}public static void sendMessage(String topic, String message) {Producer<String, String> producer = createProducer();producer.send(new ProducerRecord<String, String>(topic, message));}}
2.2.3 自定義基礎(chǔ)函數(shù)
這里自定義了filter和map兩個算子函數(shù),測試邏輯按照數(shù)據(jù)結(jié)構(gòu)變化:
自定義FilterFunction函數(shù)算子:閾值小于40的過濾掉
package org.example.funtion;import org.apache.flink.api.common.functions.FilterFunction;/*** FilterFunction重構(gòu)** @author i7楊* @date 2024/01/12 13:02:29*/public class InfoFilterFunction implements FilterFunction<String> {private double threshold;public InfoFilterFunction(double threshold) {this.threshold = threshold;}@Overridepublic boolean filter(String value) throws Exception {if (value.split("=").length == 2)// 閾值過濾return Double.valueOf(value.split("=")[1]) > threshold;else return false;}
}
自定義MapFunction函數(shù):后綴為2的,添加上特殊信息
package org.example.funtion;import org.apache.flink.api.common.functions.MapFunction;public class ActionMapFunction implements MapFunction<String, String> {@Overridepublic String map(String value) throws Exception {System.out.println("value:" + value);if (value.endsWith("2"))return value.concat(":Special processing information");else return value;}
}
2.2.4 flink任務(wù)代碼
任務(wù)邏輯:使用kafka工具產(chǎn)生數(shù)據(jù),然后監(jiān)控kafka的topic,講幾個函數(shù)串起來,輸出結(jié)果;
package org.example.service;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.example.funtion.ActionMapFunction;
import org.example.funtion.InfoFilterFunction;import java.util.*;public class FlinkTestDemo {public static void main(String[] args) throws Exception {// 設(shè)置執(zhí)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// Kafka 配置Properties kafkaProps = new Properties();kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip1:9092,ip2:9092,ip3:9092");kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group");kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 創(chuàng)建 Kafka 消費者FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("test01",// Kafka 主題名稱new SimpleStringSchema(),kafkaProps);// 從 Kafka 中讀取數(shù)據(jù)流DataStream<String> kafkaStream = env.addSource(kafkaConsumer);env.disableOperatorChaining();kafkaStream.filter(new InfoFilterFunction(40)).map(new ActionMapFunction()).print("閾值大于40以上的message=");// 執(zhí)行任務(wù)env.execute("This is a testing task");}}
運行結(jié)果: