中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當前位置: 首頁 > news >正文

網(wǎng)站開發(fā)人員的水平滕州今日頭條新聞

網(wǎng)站開發(fā)人員的水平,滕州今日頭條新聞,網(wǎng)站制作公司業(yè)務(wù)發(fā)展方案,網(wǎng)絡(luò)規(guī)劃設(shè)計師招聘前言: 這是一個Flink自定義開發(fā)的基礎(chǔ)教學。本文將通過flink的DataStream模塊API,以kafka為數(shù)據(jù)源,構(gòu)建一個基礎(chǔ)測試環(huán)境;包含一個kafka生產(chǎn)者線程工具,一個自定義FilterFunction算子,一個自定義MapFunctio…

前言:

? ? ? ?這是一個Flink自定義開發(fā)的基礎(chǔ)教學。本文將通過flink的DataStream模塊API,以kafka為數(shù)據(jù)源,構(gòu)建一個基礎(chǔ)測試環(huán)境;包含一個kafka生產(chǎn)者線程工具,一個自定義FilterFunction算子,一個自定義MapFunction算子,用一個flink任務(wù)的代碼邏輯,將實時讀kafka并多層處理串起來;讓讀者體會通過Flink構(gòu)建自定義函數(shù)的技巧。

一、Flink的開發(fā)模塊分析

Flink提供四個基礎(chǔ)模塊:核心SDK開發(fā)API分別是處理實時計算的DataStream和處理離線計算的DataSet;基于這兩個SDK,在其上包裝了TableAPI開發(fā)模塊的SDK;在Table API之上,定義了高度抽象可用SQL開發(fā)任務(wù)的FlinkSQL。在核心開發(fā)API之下,還有基礎(chǔ)API的接口,可用于對時間,狀態(tài),算子等最細粒度的特性對象做操作,如包裝自定義算子的ProcessWindowFunction和ProcessFunction等基礎(chǔ)函數(shù)以及內(nèi)置的對象狀態(tài)StateTtlConfig;

FLINK開發(fā)API關(guān)系結(jié)構(gòu)如下:

二、定制化開發(fā)Demo演示

2.1 場景介紹

Flink實時任務(wù)的的通用技術(shù)架構(gòu)是消息隊列中間件+Flink任務(wù):

將數(shù)據(jù)采集到Kafka或pulser這類隊列中間件的Topic,然后使用Flink內(nèi)置的kafkaSource,監(jiān)控Topic的數(shù)據(jù)情況,做實時處理。

  1. 這里提供一個kafka的生產(chǎn)者線程,可以自定義構(gòu)建需要的數(shù)據(jù)和上傳時間,用于控制寫入kafka的數(shù)據(jù)源;
  2. 重寫兩個DataStream的基礎(chǔ)算子:FilterFunction和MapFunction,用于讓讀者體會,如何對FLINK函數(shù)的重新包裝,后續(xù)更基礎(chǔ)的函數(shù)原理一樣;我這里用String數(shù)據(jù)對象做處理,減少對象轉(zhuǎn)換的SDK引入,通常要基于業(yè)務(wù)做數(shù)據(jù)polo的加工,這個自己處理,將對象換成業(yè)務(wù)對象;
  3. 然后使用Flink將整個業(yè)務(wù)串起來,從kafka讀數(shù)據(jù),經(jīng)過兩層處理,最終輸出需要的結(jié)果;

2.2 本地demo演示

2.2.1 pom文件

