中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

鄭州的網(wǎng)站建設(shè)公司哪家好軟文寫作網(wǎng)站

鄭州的網(wǎng)站建設(shè)公司哪家好,軟文寫作網(wǎng)站,做網(wǎng)站收費(fèi)標(biāo),網(wǎng)站建設(shè)專業(yè)公司哪家好一、上下文 《Kafka-broker粗粒度啟動(dòng)流程》博客中我們分析了broker的大致啟動(dòng)流程,這個(gè)時(shí)候每個(gè)broker都不是controller角色,下面我們就來看下它是如何選舉出來的吧 二、設(shè)置ZooKeeper ?ZooKeeper是一個(gè)開源的分布式協(xié)調(diào)服務(wù),主要用于分…

一、上下文

《Kafka-broker粗粒度啟動(dòng)流程》博客中我們分析了broker的大致啟動(dòng)流程,這個(gè)時(shí)候每個(gè)broker都不是controller角色,下面我們就來看下它是如何選舉出來的吧

二、設(shè)置ZooKeeper

?ZooKeeper是一個(gè)開源的分布式協(xié)調(diào)服務(wù),主要用于分布式系統(tǒng)中各節(jié)點(diǎn)的協(xié)調(diào)和管理。Kafka的Controller選舉也一樣用到了它。

  override def startup(): Unit = {//....initZkClient(time)configRepository = new ZkConfigRepository(new AdminZkClient(zkClient))//....}private def initZkClient(time: Time): Unit = {//config.zkConnect//zookeeper.connect=hostname1:2181,hostname2:2181,hostname2:2181/kafkainfo(s"Connecting to zookeeper on ${config.zkConnect}")_zkClient = KafkaZkClient.createZkClient("Kafka server", time, config, zkClientConfig)//如果需要,在ZK中預(yù)先創(chuàng)建頂級(jí)路徑。_zkClient.createTopLevelPaths()}def createTopLevelPaths(): Unit = {//創(chuàng)建 Persistent 持久化的 zk 路徑ZkData.PersistentZkPaths.foreach(makeSurePersistentPathExists)}//確保ZK中存在持久路徑def makeSurePersistentPathExists(path: String): Unit = {createRecursive(path, data = null, throwIfPathExists = false)}

1、KafkaZkClient

KafkaZkClient是在Kafka.zookeeper.ZooKeeperClient之上提供更高級(jí)別的Kafka特定操作。

實(shí)現(xiàn)說明:此類包括各種組件(Controller, Configs, Old Consumer等)的方法,在某些情況下會(huì)從調(diào)用包中返回類的實(shí)例。

  def createZkClient(name: String, time: Time, config: KafkaConfig, zkClientConfig: ZKClientConfig): KafkaZkClient = {//...KafkaZkClient(...)}

2、AdminZkClient

它提供與ZooKeeper交互的管理員相關(guān)方法。

class AdminZkClient(...){//創(chuàng)建topicdef createTopic(...){...}//獲取broker元數(shù)據(jù)def getBrokerMetadatas(...){...}//創(chuàng)建主題并可選地驗(yàn)證其參數(shù)。請(qǐng)注意,TopicCommand也使用此方法。def createTopicWithAssignment(...){...}//驗(yàn)證主題創(chuàng)建參數(shù)def validateTopicCreate(...){...}//刪除topic //為給定主題創(chuàng)建刪除路徑def deleteTopic(...){...}//使用可選的副本分配向現(xiàn)有主題添加分區(qū)。請(qǐng)注意,TopicCommand使用此方法。def addPartitions(...){...}//將broker從實(shí)體名稱解析為整數(shù)iddef parseBroker(...){...}//.....
}

3、ZkConfigRepository

zookeeper的配置倉庫,也就是kafka在zookeeper中配置信息。

class ZkConfigRepository(adminZkClient: AdminZkClient) extends ConfigRepository {override def config(configResource: ConfigResource): Properties = {//....//從zookeeper的目錄下讀取數(shù)據(jù),并封裝成實(shí)體(topic、broker、client-id、user、user/clients/client-id、ip)adminZkClient.fetchEntityConfig(configTypeForZk, effectiveName)}
}

4、ZkData

object ZkData {//這些是kafka broker 啟動(dòng)時(shí)應(yīng)該存在的持久ZK路徑。val PersistentZkPaths: Seq[String] = Seq(ConsumerPathZNode.path, // old consumer pathBrokerIdsZNode.path,TopicsZNode.path,ConfigEntityChangeNotificationZNode.path,DeleteTopicsZNode.path,BrokerSequenceIdZNode.path,IsrChangeNotificationZNode.path,ProducerIdBlockZNode.path,LogDirEventNotificationZNode.path) ++ ConfigType.ALL.asScala.map(ConfigEntityTypeZNode.path)
}//舊的consumer在zk上的路徑
object ConsumerPathZNode {def path = "/consumers"
}object BrokerIdsZNode {def path = s"${BrokersZNode.path}/ids"def encode: Array[Byte] = null
}object TopicsZNode {def path = s"${BrokersZNode.path}/topics"
}object ConfigEntityChangeNotificationZNode {def path = s"${ConfigZNode.path}/changes"
}object DeleteTopicsZNode {def path = s"${AdminZNode.path}/delete_topics"
}object BrokerSequenceIdZNode {def path = s"${BrokersZNode.path}/seqid"
}object IsrChangeNotificationZNode {def path = "/isr_change_notification"
}object ProducerIdBlockZNode {val CurrentVersion: Long = 1Ldef path = "/latest_producer_id_block"def generateProducerIdBlockJson(producerIdBlock: ProducerIdsBlock): Array[Byte] = {Json.encodeAsBytes(Map("version" -> CurrentVersion,"broker" -> producerIdBlock.assignedBrokerId,"block_start" -> producerIdBlock.firstProducerId.toString,"block_end" -> producerIdBlock.lastProducerId.toString).asJava)}object LogDirEventNotificationZNode {def path = "/log_dir_event_notification"
}

三、驗(yàn)證元數(shù)據(jù)屬性集成是否有效

1、meta.properties文件是否始終設(shè)置了cluster.id

2、meta.properties文件是否始終設(shè)置了node.id或者?broker.id

initialMetaPropsEnsemble.verify(Optional.of(_clusterId), verificationId, verificationFlags)

四、動(dòng)態(tài)broker初始化

動(dòng)態(tài)broker配置存儲(chǔ)在ZooKeeper中,可以在兩個(gè)級(jí)別定義:

1、每個(gè)代理的配置持久化在/configs/brokers/{brokerId} 這些可以使用AdminClient使用資源名稱brokerId進(jìn)行描述/更改

2、整個(gè)集群的默認(rèn)值持續(xù)存在于/configs/brokers/<default> 這些可以使用AdminClient使用空資源名稱進(jìn)行描述/更改。

broker配置的優(yōu)先級(jí)順序?yàn)?

1、DYNAMIC_BROKER_CONFIG:存儲(chǔ)在ZK中的/configs/brokers/{brokerId}

2、DYNAMIC_DEFAULT_BROKER_CONFIG: 存儲(chǔ)在ZK中的//configs/brokers/<default>

3、STATIC_BROKER_CONFIG:啟動(dòng)代理時(shí)使用的屬性,通常來自server.properties文件

4、DEFAULT_CONFIG:KafkaConfig中定義的默認(rèn)配置

config.dynamicConfig.initialize(Some(zkClient), clientMetricsReceiverPluginOpt = None)

五、啟動(dòng)KafkaController

_kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, metadataCache, threadNamePrefix)
kafkaController.startup()

1、KafkaController結(jié)構(gòu)

class KafkaController(...){//事件管理private[controller] val eventManager = new ControllerEventManager(config.brokerId, this, time,controllerContext.stats.rateAndTimeMetrics)//如果brokerid = 當(dāng)前的controllerid 那么就返回truedef isActive: Boolean = activeControllerId == config.brokerId@volatile private var brokerInfo = initialBrokerInfo@volatile private var _brokerEpoch = initialBrokerEpoch//啟動(dòng)def startup(): Unit = {zkClient.registerStateChangeHandler(new StateChangeHandler {//ControllerHandler = "controller-state-change-handler"override val name: String = StateChangeHandlers.ControllerHandleroverride def afterInitializingSession(): Unit = {eventManager.put(RegisterBrokerAndReelect)}override def beforeInitializingSession(): Unit = {val queuedEvent = eventManager.clearAndPut(Expire)//阻止新會(huì)話的初始化,直到處理過期事件,這確保在創(chuàng)建新會(huì)話之前已處理所有掛起的事件queuedEvent.awaitProcessing()}})eventManager.put(Startup)eventManager.start()}override def process(event: ControllerEvent): Unit = {event match {//.....case RegisterBrokerAndReelect =>processRegisterBrokerAndReelect()case Startup =>processStartup()//.....}}
}

1、ControllerEventManager結(jié)構(gòu)

class ControllerEventManager(...){//用串行化隊(duì)列代替鎖private val queue = new LinkedBlockingQueue[QueuedEvent]//ControllerEventThreadName = "controller-event-thread"private[controller] var thread = new ControllerEventThread(ControllerEventThreadName)def start(): Unit = thread.start()def put(event: ControllerEvent): QueuedEvent = inLock(putLock) {val queuedEvent = new QueuedEvent(event, time.milliseconds())queue.put(queuedEvent)queuedEvent}class ControllerEventThread(name: String)  extends ShutdownableThread(...){override def doWork(): Unit = {//從隊(duì)列獲取事件,主要是controller相關(guān)的事件val dequeued = pollFromEventQueue()dequeued.event match {case controllerEvent =>def process(): Unit = dequeued.process(processor)}}}}
}

ControllerEventManager中的ControllerEventThread的父類是ShutdownableThread,它里面有真正的run()且調(diào)起了doWork(),doWork()又調(diào)起了process(),因此真正執(zhí)行的是process()

public abstract class ShutdownableThread extends Thread {public abstract void doWork();public void run() {while (isRunning())doWork();}
}

這是一個(gè)死循環(huán),也就是后面只要往隊(duì)列中添加事件,會(huì)自動(dòng)執(zhí)行對(duì)應(yīng)方法。從KafkaController的startup()中我們知道放了兩個(gè)事件:RegisterBrokerAndReelect和Startup,下面我們來看看它們里面做了什么

2、RegisterBrokerAndReelect事件處理

  private def processRegisterBrokerAndReelect(): Unit = {_brokerEpoch = zkClient.registerBroker(brokerInfo)processReelect()}

1、向zookeeper注冊(cè)broker

class KafkaZkClient private[zk] (...{def registerBroker(brokerInfo: BrokerInfo): Long = {//brokers/ids/brokeridval path = brokerInfo.path//創(chuàng)建 對(duì)應(yīng)的 brokerid 的 臨時(shí)znode節(jié)點(diǎn),說明:當(dāng)該brokers掛掉后會(huì)隨之消失val stat = checkedEphemeralCreate(path, brokerInfo.toJsonBytes)info(s"Registered broker ${brokerInfo.broker.id} at path $path with addresses: " +s"${brokerInfo.broker.endPoints.map(_.connectionString).mkString(",")}, czxid (broker epoch): ${stat.getCzxid}")//返回czxid (broker epoch)stat.getCzxid}}

2、開始選舉

class KafkaController(...){private def processReelect(): Unit = {maybeResign()elect()}private def maybeResign(): Unit = {val wasActiveBeforeChange = isActive//在zk上注冊(cè)節(jié)點(diǎn)改變事件,當(dāng)controller改變時(shí)觸發(fā),為下面的選舉做鋪墊zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)activeControllerId = zkClient.getControllerId.getOrElse(-1)if (wasActiveBeforeChange && !isActive) {//當(dāng)當(dāng)前broker辭去controller職務(wù)時(shí)觸發(fā)onControllerResignation()}}private def elect(): Unit = {//獲取 活動(dòng)狀態(tài) contoller ,如果 集群已經(jīng)啟動(dòng)了很長(zhǎng)時(shí)間,新增了一臺(tái)broker,那么此時(shí)會(huì)獲得 當(dāng)下的controller ,//如果此時(shí)集群剛剛啟動(dòng),那么此時(shí)沒有 活動(dòng)狀態(tài)的 controller ,返回的結(jié)果就是  -1activeControllerId = zkClient.getControllerId.getOrElse(-1)/** 我們可以在初始啟動(dòng)和handleDeleted ZK回調(diào)期間到達(dá)這里。由于潛在的競(jìng)爭(zhēng)條件,當(dāng)我們到達(dá)這里時(shí),控制器可能已經(jīng)被選中了。如果此代理已經(jīng)是控制器,則此檢查將防止以下createEphemeralPath方法進(jìn)入無限循環(huán)。*/if (activeControllerId != -1) {//如果當(dāng)下已經(jīng)有 activeControllerId 那么就停止選舉 ,否則繼續(xù)往下走//Broker $activeControllerId 已被選為控制器,因此停止選舉過程debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")return}//try中會(huì)發(fā)生如下情況//1、正常運(yùn)行:當(dāng)選controller//2、異常://    1、ControllerMovedException //       1、其他broker成功當(dāng)選controller//       2、controller已經(jīng)當(dāng)選,但剛剛離職,需要重新選舉//    2、Throwable  該節(jié)點(diǎn)當(dāng)選controller,但是就職時(shí)出錯(cuò)了。刪除該controller,重新選舉//       try {val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId)controllerContext.epoch = epochcontrollerContext.epochZkVersion = epochZkVersionactiveControllerId = config.brokerId//${config.brokerId}已成功當(dāng)選為控制器。Epoch增加到${controllerContext.eepoch},Epoch zk版本現(xiàn)在是${controller Context.eepoch ZkVersion}”info(s"${config.brokerId} successfully elected as the controller. Epoch incremented to ${controllerContext.epoch} " +s"and epoch zk version is now ${controllerContext.epochZkVersion}")//成功當(dāng)選controller,并開始履行作為該角色的責(zé)任onControllerFailover()} catch {case e: ControllerMovedException =>//重新開始監(jiān)聽目錄變化maybeResign()if (activeControllerId != -1)debug(s"代理$activeControllerId被選為控制器,而不是代理${config.brokerId}", e)elsewarn("管制員已經(jīng)當(dāng)選,但剛剛辭職,這將導(dǎo)致另一輪選舉", e)case t: Throwable =>error(s"在代理${config.brokerId}上選擇或成為控制器時(shí)出錯(cuò)。立即觸發(fā)控制器移動(dòng)", t)triggerControllerMove()}}}

在選舉前zkClient注冊(cè)的 controllerChangeHandler 事件其實(shí)就是觀察 controller目錄的變化

class ControllerChangeHandler(eventManager: ControllerEventManager) extends ZNodeChangeHandler {//controller目錄override val path: String = ControllerZNode.pathoverride def handleCreation(): Unit = eventManager.put(ControllerChange)override def handleDeletion(): Unit = eventManager.put(Reelect)override def handleDataChange(): Unit = eventManager.put(ControllerChange)
}

六、總結(jié)

1、設(shè)置zookeeper,如:zookeeper.connect=hostname1:2181,hostname2:2181,hostname2:2181/kafka并創(chuàng)建持久化目錄:consumers、brokers/ids、brokers/topics、config/changes、admin/delete_topics、brokers/seqid、isr_change_notification、latest_producer_id_block、log_dir_event_notification

2、驗(yàn)證元數(shù)據(jù)屬性集成是否有效,主要時(shí)看每個(gè)broker是否有了唯一的id

3、將每個(gè)broker的id注冊(cè)到zookeeper

4、啟動(dòng)KafkaController

5、啟動(dòng)ControllerEventThread線程并不斷消費(fèi)LinkedBlockingQueue中事件

6、向隊(duì)列注冊(cè)RegisterBrokerAndReelect事件、Startup事件

7、首先處理RegisterBrokerAndReelect事件

8、向zookeeper注冊(cè)broker,并建立臨時(shí)znode

9、注冊(cè)controllerChangeHandler 事件其實(shí)就是觀察 controller目錄的變化

10、每個(gè)broker開始向zookeeper將自己注冊(cè)為controller

11、正常情況下只有一個(gè)broker成功注冊(cè)成功,其他broker拋出ControllerMovedException繼續(xù)監(jiān)控controller目錄的變化

12、如果選舉controller成功,但是在就職時(shí)失敗會(huì)里面進(jìn)行卸任工作,并進(jìn)行新一輪選舉

http://www.risenshineclean.com/news/54266.html

相關(guān)文章:

  • 巴中微信網(wǎng)站建設(shè)百度電話號(hào)碼
  • 南昌網(wǎng)站建設(shè)業(yè)務(wù)怎么做app推廣代理
  • python做網(wǎng)站掙錢91永久免費(fèi)海外地域網(wǎng)名
  • 英文網(wǎng)站怎么做seo網(wǎng)絡(luò)營銷策劃書1000字
  • 全flash網(wǎng)站制作制作網(wǎng)頁的流程
  • 可以做ppt的網(wǎng)站有哪些內(nèi)容百度貼吧官網(wǎng)app下載
  • 佛山當(dāng)?shù)鼐W(wǎng)站建設(shè)公司網(wǎng)絡(luò)營銷的四種形式
  • 皮膚自做頭像的網(wǎng)站友情鏈接是啥意思
  • 商場(chǎng)網(wǎng)站開發(fā)教程搜索引擎優(yōu)化seo專員
  • 新浪云 建設(shè)網(wǎng)站發(fā)布推廣信息的網(wǎng)站
  • 科技公司網(wǎng)站源碼百度競(jìng)價(jià)推廣課程
  • 石家莊做淘寶網(wǎng)站百度首頁優(yōu)化排名
  • wap網(wǎng)站 區(qū)別愛站網(wǎng)長(zhǎng)尾關(guān)鍵詞挖掘工具
  • 公司網(wǎng)站模板下載全網(wǎng)軟文推廣
  • 上海網(wǎng)站建設(shè) 億速網(wǎng)絡(luò)推廣合同
  • java語言做網(wǎng)站企業(yè)建站公司熱線電話
  • 電腦網(wǎng)站設(shè)計(jì)頁面抖音關(guān)鍵詞優(yōu)化排名靠前
  • 服務(wù)型網(wǎng)站的營銷特點(diǎn)域名免費(fèi)注冊(cè)
  • 博客和網(wǎng)站有什么不同百度廣告點(diǎn)擊軟件源碼
  • 網(wǎng)站建設(shè)開發(fā)有限公司線下推廣方式有哪些
  • wordpress 4.8中文版高級(jí)seo是什么職位
  • 成都網(wǎng)站優(yōu)化報(bào)價(jià)營銷策劃師
  • 安徽省住房與城鄉(xiāng)建設(shè)網(wǎng)站網(wǎng)絡(luò)推廣怎么做
  • 上海網(wǎng)站建設(shè)的企濟(jì)南網(wǎng)站建設(shè)方案
  • 網(wǎng)站和webapp的區(qū)別網(wǎng)上推廣平臺(tái)
  • 網(wǎng)站建設(shè)詳細(xì)需求文檔東莞做網(wǎng)站哪個(gè)公司好
  • 全屏網(wǎng)站尺寸鄭州seo哪家專業(yè)
  • 如何提升網(wǎng)站速度女生讀網(wǎng)絡(luò)營銷與電商直播
  • 珠江現(xiàn)代建設(shè) 雜志社網(wǎng)站石家莊seo網(wǎng)絡(luò)優(yōu)化的公司
  • 科技打破壟斷全球的霸權(quán)鄭州seo排名優(yōu)化公司