鄭州的網(wǎng)站建設(shè)公司哪家好軟文寫作網(wǎng)站
一、上下文
《Kafka-broker粗粒度啟動(dòng)流程》博客中我們分析了broker的大致啟動(dòng)流程,這個(gè)時(shí)候每個(gè)broker都不是controller角色,下面我們就來看下它是如何選舉出來的吧
二、設(shè)置ZooKeeper
?ZooKeeper是一個(gè)開源的分布式協(xié)調(diào)服務(wù),主要用于分布式系統(tǒng)中各節(jié)點(diǎn)的協(xié)調(diào)和管理。Kafka的Controller選舉也一樣用到了它。
override def startup(): Unit = {//....initZkClient(time)configRepository = new ZkConfigRepository(new AdminZkClient(zkClient))//....}private def initZkClient(time: Time): Unit = {//config.zkConnect//zookeeper.connect=hostname1:2181,hostname2:2181,hostname2:2181/kafkainfo(s"Connecting to zookeeper on ${config.zkConnect}")_zkClient = KafkaZkClient.createZkClient("Kafka server", time, config, zkClientConfig)//如果需要,在ZK中預(yù)先創(chuàng)建頂級(jí)路徑。_zkClient.createTopLevelPaths()}def createTopLevelPaths(): Unit = {//創(chuàng)建 Persistent 持久化的 zk 路徑ZkData.PersistentZkPaths.foreach(makeSurePersistentPathExists)}//確保ZK中存在持久路徑def makeSurePersistentPathExists(path: String): Unit = {createRecursive(path, data = null, throwIfPathExists = false)}
1、KafkaZkClient
KafkaZkClient是在Kafka.zookeeper.ZooKeeperClient之上提供更高級(jí)別的Kafka特定操作。
實(shí)現(xiàn)說明:此類包括各種組件(Controller, Configs, Old Consumer等)的方法,在某些情況下會(huì)從調(diào)用包中返回類的實(shí)例。
def createZkClient(name: String, time: Time, config: KafkaConfig, zkClientConfig: ZKClientConfig): KafkaZkClient = {//...KafkaZkClient(...)}
2、AdminZkClient
它提供與ZooKeeper交互的管理員相關(guān)方法。
class AdminZkClient(...){//創(chuàng)建topicdef createTopic(...){...}//獲取broker元數(shù)據(jù)def getBrokerMetadatas(...){...}//創(chuàng)建主題并可選地驗(yàn)證其參數(shù)。請(qǐng)注意,TopicCommand也使用此方法。def createTopicWithAssignment(...){...}//驗(yàn)證主題創(chuàng)建參數(shù)def validateTopicCreate(...){...}//刪除topic //為給定主題創(chuàng)建刪除路徑def deleteTopic(...){...}//使用可選的副本分配向現(xiàn)有主題添加分區(qū)。請(qǐng)注意,TopicCommand使用此方法。def addPartitions(...){...}//將broker從實(shí)體名稱解析為整數(shù)iddef parseBroker(...){...}//.....
}
3、ZkConfigRepository
zookeeper的配置倉庫,也就是kafka在zookeeper中配置信息。
class ZkConfigRepository(adminZkClient: AdminZkClient) extends ConfigRepository {override def config(configResource: ConfigResource): Properties = {//....//從zookeeper的目錄下讀取數(shù)據(jù),并封裝成實(shí)體(topic、broker、client-id、user、user/clients/client-id、ip)adminZkClient.fetchEntityConfig(configTypeForZk, effectiveName)}
}
4、ZkData
object ZkData {//這些是kafka broker 啟動(dòng)時(shí)應(yīng)該存在的持久ZK路徑。val PersistentZkPaths: Seq[String] = Seq(ConsumerPathZNode.path, // old consumer pathBrokerIdsZNode.path,TopicsZNode.path,ConfigEntityChangeNotificationZNode.path,DeleteTopicsZNode.path,BrokerSequenceIdZNode.path,IsrChangeNotificationZNode.path,ProducerIdBlockZNode.path,LogDirEventNotificationZNode.path) ++ ConfigType.ALL.asScala.map(ConfigEntityTypeZNode.path)
}//舊的consumer在zk上的路徑
object ConsumerPathZNode {def path = "/consumers"
}object BrokerIdsZNode {def path = s"${BrokersZNode.path}/ids"def encode: Array[Byte] = null
}object TopicsZNode {def path = s"${BrokersZNode.path}/topics"
}object ConfigEntityChangeNotificationZNode {def path = s"${ConfigZNode.path}/changes"
}object DeleteTopicsZNode {def path = s"${AdminZNode.path}/delete_topics"
}object BrokerSequenceIdZNode {def path = s"${BrokersZNode.path}/seqid"
}object IsrChangeNotificationZNode {def path = "/isr_change_notification"
}object ProducerIdBlockZNode {val CurrentVersion: Long = 1Ldef path = "/latest_producer_id_block"def generateProducerIdBlockJson(producerIdBlock: ProducerIdsBlock): Array[Byte] = {Json.encodeAsBytes(Map("version" -> CurrentVersion,"broker" -> producerIdBlock.assignedBrokerId,"block_start" -> producerIdBlock.firstProducerId.toString,"block_end" -> producerIdBlock.lastProducerId.toString).asJava)}object LogDirEventNotificationZNode {def path = "/log_dir_event_notification"
}
三、驗(yàn)證元數(shù)據(jù)屬性集成是否有效
1、meta.properties文件是否始終設(shè)置了cluster.id
2、meta.properties文件是否始終設(shè)置了node.id或者?broker.id
initialMetaPropsEnsemble.verify(Optional.of(_clusterId), verificationId, verificationFlags)
四、動(dòng)態(tài)broker初始化
動(dòng)態(tài)broker配置存儲(chǔ)在ZooKeeper中,可以在兩個(gè)級(jí)別定義:
1、每個(gè)代理的配置持久化在/configs/brokers/{brokerId} 這些可以使用AdminClient使用資源名稱brokerId進(jìn)行描述/更改
2、整個(gè)集群的默認(rèn)值持續(xù)存在于/configs/brokers/<default> 這些可以使用AdminClient使用空資源名稱進(jìn)行描述/更改。
broker配置的優(yōu)先級(jí)順序?yàn)?
1、DYNAMIC_BROKER_CONFIG:存儲(chǔ)在ZK中的/configs/brokers/{brokerId}
2、DYNAMIC_DEFAULT_BROKER_CONFIG: 存儲(chǔ)在ZK中的//configs/brokers/<default>
3、STATIC_BROKER_CONFIG:啟動(dòng)代理時(shí)使用的屬性,通常來自server.properties文件
4、DEFAULT_CONFIG:KafkaConfig中定義的默認(rèn)配置
config.dynamicConfig.initialize(Some(zkClient), clientMetricsReceiverPluginOpt = None)
五、啟動(dòng)KafkaController
_kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, metadataCache, threadNamePrefix)
kafkaController.startup()
1、KafkaController結(jié)構(gòu)
class KafkaController(...){//事件管理private[controller] val eventManager = new ControllerEventManager(config.brokerId, this, time,controllerContext.stats.rateAndTimeMetrics)//如果brokerid = 當(dāng)前的controllerid 那么就返回truedef isActive: Boolean = activeControllerId == config.brokerId@volatile private var brokerInfo = initialBrokerInfo@volatile private var _brokerEpoch = initialBrokerEpoch//啟動(dòng)def startup(): Unit = {zkClient.registerStateChangeHandler(new StateChangeHandler {//ControllerHandler = "controller-state-change-handler"override val name: String = StateChangeHandlers.ControllerHandleroverride def afterInitializingSession(): Unit = {eventManager.put(RegisterBrokerAndReelect)}override def beforeInitializingSession(): Unit = {val queuedEvent = eventManager.clearAndPut(Expire)//阻止新會(huì)話的初始化,直到處理過期事件,這確保在創(chuàng)建新會(huì)話之前已處理所有掛起的事件queuedEvent.awaitProcessing()}})eventManager.put(Startup)eventManager.start()}override def process(event: ControllerEvent): Unit = {event match {//.....case RegisterBrokerAndReelect =>processRegisterBrokerAndReelect()case Startup =>processStartup()//.....}}
}
1、ControllerEventManager結(jié)構(gòu)
class ControllerEventManager(...){//用串行化隊(duì)列代替鎖private val queue = new LinkedBlockingQueue[QueuedEvent]//ControllerEventThreadName = "controller-event-thread"private[controller] var thread = new ControllerEventThread(ControllerEventThreadName)def start(): Unit = thread.start()def put(event: ControllerEvent): QueuedEvent = inLock(putLock) {val queuedEvent = new QueuedEvent(event, time.milliseconds())queue.put(queuedEvent)queuedEvent}class ControllerEventThread(name: String) extends ShutdownableThread(...){override def doWork(): Unit = {//從隊(duì)列獲取事件,主要是controller相關(guān)的事件val dequeued = pollFromEventQueue()dequeued.event match {case controllerEvent =>def process(): Unit = dequeued.process(processor)}}}}
}
ControllerEventManager中的ControllerEventThread的父類是ShutdownableThread,它里面有真正的run()且調(diào)起了doWork(),doWork()又調(diào)起了process(),因此真正執(zhí)行的是process()
public abstract class ShutdownableThread extends Thread {public abstract void doWork();public void run() {while (isRunning())doWork();}
}
這是一個(gè)死循環(huán),也就是后面只要往隊(duì)列中添加事件,會(huì)自動(dòng)執(zhí)行對(duì)應(yīng)方法。從KafkaController的startup()中我們知道放了兩個(gè)事件:RegisterBrokerAndReelect和Startup,下面我們來看看它們里面做了什么
2、RegisterBrokerAndReelect事件處理
private def processRegisterBrokerAndReelect(): Unit = {_brokerEpoch = zkClient.registerBroker(brokerInfo)processReelect()}
1、向zookeeper注冊(cè)broker
class KafkaZkClient private[zk] (...{def registerBroker(brokerInfo: BrokerInfo): Long = {//brokers/ids/brokeridval path = brokerInfo.path//創(chuàng)建 對(duì)應(yīng)的 brokerid 的 臨時(shí)znode節(jié)點(diǎn),說明:當(dāng)該brokers掛掉后會(huì)隨之消失val stat = checkedEphemeralCreate(path, brokerInfo.toJsonBytes)info(s"Registered broker ${brokerInfo.broker.id} at path $path with addresses: " +s"${brokerInfo.broker.endPoints.map(_.connectionString).mkString(",")}, czxid (broker epoch): ${stat.getCzxid}")//返回czxid (broker epoch)stat.getCzxid}}
2、開始選舉
class KafkaController(...){private def processReelect(): Unit = {maybeResign()elect()}private def maybeResign(): Unit = {val wasActiveBeforeChange = isActive//在zk上注冊(cè)節(jié)點(diǎn)改變事件,當(dāng)controller改變時(shí)觸發(fā),為下面的選舉做鋪墊zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)activeControllerId = zkClient.getControllerId.getOrElse(-1)if (wasActiveBeforeChange && !isActive) {//當(dāng)當(dāng)前broker辭去controller職務(wù)時(shí)觸發(fā)onControllerResignation()}}private def elect(): Unit = {//獲取 活動(dòng)狀態(tài) contoller ,如果 集群已經(jīng)啟動(dòng)了很長(zhǎng)時(shí)間,新增了一臺(tái)broker,那么此時(shí)會(huì)獲得 當(dāng)下的controller ,//如果此時(shí)集群剛剛啟動(dòng),那么此時(shí)沒有 活動(dòng)狀態(tài)的 controller ,返回的結(jié)果就是 -1activeControllerId = zkClient.getControllerId.getOrElse(-1)/** 我們可以在初始啟動(dòng)和handleDeleted ZK回調(diào)期間到達(dá)這里。由于潛在的競(jìng)爭(zhēng)條件,當(dāng)我們到達(dá)這里時(shí),控制器可能已經(jīng)被選中了。如果此代理已經(jīng)是控制器,則此檢查將防止以下createEphemeralPath方法進(jìn)入無限循環(huán)。*/if (activeControllerId != -1) {//如果當(dāng)下已經(jīng)有 activeControllerId 那么就停止選舉 ,否則繼續(xù)往下走//Broker $activeControllerId 已被選為控制器,因此停止選舉過程debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")return}//try中會(huì)發(fā)生如下情況//1、正常運(yùn)行:當(dāng)選controller//2、異常:// 1、ControllerMovedException // 1、其他broker成功當(dāng)選controller// 2、controller已經(jīng)當(dāng)選,但剛剛離職,需要重新選舉// 2、Throwable 該節(jié)點(diǎn)當(dāng)選controller,但是就職時(shí)出錯(cuò)了。刪除該controller,重新選舉// try {val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId)controllerContext.epoch = epochcontrollerContext.epochZkVersion = epochZkVersionactiveControllerId = config.brokerId//${config.brokerId}已成功當(dāng)選為控制器。Epoch增加到${controllerContext.eepoch},Epoch zk版本現(xiàn)在是${controller Context.eepoch ZkVersion}”info(s"${config.brokerId} successfully elected as the controller. Epoch incremented to ${controllerContext.epoch} " +s"and epoch zk version is now ${controllerContext.epochZkVersion}")//成功當(dāng)選controller,并開始履行作為該角色的責(zé)任onControllerFailover()} catch {case e: ControllerMovedException =>//重新開始監(jiān)聽目錄變化maybeResign()if (activeControllerId != -1)debug(s"代理$activeControllerId被選為控制器,而不是代理${config.brokerId}", e)elsewarn("管制員已經(jīng)當(dāng)選,但剛剛辭職,這將導(dǎo)致另一輪選舉", e)case t: Throwable =>error(s"在代理${config.brokerId}上選擇或成為控制器時(shí)出錯(cuò)。立即觸發(fā)控制器移動(dòng)", t)triggerControllerMove()}}}
在選舉前zkClient注冊(cè)的 controllerChangeHandler 事件其實(shí)就是觀察 controller目錄的變化
class ControllerChangeHandler(eventManager: ControllerEventManager) extends ZNodeChangeHandler {//controller目錄override val path: String = ControllerZNode.pathoverride def handleCreation(): Unit = eventManager.put(ControllerChange)override def handleDeletion(): Unit = eventManager.put(Reelect)override def handleDataChange(): Unit = eventManager.put(ControllerChange)
}
六、總結(jié)
1、設(shè)置zookeeper,如:zookeeper.connect=hostname1:2181,hostname2:2181,hostname2:2181/kafka并創(chuàng)建持久化目錄:consumers、brokers/ids、brokers/topics、config/changes、admin/delete_topics、brokers/seqid、isr_change_notification、latest_producer_id_block、log_dir_event_notification
2、驗(yàn)證元數(shù)據(jù)屬性集成是否有效,主要時(shí)看每個(gè)broker是否有了唯一的id
3、將每個(gè)broker的id注冊(cè)到zookeeper
4、啟動(dòng)KafkaController
5、啟動(dòng)ControllerEventThread線程并不斷消費(fèi)LinkedBlockingQueue中事件
6、向隊(duì)列注冊(cè)RegisterBrokerAndReelect事件、Startup事件
7、首先處理RegisterBrokerAndReelect事件
8、向zookeeper注冊(cè)broker,并建立臨時(shí)znode
9、注冊(cè)controllerChangeHandler 事件其實(shí)就是觀察 controller目錄的變化
10、每個(gè)broker開始向zookeeper將自己注冊(cè)為controller
11、正常情況下只有一個(gè)broker成功注冊(cè)成功,其他broker拋出ControllerMovedException繼續(xù)監(jiān)控controller目錄的變化
12、如果選舉controller成功,但是在就職時(shí)失敗會(huì)里面進(jìn)行卸任工作,并進(jìn)行新一輪選舉