中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

鄭州百度網(wǎng)站優(yōu)化排名百度賬號申訴

鄭州百度網(wǎng)站優(yōu)化排名,百度賬號申訴,網(wǎng)站開發(fā)是做啥的,杭州城鄉(xiāng)建設(shè)委網(wǎng)站目錄 官方API文檔 提交作業(yè)到集群運行 官方示例 環(huán)境 編寫一個 Flink Python Table API 程序 執(zhí)行一個 Flink Python Table API 程序 實例處理Kafka后入庫到Mysql 下載依賴 flink-kafka jar 讀取kafka數(shù)據(jù) 寫入mysql數(shù)據(jù) flink-mysql jar 官方API文檔 https://nigh…

目錄

官方API文檔

提交作業(yè)到集群運行

官方示例

環(huán)境

實例處理Kafka后入庫到Mysql

下載依賴

讀取kafka數(shù)據(jù)

寫入mysql數(shù)據(jù)


官方API文檔

https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/python/overview/

https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/connectors/datastream/kafka/

提交作業(yè)到集群運行

#! /usr/bin/env python
# -*- coding: utf-8 -*-# /opt/test_flink.py
if __name__ == "__main__":print("這是一個簡單的測試用例")

flink 安裝目錄下的 examples 目錄里面已經(jīng)提供了一些測試案例,我們也可以直接拿它來做實驗。

提交至集群

./bin/flink run -py 代碼文件

通過 flink run 即可運行應(yīng)用程序,由于 flink 既可運行 Java 程序、也可以運行 Python 程序,所以這里我們需要指定 -py 參數(shù),表示運行的是 py 文件。但默認情況下解釋器使用的 python2,當(dāng)然如果你終端輸入 python 進入的就是 python3 的話則當(dāng)我沒說,要是我們想指定 flink 使用 python3 解釋器的話,則需要配置一個環(huán)境變量。

export PYFLINK_CLIENT_EXECUTABLE=/usr/bin/python3

下面來測試一下:

./bin/flink run -py /opt/test_flink.py

很明顯結(jié)果是成功的,當(dāng)然這里面沒有涉及到任何與 Flink 有關(guān)的內(nèi)容,只是演示如何提交一個 Python 應(yīng)用程序。當(dāng)然?flink run 是同時支持 Java、Python 等語言的。

不管使用哪種 API 進行編程,最終客戶端都會生成 JobGraph 提交到 JM 上。但畢竟 Flink 的內(nèi)核是采用 Java 語言編寫的,如果 Python 應(yīng)用程序變成 JobGraph 對象被提交到 Flink 集群上運行的話,那么 Python 虛擬機和 Java 虛擬機之間一定有某種方式,使得 Python 可以直接動態(tài)訪問 Java 中的對象、Java 也可以回調(diào) Python 中的對象。沒錯,實現(xiàn)這一點的便是 py4j。

提交單個 py 文件知道怎么做了,但如果該文件還導(dǎo)入了其它文件該怎么辦呢?一個項目中還會涉及到包的存在。其實不管項目里的文件有多少,啟動文件只有一個,只需要把這個啟動文件提交上去即可。舉例說明,當(dāng)然這里仍不涉及具體和 Flink 相關(guān)的內(nèi)容,先把如何提交程序這一步給走通。因為不管編寫的程序多復(fù)雜,提交這一步驟是不會變的。

先來看看編寫的程序:

flink_test 就是主目錄,里面有一個 apps 子目錄和一個 main.py 文件,apps 目錄里面有三個 py 文件,對應(yīng)的內(nèi)容分別如圖所示。然后將其提交到 Flink Standalone 集群上運行,命令和提交單個文件是一樣的

即使是多文件,提交方式也是相似的,輸出結(jié)果表明提交成功了。

官方示例

環(huán)境

  • Java 11
  • Python 3.7, 3.8, 3.9 or 3.10
python -m pip install apache-flink==1.17.1

編寫 Flink Python Table API 程序的第一步是創(chuàng)建?TableEnvironment。這是 Python Table API 作業(yè)的入口類。

t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().set("parallelism.default", "1")

接下來,我們將介紹如何創(chuàng)建源表和結(jié)果表。

t_env.create_temporary_table('source',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).build()).option('path', input_path).format('csv').build())
tab = t_env.from_path('source')t_env.create_temporary_table('sink',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).option('path', output_path).format(FormatDescriptor.for_format('canal-json').build()).build())

你也可以使用?TableEnvironment.execute_sql()?方法,通過 DDL 語句來注冊源表和結(jié)果表:

my_source_ddl = """create table source (word STRING) with ('connector' = 'filesystem','format' = 'csv','path' = '{}')
""".format(input_path)my_sink_ddl = """create table sink (word STRING,`count` BIGINT) with ('connector' = 'filesystem','format' = 'canal-json','path' = '{}')
""".format(output_path)t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)

