中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

扁平風(fēng)網(wǎng)站哪家培訓(xùn)機構(gòu)學(xué)校好

扁平風(fēng)網(wǎng)站,哪家培訓(xùn)機構(gòu)學(xué)校好,公司網(wǎng)絡(luò)推廣公司,阿里云ace+wordpress1.前言 RouteInfoManager 是 RocketMQ 中 NameServer 的核心組件之一,主要負(fù)責(zé)管理和維護整個 RocketMQ 集群的路由元數(shù)據(jù)信息。里面包含一些非常核心的功能:存儲和管理 Broker 信息(broker的注冊,broker心跳的維護)&…

1.前言

RouteInfoManager?是 RocketMQ 中?NameServer?的核心組件之一,主要負(fù)責(zé)管理和維護整個 RocketMQ 集群的路由元數(shù)據(jù)信息。里面包含一些非常核心的功能:存儲和管理 Broker 信息(broker的注冊,broker心跳的維護);維護 Topic 的路由信息(topic的創(chuàng)建和更新,topic路由信息的查詢);管理隊列信息,管理集群信息等。

2.內(nèi)部數(shù)據(jù)結(jié)構(gòu)

public class RouteInfoManager {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);// broker長連接過期時間 長連接的空閑時間是2分鐘private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;//讀寫鎖private final ReadWriteLock lock = new ReentrantReadWriteLock();// 創(chuàng)建topic 以后 topic是邏輯上的概念 一個topic會有多個Queue Queue會分散到不同的broker上private final HashMap<String/* topic */, Map<String /* brokerName */ , QueueData>> topicQueueTable;//  代表的broker組的信息 BrokerData包含了一組Broker的信息private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;// 一個NameServer可以管理多個broker組 通常來說一個Cluster就可以了// 有可能會有很多復(fù)雜的業(yè)務(wù)場景 多個Clusterprivate final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;//管理Broker的長連接心跳 是否還有心跳private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;// Filter Server 是rocketMQ的一個高級功能,用來過濾消息//一般情況下 我們是可以基于tag進行數(shù)據(jù)篩選的操作,比較簡單,沒有辦法進行更加細化的過濾//這個Filter Server是在每臺Broker機器上啟動一個(或者多個)Filter Server//我們可以把一個自定義的消息篩選的class 上傳到Filter server上,在進行數(shù)據(jù)消費的時候讓Broker把數(shù)據(jù)先傳輸?shù)紽ilter Server// Filter Server會根據(jù)你自定義的class來進行細粒度的數(shù)據(jù)篩選,把篩選后的數(shù)據(jù)回傳給你的消費端private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}

3.核心方法

? ? 3.1?getAllClusterInfo

  /*** 返回的是 broker的cluster信息* 里面包含的是HashMap<String //brokerName//  BrokerData> brokerAddrTable* HashMap<String  //clusterName// , Set<String //brokerName// >> clusterAddrTable* @return*/public ClusterInfo getAllClusterInfo() {ClusterInfo clusterInfoSerializeWrapper = new ClusterInfo();clusterInfoSerializeWrapper.setBrokerAddrTable(this.brokerAddrTable);clusterInfoSerializeWrapper.setClusterAddrTable(this.clusterAddrTable);return clusterInfoSerializeWrapper;}

3.2 deleteTopic

  /*** 刪除某個topic 直接操作topicQueueTable的hashMap* @param topic*/public void deleteTopic(final String topic) {try {try {this.lock.writeLock().lockInterruptibly();this.topicQueueTable.remove(topic);} finally {this.lock.writeLock().unlock();}} catch (Exception e) {log.error("deleteTopic Exception", e);}}public void deleteTopic(final String topic, final String clusterName) {try {try {this.lock.writeLock().lockInterruptibly();Set<String> brokerNames = this.clusterAddrTable.get(clusterName);if (brokerNames != null&& !brokerNames.isEmpty()) {Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topic);if (queueDataMap != null) {for (String brokerName : brokerNames) {final QueueData removedQD = queueDataMap.remove(brokerName);if (removedQD != null) {log.info("deleteTopic, remove one broker's topic {} {} {}", brokerName, topic,removedQD);}}if (queueDataMap.isEmpty()) {log.info("deleteTopic, remove the topic all queue {} {}", clusterName, topic);this.topicQueueTable.remove(topic);}}}} finally {this.lock.writeLock().unlock();}} catch (Exception e) {log.error("deleteTopic Exception", e);}}

