中山三水網(wǎng)站建設(shè)網(wǎng)站推廣策劃書模板
有狀態(tài)操作或者操作算子在處理DataStream的元素或者事件的時(shí)候需要存儲(chǔ)計(jì)算的中間狀態(tài),這就使得狀態(tài)在整個(gè)Flink的精細(xì)化計(jì)算中有著非常重要的地位:
- 記錄數(shù)據(jù)從某一個(gè)過去時(shí)間點(diǎn)到當(dāng)前時(shí)間的狀態(tài)信息。
- 以每分鐘/小時(shí)/天匯總事件時(shí),狀態(tài)將保留待處理的匯總記錄。
- 在訓(xùn)練機(jī)器學(xué)習(xí)模型時(shí),狀態(tài)將保持當(dāng)前版本的模型參數(shù)。
Flink在管理狀態(tài)方面,使用Checkpoint和Savepoint實(shí)現(xiàn)狀態(tài)容錯(cuò)。Flink的狀態(tài)在計(jì)算規(guī)模發(fā)生變化的時(shí)候,可以自動(dòng)在并行實(shí)例間實(shí)現(xiàn)狀態(tài)的重新分發(fā),底層使用State Backend策略存儲(chǔ)計(jì)算狀態(tài),State Backend決定了狀態(tài)存儲(chǔ)的方式和位置(后續(xù)章節(jié)介紹)。
Flink在狀態(tài)管理中將所有能操作的狀態(tài)分為Keyed State
和Operator State
,其中Keyed State類型的狀態(tài)同key一一綁定,并且只能在KeyedStream中使用。所有non-KeyedStream狀態(tài)操作都叫做Operator State。Flink在底層做狀態(tài)管理時(shí),將Keyed State和<parallel-operator-instance, key>
關(guān)聯(lián),由于某一個(gè)key僅僅落入其中一個(gè)operator-instance中,因此可以簡(jiǎn)單的理解Keyed State是和<operator,key>
進(jìn)行綁定的,采用Key Group機(jī)制對(duì)Keyed State進(jìn)行管理或者分類,所有的keyed-operator在做狀態(tài)操作的時(shí)候可能需要和1~n個(gè)Key Group進(jìn)行交互。
Flink在分發(fā)Keyed State狀態(tài)的時(shí)候,不是以key為單位,而是以Key Group為最小單元分發(fā)
Operator State (也稱為 non-keyed state),每一個(gè)operator state 會(huì)和一個(gè)parallel operator instance進(jìn)行綁定。Keyed State 和 Operator State 以兩種形式存在( managed(管理)和 raw(原生)),所有Flink已知的操作符都支持Managed State,但是Raw State僅僅在用戶自定義Operator時(shí)使用,并且不支持在并行度發(fā)生變化的時(shí)候重新分發(fā)狀態(tài),因此,雖然Flink支持Raw State,但是在絕大多數(shù)的應(yīng)用場(chǎng)景下,一般使用的都是Managed State。
Keyed State
Keyed-state接口提供對(duì)不同類型狀態(tài)的訪問,所有狀態(tài)都限于當(dāng)前輸入元素的key。
類型 | 說明 | 方法 |
---|---|---|
ValueState | 這個(gè)狀態(tài)主要存儲(chǔ)一個(gè)可以用作更新的值 | update(T) T value() clear() |
ListState | 這將存儲(chǔ)List集合元素 | add(T) addAll(List) Iterable get() update(List) clear() |
ReducingState | 這將保留一個(gè)值,該值表示添加到狀態(tài)的所有值的匯總 需要用戶提供ReduceFunction | add(T) T get() clear() |
AggregatingState<IN, OUT> | 這將保留一個(gè)值,該值表示添加到狀態(tài)的所有值的匯總 需要用戶提供AggregateFunction | add(IN) T get() clear() |
FoldingState<T, ACC> | 這將保留一個(gè)值,該值表示添加到狀態(tài)的所有值的匯總 需要用戶提供FoldFunction | add(IN) T get() clear() |
MapState<UK, UV> | 這個(gè)狀態(tài)會(huì)保留一個(gè)Map集合元素 | put(UK, UV) putAll(Map<UK, UV>) entries() keys() values() clear() |
ValueSate
var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.map(new RichMapFunction[(String,Int),(String,Int)] {var vs:ValueState[Int]=_override def open(parameters: Configuration): Unit = {val vsd=new ValueStateDescriptor[Int]("valueCount",createTypeInformation[Int])vs=getRuntimeContext.getState[Int](vsd)}override def map(value: (String, Int)): (String, Int) = {val histroyCount = vs.value()val currentCount=histroyCount+value._2vs.update(currentCount)(value._1,currentCount)}
}).print()
env.execute("wordcount")
AggregatingState<IN, OUT>
var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",9999)
.map(_.split("\\s+"))
.map(ts=>(ts(0),ts(1).toInt))
.keyBy(0)
.map(new RichMapFunction[(String,Int),(String,Double)] {var vs:AggregatingState[Int,Double]=_override def open(parameters: Configuration): Unit = {val vsd=new AggregatingStateDescriptor[Int,(Double,Int),Double]("avgCount",new AggregateFunction[Int,(Double,Int),Double] {override def createAccumulator(): (Double, Int) = {(0.0,0)}override def add(value: Int, accumulator: (Double, Int)): (Double, Int) = {(accumulator._1+value,accumulator._2+1)}override def merge(a: (Double, Int), b: (Double, Int)): (Double, Int) = {(a._1+b._1,a._2+b._2)}override def getResult(accumulator: (Double, Int)): Double = {accumulator._1/accumulator._2}},createTypeInformation[(Double,Int)])vs=getRuntimeContext.getAggregatingState(vsd)}override def map(value: (String, Int)): (String, Double) = {vs.add(value._2)val avgCount=vs.get()(value._1,avgCount)}
}).print()
env.execute("wordcount")
MapState<UK, UV>
var env=StreamExecutionEnvironment.getExecutionEnvironment
//001 zs 202.15.10.12 日本 2019-10-10
env.socketTextStream("centos",9999)
.map(_.split("\\s+"))
.map(ts=>Login(ts(0),ts(1),ts(2),ts(3),ts(4)))
.keyBy("id","name")
.map(new RichMapFunction[Login,String] {var vs:MapState[String,String]=_override def open(parameters: Configuration): Unit = {val msd=new MapStateDescriptor[String,String]("mapstate",createTypeInformation[String],createTypeInformation[String])vs=getRuntimeContext.getMapState(msd)}override def map(value: Login): String = {println("歷史登錄")for(k<- vs.keys().asScala){println(k+" "+vs.get(k))}var result=""if(vs.keys().iterator().asScala.isEmpty){result="ok"}else{if(!value.city.equalsIgnoreCase(vs.get("city"))){result="error"}else{result="ok"}}vs.put("ip",value.ip)vs.put("city",value.city)vs.put("loginTime",value.loginTime)result}
}).print()
env.execute("wordcount")
總結(jié)
new Rich[Map|FaltMap]Function {var vs:XxxState=_ //狀態(tài)聲明override def open(parameters: Configuration): Unit = {val xxd=new XxxStateDescription //完成狀態(tài)的初始化vs=getRuntimeContext.getXxxState(xxd)}override def xxx(value: Xx): Xxx = {//狀態(tài)操作}
}
ValueState<T> getState(ValueStateDescriptor<T>)
ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
ListState<T> getListState(ListStateDescriptor<T>)
AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)
MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)
State Time-To-Live(TTL)
基本使用
可以將state存活時(shí)間(TTL)分配給任何類型的keyed-state,如果配置了TTL且狀態(tài)值已過期,則Flink將盡力清除存儲(chǔ)的歷史狀態(tài)值。
import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.time.Timeval ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build
val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
stateDescriptor.enableTimeToLive(ttlConfig)
- 案例
var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.map(new RichMapFunction[(String,Int),(String,Int)] {var vs:ValueState[Int]=_override def open(parameters: Configuration): Unit = {val vsd=new ValueStateDescriptor[Int]("valueCount",createTypeInformation[Int])val ttlConfig = StateTtlConfig.newBuilder(Time.seconds(5)) //過期時(shí)間5s.setUpdateType(UpdateType.OnCreateAndWrite)//創(chuàng)建和修改的時(shí)候更新過期時(shí)間.setStateVisibility(StateVisibility.NeverReturnExpired)//永不返回過期的數(shù)據(jù).build()vsd.enableTimeToLive(ttlConfig)vs=getRuntimeContext.getState[Int](vsd)}override def map(value: (String, Int)): (String, Int) = {val histroyCount = vs.value()val currentCount=histroyCount+value._2vs.update(currentCount)(value._1,currentCount)}
}).print()
env.execute("wordcount")
注意:開啟TTL之后,系統(tǒng)會(huì)額外消耗內(nèi)存存儲(chǔ)時(shí)間戳(Processing Time),如果用戶以前沒有開啟TTL配置,在啟動(dòng)之前修改代碼開啟了TTL,在做狀態(tài)恢復(fù)的時(shí)候系統(tǒng)啟動(dòng)不起來,會(huì)拋出兼容性失敗以及StateMigrationException的異常。
清除Expired State
在默認(rèn)情況下,僅當(dāng)明確讀出過期狀態(tài)時(shí),通過調(diào)用ValueState.value()方法才會(huì)清除過期的數(shù)據(jù),這意味著,如果系統(tǒng)一直未讀取過期的狀態(tài),則不會(huì)將其刪除,可能會(huì)導(dǎo)致存儲(chǔ)狀態(tài)數(shù)據(jù)的文件持續(xù)增長(zhǎng)。
Cleanup in full snapshot
系統(tǒng)會(huì)從上一次狀態(tài)恢復(fù)的時(shí)間點(diǎn),加載所有的State快照,在加載過程中會(huì)剔除那些過期的數(shù)據(jù),這并不會(huì)影響磁盤已存儲(chǔ)的狀態(tài)數(shù)據(jù),該狀態(tài)數(shù)據(jù)只會(huì)在Checkpoint的時(shí)候被覆蓋,但是依然解決不了在運(yùn)行時(shí)自動(dòng)清除過期且沒有用過的數(shù)據(jù)。
import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.time.Time
val ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupFullSnapshot.build
只能用于memory或者snapshot狀態(tài)的后端實(shí)現(xiàn),不支持RocksDB State Backend。
Cleanup in background
可以開啟后臺(tái)清除策略,根據(jù)State Backend采取默認(rèn)的清除策略(不同狀態(tài)的后端存儲(chǔ),清除策略不同)
import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupInBackground
.build
import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig.newBuilder(Time.seconds(5))
.setUpdateType(UpdateType.OnCreateAndWrite)
.setStateVisibility(StateVisibility.NeverReturnExpired)
.cleanupIncrementally(100,true) //默認(rèn)值 5 | false
.build()
第一個(gè)參數(shù)表示每一次觸發(fā)cleanup的時(shí)候,系統(tǒng)會(huì)一次處理100個(gè)元素。第二個(gè)參數(shù)是false,表示只要用戶對(duì)任意一個(gè)state進(jìn)行操作,系統(tǒng)都會(huì)觸發(fā)cleanup策略;第二個(gè)參數(shù)是true,表示只要系統(tǒng)接收到記錄數(shù)(即使用戶沒有操作狀態(tài))就會(huì)觸發(fā)cleanup策略。
RocksDB是一個(gè)嵌入式的key-value存儲(chǔ),其中key和value是任意的字節(jié)流,底層進(jìn)行異步壓縮,會(huì)將key相同的數(shù)據(jù)進(jìn)行compact(壓縮),以減少state文件大小,但是并不對(duì)過期的state進(jìn)行清理,因此可以通過配置compactFilter,讓RocksDB在compact的時(shí)候?qū)^期的state進(jìn)行排除,RocksDB數(shù)據(jù)庫的這種過濾特性,默認(rèn)關(guān)閉,如果想要開啟,可以在flink-conf.yaml中配置 state.backend.rocksdb.ttl.compaction.filter.enabled:true 或者在應(yīng)用程序的API里設(shè)置RocksDBStateBackend::enableTtlCompactionFilter。
import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig.newBuilder(Time.seconds(5))
.setUpdateType(UpdateType.OnCreateAndWrite)
.setStateVisibility(StateVisibility.NeverReturnExpired)
.cleanupInRocksdbCompactFilter(1000) //默認(rèn)配置1000
.build()
這里的1000表示,系統(tǒng)在做Compact的時(shí)候,會(huì)檢查1000個(gè)元素是否失效,如果失效,則清除該過期數(shù)據(jù)。
Operator State
如果用戶想要使用Operator State,只需要實(shí)現(xiàn)通用的CheckpointedFunction
接口或者ListCheckpointed<T extends Serializable>
,值得注意的是,目前的operator-state僅僅支持list-style風(fēng)格的狀態(tài),要求所存儲(chǔ)的狀態(tài)必須是一個(gè)List,且其中的元素必須可以序列化。
CheckpointedFunction
提供兩種不同的狀態(tài)分發(fā)方案:Even-split
和 Union
void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
- snapshotState():調(diào)用
checkpoint()
的時(shí)候,系統(tǒng)會(huì)調(diào)用snapshotState()
對(duì)狀態(tài)做快照 - initializeState():第一次啟動(dòng)或者從上一次狀態(tài)恢復(fù)的時(shí)候,系統(tǒng)會(huì)調(diào)用
initializeState()
Even-split:表示系統(tǒng)在故障恢復(fù)時(shí),會(huì)將operator-state的元素均分給所有的operator實(shí)例,每個(gè)operator實(shí)例將獲取到整個(gè)operator-state的sub-list數(shù)據(jù)。
Union:表示系統(tǒng)在故障恢復(fù)時(shí),每一個(gè)operator實(shí)例可以獲取到整個(gè)operator-state的全部數(shù)據(jù)。
案例
class BufferingSink(threshold: Int = 0) extends SinkFunction[(String, Int)] with CheckpointedFunction {var listState:ListState[(String,Int)]=_val bufferedElements = ListBuffer[(String, Int)]()//負(fù)責(zé)將數(shù)據(jù)輸出到外圍系統(tǒng)override def invoke(value: (String, Int)): Unit = {bufferedElements += valueif(bufferedElements.size == threshold){for(ele <- bufferedElements){println(ele)}bufferedElements.clear()}}//是在savepoint|checkpoint時(shí)候?qū)?shù)據(jù)持久化override def snapshotState(context: FunctionSnapshotContext): Unit = {listState.clear()for(ele <- bufferedElements){listState.add(ele)}}//狀態(tài)恢復(fù)|初始化 創(chuàng)建狀態(tài)override def initializeState(context: FunctionInitializationContext): Unit = {val lsd = new ListStateDescriptor[(String, Int)]("buffered-elements",createTypeInformation[(String,Int)])listState=context.getOperatorStateStore.getListState(lsd)if(context.isRestored){for(element <- listState.get().asScala) {bufferedElements += element}}}
}
var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.addSink(new BufferingSink(5))
env.execute("testoperatorstate")
- 啟動(dòng)netcat服務(wù)
[root@centos ~]# nc -lk 9999
- 提交任務(wù)
注意,將并行度設(shè)置為1,方便測(cè)試
- 在netcat中輸入以下數(shù)據(jù)
[root@centos ~]# nc -lk 9999
a1 b1 c1 d1
- 取消任務(wù),并且創(chuàng)建savepoint
[root@centos flink-1.8.1]# ./bin/flink list -m centos:8081
------------------ Running/Restarting Jobs -------------------
17.10.2019 09:49:20 : f21795e74312eb06fbf0d48cb8d90489 : testoperatorstate (RUNNING)
--------------------------------------------------------------
[root@centos flink-1.8.1]# ./bin/flink cancel -m centos:8081 -s hdfs:///savepoints f21795e74312eb06fbf0d48cb8d90489
Cancelling job f21795e74312eb06fbf0d48cb8d90489 with savepoint to hdfs:///savepoints.
Cancelled job f21795e74312eb06fbf0d48cb8d90489. Savepoint stored in hdfs://centos:9000/savepoints/savepoint-f21795-38e7beefe07b.
注意,如果Flink需要和Hadoop整合,必須保證在當(dāng)前環(huán)境變量下有
HADOOP_HOME
|HADOOP_CALSSPATH
[root@centos flink-1.8.1]# cat /root/.bashrc
HADOOP_HOME=/usr/hadoop-2.9.2
JAVA_HOME=/usr/java/latest
PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
CLASSPATH=.
export JAVA_HOME
export PATH
export CLASSPATH
export HADOOP_HOME
HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CLASSPATH
- 測(cè)試狀態(tài)
ListCheckpointed
ListCheckpointed接口是CheckpointedFunction接口的一種變體形式,僅僅支持Even-split
狀態(tài)的分發(fā)策略。
List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;
- snapshotState():調(diào)用
checkpoint()
的時(shí)候,系統(tǒng)會(huì)調(diào)用snapshotState()
對(duì)狀態(tài)做快照 - restoreState():等價(jià)于上述
CheckpointedFunction
中聲明的initializeState()
方法,用作狀態(tài)恢復(fù)
案例
import java.lang.{Long => JLong} //修改類別名
import scala.{Long => SLong} //修改類別名
class CustomStatefulSourceFunction extends ParallelSourceFunction[SLong] with ListCheckpointed[JLong]{@volatilevar isRunning:Boolean = truevar offset = 0Loverride def run(ctx: SourceFunction.SourceContext[SLong]): Unit = {val lock = ctx.getCheckpointLockwhile(isRunning){Thread.sleep(1000)lock.synchronized({ctx.collect(offset)offset += 1})}}override def cancel(): Unit = {isRunning=false}override def snapshotState(checkpointId: Long, timestamp: Long): util.List[JLong] = {Collections.singletonList(offset) //存儲(chǔ)的是 當(dāng)前source的偏移量,如果狀態(tài)不可拆分,用戶可以使Collections.singletonList}override def restoreState(state: util.List[JLong]): Unit = {for (s <- state.asScala) {offset = s}}
}
var env=StreamExecutionEnvironment.getExecutionEnvironment
env.addSource[Long](new CustomStatefulSourceFunction)
.print("offset:")
env.execute("testOffset")
廣播狀態(tài)
支持Operator State的第三種類型是廣播狀態(tài)。引入廣播狀態(tài)以支持用例,其中需要將來自一個(gè)流的某些數(shù)據(jù)廣播到所有下游任務(wù),廣播的狀態(tài)將存儲(chǔ)在本地,用于處理另一個(gè)流上所有傳入的元素。
A third type of supported operator state is the Broadcast State. Broadcast state was introduced to support use cases where some data coming from one stream is required to be broadcasted to all downstream tasks, where it is stored locally and is used to process all incoming elements on the other stream.
non-keyed√
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.util.Collector
import scala.collection.JavaConverters._
class UserBuyPathBroadcastProcessFunction(msd:MapStateDescriptor[String,Int]) extends BroadcastProcessFunction[UserBuyPath,Rule,String]{//處理的是UserBuyPath,讀取廣播狀態(tài)override def processElement(value: UserBuyPath,ctx: BroadcastProcessFunction[UserBuyPath, Rule, String]#ReadOnlyContext,out: Collector[String]): Unit = {val broadcastState = ctx.getBroadcastState(msd)if(broadcastState.contains(value.channel)){//如果有規(guī)則,嘗試計(jì)算val threshold= broadcastState.get(value.channel)if(value.path >= threshold){//將滿足條件的用戶信息輸出out.collect(value.id+" "+value.name+" "+value.channel+" "+value.path)}}}//處理的是規(guī)則 Rule 數(shù)據(jù) ,記錄修改廣播狀態(tài)override def processBroadcastElement(value: Rule, ctx: BroadcastProcessFunction[UserBuyPath, Rule, String]#Context,out: Collector[String]): Unit = {val broadcastState = ctx.getBroadcastState(msd)broadcastState.put(value.channel,value.threshold)//更新狀態(tài)println("=======rule======")for(entry <- broadcastState.entries().asScala){println(entry.getKey+"\t"+entry.getValue)}println()println()}
}
var env=StreamExecutionEnvironment.getExecutionEnvironment
// id name channel action
// 001 mack 手機(jī) view
// 001 mack 手機(jī) view
// 001 mack 手機(jī) addToCart
// 001 mack 手機(jī) buy
val userStream = fsEnv.socketTextStream("centos", 9999).map(line => line.split("\\s+")).map(ts => UserAction(ts(0), ts(1), ts(2), ts(3))).keyBy("id", "name").map(new UserActionRichMapFunction)val msd=new MapStateDescriptor[String,Int]("braodcast-sate",createTypeInformation[String],createTypeInformation[Int])
// channel 閾值
// 手機(jī)類 10
val broadcastStream: BroadcastStream[Rule] = fsEnv.socketTextStream("centos", 8888).map(line => line.split("\\s+")).map(ts => Rule(ts(0), ts(1).toInt)).broadcast(msd)userStream.connect(broadcastStream)
.process(new UserBuyPathBroadcastProcessFunction(msd))
.print()
env.execute("testoperatorstate")
case class Rule(channel:String,threshold:Int)
case class UserAction(id:String,name:String ,channel:String,action:String)
case class UserBuyPath(id:String,name:String,channel:String,path:Int)
class UserActionRichMapFunction extends RichMapFunction[UserAction,UserBuyPath]{var buyPathState:MapState[String,Int]=_override def open(parameters: Configuration): Unit = {val msd= new MapStateDescriptor[String,Int]("buy-path",createTypeInformation[String],createTypeInformation[Int])buyPathState=getRuntimeContext.getMapState(msd)}override def map(value: UserAction): UserBuyPath = {val channel = value.channelvar path=0if(buyPathState.contains(channel)){path=buyPathState.get(channel)}if(value.action.equals("buy")){buyPathState.remove(channel)}else{buyPathState.put(channel,path+1)}UserBuyPath(value.id,value.name,value.channel,buyPathState.get(channel))}
}
keyed
class UserBuyPathKeyedBroadcastProcessFunction(msd:MapStateDescriptor[String,Int]) extends KeyedBroadcastProcessFunction[String,UserAction,Rule,String]{override def processElement(value: UserAction,ctx: KeyedBroadcastProcessFunction[String, UserAction, Rule, String]#ReadOnlyContext,out: Collector[String]): Unit = {println("value:"+value +" key:"+ctx.getCurrentKey)println("=====state======")for(entry <- ctx.getBroadcastState(msd).immutableEntries().asScala){println(entry.getKey+"\t"+entry.getValue)}}override def processBroadcastElement(value: Rule, ctx: KeyedBroadcastProcessFunction[String, UserAction, Rule, String]#Context, out: Collector[String]): Unit = {println("Rule:"+value)//更新狀態(tài)ctx.getBroadcastState(msd).put(value.channel,value.threshold)}
}
case class Rule(channel:String,threshold:Int)
case class UserAction(id:String,name:String ,channel:String,action:String)
var env=StreamExecutionEnvironment.getExecutionEnvironment
// id name channel action
// 001 mack 手機(jī) view
// 001 mack 手機(jī) view
// 001 mack 手機(jī) addToCart
// 001 mack 手機(jī) buy
val userKeyedStream = env.socketTextStream("centos", 9999)
.map(line => line.split("\\s+"))
.map(ts => UserAction(ts(0), ts(1), ts(2), ts(3)))
.keyBy(0)//只可以寫一個(gè)參數(shù)val msd=new MapStateDescriptor[String,Int]("braodcast-sate",createTypeInformation[String],createTypeInformation[Int])
// channel 閾值
// 手機(jī)類 10
// 電子類 10
val broadcastStream: BroadcastStream[Rule] = fsEnv.socketTextStream("centos", 8888)
.map(line => line.split("\\s+"))
.map(ts => Rule(ts(0), ts(1).toInt))
.broadcast(msd)
userKeyedStream.connect(broadcastStream)
.process(new UserBuyPathKeyedBroadcastProcessFunction(msd))
.print()
env.execute("testoperatorstate")
CheckPoint & SavePoint
CheckPoint是Flink實(shí)現(xiàn)故障容錯(cuò)的一種機(jī)制,系統(tǒng)會(huì)根據(jù)配置的檢查點(diǎn)定期自動(dòng)對(duì)程序計(jì)算狀態(tài)進(jìn)行備份。一旦程序在計(jì)算過程中出現(xiàn)故障,系統(tǒng)會(huì)選擇一個(gè)最近的檢查點(diǎn)進(jìn)行故障恢復(fù)。
SavePoint是一種有效的運(yùn)維手段,需要用戶手動(dòng)觸發(fā)程序進(jìn)行狀態(tài)備份,本質(zhì)也是在做CheckPoint。
實(shí)現(xiàn)故障恢復(fù)的先決條件:
- 持久的數(shù)據(jù)源,可以在一定時(shí)間內(nèi)重播記錄(例如,FlinkKafkaConsumer)
- 狀態(tài)的永久性存儲(chǔ),通常是分布式文件系統(tǒng)(例如,HDFS)
var env=StreamExecutionEnvironment.getExecutionEnvironment
//啟動(dòng)檢查點(diǎn)機(jī)制
env.enableCheckpointing(5000,CheckpointingMode.EXACTLY_ONCE)
//配置checkpoint必須在2s內(nèi)完成一次checkpoint,否則檢查點(diǎn)終止
env.getCheckpointConfig.setCheckpointTimeout(2000)
//設(shè)置checkpoint之間時(shí)間間隔 <= Checkpoint interval
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(5)
//配置checkpoint并行度,不配置默認(rèn)1
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
//一旦檢查點(diǎn)不能正常運(yùn)行,Task也將終止
env.getCheckpointConfig.setFailOnCheckpointingErrors(true)
//將檢查點(diǎn)存儲(chǔ)外圍系統(tǒng) filesystem、rocksdb,可以配置在cancel任務(wù)時(shí)候,系統(tǒng)是否保留checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
val props = new Properties()
props.setProperty("bootstrap.servers", "centos:9092")
props.setProperty("group.id", "g1")
env.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props))
.flatMap(line => line.split("\\s+"))
.map((_,1))
.keyBy(0) //只可以寫一個(gè)參數(shù)
.sum(1)
.print()
env.execute("testoperatorstate")
State Backend
State Backend決定Flink如何存儲(chǔ)系統(tǒng)狀態(tài)信息(Checkpoint形式),目前Flink提供了三種State Backend實(shí)現(xiàn)。
- Memory (JobManagwer):這是Flink的默認(rèn)實(shí)現(xiàn),通常用于測(cè)試,系統(tǒng)會(huì)將計(jì)算狀態(tài)存儲(chǔ)在JobManager的內(nèi)存中,但是在實(shí)際的生產(chǎn)環(huán)境中,由于計(jì)算的狀態(tài)比較多,使用Memory 很容易導(dǎo)致OOM(out of memory)。
- FileSystem:系統(tǒng)會(huì)將計(jì)算狀態(tài)存儲(chǔ)在TaskManager的內(nèi)存中,因此一般用作生產(chǎn)環(huán)境,系統(tǒng)會(huì)根據(jù)CheckPoin機(jī)制,將TaskManager狀態(tài)數(shù)據(jù)在文件系統(tǒng)上進(jìn)行備份。如果是超大規(guī)模集群,TaskManager內(nèi)存也可能發(fā)生溢出。
- RocksDB:系統(tǒng)會(huì)將計(jì)算狀態(tài)存儲(chǔ)在TaskManager的內(nèi)存中,如果TaskManager內(nèi)存不夠,系統(tǒng)可以使用RocksDB配置本地磁盤完成狀態(tài)的管理,同時(shí)支持將本地的狀態(tài)數(shù)據(jù)備份到遠(yuǎn)程文件系統(tǒng),因此,RocksDB Backend 是推薦的選擇。
參考:https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/state_backends.html
每一個(gè)Job都可以配置自己狀態(tài)存儲(chǔ)的后端實(shí)現(xiàn)
var env=StreamExecutionEnvironment.getExecutionEnvironment
val fsStateBackend:StateBackend = new FsStateBackend("hdfs:///xxx") //MemoryStateBackend、FsStateBackend、RocksDBStateBackend
env.setStateBackend(fsStateBackend)
如果用戶不配置,則系統(tǒng)使用默認(rèn)實(shí)現(xiàn),默認(rèn)實(shí)現(xiàn)可以通過修改flink-conf-yaml文件進(jìn)行配置
[root@centos ~]# cd /usr/flink-1.8.1/
[root@centos flink-1.8.1]# vi conf/flink-conf.yaml
#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================
# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#state.backend: rocksdb
# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#state.checkpoints.dir: hdfs:///flink-checkpoints
# Default target directory for savepoints, optional.
#state.savepoints.dir: hdfs:///flink-savepoints# Flag to enable/disable incremental checkpoints for backends that
# support incremental checkpoints (like the RocksDB state backend).
#state.backend.incremental: true
注意,必須在環(huán)境變量中出現(xiàn)
HADOOP_CLASSPATH
Flink計(jì)算發(fā)布之后是否還能夠修改計(jì)算算子?
首先,這在Spark中是不允許的,因?yàn)镾park會(huì)持久化代碼片段,一旦修改代碼,必須刪除Checkpoint,但是Flink僅僅存儲(chǔ)各個(gè)算子的計(jì)算狀態(tài),如果用戶修改代碼,需要用戶在有狀態(tài)的操作算子上指定uid屬性。
env.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props))
.uid("kakfa-consumer")
.flatMap(line => line.split("\\s+"))
.map((_,1))
.keyBy(0) //只可以寫一個(gè)參數(shù)
.sum(1)
.uid("word-count") //唯一
.map(t=>t._1+"->"+t._2)
.print()
Flink Kafka如何保證精準(zhǔn)一次的語義操作?
- https://www.cnblogs.com/ooffff/p/9482873.html
- https://www.jianshu.com/p/8cf344bb729a
- https://www.jianshu.com/p/de35bf649293
- https://blog.csdn.net/justlpf/article/details/80292375
- https://www.jianshu.com/p/c0af87078b9c (面試題)