扁平風(fēng)網(wǎng)站哪家培訓(xùn)機構(gòu)學(xué)校好
1.前言
RouteInfoManager
?是 RocketMQ 中?NameServer
?的核心組件之一,主要負(fù)責(zé)管理和維護整個 RocketMQ 集群的路由元數(shù)據(jù)信息。里面包含一些非常核心的功能:存儲和管理 Broker 信息(broker的注冊,broker心跳的維護);維護 Topic 的路由信息(topic的創(chuàng)建和更新,topic路由信息的查詢);管理隊列信息,管理集群信息等。
2.內(nèi)部數(shù)據(jù)結(jié)構(gòu)
public class RouteInfoManager {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);// broker長連接過期時間 長連接的空閑時間是2分鐘private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;//讀寫鎖private final ReadWriteLock lock = new ReentrantReadWriteLock();// 創(chuàng)建topic 以后 topic是邏輯上的概念 一個topic會有多個Queue Queue會分散到不同的broker上private final HashMap<String/* topic */, Map<String /* brokerName */ , QueueData>> topicQueueTable;// 代表的broker組的信息 BrokerData包含了一組Broker的信息private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;// 一個NameServer可以管理多個broker組 通常來說一個Cluster就可以了// 有可能會有很多復(fù)雜的業(yè)務(wù)場景 多個Clusterprivate final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;//管理Broker的長連接心跳 是否還有心跳private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;// Filter Server 是rocketMQ的一個高級功能,用來過濾消息//一般情況下 我們是可以基于tag進行數(shù)據(jù)篩選的操作,比較簡單,沒有辦法進行更加細化的過濾//這個Filter Server是在每臺Broker機器上啟動一個(或者多個)Filter Server//我們可以把一個自定義的消息篩選的class 上傳到Filter server上,在進行數(shù)據(jù)消費的時候讓Broker把數(shù)據(jù)先傳輸?shù)紽ilter Server// Filter Server會根據(jù)你自定義的class來進行細粒度的數(shù)據(jù)篩選,把篩選后的數(shù)據(jù)回傳給你的消費端private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
3.核心方法
? ? 3.1?getAllClusterInfo
/*** 返回的是 broker的cluster信息* 里面包含的是HashMap<String //brokerName// BrokerData> brokerAddrTable* HashMap<String //clusterName// , Set<String //brokerName// >> clusterAddrTable* @return*/public ClusterInfo getAllClusterInfo() {ClusterInfo clusterInfoSerializeWrapper = new ClusterInfo();clusterInfoSerializeWrapper.setBrokerAddrTable(this.brokerAddrTable);clusterInfoSerializeWrapper.setClusterAddrTable(this.clusterAddrTable);return clusterInfoSerializeWrapper;}
3.2 deleteTopic
/*** 刪除某個topic 直接操作topicQueueTable的hashMap* @param topic*/public void deleteTopic(final String topic) {try {try {this.lock.writeLock().lockInterruptibly();this.topicQueueTable.remove(topic);} finally {this.lock.writeLock().unlock();}} catch (Exception e) {log.error("deleteTopic Exception", e);}}public void deleteTopic(final String topic, final String clusterName) {try {try {this.lock.writeLock().lockInterruptibly();Set<String> brokerNames = this.clusterAddrTable.get(clusterName);if (brokerNames != null&& !brokerNames.isEmpty()) {Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topic);if (queueDataMap != null) {for (String brokerName : brokerNames) {final QueueData removedQD = queueDataMap.remove(brokerName);if (removedQD != null) {log.info("deleteTopic, remove one broker's topic {} {} {}", brokerName, topic,removedQD);}}if (queueDataMap.isEmpty()) {log.info("deleteTopic, remove the topic all queue {} {}", clusterName, topic);this.topicQueueTable.remove(topic);}}}} finally {this.lock.writeLock().unlock();}} catch (Exception e) {log.error("deleteTopic Exception", e);}}
3.3?getAllTopicList
/*** 查詢所有的topic的列表信息* @return*/public TopicList getAllTopicList() {TopicList topicList = new TopicList();try {try {this.lock.readLock().lockInterruptibly();topicList.getTopicList().addAll(this.topicQueueTable.keySet());} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("getAllTopicList Exception", e);}return topicList;}
3.4? registerBroker
? ?詳細的注冊流程 可以看我以前的博客:RocketMQ中的NameServer主要數(shù)據(jù)結(jié)構(gòu)-CSDN博客
/*** broker的注冊方法* @param clusterName broker的集群名稱* @param brokerAddr broker的地址* @param brokerName broker所屬組的名稱* @param brokerId broker機器的id* @param haServerAddr broker的ha地址* @param topicConfigWrapper 當(dāng)前broker機器上包含的topic隊列上的數(shù)據(jù)* @param filterServerList broker上部署的filterServer的列表* @param channel netty的網(wǎng)絡(luò)長連接* @return broker注冊的結(jié)果*/public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) {//省略大部分代碼}
3.5?unregisterBroker
/*** broker的下線邏輯處理* @param clusterName 集群名* @param brokerAddr 地址* @param brokerName broker組的名字* @param brokerId broker對應(yīng)的id*/public void unregisterBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId) {try {try {//加鎖this.lock.writeLock().lockInterruptibly();//獲取brokerLiveInfo對象 獲取保活信息BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr);log.info("unregisterBroker, remove from brokerLiveTable {}, {}",brokerLiveInfo != null ? "OK" : "Failed",brokerAddr);//filterServerTable中刪除broker的信息this.filterServerTable.remove(brokerAddr);boolean removeBrokerName = false;//獲取broker組中獲取到brokerData信息BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null != brokerData) {//根據(jù)brokerId 從brokerData中移除掉BrokerId對應(yīng)的地址String addr = brokerData.getBrokerAddrs().remove(brokerId);log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}",addr != null ? "OK" : "Failed",brokerAddr);//broker組中的機器數(shù)量如果為空的話 就移除掉這個broker組的信息if (brokerData.getBrokerAddrs().isEmpty()) {this.brokerAddrTable.remove(brokerName);log.info("unregisterBroker, remove name from brokerAddrTable OK, {}",brokerName);removeBrokerName = true;}}//如果已經(jīng)移除掉Broker組的信息的話if (removeBrokerName) {//從集群中移除掉這個broker組Set<String> nameSet = this.clusterAddrTable.get(clusterName);if (nameSet != null) {boolean removed = nameSet.remove(brokerName);log.info("unregisterBroker, remove name from clusterAddrTable {}, {}",removed ? "OK" : "Failed",brokerName);//集群中的broker組的數(shù)量如果也為空的話 就移除掉這個集群的信息if (nameSet.isEmpty()) {this.clusterAddrTable.remove(clusterName);log.info("unregisterBroker, remove cluster from clusterAddrTable {}",clusterName);}}//根據(jù)broker的名字移除掉topic的信息this.removeTopicByBrokerName(brokerName);}} finally {this.lock.writeLock().unlock();}} catch (Exception e) {log.error("unregisterBroker Exception", e);}}/*** 根據(jù)broker的名字移除掉topic的信息* @param brokerName*/private void removeTopicByBrokerName(final String brokerName) {Set<String> noBrokerRegisterTopic = new HashSet<>();this.topicQueueTable.forEach((topic, queueDataMap) -> {QueueData old = queueDataMap.remove(brokerName);if (old != null) {log.info("removeTopicByBrokerName, remove one broker's topic {} {}", topic, old);}if (queueDataMap.size() == 0) {noBrokerRegisterTopic.add(topic);log.info("removeTopicByBrokerName, remove the topic all queue {}", topic);}});noBrokerRegisterTopic.forEach(topicQueueTable::remove);}//獲取topic的路由信息(broker的地址信息,以及在broker上的filterServer的列表) 針對一個topic里有多個queues來進行路由public TopicRouteData pickupTopicRouteData(final String topic) {TopicRouteData topicRouteData = new TopicRouteData();boolean foundQueueData = false;boolean foundBrokerData = false;Set<String> brokerNameSet = new HashSet<>();List<BrokerData> brokerDataList = new LinkedList<>();topicRouteData.setBrokerDatas(brokerDataList);HashMap<String, List<String>> filterServerMap = new HashMap<>();topicRouteData.setFilterServerTable(filterServerMap);try {try {//加一把讀鎖this.lock.readLock().lockInterruptibly();//從topicQueueTable中獲取到topic對應(yīng)的 QueueDataMap<String, QueueData> queueDataMap = this.topicQueueTable.get(topic);if (queueDataMap != null) {topicRouteData.setQueueDatas(new ArrayList<>(queueDataMap.values()));foundQueueData = true;//從queueData中獲取到broker名字的set集合brokerNameSet.addAll(queueDataMap.keySet());for (String brokerName : brokerNameSet) {BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null != brokerData) {BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData.getBrokerAddrs().clone());brokerDataList.add(brokerDataClone);foundBrokerData = true;// skip if filter server table is emptyif (!filterServerTable.isEmpty()) {for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {List<String> filterServerList = this.filterServerTable.get(brokerAddr);// only add filter server list when not nullif (filterServerList != null) {filterServerMap.put(brokerAddr, filterServerList);}}}}}}} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("pickupTopicRouteData Exception", e);}log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);if (foundBrokerData && foundQueueData) {return topicRouteData;}return null;}
3.6 scanNotActiveBroker
? 掃描出心跳超時的broker,并針對超時的broker進行下線的操作
public int scanNotActiveBroker() {// 這塊的方法主要是brokerLiveTable的集合中的所有元素//拿到broker最新一次的心跳時間//broker的最新一次心跳時間+120s 小于 當(dāng)前時間戳//就把這個broker進行移除掉int removeCount = 0;Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();while (it.hasNext()) {Entry<String, BrokerLiveInfo> next = it.next();long last = next.getValue().getLastUpdateTimestamp();if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {//關(guān)閉連接的channel通道信息RemotingUtil.closeChannel(next.getValue().getChannel());it.remove();log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);//從內(nèi)存中進行刪除緩存的channel連接信息this.onChannelDestroy(next.getKey(), next.getValue().getChannel());removeCount++;}}return removeCount;}//從brokerLiveTable中刪除掉broker的?;钚畔⒉⑦M行清理掉內(nèi)存中的?;钚畔ublic void onChannelDestroy(String remoteAddr, Channel channel) {String brokerAddrFound = null;//找到要進行刪除的broker信息if (channel != null) {try {try {this.lock.readLock().lockInterruptibly();Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =this.brokerLiveTable.entrySet().iterator();while (itBrokerLiveTable.hasNext()) {Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();if (entry.getValue().getChannel() == channel) {brokerAddrFound = entry.getKey();break;}}} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("onChannelDestroy Exception", e);}}if (null == brokerAddrFound) {brokerAddrFound = remoteAddr;} else {log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);}//下面的代碼開始進行刪除broker的信息if (brokerAddrFound != null && brokerAddrFound.length() > 0) {try {try {this.lock.writeLock().lockInterruptibly();//刪除 brokerLiveTable中的broker信息this.brokerLiveTable.remove(brokerAddrFound);//刪除 filterServerTable中的broker信息this.filterServerTable.remove(brokerAddrFound);String brokerNameFound = null;boolean removeBrokerName = false;//刪除broker組中的broker信息Iterator<Entry<String, BrokerData>> itBrokerAddrTable =this.brokerAddrTable.entrySet().iterator();while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {BrokerData brokerData = itBrokerAddrTable.next().getValue();Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();while (it.hasNext()) {Entry<Long, String> entry = it.next();Long brokerId = entry.getKey();String brokerAddr = entry.getValue();if (brokerAddr.equals(brokerAddrFound)) {brokerNameFound = brokerData.getBrokerName();it.remove();log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",brokerId, brokerAddr);break;}}//如果刪除broker完成之后 發(fā)現(xiàn)broker組的信息也為空 那就把broker組進行刪除操作if (brokerData.getBrokerAddrs().isEmpty()) {removeBrokerName = true;itBrokerAddrTable.remove();log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",brokerData.getBrokerName());}}//刪除cluster集群的中的broker組信息if (brokerNameFound != null && removeBrokerName) {Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();while (it.hasNext()) {Entry<String, Set<String>> entry = it.next();String clusterName = entry.getKey();Set<String> brokerNames = entry.getValue();boolean removed = brokerNames.remove(brokerNameFound);if (removed) {log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",brokerNameFound, clusterName);if (brokerNames.isEmpty()) {log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",clusterName);it.remove();}break;}}}//刪除topic組在這個刪除broker組中對應(yīng)的信息也進行刪除的操作if (removeBrokerName) {String finalBrokerNameFound = brokerNameFound;Set<String> needRemoveTopic = new HashSet<>();topicQueueTable.forEach((topic, queueDataMap) -> {QueueData old = queueDataMap.remove(finalBrokerNameFound);log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",topic, old);if (queueDataMap.size() == 0) {log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",topic);needRemoveTopic.add(topic);}});needRemoveTopic.forEach(topicQueueTable::remove);}} finally {this.lock.writeLock().unlock();}} catch (Exception e) {log.error("onChannelDestroy Exception", e);}}}