這里以flink1.14.6+scala1.12版本為例:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.example</groupId><artifactId>flinkCDC</artifactId><version>1.0-SNAPSHOT</version></parent><artifactId>flinkStream</artifactId><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink-version>1.14.6</flink-version><scala-version>2.12</scala-version><hadop-common-version>2.9.1</hadop-common-version><elasticsearch.version>7.6.2</elasticsearch.version><target.java.version>1.8</target.java.version><scala.binary.version>2.12</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.17.1</log4j.version></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots></snapshots></repository></repositories><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink-version}</version><!--            <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala-version}</artifactId><version>${flink-version}</version><!--            <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala-version}</artifactId><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions><version>${flink-version}</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${target.java.version}</source><target>${target.java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><createDependencyReducedPom>false</createDependencyReducedPom><artifactSet><excludes><exclude>org.apache.flink:flink-shaded-force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>myflinkml.DataStreamJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.1.1,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build>
</project>
2.2.2 kafka生產(chǎn)者線程方法

package org.example.util;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.*;/*** 向kafka生產(chǎn)數(shù)據(jù)** @author i7楊* @date 2024/01/12 13:02:29*/public class KafkaProducerUtil extends Thread {private String topic;public KafkaProducerUtil(String topic) {super();this.topic = topic;}private static Producer<String, String> createProducer() {// 通過Properties類設(shè)置Producer的屬性Properties properties = new Properties();
//        測試環(huán)境 kafka 配置properties.put("bootstrap.servers", "ip2:9092,ip:9092,ip3:9092");properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");return new KafkaProducer<String, String>(properties);}@Overridepublic void run() {Producer<String, String> producer = createProducer();Random random = new Random();Random random2 = new Random();while (true) {int nums = random.nextInt(10);int nums2 = random.nextInt(50);
//            double nums2 = random2.nextDouble();String time = new Date().getTime() / 1000 + 5 + "";String type = "pv";try {if (nums2 % 2 == 0) {type = "pv";} else {type = "uv";}
//                String info = "{\"user\":" + nums + ",\"item\":" + nums * 10 + ",\"category\":" + nums2 + ",\"pv\":" + nums2 * 5 + ",\"ts\":\"" + time + "\"}";String info = nums + "=" + nums2;System.out.println("message : " + info);producer.send(new ProducerRecord<String, String>(this.topic, info));} catch (Exception e) {e.printStackTrace();}System.out.println("=========數(shù)據(jù)已經(jīng)寫入==========");try {sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) {new KafkaProducerUtil("test01").run();}public static void sendMessage(String topic, String message) {Producer<String, String> producer = createProducer();producer.send(new ProducerRecord<String, String>(topic, message));}}
2.2.3 自定義基礎(chǔ)函數(shù)

這里自定義了filter和map兩個算子函數(shù),測試邏輯按照數(shù)據(jù)結(jié)構(gòu)變化:

自定義FilterFunction函數(shù)算子:閾值小于40的過濾掉

package org.example.funtion;import org.apache.flink.api.common.functions.FilterFunction;/*** FilterFunction重構(gòu)** @author i7楊* @date 2024/01/12 13:02:29*/public class InfoFilterFunction implements FilterFunction<String> {private double threshold;public InfoFilterFunction(double threshold) {this.threshold = threshold;}@Overridepublic boolean filter(String value) throws Exception {if (value.split("=").length == 2)// 閾值過濾return Double.valueOf(value.split("=")[1]) > threshold;else return false;}
}

自定義MapFunction函數(shù):后綴為2的,添加上特殊信息

package org.example.funtion;import org.apache.flink.api.common.functions.MapFunction;public class ActionMapFunction implements MapFunction<String, String> {@Overridepublic String map(String value) throws Exception {System.out.println("value:" + value);if (value.endsWith("2"))return value.concat(":Special processing information");else return value;}
}
2.2.4 flink任務(wù)代碼

任務(wù)邏輯:使用kafka工具產(chǎn)生數(shù)據(jù),然后監(jiān)控kafka的topic,講幾個函數(shù)串起來,輸出結(jié)果;

package org.example.service;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.example.funtion.ActionMapFunction;
import org.example.funtion.InfoFilterFunction;import java.util.*;public class FlinkTestDemo {public static void main(String[] args) throws Exception {// 設(shè)置執(zhí)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// Kafka 配置Properties kafkaProps = new Properties();kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip1:9092,ip2:9092,ip3:9092");kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group");kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 創(chuàng)建 Kafka 消費者FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("test01",// Kafka 主題名稱new SimpleStringSchema(),kafkaProps);// 從 Kafka 中讀取數(shù)據(jù)流DataStream<String> kafkaStream = env.addSource(kafkaConsumer);env.disableOperatorChaining();kafkaStream.filter(new InfoFilterFunction(40)).map(new ActionMapFunction()).print("閾值大于40以上的message=");// 執(zhí)行任務(wù)env.execute("This is a testing task");}}

運行結(jié)果:

http://www.risenshineclean.com/news/55003.html

相關(guān)文章:

  • wordpress 插件 文本合肥seo網(wǎng)站排名
  • 陽谷網(wǎng)站建設(shè)價格淘寶關(guān)鍵詞優(yōu)化軟件
  • 微信二維碼烏魯木齊seo
  • 百度免費做網(wǎng)站友情鏈接格式
  • 請簡述網(wǎng)站開發(fā)的流程圖南寧百度seo排名
  • 泉州專業(yè)做網(wǎng)站免費推廣引流app
  • 商城網(wǎng)站建設(shè)怎么收費企業(yè)網(wǎng)絡(luò)規(guī)劃設(shè)計方案
  • 手機app微信網(wǎng)站品牌策劃運營公司
  • 店鋪推廣方法網(wǎng)站優(yōu)化排名軟件網(wǎng)站
  • 東營做網(wǎng)站seo5118營銷大數(shù)據(jù)
  • 游戲釣魚網(wǎng)站怎么做seo高級
  • dw怎么做自我展示網(wǎng)站類似凡科建站的平臺
  • 濟寧網(wǎng)站制作發(fā)布信息的免費平臺有哪些
  • 國外搜索引擎網(wǎng)址網(wǎng)站推廣優(yōu)化怎么做最好
  • 龍華建站公司b站視頻推廣網(wǎng)站2023年
  • 深圳微信網(wǎng)站建設(shè)公司網(wǎng)絡(luò)營銷專業(yè)就業(yè)方向
  • 做外貿(mào)哪幾個網(wǎng)站好如何做網(wǎng)站推廣私人
  • 網(wǎng)站公司哪家好哪家網(wǎng)站推廣好
  • 網(wǎng)站開發(fā)應(yīng)用技術(shù)專業(yè)一站式網(wǎng)站設(shè)計
  • 做網(wǎng)站的圖片新網(wǎng)站百度多久收錄
  • 國外購物獨立網(wǎng)站建設(shè)全國人大常委會
  • 麗水網(wǎng)站建設(shè)拉新推廣一手接單平臺
  • 西安網(wǎng)站開發(fā)百度圖片查找
  • 保定設(shè)計網(wǎng)站建設(shè)網(wǎng)址查詢服務(wù)器地址
  • 企業(yè)網(wǎng)站服務(wù)器租用平臺優(yōu)化
  • 網(wǎng)站標簽怎么做嘉興百度快照優(yōu)化排名
  • 有什么可以做建筑模型的網(wǎng)站成都官網(wǎng)seo費用
  • 裝飾行業(yè)網(wǎng)站建設(shè)企業(yè)qq
  • 山東省建設(shè)建設(shè)監(jiān)理協(xié)會網(wǎng)站長沙正規(guī)競價優(yōu)化推薦
  • 網(wǎng)站行業(yè)新聞怎么做百度快照搜索