租用阿里云做網(wǎng)站鏈接是什么意思
文章目錄
- 1. Akka基本概念與Actor模型
- 2. Akka相關(guān)demo
- 2.1. 創(chuàng)建Akka系統(tǒng)
- 2.2. 根據(jù)path獲取Actor并與之通訊
- 3. Flink RPC框架與Akka的關(guān)系
- 4.運(yùn)行時(shí)RPC整體架構(gòu)設(shè)計(jì)
- 5. RpcEndpoint的設(shè)計(jì)與實(shí)現(xiàn)
我們從整體的角度看一下Flink RPC通信框架的設(shè)計(jì)與實(shí)現(xiàn),了解其底層Akka通信框架的基礎(chǔ)概念及二者之間的關(guān)系。
?
1. Akka基本概念與Actor模型
Akka是使用Scala語言編寫的庫(kù),用于在JVM上簡(jiǎn)化編寫具有可容錯(cuò)、高可伸縮性的Java或Scala的Actor模型。Akka基于Actor模型,提供了一個(gè)用于構(gòu)建可擴(kuò)展、彈性、快速響應(yīng)的應(yīng)用程序的平臺(tái)。
Actor 模型是一種并發(fā)計(jì)算模型,Actor 模型的核心思想是將計(jì)算單元抽象為獨(dú)立的并發(fā)實(shí)體,稱為 “actors”,這些 actors 之間通過消息傳遞進(jìn)行通信。
以下是 Actor 模型的一些關(guān)鍵概念:
- Actor:Actor 是計(jì)算模型的基本執(zhí)行單元。每個(gè) Actor 都有自己的狀態(tài)、行為和郵箱(用于接收消息)。Actor 之間是相互獨(dú)立的,它們通過消息傳遞進(jìn)行通信。
- 消息傳遞:在 Actor 模型中,通信是通過消息傳遞來實(shí)現(xiàn)的。一個(gè) Actor 可以向另一個(gè) Actor 發(fā)送消息,消息包含了要執(zhí)行的操作或者改變狀態(tài)的請(qǐng)求。這種異步消息傳遞使得系統(tǒng)更具有彈性和可伸縮性。
- 地址:每個(gè) Actor 都有一個(gè)唯一的地址,用于唯一標(biāo)識(shí)該 Actor。其他 Actor 可以使用地址向目標(biāo) Actor 發(fā)送消息。
- 郵箱:每個(gè) Actor 都有一個(gè)郵箱,
用于存儲(chǔ)接收到的消息
。Actor 處理消息的速度可能不同,但由于消息傳遞是異步的,這不會(huì)阻塞發(fā)送者。- 行為:Actor 的行為定義了對(duì)消息的
響應(yīng)方式
,包括狀態(tài)的修改、消息的處理等。行為可以隨著時(shí)間和接收到的消息而動(dòng)態(tài)變化。
?
Actor由狀態(tài)(State)、行為(Behavior)和郵箱(Mailbox)三部分組成。
actors和其他actors通過發(fā)送異步消息通信。Actor模型的強(qiáng)大來自于異步。它也可以顯式等待響應(yīng),這使得可以執(zhí)行同步操作。但是,強(qiáng)烈不建議同步消息,因?yàn)樗鼈兿拗屏讼到y(tǒng)的伸縮性(?怎么實(shí)現(xiàn)的伸縮性)。
actor系統(tǒng)
每個(gè)actor是一個(gè)單一的線程,它不斷地從其郵箱中poll(拉取)消息,并且連續(xù)不斷地處理。對(duì)于已經(jīng)處理過的消息的結(jié)果,actor可以改變它自身的內(nèi)部狀態(tài)或者發(fā)送一個(gè)新消息或者
孵化一個(gè)新的actor
。
?
2. Akka相關(guān)demo
2.1. 創(chuàng)建Akka系統(tǒng)
Akka系統(tǒng)的核心組件包括ActorSystem和Actor,構(gòu)建一個(gè)Akka系統(tǒng),首先需要?jiǎng)?chuàng)建ActorSystem,然后通過ActorSystem創(chuàng)建Actor。
需要注意的是:
- Akka不允許直接創(chuàng)建Actor實(shí)例,只能通過ActorSystem.actorOf和ActorContext.actorOf等特定接口創(chuàng)建Actor。
- 只能通過ActorRef與Actor進(jìn)行通信,ActorRef對(duì)原生Actor實(shí)例做了良好的封裝,外界不能隨意修改其內(nèi)部狀態(tài)。
如代碼所示,Akka系統(tǒng)中包含了創(chuàng)建ActorSystem以及Actor的基本實(shí)例。
// 1. 構(gòu)建ActorSystem
// 使用缺省配置
ActorSystem system = ActorSystem.create("sys");
// 也可顯示指定appsys配置
// ActorSystem system1 = ActorSystem.create("helloakka", ConfigFactory.load("appsys"));// 2. 構(gòu)建Actor,獲取該Actor的引用,即ActorRef
ActorRef helloActor = system.actorOf(Props.create(HelloActor.class), "helloActor");// 3. 給helloActor發(fā)送消息
helloActor.tell("hello helloActor", ActorRef.noSender());// 4. 關(guān)閉ActorSystem
system.terminate();
在Akka中,創(chuàng)建的每個(gè)Actor都有自己的路徑,該路徑遵循 ActorSystem 的層級(jí)結(jié)構(gòu),大致如下:
本地:akka://sys/user/helloActor
遠(yuǎn)程:akka.tcp://sys@l27.0.0.1:2020/user/remoteActor - sys,創(chuàng)建的ActorSystem的名字;
- user,通過ActorSystem#actorOf和ActorContext#actorOf 方法創(chuàng)建的 Actor 都屬于/user下,其是系統(tǒng)層面創(chuàng)建的,與系統(tǒng)整體行為有關(guān),在開發(fā)階段并不需要對(duì)其過多關(guān)注;
- helloActor,我們創(chuàng)建的HelloActor。其中遠(yuǎn)程部分路徑含義如下:- akka.tcp,遠(yuǎn)程通信方式為tcp;
- sys@127.0.0.1:2020,ActorSystem名字及遠(yuǎn)程主機(jī)ip和端口號(hào)。
?
2.2. 根據(jù)path獲取Actor并與之通訊
若提供了Actor的路徑,可以通過路徑獲取到ActorRef,然后與之通信,代碼如下所示:
ActorSystem system = ActorSystem.create("sys");
ActorSelection as = system.actorSelection("/path/to/actor");Timeout timeout = new Timeout(Duration.create(2, "seconds"));
Future<ActorRef> fu = as.resolveOne(timeout);fu.onSuccess(new OnSuccess<ActorRef>() {@Overridepublic void onSuccess(ActorRef actor) {System.out.println("actor:" + actor);actor.tell("hello actor", ActorRef.noSender());}
}, system.dispatcher());fu.onFailure(new OnFailure() {@Overridepublic void onFailure(Throwable failure) {System.out.println("failure:" + failure);}
}, system.dispatcher());
?
3. Flink RPC框架與Akka的關(guān)系
Flink進(jìn)行RPC通信的組件
如圖所示,從Flink RPC節(jié)點(diǎn)關(guān)系中可以看出,集群運(yùn)行時(shí)中實(shí)現(xiàn)了RPC通信節(jié)點(diǎn)功能的主要有
Dispatcher、ResourceManager和TaskManager以及JobMaster
等組件。
借助RPC通信,這些組件共同參與任務(wù)提交及運(yùn)行的整個(gè)流程,例如通過客戶端向Dispatcher服務(wù)提交JobGraph,JobManager向TaskManager提交Task請(qǐng)求,以及TaskManager向JobManager更新Task執(zhí)行狀態(tài)等。
通過AkkaRpcService實(shí)現(xiàn)遠(yuǎn)程通訊能力
從圖中也可以看出,集群的RPC服務(wù)組件是(1)RpcEndpoint,每個(gè)RpcEndpoint包含一個(gè)內(nèi)置的RpcServer負(fù)責(zé)執(zhí)行本地和遠(yuǎn)程的代碼請(qǐng)求,(2)RpcServer對(duì)應(yīng)Akka中的Actor實(shí)例。RpcEndpoint中創(chuàng)建和啟動(dòng)RpcServer主要是基于集群中的(3)RpcService實(shí)現(xiàn),(4)RpcService的主要實(shí)現(xiàn)是AkkaRpcService。
?
從圖可以看出,AkkaRpcService將Akka中的ActorSystem進(jìn)行封裝
,通過AkkaRpcService可以創(chuàng)建RpcEndpoint中的RpcServer,同時(shí)基于AkkaRpcService提供的connect()方法與遠(yuǎn)程RpcServer建立RPC連接,提供遠(yuǎn)程進(jìn)程調(diào)用的能力。
?
4.運(yùn)行時(shí)RPC整體架構(gòu)設(shè)計(jì)
Flink的RPC框架設(shè)計(jì)非常復(fù)雜,除了基于Akka構(gòu)建了底層通信系統(tǒng)之外,還會(huì)使用JDK動(dòng)態(tài)代理構(gòu)建RpcGateway接口的代理類。
Flink RPC UML關(guān)系圖
這里我們簡(jiǎn)單梳理一下RPC架構(gòu)涉及的組件以及每種組件的作用。
- 集群RPC組件的基本實(shí)現(xiàn)類:
RpcEndpoint提供了集群RPC組件的基本實(shí)現(xiàn),所有需要實(shí)現(xiàn)RPC服務(wù)的組件都會(huì)繼承RpcEndpoint抽象類。
RpcEndpoint中包含了endpointId,用于唯一標(biāo)記當(dāng)前的RPC節(jié)點(diǎn)。RpcEndpoint借助RpcService啟動(dòng)內(nèi)部RpcServer,之后通過RpcServer完成本地和遠(yuǎn)程線程執(zhí)行。
- 基本實(shí)現(xiàn)類與FencedToken對(duì)比
對(duì)于RpcEndpoint來講,底層主要有FencedRpcEndpoint基本實(shí)現(xiàn)類。
實(shí)現(xiàn)FencedRpcEndpoint的RPC節(jié)點(diǎn)都會(huì)有自己的FencedToken,當(dāng)進(jìn)行遠(yuǎn)程RPC調(diào)用時(shí),會(huì)對(duì)比訪問者分配的FencedToken和被訪問者的FencedToken,結(jié)果一致才會(huì)進(jìn)行后續(xù)操作。
- RpcEndpoint的實(shí)現(xiàn)類有TaskExecutor組件,FencedRpcEndpoint的實(shí)現(xiàn)類有Dispatcher、JobMaster以及ResourceManager等組件。這些組件可以獲取RpcService中ActorSystem的dispatcher服務(wù),并直接通過
dispatcher創(chuàng)建Task線程實(shí)例
。- RpcService提供了創(chuàng)建和啟動(dòng)RpcServer的方法。
在啟動(dòng)RpcServer的過程中,通過RpcEndpoint的地址創(chuàng)建Akka Actor實(shí)例,并基于Actor實(shí)例構(gòu)建RpcServer接口的動(dòng)態(tài)代理類,向RpcServer的主線程中提交Runnable以及Callable線程等。
同時(shí)在RpcService中提供了連接遠(yuǎn)程RpcEndpoint的方法,并創(chuàng)建了相應(yīng)RpcGateway接口的動(dòng)態(tài)代理類,用于執(zhí)行遠(yuǎn)程RPC請(qǐng)求。
- RpcServer接口通過AkkaInvocationHandler動(dòng)態(tài)代理類實(shí)現(xiàn),所有遠(yuǎn)程或本地的執(zhí)行請(qǐng)求最終都會(huì)轉(zhuǎn)換到AkkaInvocationHandler代理類中執(zhí)行。
AkkaInvocationHandler實(shí)現(xiàn)了MainThreadExecutable接口,提供了
runAsync(Runnable runnable)
以及callAsync(Callable<V> callable, Time callTimeout)
等在主線程中執(zhí)行代碼塊的功能。例如在TaskExecutor中釋放Slot資源時(shí),會(huì)調(diào)用runAsync()方法將freeSlotInternal()方法提交到TaskExecutor對(duì)應(yīng)的RpcServer中運(yùn)行,此時(shí)就會(huì)調(diào)用AkkaInvocationHandler在主線程中執(zhí)行任務(wù).
?
5. RpcEndpoint的設(shè)計(jì)與實(shí)現(xiàn)
RpcEndpoint是集群中RPC組件的端點(diǎn),每個(gè)RpcEndpoint都對(duì)應(yīng)一個(gè)由endpointId和actorSystem確定的路徑,且該路徑對(duì)應(yīng)同一個(gè)Akka Actor。
如圖,所有需要實(shí)現(xiàn)RPC通信的集群組件都會(huì)繼承RpcEndpoint抽象類,例如TaskExecutor、Dispatcher以及ResourceManager組件服務(wù),還包括根據(jù)JobGraph動(dòng)態(tài)創(chuàng)建和啟動(dòng)的JobMaster服務(wù)。
從圖中我們可以看出,RpcEndpoint實(shí)現(xiàn)了RpcGateway和AutoCloseableAsync兩個(gè)接口,其中 RpcGateway
提供了動(dòng)態(tài)獲取RpcEndpoint中Akka地址和HostName的方法。
因?yàn)镴obMaster組件在任務(wù)啟動(dòng)時(shí)才會(huì)獲取Akka中ActorSystem分配的地址信息,所以借助RpcGateway接口提供的方法就能獲取Akka相關(guān)連接信息。
?
RpcEndpoint中包含RpcService、RpcServer以及MainThreadExecutor三個(gè)重要的成員變量,其中
- RpcService是RpcEndpoint的后臺(tái)管理服務(wù)
- RpcServer是RpcEndpoint的內(nèi)部服務(wù)類
- MainThreadExecutor封裝了MainThreadExecutable接口,其主要底層實(shí)現(xiàn)是AkkaInvocationHandler代理類。所有本地和遠(yuǎn)程的RpcGateway執(zhí)行請(qǐng)求都會(huì)通過動(dòng)態(tài)代理的形式轉(zhuǎn)換到AkkaInvocationHandler代理類中執(zhí)行。
?
?