上面的程序展示了如何創(chuàng)建及注冊表名分別為?source?和?sink?的表。 其中,源表?source?有一列: word,該表代表了從?input_path?所指定的輸入文件中讀取的單詞; 結(jié)果表?sink?有兩列: word 和 count,該表的結(jié)果會輸出到?output_path?所指定的輸出文件中。

接下來,我們介紹如何創(chuàng)建一個作業(yè):該作業(yè)讀取表?source?中的數(shù)據(jù),進行一些變換,然后將結(jié)果寫入表?sink。

最后,需要做的就是啟動 Flink Python Table API 作業(yè)。上面所有的操作,比如創(chuàng)建源表 進行變換以及寫入結(jié)果表的操作都只是構(gòu)建作業(yè)邏輯圖,只有當(dāng)?execute_insert(sink_name)?被調(diào)用的時候, 作業(yè)才會被真正提交到集群或者本地進行執(zhí)行。

@udtf(result_types=[DataTypes.STRING()])
def split(line: Row):for s in line[0].split():yield Row(s)# 計算 word count
tab.flat_map(split).alias('word') \.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert('sink') \.wait()

該教程的完整代碼如下:

import argparse
import logging
import sysfrom pyflink.common import Row
from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,DataTypes, FormatDescriptor)
from pyflink.table.expressions import lit, col
from pyflink.table.udf import udtfword_count_data = ["To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer","The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,","And by opposing end them?--To die,--to sleep,--","No more; and by a sleep to say we end","The heartache, and the thousand natural shocks","That flesh is heir to,--'tis a consummation","Devoutly to be wish'd. To die,--to sleep;--","To sleep! perchance to dream:--ay, there's the rub;","For in that sleep of death what dreams may come,","When we have shuffled off this mortal coil,","Must give us pause: there's the respect","That makes calamity of so long life;","For who would bear the whips and scorns of time,","The oppressor's wrong, the proud man's contumely,","The pangs of despis'd love, the law's delay,","The insolence of office, and the spurns","That patient merit of the unworthy takes,","When he himself might his quietus make","With a bare bodkin? who would these fardels bear,","To grunt and sweat under a weary life,","But that the dread of something after death,--","The undiscover'd country, from whose bourn","No traveller returns,--puzzles the will,","And makes us rather bear those ills we have","Than fly to others that we know not of?","Thus conscience does make cowards of us all;","And thus the native hue of resolution","Is sicklied o'er with the pale cast of thought;","And enterprises of great pith and moment,","With this regard, their currents turn awry,","And lose the name of action.--Soft you now!","The fair Ophelia!--Nymph, in thy orisons","Be all my sins remember'd."]def word_count(input_path, output_path):t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())# write all the data to one filet_env.get_config().set("parallelism.default", "1")# define the sourceif input_path is not None:t_env.create_temporary_table('source',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).build()).option('path', input_path).format('csv').build())tab = t_env.from_path('source')else:print("Executing word_count example with default input data set.")print("Use --input to specify file input.")tab = t_env.from_elements(map(lambda i: (i,), word_count_data),DataTypes.ROW([DataTypes.FIELD('line', DataTypes.STRING())]))# define the sinkif output_path is not None:t_env.create_temporary_table('sink',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).option('path', output_path).format(FormatDescriptor.for_format('canal-json').build()).build())else:print("Printing result to stdout. Use --output to specify output path.")t_env.create_temporary_table('sink',TableDescriptor.for_connector('print').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).build())@udtf(result_types=[DataTypes.STRING()])def split(line: Row):for s in line[0].split():yield Row(s)# compute word counttab.flat_map(split).alias('word') \.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert('sink') \.wait()# remove .wait if submitting to a remote cluster, refer to# https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster# for more detailsif __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")parser = argparse.ArgumentParser()parser.add_argument('--input',dest='input',required=False,help='Input file to process.')parser.add_argument('--output',dest='output',required=False,help='Output file to write results to.')argv = sys.argv[1:]known_args, _ = parser.parse_known_args(argv)word_count(known_args.input, known_args.output)

接下來,可以在命令行中運行作業(yè)(假設(shè)作業(yè)名為 word_count.py):

python word_count.py

上述命令會構(gòu)建 Python Table API 程序,并在本地 mini cluster 中運行。如果想將作業(yè)提交到遠端集群執(zhí)行, 可以參考作業(yè)提交示例。

最后,你可以得到如下運行結(jié)果:

實例處理Kafka后入庫到Mysql

下載依賴

wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.1/flink-sql-connector-kafka-1.17.1.jar

讀取kafka數(shù)據(jù)

