如何查看網(wǎng)站訪問日志文件網(wǎng)頁優(yōu)化公司
文章目錄
- 1. 前言
- 2. 入口
- 3. 分配策略
- 3.1 AllocateMessageQueueAveragely 平均分配
- 3.2 AllocateMessageQueueAveragelyByCircle 負載均衡分配隊列, 環(huán)形分配
- 3.3 AllocateMessageQueueByConfig 根據(jù)配置分配消息隊列
- 3.4 AllocateMessageQueueConsistentHash 一致性 hash 分配
- 3.5 AllocateMachineRoomNearby 通過機房分組
- 3.6 AllocateMachineRoomNearby 通過機房分組
- 4. 小結(jié)
本文章基于 RocketMQ 4.9.3
1. 前言
- 【RocketMQ】- 源碼系列目錄
- 【RocketMQ 生產(chǎn)者消費者】- 同步、異步、單向發(fā)送消費消息
- 【RocketMQ 生產(chǎn)者和消費者】- 消費者啟動源碼
- 【RocketMQ 生產(chǎn)者和消費者】- 消費者重平衡(1)
上一篇講解了消費者重平衡的入口和大致源碼,還剩下一點 RocketMQ 的幾個重平衡策略。
2. 入口
在 rebalanceByTopic 方法中,當消費模式是集群模式的時候,會基于負載均衡策略確定分配給當前消費者的 MessageQueue,分配的方法是 strategy.allocate
。
要想知道 allocateMessageQueueStrategy 到底是什么策略,就需要知道 allocateMessageQueueStrategy 是在哪初始化的,這就要回到 Consumer 的構造器了。
在創(chuàng)建 DefaultMQPushConsumer 的時候我們是只設置了一個 consumerGroup,其他參數(shù)都沒有設置,所以默認創(chuàng)建的就是 AllocateMessageQueueAveragely 策略,也就是平均分配策略。
可以看到最后也是將這個策略設置到了 allocateMessageQueueStrategy
中,而在消費者的 start 方法中又把這個屬性設置到了重平衡服務中。
所以最終重平衡默認就是用的 allocateMessageQueueStrategy,那么下面就來看下除了這個默認策略之外,RocketMQ 還提供了哪些策略。
3. 分配策略
分配的接口是 allocate,接收四個傳參,可以看下面接口定義,就能得知消費者負載均衡是根據(jù)消費者 ID 來負載的。
/*** Allocating by consumer id** @param 消費者組* @param 當前消費者 ID* @param 訂閱的 topic 下面的所有消息隊列* @param 消費者組下面的所有消費者的 clientID 集合* @return The allocate result of given strategy*/
List<MessageQueue> allocate(final String consumerGroup,final String currentCID,final List<MessageQueue> mqAll,final List<String> cidAll
);
3.1 AllocateMessageQueueAveragely 平均分配
/*** 負載均衡分配隊列, 平均分配* @param consumerGroup 當前消費者組* @param currentCID 當前消費者的 clientID* @param mqAll 當前 topic 的所有隊列* @param cidAll 當前消費者組的所有消費者的 clientID* @return*/
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {// 參數(shù)校驗if (currentCID == null || currentCID.length() < 1) {throw new IllegalArgumentException("currentCID is empty");}if (mqAll == null || mqAll.isEmpty()) {throw new IllegalArgumentException("mqAll is null or mqAll empty");}if (cidAll == null || cidAll.isEmpty()) {throw new IllegalArgumentException("cidAll is null or cidAll empty");}// 分配結(jié)果List<MessageQueue> result = new ArrayList<MessageQueue>();if (!cidAll.contains(currentCID)) {// 如果這個消費者不是傳入的消費者組下的, 有可能是剛啟動沒注冊到 brokerlog.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",consumerGroup,currentCID,cidAll);// 就不分配隊列處理了return result;}// 假設現(xiàn)在有 8 個隊列, 要負載均衡給 3 個消費者 [0, 1, 2], 而當前消費者[1]的 index = 1// 那么在分配的時候三個消費者分配的隊列標號就是 [0, 1, 2], [3, 4, 5], [6, 7]// 這里就是求出來當前消費者 ID 所在的位置 = 1int index = cidAll.indexOf(currentCID);// 隊列數(shù) % 消費者數(shù) = 剩余隊列數(shù) = 2int mod = mqAll.size() % cidAll.size();// 這里求平均數(shù), 求出來的結(jié)果就是 8 / 3 + 1 = 3// 1.如果隊列數(shù)小于消費者數(shù), 平均數(shù)就是 1// 2.如果隊列數(shù)大于消費者數(shù), 并且當前隊列的下標在 (0, mod) 這個范圍, 那么平均數(shù)就是 mqAll.size() / cidAll.size() + 1// 3.如果隊列數(shù)大于消費者數(shù), 并且當前隊列的下標在 [mod, cidAll.size()) 這個范圍, 那么平均數(shù)就是 mqAll.size() / cidAll.size()int averageSize =mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()+ 1 : mqAll.size() / cidAll.size());// 計算當前消費者從哪里開始分配隊列, 這里求出來的就是 1 * 3 = 3int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;// 這里求出來當前消費者要分配多少個隊列, 比如當前就是分配 3 個隊列int range = Math.min(averageSize, mqAll.size() - startIndex);for (int i = 0; i < range; i++) {// 開始分配隊列, 分配的起始位是是 startIndex, 分配的隊列數(shù)量是 rangeresult.add(mqAll.get((startIndex + i) % mqAll.size()));}return result;
}
平均分配的策略就是就是將隊列數(shù)量按照消費者數(shù)量平均分配,假設現(xiàn)在有 8 個隊列,要負載均衡給 3 個消費者 [0, 1, 2]
,而當前消費者 [1] 的 index = 1
,那么在分配的時候三個消費者分配的隊列標號就是 [0, 1, 2],[3, 4, 5],[6, 7]
。
根據(jù)下面幾個步驟分配:
- 首先就是求出當前消費者 ID 所在的下標,也就是上面的 1。
- 求出余數(shù),也就是 8 % 3 = 2,余數(shù)是用來求下面的消息隊列數(shù)量和范圍的。
- 求出當前消費者可以分配到的消息隊列數(shù)量 averageSize,過程如下:
- 如果隊列數(shù)小于消費者數(shù),平均數(shù)就是
1
,隊列會從前往后分配給消費者。 - 如果隊列數(shù)大于消費者數(shù),并且當前隊列的下標在 (0, mod) 這個范圍,那么平均數(shù)就是
mqAll.size() / cidAll.size() + 1
,比如當前 index = 1,那么就可以分配到 3 個隊列。 - 如果隊列數(shù)大于消費者數(shù),并且當前隊列的下標在
[mod, cidAll.size())
這個范圍, 那么平均數(shù)就是mqAll.size() / cidAll.size()
,說明當前消費者在靠后的位置。。
- 如果隊列數(shù)小于消費者數(shù),平均數(shù)就是
算出來當前消費者可以分配多少個消息隊列之后,計算當前消費者從哪里開始分配隊列,計算方式就是如果 index 在 mod 之前,就通過 index * averageSize
,否則就是 index * averageSize + mod
,range
就是要分配多少個隊列,求一個最小值,因為如果是靠后的有可能 startIndex + averageSize
超過了數(shù)組,就越界了,最終遍歷分配。
可能看著有點繞,總之記住就是平均分配,只是這種平均分配是求出分配的消息隊列整體數(shù)量,然后求出分配的起始下標,再從原集合中去獲取。
下面我們自己寫一個 main 方法測試下。
public static void main(String[] args) {AllocateMessageQueueAveragely allocateMessageQueueAveragely = new AllocateMessageQueueAveragely();List<MessageQueue> queues = new ArrayList<MessageQueue>();for(int i = 0; i < 8; i++){queues.add(new MessageQueue("topic-test", "broker-a", i));}System.out.println(allocateMessageQueueAveragely.allocate("group", "0", queues, Arrays.asList("0", "1", "2")));System.out.println(allocateMessageQueueAveragely.allocate("group", "1", queues, Arrays.asList("0", "1", "2")));System.out.println(allocateMessageQueueAveragely.allocate("group", "2", queues, Arrays.asList("0", "1", "2")));
}
輸出結(jié)果如下:
[MessageQueue [topic=topic-test, brokerName=broker-a, queueId=0], MessageQueue [topic=topic-test, brokerName=broker-a, queueId=1], MessageQueue [topic=topic-test, brokerName=broker-a, queueId=2]]
[MessageQueue [topic=topic-test, brokerName=broker-a, queueId=3], MessageQueue [topic=topic-test, brokerName=broker-a, queueId=4], MessageQueue [topic=topic-test, brokerName=broker-a, queueId=5]]
[MessageQueue [topic=topic-test, brokerName=broker-a, queueId=6], MessageQueue [topic=topic-test, brokerName=broker-a, queueId=7]]
3.2 AllocateMessageQueueAveragelyByCircle 負載均衡分配隊列, 環(huán)形分配
這個也是平均分配,但是跟上面的不同,這個是一個一個分配,啥意思呢,看下面源碼。
/*** 負載均衡分配隊列, 環(huán)形分配* @param consumerGroup 當前消費者組* @param currentCID 當前消費者的 clientID* @param mqAll 當前 topic 的所有隊列* @param cidAll 當前消費者組的所有消費者的 clientID* @return*/
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {// 參數(shù)校驗if (currentCID == null || currentCID.length() < 1) {throw new IllegalArgumentException("currentCID is empty");}if (mqAll == null || mqAll.isEmpty()) {throw new IllegalArgumentException("mqAll is null or mqAll empty");}if (cidAll == null || cidAll.isEmpty()) {throw new IllegalArgumentException("cidAll is null or cidAll empty");}// 分配結(jié)果List<MessageQueue> result = new ArrayList<MessageQueue>();if (!cidAll.contains(currentCID)) {// 如果這個消費者不是傳入的消費者組下的, 有可能是剛啟動沒注冊到 brokerlog.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",consumerGroup,currentCID,cidAll);// 直接返回, 不分配了return result;}// 獲取當前消費者的下標int index = cidAll.indexOf(currentCID);for (int i = index; i < mqAll.size(); i++) {// 比如當前消息隊列是 0, 1, 2, 3, 4, 5, 6, 7, 8// 當前消費者是 1// 那么分配的隊列就是 1, 4, 7if (i % cidAll.size() == index) {result.add(mqAll.get(i));}}return result;
}
這個分配策略的源碼不多,可以看到分配就直接求出當前消費者的下標,然后開始遍歷,只要符合 i % cidAll.size() == index
就說明這個隊列可以分配給當前消費者,其實從 0 開始遍歷也可以,不過從 index 開始會少幾次判斷。
同樣的,我們使用一個 main 方法來測試下:
public static void main(String[] args) {AllocateMessageQueueAveragelyByCircle averagelyByCircle = new AllocateMessageQueueAveragelyByCircle();List<MessageQueue> queues = new ArrayList<MessageQueue>();for(int i = 0; i < 8; i++){queues.add(new MessageQueue("topic-test", "broker-a", i));}System.out.println(averagelyByCircle.allocate("group", "0", queues, Arrays.asList("0", "1", "2")));System.out.println(averagelyByCircle.allocate("group", "1", queues, Arrays.asList("0", "1", "2")));System.out.println(averagelyByCircle.allocate("group", "2", queues, Arrays.asList("0", "1", "2")));
}
輸出結(jié)果如下:
[MessageQueue [topic=topic-test, brokerName=broker-a, queueId=0], MessageQueue [topic=topic-test, brokerName=broker-a, queueId=3], MessageQueue [topic=topic-test, brokerName=broker-a, queueId=6]]
[MessageQueue [topic=topic-test, brokerName=broker-a, queueId=1], MessageQueue [topic=topic-test, brokerName=broker-a, queueId=4], MessageQueue [topic=topic-test, brokerName=broker-a, queueId=7]]
[MessageQueue [topic=topic-test, brokerName=broker-a, queueId=2], MessageQueue [topic=topic-test, brokerName=broker-a, queueId=5]]
3.3 AllocateMessageQueueByConfig 根據(jù)配置分配消息隊列
/*** 負載均衡分配隊列, 用戶自定義消費者需要消費的隊列* @param consumerGroup 當前消費者組* @param currentCID 當前消費者的 clientID* @param mqAll 當前 topic 的所有隊列* @param cidAll 當前消費者組的所有消費者的 clientID* @return*/
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {return this.messageQueueList;
}
這個負載均衡類可以自己手動設置消息隊列。
3.4 AllocateMessageQueueConsistentHash 一致性 hash 分配
/*** 負載均衡分配隊列, 一致性 hash 分配* @param consumerGroup 當前消費者組* @param currentCID 當前消費者的 clientID* @param mqAll 當前 topic 的所有隊列* @param cidAll 當前消費者組的所有消費者的 clientID* @return*/
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {// 參數(shù)校驗if (currentCID == null || currentCID.length() < 1) {throw new IllegalArgumentException("currentCID is empty");}if (mqAll == null || mqAll.isEmpty()) {throw new IllegalArgumentException("mqAll is null or mqAll empty");}if (cidAll == null || cidAll.isEmpty()) {throw new IllegalArgumentException("cidAll is null or cidAll empty");}List<MessageQueue> result = new ArrayList<MessageQueue>();// 如果這個消費者不是傳入的消費者組下的, 有可能是剛啟動沒注冊到 brokerif (!cidAll.contains(currentCID)) {log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",consumerGroup,currentCID,cidAll);// 先不分配注冊隊列return result;}// 將所有消費者封轉(zhuǎn)為 ClientNodeCollection<ClientNode> cidNodes = new ArrayList<ClientNode>();for (String cid : cidAll) {cidNodes.add(new ClientNode(cid));}// ConsistentHashRouter 就是用來根據(jù) hash 環(huán)算法分配節(jié)點的final ConsistentHashRouter<ClientNode> router; //for building hash ringif (customHashFunction != null) {// 設置了 hash 函數(shù), virtualNodeCnt 是虛擬節(jié)點個數(shù), 默認 10 個router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);} else {// 沒有 hash 函數(shù), 就用 MD5Hash 作為 hash 函數(shù)router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);}List<MessageQueue> results = new ArrayList<MessageQueue>();// 遍歷所有消息隊列for (MessageQueue mq : mqAll) {// 根據(jù)一致性 hash 算法來計算出當前這個 MessageQueue 要分配到哪個消費者ClientNode clientNode = router.routeNode(mq.toString());// 如果分配到的消費者是當前的消費者if (clientNode != null && currentCID.equals(clientNode.getKey())) {// 添加到 results 集合中results.add(mq);}}// 返回分配的隊列結(jié)果return results;}
這個就是一致性 hash 算法,簡單來說就是將消費者 ID 添加到 hash 環(huán)里面,每一個消費者都分配 10 個虛擬節(jié)點,避免分配不均的情況。
3.5 AllocateMachineRoomNearby 通過機房分組
這個分配策略激素hi通過將消息隊列和消費者ID(CID)根據(jù)機房進行分組,然后優(yōu)先分配同一機房的隊列給同一機房的消費者,如果同一機房沒有可用消費者,則將剩余的消息隊列平均分配給其他機房的消費者?。
/*** 負載均衡分配隊列* @param consumerGroup 當前消費者組* @param currentCID 當前消費者的 clientID* @param mqAll 當前 topic 的所有隊列* @param cidAll 當前消費者組的所有消費者的 clientID* @return*/
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {// 參數(shù)校驗if (currentCID == null || currentCID.length() < 1) {throw new IllegalArgumentException("currentCID is empty");}if (mqAll == null || mqAll.isEmpty()) {throw new IllegalArgumentException("mqAll is null or mqAll empty");}if (cidAll == null || cidAll.isEmpty()) {throw new IllegalArgumentException("cidAll is null or cidAll empty");}// 分配給當前消費者的隊列結(jié)果List<MessageQueue> result = new ArrayList<MessageQueue>();if (!cidAll.contains(currentCID)) {// 如果當前消費者不是這個消費者組下的, 就不參與分配了, 有可能剛啟動還沒注冊到 brokerlog.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",consumerGroup,currentCID,cidAll);return result;}// 將所有消費隊列根據(jù)機房分組Map<String/*machine room */, List<MessageQueue>> mr2Mq = new TreeMap<String, List<MessageQueue>>();// 將消息隊列按照機房分組for (MessageQueue mq : mqAll) {// 消息隊列可以分配到不同的 broker, 這里就是從 brokerName 中獲取這個 broker 所屬的機房String brokerMachineRoom = machineRoomResolver.brokerDeployIn(mq);if (StringUtils.isNoneEmpty(brokerMachineRoom)) {// 添加到集合中if (mr2Mq.get(brokerMachineRoom) == null) {mr2Mq.put(brokerMachineRoom, new ArrayList<MessageQueue>());}mr2Mq.get(brokerMachineRoom).add(mq);} else {// 沒找到機房, 拋出異常, 因為這個分配策略就是根據(jù)機房分配的throw new IllegalArgumentException("Machine room is null for mq " + mq);}}// 將所有消費者按照機房分組Map<String/*machine room */, List<String/*clientId*/>> mr2c = new TreeMap<String, List<String>>();for (String cid : cidAll) {// 獲取消費者所屬的機房String consumerMachineRoom = machineRoomResolver.consumerDeployIn(cid);if (StringUtils.isNoneEmpty(consumerMachineRoom)) {if (mr2c.get(consumerMachineRoom) == null) {mr2c.put(consumerMachineRoom, new ArrayList<String>());}mr2c.get(consumerMachineRoom).add(cid);} else {throw new IllegalArgumentException("Machine room is null for consumer id " + cid);}}List<MessageQueue> allocateResults = new ArrayList<MessageQueue>();// 1. allocate the mq that deploy in the same machine room with the current consumer// 1. 開始分配隊列, 主要就是分配和傳入的消費者在同一機房的隊列String currentMachineRoom = machineRoomResolver.consumerDeployIn(currentCID);// 獲取當前機房下面的消息隊列List<MessageQueue> mqInThisMachineRoom = mr2Mq.remove(currentMachineRoom);// 獲取當前機房下面的所有消費者List<String> consumerInThisMachineRoom = mr2c.get(currentMachineRoom);if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty()) {// 根據(jù)傳入的策略進行分配allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mqInThisMachineRoom, consumerInThisMachineRoom));}// 2. allocate the rest mq to each machine room if there are no consumer alive in that machine room// 2. 遍歷所有剩余的隊列, 這些隊列跟當前消費者不在同一個機房for (Entry<String, List<MessageQueue>> machineRoomEntry : mr2Mq.entrySet()) {// 如果 machineRoomEntry.getKey() 這個機房在 mr2c 中沒有消費者, 也就是說這個機房下面沒有消費者去消費對應的隊列,// 那么這些隊列就應該分配給所有的消費者去消費, 也就是在下面調(diào)用 allocate 去分配if (!mr2c.containsKey(machineRoomEntry.getKey())) { // no alive consumer in the corresponding machine room, so all consumers share these queuesallocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, machineRoomEntry.getValue(), cidAll));}}// 返回當前消費者 ID 分配的結(jié)果return allocateResults;
}
這個分配策略會先將 topic 下面的所有消息隊列按照機房分組,也就是處理 mr2Mq
集合。再將所有消費者按照機房分組,也就是處理 mr2c
集合。
分好組之后,獲取當前消費者所在的機房,然后獲取這個機房下面的消息隊列和消費者,接著調(diào)用 allocateMessageQueueStrategy.allocate
對這個機房下面的消費者進行分配,這個 allocateMessageQueueStrategy 是真正的分配策略,AllocateMachineRoomNearby 就是按機房分組后,按照這個真正的分配策略去分配。
最后遍歷剩余的 mr2Mq,如果 machineRoomEntry.getKey() 這個機房在 mr2c 中沒有消費者,也就是說這個機房下面沒有消費者去消費對應的隊列,那么這些隊列就應該分配給所有的消費者去消費。
所以總的來說就是如果消費者和消息隊列在同一個機房,那么就同一個機房內(nèi)的就用 allocateMessageQueueStrategy 去分配,如果消息隊列所在的機房沒有消費者的,就分配給這個消費者組下面的所有消費者。
下面我們就來模擬一下,還是用一個 main 方法,要注意一下就是創(chuàng)建 AllocateMachineRoomNearby 的時候需要傳入真正的分配策略以及 MachineRoomResolver 的實現(xiàn)類,也就是我們可以自己定義如何獲取消費者以及消息隊列的機房。
/*** A resolver object to determine which machine room do the message queues or clients are deployed in.** AllocateMachineRoomNearby will use the results to group the message queues and clients by machine room.** The result returned from the implemented method CANNOT be null.*/
public interface MachineRoomResolver {String brokerDeployIn(MessageQueue messageQueue);String consumerDeployIn(String clientID);
}
下面就寫一個 main 方法測試下,我們設置真實的分配方式為 AllocateMessageQueueAveragelyByCircle
。
public static void main(String[] args) {// 使用 - 分割, 后面的就是機房MachineRoomResolver resolver = new MachineRoomResolver() {@Overridepublic String brokerDeployIn(MessageQueue messageQueue) {return messageQueue.getBrokerName().split("-")[1];}@Overridepublic String consumerDeployIn(String clientID) {return clientID.split("-")[1];}};// 消息隊列List<MessageQueue> queues = new ArrayList<MessageQueue>();// 機房 A 的queues.add(new MessageQueue("topic", "broker-A", 0));queues.add(new MessageQueue("topic", "broker-A", 1));queues.add(new MessageQueue("topic", "broker-A", 2));// 機房 B 的queues.add(new MessageQueue("topic", "broker-B", 3));queues.add(new MessageQueue("topic", "broker-B", 4));queues.add(new MessageQueue("topic", "broker-B", 5));// 機房 C 的queues.add(new MessageQueue("topic", "broker-C", 6));queues.add(new MessageQueue("topic", "broker-C", 7));// 消費者隊列List<String> cidAll = new ArrayList<String>();cidAll.add("Consumer1-A");cidAll.add("Consumer2-A");cidAll.add("Consumer3-C");cidAll.add("Consumer4-D");AllocateMachineRoomNearby allocateMachineRoomNearby = new AllocateMachineRoomNearby(new AllocateMessageQueueAveragelyByCircle(), resolver);System.out.println(allocateMachineRoomNearby.allocate("groupTest", "Consumer1-A", queues, cidAll));System.out.println(allocateMachineRoomNearby.allocate("groupTest", "Consumer2-A", queues, cidAll));System.out.println(allocateMachineRoomNearby.allocate("groupTest", "Consumer3-C", queues, cidAll));System.out.println(allocateMachineRoomNearby.allocate("groupTest", "Consumer4-D", queues, cidAll));}
}
可以看到,分配的結(jié)果如下:
- Consumer1-A:0,2,3
- Consumer2-A:1,4
- Consumer3-C:6,7,5
- Consumer3-D:
符合上面的解析流程,如果同機房就平均分配給同一機房的消費者,如果找不到對應機房的消費者就平均分配給所有消息隊列。
3.6 AllocateMachineRoomNearby 通過機房分組
/*** 負載均衡分配隊列, 根據(jù)機房平均分配* @param consumerGroup 當前消費者組* @param currentCID 當前消費者的 clientID* @param mqAll 當前 topic 的所有隊列* @param cidAll 當前消費者組的所有消費者的 clientID* @return*/
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {// 參數(shù)校驗if (StringUtils.isBlank(currentCID)) {throw new IllegalArgumentException("currentCID is empty");}if (CollectionUtils.isEmpty(mqAll)) {throw new IllegalArgumentException("mqAll is null or mqAll empty");}if (CollectionUtils.isEmpty(cidAll)) {throw new IllegalArgumentException("cidAll is null or cidAll empty");}List<MessageQueue> result = new ArrayList<MessageQueue>();// 如果這個消費者不在對應消費者組下, 直接返回, 有可能是剛啟動沒注冊到 brokerint currentIndex = cidAll.indexOf(currentCID);if (currentIndex < 0) {// 直接返回空集合, 不消費了return result;}List<MessageQueue> premqAll = new ArrayList<MessageQueue>();// 遍歷所有 MessageQueuefor (MessageQueue mq : mqAll) {// 將 brokerName 根據(jù) @ 分割, 格式就是 "機房名@brokerName", 如果 broker 所在機房在用戶設置的 consumeridcs 集合中String[] temp = mq.getBrokerName().split("@");if (temp.length == 2 && consumeridcs.contains(temp[0])) {// 加入 premqAll 隊列, 然后開始平均分配premqAll.add(mq);}}// 假設當前機房有 [0, 1, 2, 3, 4, 5, 6, 7], 而消費者組[0, 1, 2], 當前消費者 ID = 1// 所以這個消費者分配到的隊列就是 [2, 3, 7]// 是這樣的, 平均分配會先分配 mod 大小的數(shù), 比如 [0, 1, 2] 三個消費者會先分配 [0, 1], [2, 3], [4, 5] 的隊列// 然后剩余 [6, 7] 兩個隊列再分配給 [0, 1] 兩個消費者, 所以最終分配結(jié)果就是:// [0, 1, 2] => 0: [0, 1, 6], 1: [2, 3, 7], 2: [4, 5]// 當前消費者能分到多少隊列, 這里是 8 / 3 = 2int mod = premqAll.size() / cidAll.size();// 剩余的隊列, 這里是 8 % 3 = 2int rem = premqAll.size() % cidAll.size();// 起始位置 = 2 * 1 = 2int startIndex = mod * currentIndex;// 結(jié)束位置 = 2 + 2 = 4int endIndex = startIndex + mod;// 先把平均分配的添加到隊列中, 就是 [2, 3]for (int i = startIndex; i < endIndex; i++) {result.add(premqAll.get(i));}// 如果當前隊列可以多分配一個if (rem > currentIndex) {// 獲取下標 (1 + 2 * 3 = 7)result.add(premqAll.get(currentIndex + mod * cidAll.size()));}// 最終結(jié)果就是 [2, 3, 7]return result;
}
AllocateMachineRoomNearby 分配策略中用戶可以設定需要分配的機房集合 consumeridcs,然后遍歷所有消息隊列,如果消息隊列是用戶設定的機房下面的,就加入 premqAll 集合等待分配。
分配的策略如下:假設當前機房有 [0,1,2,3,4,5,6,7],而消費者組 [0,1,2],當前消費者 ID = 1,所以這個消費者分配到的隊列就是 [2,3,7],平均分配會先分配 mod 大小的數(shù),比如 [0,1,2] 三個消費者會先分配 [0,1],[2,3],[4,5] 的隊列,然后剩余 [6,7] 兩個隊列再分配給 [0,1] 兩個消費者,所以最終分配結(jié)果就是: [0,1,2]。
下面寫個測試用例來測試分配情況。
public static void main(String[] args) {AllocateMessageQueueByMachineRoom room = new AllocateMessageQueueByMachineRoom();// 消息隊列List<MessageQueue> queues = new ArrayList<MessageQueue>();// 機房 A 的queues.add(new MessageQueue("topic", "A@broker", 0));queues.add(new MessageQueue("topic", "A@broker", 1));queues.add(new MessageQueue("topic", "A@broker", 2));queues.add(new MessageQueue("topic", "A@broker", 3));queues.add(new MessageQueue("topic", "A@broker", 4));queues.add(new MessageQueue("topic", "A@broker", 5));queues.add(new MessageQueue("topic", "A@broker", 6));queues.add(new MessageQueue("topic", "A@broker", 7));// 機房 B 的queues.add(new MessageQueue("topic", "B@broker", 8));queues.add(new MessageQueue("topic", "B@broker", 9));queues.add(new MessageQueue("topic", "B@broker", 10));// 消費者隊列List<String> cidAll = new ArrayList<String>();cidAll.add("Consumer1-A");cidAll.add("Consumer2-A");cidAll.add("Consumer3-C");// 只關注 A 機房room.setConsumeridcs(new HashSet<String>(){{add("A");}});System.out.println(room.allocate("groupTest", "Consumer1-A", queues, cidAll));System.out.println(room.allocate("groupTest", "Consumer2-A", queues, cidAll));System.out.println(room.allocate("groupTest", "Consumer3-C", queues, cidAll));
}
結(jié)果如下:
可以看到,最終隊列分配只會分配機房 A 的,而且分配的方式也跟我們上面說的一樣。
那如果是只對 B 感興趣呢?
// 只關注 B 機房
room.setConsumeridcs(new HashSet<String>(){{add("B");}});
可以看到這里就是只分配了機房 B,下面也可以來看下如果 A、B 全部都關注又是怎么分配的。
[MessageQueue [topic=topic, brokerName=A@broker, queueId=0], MessageQueue [topic=topic, brokerName=A@broker, queueId=1], MessageQueue [topic=topic, brokerName=A@broker, queueId=2], MessageQueue [topic=topic, brokerName=B@broker, queueId=9]]
[MessageQueue [topic=topic, brokerName=A@broker, queueId=3], MessageQueue [topic=topic, brokerName=A@broker, queueId=4], MessageQueue [topic=topic, brokerName=A@broker, queueId=5], MessageQueue [topic=topic, brokerName=B@broker, queueId=10]]
[MessageQueue [topic=topic, brokerName=A@broker, queueId=6], MessageQueue [topic=topic, brokerName=A@broker, queueId=7], MessageQueue [topic=topic, brokerName=B@broker, queueId=8]]
4. 小結(jié)
好了,這篇文章就到這里,主要講述了 RocketMQ 的幾種負載均衡策略,當然我們也可以自己寫一個實現(xiàn)類,只要實現(xiàn) AllocateMessageQueueStrategy 接口,實現(xiàn)里面的方法,就可以用我們自定義的分配策略。
如有錯誤,歡迎指出!!!!