鄭州百度網(wǎng)站優(yōu)化排名百度賬號申訴
目錄
官方API文檔
提交作業(yè)到集群運行
官方示例
環(huán)境
編寫一個 Flink Python Table API 程序
執(zhí)行一個 Flink Python Table API 程序
實例處理Kafka后入庫到Mysql
下載依賴
flink-kafka jar
讀取kafka數(shù)據(jù)
寫入mysql數(shù)據(jù)
flink-mysql jar
官方API文檔
https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/python/overview/
https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/connectors/datastream/kafka/
提交作業(yè)到集群運行
#! /usr/bin/env python
# -*- coding: utf-8 -*-# /opt/test_flink.py
if __name__ == "__main__":print("這是一個簡單的測試用例")
flink 安裝目錄下的 examples 目錄里面已經(jīng)提供了一些測試案例,我們也可以直接拿它來做實驗。
提交至集群
./bin/flink run -py 代碼文件
通過 flink run 即可運行應(yīng)用程序,由于 flink 既可運行 Java 程序、也可以運行 Python 程序,所以這里我們需要指定 -py 參數(shù),表示運行的是 py 文件。但默認情況下解釋器使用的 python2,當(dāng)然如果你終端輸入 python 進入的就是 python3 的話則當(dāng)我沒說,要是我們想指定 flink 使用 python3 解釋器的話,則需要配置一個環(huán)境變量。
export PYFLINK_CLIENT_EXECUTABLE=/usr/bin/python3
下面來測試一下:
./bin/flink run -py /opt/test_flink.py
很明顯結(jié)果是成功的,當(dāng)然這里面沒有涉及到任何與 Flink 有關(guān)的內(nèi)容,只是演示如何提交一個 Python 應(yīng)用程序。當(dāng)然?flink run 是同時支持 Java、Python 等語言的。
不管使用哪種 API 進行編程,最終客戶端都會生成 JobGraph 提交到 JM 上。但畢竟 Flink 的內(nèi)核是采用 Java 語言編寫的,如果 Python 應(yīng)用程序變成 JobGraph 對象被提交到 Flink 集群上運行的話,那么 Python 虛擬機和 Java 虛擬機之間一定有某種方式,使得 Python 可以直接動態(tài)訪問 Java 中的對象、Java 也可以回調(diào) Python 中的對象。沒錯,實現(xiàn)這一點的便是 py4j。
提交單個 py 文件知道怎么做了,但如果該文件還導(dǎo)入了其它文件該怎么辦呢?一個項目中還會涉及到包的存在。其實不管項目里的文件有多少,啟動文件只有一個,只需要把這個啟動文件提交上去即可。舉例說明,當(dāng)然這里仍不涉及具體和 Flink 相關(guān)的內(nèi)容,先把如何提交程序這一步給走通。因為不管編寫的程序多復(fù)雜,提交這一步驟是不會變的。
先來看看編寫的程序:
flink_test 就是主目錄,里面有一個 apps 子目錄和一個 main.py 文件,apps 目錄里面有三個 py 文件,對應(yīng)的內(nèi)容分別如圖所示。然后將其提交到 Flink Standalone 集群上運行,命令和提交單個文件是一樣的:
即使是多文件,提交方式也是相似的,輸出結(jié)果表明提交成功了。
官方示例
環(huán)境
- Java 11
- Python 3.7, 3.8, 3.9 or 3.10
python -m pip install apache-flink==1.17.1
編寫一個 Flink Python Table API 程序
編寫 Flink Python Table API 程序的第一步是創(chuàng)建?TableEnvironment
。這是 Python Table API 作業(yè)的入口類。
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().set("parallelism.default", "1")
接下來,我們將介紹如何創(chuàng)建源表和結(jié)果表。
t_env.create_temporary_table('source',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).build()).option('path', input_path).format('csv').build())
tab = t_env.from_path('source')t_env.create_temporary_table('sink',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).option('path', output_path).format(FormatDescriptor.for_format('canal-json').build()).build())
你也可以使用?TableEnvironment.execute_sql()
?方法,通過 DDL 語句來注冊源表和結(jié)果表:
my_source_ddl = """create table source (word STRING) with ('connector' = 'filesystem','format' = 'csv','path' = '{}')
""".format(input_path)my_sink_ddl = """create table sink (word STRING,`count` BIGINT) with ('connector' = 'filesystem','format' = 'canal-json','path' = '{}')
""".format(output_path)t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)
上面的程序展示了如何創(chuàng)建及注冊表名分別為?source
?和?sink
?的表。 其中,源表?source
?有一列: word,該表代表了從?input_path
?所指定的輸入文件中讀取的單詞; 結(jié)果表?sink
?有兩列: word 和 count,該表的結(jié)果會輸出到?output_path
?所指定的輸出文件中。
接下來,我們介紹如何創(chuàng)建一個作業(yè):該作業(yè)讀取表?source
?中的數(shù)據(jù),進行一些變換,然后將結(jié)果寫入表?sink
。
最后,需要做的就是啟動 Flink Python Table API 作業(yè)。上面所有的操作,比如創(chuàng)建源表 進行變換以及寫入結(jié)果表的操作都只是構(gòu)建作業(yè)邏輯圖,只有當(dāng)?execute_insert(sink_name)
?被調(diào)用的時候, 作業(yè)才會被真正提交到集群或者本地進行執(zhí)行。
@udtf(result_types=[DataTypes.STRING()])
def split(line: Row):for s in line[0].split():yield Row(s)# 計算 word count
tab.flat_map(split).alias('word') \.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert('sink') \.wait()
該教程的完整代碼如下:
import argparse
import logging
import sysfrom pyflink.common import Row
from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,DataTypes, FormatDescriptor)
from pyflink.table.expressions import lit, col
from pyflink.table.udf import udtfword_count_data = ["To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer","The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,","And by opposing end them?--To die,--to sleep,--","No more; and by a sleep to say we end","The heartache, and the thousand natural shocks","That flesh is heir to,--'tis a consummation","Devoutly to be wish'd. To die,--to sleep;--","To sleep! perchance to dream:--ay, there's the rub;","For in that sleep of death what dreams may come,","When we have shuffled off this mortal coil,","Must give us pause: there's the respect","That makes calamity of so long life;","For who would bear the whips and scorns of time,","The oppressor's wrong, the proud man's contumely,","The pangs of despis'd love, the law's delay,","The insolence of office, and the spurns","That patient merit of the unworthy takes,","When he himself might his quietus make","With a bare bodkin? who would these fardels bear,","To grunt and sweat under a weary life,","But that the dread of something after death,--","The undiscover'd country, from whose bourn","No traveller returns,--puzzles the will,","And makes us rather bear those ills we have","Than fly to others that we know not of?","Thus conscience does make cowards of us all;","And thus the native hue of resolution","Is sicklied o'er with the pale cast of thought;","And enterprises of great pith and moment,","With this regard, their currents turn awry,","And lose the name of action.--Soft you now!","The fair Ophelia!--Nymph, in thy orisons","Be all my sins remember'd."]def word_count(input_path, output_path):t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())# write all the data to one filet_env.get_config().set("parallelism.default", "1")# define the sourceif input_path is not None:t_env.create_temporary_table('source',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).build()).option('path', input_path).format('csv').build())tab = t_env.from_path('source')else:print("Executing word_count example with default input data set.")print("Use --input to specify file input.")tab = t_env.from_elements(map(lambda i: (i,), word_count_data),DataTypes.ROW([DataTypes.FIELD('line', DataTypes.STRING())]))# define the sinkif output_path is not None:t_env.create_temporary_table('sink',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).option('path', output_path).format(FormatDescriptor.for_format('canal-json').build()).build())else:print("Printing result to stdout. Use --output to specify output path.")t_env.create_temporary_table('sink',TableDescriptor.for_connector('print').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).build())@udtf(result_types=[DataTypes.STRING()])def split(line: Row):for s in line[0].split():yield Row(s)# compute word counttab.flat_map(split).alias('word') \.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert('sink') \.wait()# remove .wait if submitting to a remote cluster, refer to# https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster# for more detailsif __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")parser = argparse.ArgumentParser()parser.add_argument('--input',dest='input',required=False,help='Input file to process.')parser.add_argument('--output',dest='output',required=False,help='Output file to write results to.')argv = sys.argv[1:]known_args, _ = parser.parse_known_args(argv)word_count(known_args.input, known_args.output)
執(zhí)行一個 Flink Python Table API 程序
接下來,可以在命令行中運行作業(yè)(假設(shè)作業(yè)名為 word_count.py):
python word_count.py
上述命令會構(gòu)建 Python Table API 程序,并在本地 mini cluster 中運行。如果想將作業(yè)提交到遠端集群執(zhí)行, 可以參考作業(yè)提交示例。
最后,你可以得到如下運行結(jié)果:
實例處理Kafka后入庫到Mysql
下載依賴
flink-kafka jar
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.1/flink-sql-connector-kafka-1.17.1.jar
讀取kafka數(shù)據(jù)
#! /usr/bin/env python
# -*- coding: utf-8 -*-import sys
import loggingfrom pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import WatermarkStrategy, Types
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializerfrom pyflink.common import Row
from pyflink.datastream import FlatMapFunctiondef read_kafka():env = StreamExecutionEnvironment.get_execution_environment()env.add_jars("file:///D:/安技匯/運營平臺/DataManage/flink-sql-connector-kafka-1.17.1.jar")source = KafkaSource.builder() \.set_bootstrap_servers("172.16.12.128:9092") \.set_topics("test") \.set_group_id("my-group") \.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \.set_value_only_deserializer(SimpleStringSchema()) \.build()# 從消費組提交的位點開始消費,不指定位點重置策略#.set_starting_offsets(KafkaOffsetsInitializer.committed_offsets()) \# 從消費組提交的位點開始消費,如果提交位點不存在,使用最早位點#.set_starting_offsets(KafkaOffsetsInitializer.committed_offsets(KafkaOffsetResetStrategy.EARLIEST)) \# 從時間戳大于等于指定時間戳(毫秒)的數(shù)據(jù)開始消費#.set_starting_offsets(KafkaOffsetsInitializer.timestamp(1657256176000)) \# 從最早位點開始消費#.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \# 從最末尾位點開始消費#.set_starting_offsets(KafkaOffsetsInitializer.latest()) \#.set_property("partition.discovery.interval.ms", "10000") # 每 10 秒檢查一次新分區(qū)#.set_property("security.protocol", "SASL_PLAINTEXT") \#.set_property("sasl.mechanism", "PLAIN") \#.set_property("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")kafka_stream = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")kafka_stream.print()env.execute("Source")if __name__ == "__main__":logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")read_kafka()
寫入mysql數(shù)據(jù)
flink-mysql jar
沒通,待補充。。