#! /usr/bin/env python
# -*- coding: utf-8 -*-import sys
import loggingfrom pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import WatermarkStrategy, Types
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializerfrom pyflink.common import Row
from pyflink.datastream import FlatMapFunctiondef read_kafka():env = StreamExecutionEnvironment.get_execution_environment()env.add_jars("file:///D:/安技匯/運營平臺/DataManage/flink-sql-connector-kafka-1.17.1.jar")source = KafkaSource.builder() \.set_bootstrap_servers("172.16.12.128:9092") \.set_topics("test") \.set_group_id("my-group") \.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \.set_value_only_deserializer(SimpleStringSchema()) \.build()# 從消費組提交的位點開始消費,不指定位點重置策略#.set_starting_offsets(KafkaOffsetsInitializer.committed_offsets()) \# 從消費組提交的位點開始消費,如果提交位點不存在,使用最早位點#.set_starting_offsets(KafkaOffsetsInitializer.committed_offsets(KafkaOffsetResetStrategy.EARLIEST)) \# 從時間戳大于等于指定時間戳(毫秒)的數(shù)據(jù)開始消費#.set_starting_offsets(KafkaOffsetsInitializer.timestamp(1657256176000)) \# 從最早位點開始消費#.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \# 從最末尾位點開始消費#.set_starting_offsets(KafkaOffsetsInitializer.latest()) \#.set_property("partition.discovery.interval.ms", "10000")  # 每 10 秒檢查一次新分區(qū)#.set_property("security.protocol", "SASL_PLAINTEXT") \#.set_property("sasl.mechanism", "PLAIN") \#.set_property("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")kafka_stream = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")kafka_stream.print()env.execute("Source")if __name__ == "__main__":logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")read_kafka()

寫入mysql數(shù)據(jù)

沒通,待補充。。

http://www.risenshineclean.com/news/57166.html

相關(guān)文章:

  • 如何建設(shè)網(wǎng)站效果好百度發(fā)布信息的免費平臺
  • 深圳市多語言網(wǎng)站建設(shè)公司廣州百度關(guān)鍵詞排名
  • 人才網(wǎng)站的會計賬如何做網(wǎng)絡(luò)推廣渠道公司
  • 美女做暖暖視頻免費網(wǎng)站網(wǎng)絡(luò)營銷圖片素材
  • 供應(yīng)邯鄲專業(yè)做網(wǎng)站網(wǎng)站seo入門基礎(chǔ)教程書籍
  • 南昌網(wǎng)站建設(shè)哪家好哈爾濱seo推廣
  • 軟件首頁設(shè)計圖微博seo營銷
  • 運營最好的網(wǎng)站西安網(wǎng)站關(guān)鍵詞優(yōu)化推薦
  • 南京網(wǎng)站設(shè)計公司推薦企業(yè)策劃推廣公司
  • 動態(tài)網(wǎng)站開發(fā)教案網(wǎng)頁模版
  • 武漢網(wǎng)站設(shè)計廠家網(wǎng)頁版百度云
  • 對政府網(wǎng)站有關(guān)標(biāo)準(zhǔn)規(guī)范建設(shè)的需求中國進入一級戰(zhàn)備狀態(tài)了嗎
  • 如何做營銷型單頁網(wǎng)站十大營銷模式
  • 網(wǎng)站一級導(dǎo)航怎么做在線seo診斷
  • 如何選擇網(wǎng)站開發(fā)語言seo監(jiān)控
  • 支付網(wǎng)站開發(fā)費可以做無形資產(chǎn)2345王牌瀏覽器
  • 做裝修廣告網(wǎng)站好百度網(wǎng)盟
  • 網(wǎng)站審查備案萬網(wǎng)的app叫什么
  • 用花生棒做網(wǎng)站快嗎百度seo公司哪家最好
  • 射陽做企業(yè)網(wǎng)站多少錢百度地圖優(yōu)化
  • wordpress 5.1.1主題網(wǎng)絡(luò)搜索引擎優(yōu)化
  • 公司網(wǎng)站服務(wù)器租用營銷策劃主要做些什么
  • 做網(wǎng)站包括哪些網(wǎng)絡(luò)科技有限公司
  • 企業(yè)網(wǎng)站建設(shè)方案教程網(wǎng)站批量收錄
  • 地方門戶東莞seo建站推廣費用
  • 做網(wǎng)站好用的軟件seo搜索引擎優(yōu)化薪資
  • phpstudy 網(wǎng)站空白百度怎么推廣廣告
  • 網(wǎng)絡(luò)公司經(jīng)營范圍能寫建材嗎鄭州網(wǎng)站優(yōu)化seo
  • 碼云可以做博客網(wǎng)站嗎百度客服投訴中心
  • 大連科技網(wǎng)站制作網(wǎng)絡(luò)營銷的常用工具