wordpress實(shí)現(xiàn)pdf瀏覽seo網(wǎng)絡(luò)推廣機(jī)構(gòu)
Flink
提供了豐富的客戶端操作來(lái)提交任務(wù)和與任務(wù)進(jìn)行交互。下面主要從Flink
命令行、Scala Shell
、SQL Client
、Restful API
和 Web
五個(gè)方面進(jìn)行整理。
在Flink
安裝目錄的bin
目錄下可以看到flink
,start-scala-shell.sh
和sql-client.sh
等文件,這些都是客戶端操作的入口。
flink 常見(jiàn)操作:可以通過(guò) -help 查看幫助
run 運(yùn)行任務(wù)
-d
:以分離模式運(yùn)行作業(yè)
-c
:如果沒(méi)有在jar
包中指定入口類,則需要在這里通過(guò)這個(gè)參數(shù)指定;
-m
:指定需要連接的jobmanager
(主節(jié)點(diǎn))地址,使用這個(gè)參數(shù)可以指定一個(gè)不同于配置文件中的jobmanager
,可以說(shuō)是yarn
集群名稱;
-p
:指定程序的并行度??梢愿采w配置文件中的默認(rèn)值;
-s
:保存點(diǎn)savepoint
的路徑以還原作業(yè)來(lái)自(例如hdfs:///flink/savepoint-1537
);
[root@hadoop1 flink-1.10.1]# bin/flink run -d examples/streaming/TopSpeedWindowing.jar
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID dce7b69ad15e8756766967c46122736f
就可以看到我們提交的JobManager
,默認(rèn)是一個(gè)并發(fā)。
點(diǎn)進(jìn)去就可以看到詳細(xì)的信息
點(diǎn)擊左側(cè)TaskManager —Stdout
能看到具體輸出的日志信息。
或者查看TaskManager
節(jié)點(diǎn)的log
目錄下的*.out
文件,也能看到具體的輸出信息。
list 查看任務(wù)列表
-m
:jobmanager<arg>
作業(yè)管理器(主)的地址連接。
[root@hadoop1 flink-1.10.1]# bin/flink list -m 127.0.0.1:8081
Waiting for response...
------------------ Running/Restarting Jobs -------------------
09.07.2020 16:44:09 : dce7b69ad15e8756766967c46122736f : CarTopSpeedWindowingExample (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
Stop 停止任務(wù)
需要指定jobmanager
的ip:prot
和jobId
。如下報(bào)錯(cuò)可知,一個(gè)job
能夠被stop
要求所有的source
都是可以stoppable
的,即實(shí)現(xiàn)了 StoppableFunction
接口。
[root@hadoop1 flink-1.10.1]# bin/flink stop -m 127.0.0.1:8081 dce7b69ad15e8756766967c46122736f
Suspending job "dce7b69ad15e8756766967c46122736f" with a savepoint.------------------------------------------------------------The program finished with the following exception:org.apache.flink.util.FlinkException: Could not stop with a savepoint job "dce7b69ad15e8756766967c46122736f".at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:458)
StoppableFunction
接口如下,屬于優(yōu)雅停止任務(wù)。
/*** @Description 需要 stoppabel 的函數(shù)必須實(shí)現(xiàn)此接口,例如流式任務(wù) source** stop() 方法在任務(wù)收到 stop信號(hào)的時(shí)候調(diào)用* source 在接收到這個(gè)信號(hào)后,必須停止發(fā)送新的數(shù)據(jù)優(yōu)雅的停止。* @Date 2020/7/9 17:26*/@PublicEvolvingpublic interface StoppableFunction {/*** 停止 source,與 cancel() 不同的是,這是一個(gè)讓 source優(yōu)雅停止的請(qǐng)求。* 等待中的數(shù)據(jù)可以繼續(xù)發(fā)送出去,不需要立即停止*/void stop();
}
Cancel 取消任務(wù)
如果在conf/flink-conf.yaml
里面配置state.savepoints.dir
,會(huì)保存savepoint
,否則不會(huì)保存savepoint
。(重啟)
state.savepoints.dir: file:///tmp/savepoint
執(zhí)行 Cancel
命令 取消任務(wù)
[root@hadoop1 flink-1.10.1]# bin/flink cancel -m 127.0.0.1:8081 -s e8ce0d111262c52bf8228d5722742d47
DEPRECATION WARNING: Cancelling a job with savepoint is deprecated. Use "stop" instead.
Cancelling job e8ce0d111262c52bf8228d5722742d47 with savepoint to default savepoint directory.
Cancelled job e8ce0d111262c52bf8228d5722742d47. Savepoint stored in file:/tmp/savepoint/savepoint-e8ce0d-f7fa96a085d8.
也可以在停止的時(shí)候顯示指定savepoint
目錄
1 [root@hadoop1 flink-1.10.1]# bin/flink cancel -m 127.0.0.1:8081 -s /tmp/savepoint f58bb4c49ee5580ab5f27fdb24083353
DEPRECATION WARNING: Cancelling a job with savepoint is deprecated. Use "stop" instead.
Cancelling job f58bb4c49ee5580ab5f27fdb24083353 with savepoint to /tmp/savepoint.
Cancelled job f58bb4c49ee5580ab5f27fdb24083353. Savepoint stored in file:/tmp/savepoint/savepoint-f58bb4-127b7e84910e.
取消和停止(流作業(yè))的區(qū)別如下:
● cancel()
調(diào)用, 立即調(diào)用作業(yè)算子的cancel()
方法,以盡快取消它們。如果算子在接到cancel()
調(diào)用后沒(méi)有停止,Flink
將開(kāi)始定期中斷算子線程的執(zhí)行,直到所有算子停止為止。
● stop()
調(diào)用 ,是更優(yōu)雅的停止正在運(yùn)行流作業(yè)的方式。stop()
僅適用于source
實(shí)現(xiàn)了StoppableFunction
接口的作業(yè)。當(dāng)用戶請(qǐng)求停止作業(yè)時(shí),作業(yè)的所有source
都將接收stop()
方法調(diào)用。直到所有source
正常關(guān)閉時(shí),作業(yè)才會(huì)正常結(jié)束。這種方式,使 作業(yè)正常處理完所有作業(yè)。
觸發(fā) savepoint
當(dāng)需要生成savepoint
文件時(shí),需要手動(dòng)觸發(fā)savepoint
。如下,需要指定正在運(yùn)行的 JobID 和生成文件的存放目錄。同時(shí),我們也可以看到它會(huì)返回給用戶存放的savepoint
的文件名稱等信息。
[root@hadoop1 flink-1.10.1]# bin/flink run -d examples/streaming/TopSpeedWindowing.jar Executing TopSpeedWindowing example with default input data set.Use --input to specify file input.Printing result to stdout. Use --output to specify output path.Job has been submitted with JobID 216c427d63e3754eb757d2cc268a448d[root@hadoop1 flink-1.10.1]# bin/flink savepoint -m 127.0.0.1:8081 216c427d63e3754eb757d2cc268a448d /tmp/savepoint/Triggering savepoint for job 216c427d63e3754eb757d2cc268a448d.Waiting for response...Savepoint completed. Path: file:/tmp/savepoint/savepoint-216c42-154a34cf6bfdYou can resume your program from this savepoint with the run command.
savepoint
和checkpoint
的區(qū)別:
● checkpoint
是增量做的,每次的時(shí)間較短,數(shù)據(jù)量較小,只要在程序里面啟用后會(huì)自動(dòng)觸發(fā),用戶無(wú)須感知;savepoint
是全量做的,每次的時(shí)間較長(zhǎng),數(shù)據(jù)量較大,需要用戶主動(dòng)去觸發(fā)。
● checkpoint
是作業(yè)failover
的時(shí)候自動(dòng)使用,不需要用戶指定。savepoint
一般用于程序的版本更新,bug
修復(fù),A/B Test
等場(chǎng)景,需要用戶指定。
從指定 savepoint 中啟動(dòng)
[root@hadoop1 flink-1.10.1]# bin/flink run -d -s /tmp/savepoint/savepoint-f58bb4-127b7e84910e/ examples/streaming/TopSpeedWindowing.jar
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 1a5c5ce279e0e4bd8609f541b37652e2
查看JobManager
的日志能夠看到Reset the checkpoint ID
為我們指定的savepoint
文件中的ID
modify 修改任務(wù)并行度
這里修改master
的conf/flink-conf.yaml
將task slot
數(shù)修改為4
。并通過(guò)xsync
分發(fā)到 兩個(gè)slave
節(jié)點(diǎn)上。
taskmanager.numberOfTaskSlots: 4
修改參數(shù)后需要重啟集群生效:關(guān)閉/啟動(dòng)集群
[root@hadoop1 flink-1.10.1]# bin/stop-cluster.sh && bin/start-cluster.sh
Stopping taskexecutor daemon (pid: 8236) on host hadoop2.
Stopping taskexecutor daemon (pid: 8141) on host hadoop3.
Stopping standalonesession daemon (pid: 22633) on host hadoop1.
Starting cluster.
Starting standalonesession daemon on host hadoop1.
Starting taskexecutor daemon on host hadoop2.
Starting taskexecutor daemon on host hadoop3.
啟動(dòng)任務(wù)
[root@hadoop1 flink-1.10.1]# bin/flink run -d examples/streaming/TopSpeedWindowing.jar
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 2e833a438da7d8052f14d5433910515a
從頁(yè)面上能看到Task Slots
總計(jì)變?yōu)榱?code>8,運(yùn)行的Slot
為1
,剩余Slot
數(shù)量為7
。
這時(shí)候默認(rèn)的并行度是1
Flink1.0
版本命令行flink modify
已經(jīng)沒(méi)有這個(gè)行為了,被移除了。。。Flink1.7
上是可以運(yùn)行的。
[root@hadoop1 flink-1.10.1]# bin/flink modify -p 4 cc22cc3d09f5d65651d637be6fb0a1c3
"modify" is not a valid action.
Info 顯示程序的執(zhí)行計(jì)劃
[root@hadoop1 flink-1.10.1]# bin/flink info examples/streaming/TopSpeedWindowing.jar
----------------------- Execution Plan -----------------------
{"nodes":[{"id":1,"type":"Source: Custom Source","pact":"Data Source","contents":"Source: Custom Source","parallelism":1},{"id":2,"type":"Timestamps/Watermarks","pact":"Operator","contents":"Timestamps/Watermarks","parallelism":1,"predecessors":[{"id":1,"ship_strategy":"FORWARD","side":"second"}]},{"id":4,"type":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction)","pact":"Operator","contents":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction)","parallelism":1,"predecessors":[{"id":2,"ship_strategy":"HASH","side":"second"}]},{"id":5,"type":"Sink: Print to Std. Out","pact":"Data Sink","contents":"Sink: Print to Std. Out","parallelism":1,"predecessors":[{"id":4,"ship_strategy":"FORWARD","side":"second"}]}]}
--------------------------------------------------------------
拷貝輸出的json
內(nèi)容,粘貼到這個(gè)網(wǎng)站:http://flink.apache.org/visualizer/
可以生成類似如下的執(zhí)行圖。
可以與實(shí)際運(yùn)行的物理執(zhí)行計(jì)劃進(jìn)行對(duì)比。
SQL Client Beta
進(jìn)入 Flink SQL
[root@hadoop1 flink-1.10.1]# bin/sql-client.sh embedded
Select
查詢,按Q
退出如下界面;
Flink SQL> select 'hello word';SQL Query Result (Table)Table program finished. Page: Last of 1 Updated: 16:37:04.649EXPR$0hello wordQ Quit + Inc Refresh G Goto Page N Next Page O Open Row
R Refresh - Dec Refresh L Last Page P Prev Page
打開(kāi)http://hadoop1:8081
能看到這條select
語(yǔ)句產(chǎn)生的查詢?nèi)蝿?wù)已經(jīng)結(jié)束了。這個(gè)查詢采用的是讀取固定數(shù)據(jù)集的Custom Source
,輸出用的是Stream Collect Sink
,且只輸出一條結(jié)果。
explain 查看 SQL 的執(zhí)行計(jì)劃。
Flink SQL> explain SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;
== Abstract Syntax Tree == //抽象語(yǔ)法樹(shù)
LogicalAggregate(group=[{0}], cnt=[COUNT()])
+- LogicalValues(type=[RecordType(VARCHAR(5) name)], tuples=[[{ _UTF-16LE'Bob' }, { _UTF-16LE'Alice' }, { _UTF-16LE'Greg' }, { _UTF-16LE'Bob' }]])== Optimized Logical Plan == //優(yōu)化后的邏輯執(zhí)行計(jì)劃
GroupAggregate(groupBy=[name], select=[name, COUNT(*) AS cnt])
+- Exchange(distribution=[hash[name]])+- Values(type=[RecordType(VARCHAR(5) name)], tuples=[[{ _UTF-16LE'Bob' }, { _UTF-16LE'Alice' }, { _UTF-16LE'Greg' }, { _UTF-16LE'Bob' }]])== Physical Execution Plan == //物理執(zhí)行計(jì)劃
Stage 13 : Data Sourcecontent : Source: Values(tuples=[[{ _UTF-16LE'Bob' }, { _UTF-16LE'Alice' }, { _UTF-16LE'Greg' }, { _UTF-16LE'Bob' }]])Stage 15 : Operatorcontent : GroupAggregate(groupBy=[name], select=[name, COUNT(*) AS cnt])ship_strategy : HASH
結(jié)果展示
SQL Client
支持兩種模式來(lái)維護(hù)并展示查詢結(jié)果:
table mode
在內(nèi)存中物化查詢結(jié)果,并以分頁(yè)table
形式展示。用戶可以通過(guò)以下命令啟用table mode
:例如如下案例;
Flink SQL> SET execution.result-mode=table;
[INFO] Session property has been set.Flink SQL> SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;SQL Query Result (Table)Table program finished. Page: Last of 1 Updated: 16:55:08.589name cntAlice 1Greg 1Bob 2Q Quit + Inc Refresh G Goto Page N Next Page O Open Row
R Refresh - Dec Refresh L Last Page P Prev Page
changelog mode
不會(huì)物化查詢結(jié)果,而是直接對(duì)continuous query
產(chǎn)生的添加和撤回retractions
結(jié)果進(jìn)行展示:如下案例中的-表示撤回消息
Flink SQL> SET execution.result-mode=changelog;
[INFO] Session property has been set.Flink SQL> SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;SQL Query Result (Changelog)Table program finished. Updated: 16:58:05.777+/- name cnt+ Bob 1+ Alice 1+ Greg 1- Bob 1+ Bob 2Q Quit + Inc Refresh O Open Row
R Refresh - Dec Refresh
Environment Files
CREATE TABLE
創(chuàng)建表DDL
語(yǔ)句:
Flink SQL> CREATE TABLE pvuv_sink (
> dt VARCHAR,
> pv BIGINT,
> uv BIGINT
> ) ;
[INFO] Table has been created.
SHOW TABLES
查看所有表名
Flink SQL> show tables;
pvuv_sink
DESCRIBE 表名
查看表的詳細(xì)信息;
Flink SQL> describe pvuv_sink;
root|-- dt: STRING|-- pv: BIGINT|-- uv: BIGINT
插入等操作均與關(guān)系型數(shù)據(jù)庫(kù)操作語(yǔ)句一樣,省略N
個(gè)操作
Restful API
接下來(lái)我們演示如何通過(guò)rest api
來(lái)提交jar
包和執(zhí)行任務(wù)。
通過(guò)Show Plan
可以看到執(zhí)行圖
提交之后的操作,取消的話點(diǎn)擊頁(yè)面的Cancel Job