3.3?getAllTopicList

   /*** 查詢所有的topic的列表信息* @return*/public TopicList getAllTopicList() {TopicList topicList = new TopicList();try {try {this.lock.readLock().lockInterruptibly();topicList.getTopicList().addAll(this.topicQueueTable.keySet());} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("getAllTopicList Exception", e);}return topicList;}

3.4? registerBroker

? ?詳細的注冊流程 可以看我以前的博客:RocketMQ中的NameServer主要數(shù)據(jù)結(jié)構(gòu)-CSDN博客

 /*** broker的注冊方法* @param clusterName broker的集群名稱* @param brokerAddr broker的地址* @param brokerName broker所屬組的名稱* @param brokerId   broker機器的id* @param haServerAddr broker的ha地址* @param topicConfigWrapper 當(dāng)前broker機器上包含的topic隊列上的數(shù)據(jù)* @param filterServerList broker上部署的filterServer的列表* @param channel netty的網(wǎng)絡(luò)長連接* @return broker注冊的結(jié)果*/public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) {//省略大部分代碼}

3.5?unregisterBroker

  /*** broker的下線邏輯處理* @param clusterName 集群名* @param brokerAddr 地址* @param brokerName broker組的名字* @param brokerId broker對應(yīng)的id*/public void unregisterBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId) {try {try {//加鎖this.lock.writeLock().lockInterruptibly();//獲取brokerLiveInfo對象 獲取保活信息BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr);log.info("unregisterBroker, remove from brokerLiveTable {}, {}",brokerLiveInfo != null ? "OK" : "Failed",brokerAddr);//filterServerTable中刪除broker的信息this.filterServerTable.remove(brokerAddr);boolean removeBrokerName = false;//獲取broker組中獲取到brokerData信息BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null != brokerData) {//根據(jù)brokerId 從brokerData中移除掉BrokerId對應(yīng)的地址String addr = brokerData.getBrokerAddrs().remove(brokerId);log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}",addr != null ? "OK" : "Failed",brokerAddr);//broker組中的機器數(shù)量如果為空的話 就移除掉這個broker組的信息if (brokerData.getBrokerAddrs().isEmpty()) {this.brokerAddrTable.remove(brokerName);log.info("unregisterBroker, remove name from brokerAddrTable OK, {}",brokerName);removeBrokerName = true;}}//如果已經(jīng)移除掉Broker組的信息的話if (removeBrokerName) {//從集群中移除掉這個broker組Set<String> nameSet = this.clusterAddrTable.get(clusterName);if (nameSet != null) {boolean removed = nameSet.remove(brokerName);log.info("unregisterBroker, remove name from clusterAddrTable {}, {}",removed ? "OK" : "Failed",brokerName);//集群中的broker組的數(shù)量如果也為空的話 就移除掉這個集群的信息if (nameSet.isEmpty()) {this.clusterAddrTable.remove(clusterName);log.info("unregisterBroker, remove cluster from clusterAddrTable {}",clusterName);}}//根據(jù)broker的名字移除掉topic的信息this.removeTopicByBrokerName(brokerName);}} finally {this.lock.writeLock().unlock();}} catch (Exception e) {log.error("unregisterBroker Exception", e);}}/***  根據(jù)broker的名字移除掉topic的信息* @param brokerName*/private void removeTopicByBrokerName(final String brokerName) {Set<String> noBrokerRegisterTopic = new HashSet<>();this.topicQueueTable.forEach((topic, queueDataMap) -> {QueueData old = queueDataMap.remove(brokerName);if (old != null) {log.info("removeTopicByBrokerName, remove one broker's topic {} {}", topic, old);}if (queueDataMap.size() == 0) {noBrokerRegisterTopic.add(topic);log.info("removeTopicByBrokerName, remove the topic all queue {}", topic);}});noBrokerRegisterTopic.forEach(topicQueueTable::remove);}//獲取topic的路由信息(broker的地址信息,以及在broker上的filterServer的列表)  針對一個topic里有多個queues來進行路由public TopicRouteData pickupTopicRouteData(final String topic) {TopicRouteData topicRouteData = new TopicRouteData();boolean foundQueueData = false;boolean foundBrokerData = false;Set<String> brokerNameSet = new HashSet<>();List<BrokerData> brokerDataList = new LinkedList<>();topicRouteData.setBrokerDatas(brokerDataList);HashMap<String, List<String>> filterServerMap = new HashMap<>();topicRouteData.setFilterServerTable(filterServerMap);try {try {//加一把讀鎖this.lock.readLock().lockInterruptibly();//從topicQueueTable中獲取到topic對應(yīng)的 QueueDataMap<String, QueueData> queueDataMap = this.topicQueueTable.get(topic);if (queueDataMap != null) {topicRouteData.setQueueDatas(new ArrayList<>(queueDataMap.values()));foundQueueData = true;//從queueData中獲取到broker名字的set集合brokerNameSet.addAll(queueDataMap.keySet());for (String brokerName : brokerNameSet) {BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null != brokerData) {BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData.getBrokerAddrs().clone());brokerDataList.add(brokerDataClone);foundBrokerData = true;// skip if filter server table is emptyif (!filterServerTable.isEmpty()) {for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {List<String> filterServerList = this.filterServerTable.get(brokerAddr);// only add filter server list when not nullif (filterServerList != null) {filterServerMap.put(brokerAddr, filterServerList);}}}}}}} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("pickupTopicRouteData Exception", e);}log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);if (foundBrokerData && foundQueueData) {return topicRouteData;}return null;}

3.6 scanNotActiveBroker

? 掃描出心跳超時的broker,并針對超時的broker進行下線的操作

  public int scanNotActiveBroker() {// 這塊的方法主要是brokerLiveTable的集合中的所有元素//拿到broker最新一次的心跳時間//broker的最新一次心跳時間+120s 小于 當(dāng)前時間戳//就把這個broker進行移除掉int removeCount = 0;Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();while (it.hasNext()) {Entry<String, BrokerLiveInfo> next = it.next();long last = next.getValue().getLastUpdateTimestamp();if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {//關(guān)閉連接的channel通道信息RemotingUtil.closeChannel(next.getValue().getChannel());it.remove();log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);//從內(nèi)存中進行刪除緩存的channel連接信息this.onChannelDestroy(next.getKey(), next.getValue().getChannel());removeCount++;}}return removeCount;}//從brokerLiveTable中刪除掉broker的?;钚畔⒉⑦M行清理掉內(nèi)存中的?;钚畔ublic void onChannelDestroy(String remoteAddr, Channel channel) {String brokerAddrFound = null;//找到要進行刪除的broker信息if (channel != null) {try {try {this.lock.readLock().lockInterruptibly();Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =this.brokerLiveTable.entrySet().iterator();while (itBrokerLiveTable.hasNext()) {Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();if (entry.getValue().getChannel() == channel) {brokerAddrFound = entry.getKey();break;}}} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("onChannelDestroy Exception", e);}}if (null == brokerAddrFound) {brokerAddrFound = remoteAddr;} else {log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);}//下面的代碼開始進行刪除broker的信息if (brokerAddrFound != null && brokerAddrFound.length() > 0) {try {try {this.lock.writeLock().lockInterruptibly();//刪除 brokerLiveTable中的broker信息this.brokerLiveTable.remove(brokerAddrFound);//刪除 filterServerTable中的broker信息this.filterServerTable.remove(brokerAddrFound);String brokerNameFound = null;boolean removeBrokerName = false;//刪除broker組中的broker信息Iterator<Entry<String, BrokerData>> itBrokerAddrTable =this.brokerAddrTable.entrySet().iterator();while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {BrokerData brokerData = itBrokerAddrTable.next().getValue();Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();while (it.hasNext()) {Entry<Long, String> entry = it.next();Long brokerId = entry.getKey();String brokerAddr = entry.getValue();if (brokerAddr.equals(brokerAddrFound)) {brokerNameFound = brokerData.getBrokerName();it.remove();log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",brokerId, brokerAddr);break;}}//如果刪除broker完成之后 發(fā)現(xiàn)broker組的信息也為空 那就把broker組進行刪除操作if (brokerData.getBrokerAddrs().isEmpty()) {removeBrokerName = true;itBrokerAddrTable.remove();log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",brokerData.getBrokerName());}}//刪除cluster集群的中的broker組信息if (brokerNameFound != null && removeBrokerName) {Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();while (it.hasNext()) {Entry<String, Set<String>> entry = it.next();String clusterName = entry.getKey();Set<String> brokerNames = entry.getValue();boolean removed = brokerNames.remove(brokerNameFound);if (removed) {log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",brokerNameFound, clusterName);if (brokerNames.isEmpty()) {log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",clusterName);it.remove();}break;}}}//刪除topic組在這個刪除broker組中對應(yīng)的信息也進行刪除的操作if (removeBrokerName) {String finalBrokerNameFound = brokerNameFound;Set<String> needRemoveTopic = new HashSet<>();topicQueueTable.forEach((topic, queueDataMap) -> {QueueData old = queueDataMap.remove(finalBrokerNameFound);log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",topic, old);if (queueDataMap.size() == 0) {log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",topic);needRemoveTopic.add(topic);}});needRemoveTopic.forEach(topicQueueTable::remove);}} finally {this.lock.writeLock().unlock();}} catch (Exception e) {log.error("onChannelDestroy Exception", e);}}}

http://www.risenshineclean.com/news/49447.html

相關(guān)文章:

  • 高校邦營銷型網(wǎng)站建設(shè)答案semifinal
  • 找人做網(wǎng)站注意哪些女教師遭網(wǎng)課入侵視頻大全播放
  • 微信網(wǎng)站開發(fā)簡單百度如何注冊公司網(wǎng)站
  • 權(quán)大師的網(wǎng)站是哪個公司做的指數(shù)基金是什么意思
  • 西安網(wǎng)站建設(shè)那家強深圳網(wǎng)絡(luò)營銷渠道
  • 鄭州網(wǎng)站制作公司名單外貿(mào)新手怎樣用谷歌找客戶
  • 企業(yè)網(wǎng)站備案在哪個部門seo教學(xué)
  • seo外貿(mào)網(wǎng)站建設(shè)百度下載安裝2021最新版
  • 上海做響應(yīng)式網(wǎng)站的公司江西seo
  • 那個網(wǎng)站做室內(nèi)比較好的網(wǎng)站流量排行
  • 拉新推廣變現(xiàn)app寧德seo推廣
  • 網(wǎng)站建站卡頓怎么辦流量查詢網(wǎng)站
  • 深圳網(wǎng)站建設(shè)易佰訊寧波seo排名外包
  • 烏魯木齊經(jīng)濟開發(fā)區(qū)建設(shè)局網(wǎng)站如何創(chuàng)建自己的網(wǎng)址
  • 有個藍色章魚做標(biāo)志的網(wǎng)站seo和sem的聯(lián)系
  • 蘇寧易購網(wǎng)站建設(shè)的目的競價關(guān)鍵詞排名軟件
  • 大連網(wǎng)站制作師企業(yè)微信scrm
  • Wordpress搜索指定頁面內(nèi)容seo網(wǎng)絡(luò)優(yōu)化推廣
  • 廣東省東莞陽光網(wǎng)seo推廣優(yōu)化外包價格
  • 網(wǎng)站如何做搜索功能的網(wǎng)絡(luò)推廣的途徑有哪些
  • 簡單制作網(wǎng)站的過程網(wǎng)站推廣軟件哪個最好
  • 騰訊微博 wordpressseo搜索引擎優(yōu)化視頻
  • wordpress 通知中心文章優(yōu)化關(guān)鍵詞排名
  • 外國英文設(shè)計網(wǎng)站搜多多搜索引擎入口
  • 學(xué)校網(wǎng)站建設(shè)計劃書什么叫網(wǎng)絡(luò)營銷
  • 網(wǎng)站正在建設(shè)中卡通源碼百度關(guān)鍵詞推廣教程
  • 答題網(wǎng)站怎么做小說網(wǎng)站排名前十
  • 免備案cdn保定關(guān)鍵詞優(yōu)化軟件
  • 重慶的汽車網(wǎng)站建設(shè)2022最新熱點事件及點評
  • 網(wǎng)頁游戲傳奇霸業(yè)seo網(wǎng)站推廣下載