網(wǎng)站提交百度收錄怎么簡單制作一個網(wǎng)頁
高性能隊列 Disruptor 在 IM 系統(tǒng)中的實(shí)戰(zhàn)
前三期我們介紹了Disruptor
的典型使用場景和相關(guān)高性能原理,本期我介紹一下Disruptor
在IM系統(tǒng)用的應(yīng)用實(shí)戰(zhàn),IM系統(tǒng)即社交聊天系統(tǒng),對實(shí)時性的要求非常高,非常符合Disruptor
的使用場景。
本篇文章將結(jié)合實(shí)際代碼,介紹如何在 IM
系統(tǒng)中使用 Disruptor
進(jìn)行高效的消息轉(zhuǎn)發(fā)。
1. Disruptor 在 IM 系統(tǒng)中的作用
在 IM 系統(tǒng)中,用戶 A 發(fā)送消息給 B、C、D 時,需要根據(jù) B、C、D 所在的服務(wù)器節(jié)點(diǎn)進(jìn)行分組,并將消息轉(zhuǎn)發(fā)到對應(yīng)的節(jié)點(diǎn)上。為了確保高吞吐量和低延遲,我們使用 Disruptor 作為高性能隊列。
2. 代碼實(shí)現(xiàn)
2.1 初始化 Disruptor
當(dāng)某個節(jié)點(diǎn) nodeId
還沒有對應(yīng)的 RingBuffer 時,我們需要創(chuàng)建一個新的 Disruptor,并將其存入 ringBufferMap
中。
???private?final?Map<String,?RingBuffer<ClusterPublishEvent>>?ringBufferMap?=?new?ConcurrentHashMap<>();
????public?ClusterQueueService(Server?server)?{
????????this.mServer?=?server;
????}
????public?void?publishMessage(String?nodeId,?String?fromUser,?String?clientId,?String?topic,?byte[]?payload)?{
????????if?(!ringBufferMap.containsKey(nodeId))?{
????????????long?st?=?System.currentTimeMillis();
????????????synchronized?(ringBufferMap){
????????????????if(!ringBufferMap.containsKey(nodeId))?{
????????????????????BlockingWaitStrategy?strategy?=?new?BlockingWaitStrategy();
????????????????????Disruptor<ClusterPublishEvent>?disruptor?=?new?Disruptor<>(
????????????????????????new?ClusterPublishEventFactory(),?1024?*?1024,?DaemonThreadFactory.INSTANCE,
????????????????????????ProducerType.SINGLE,?strategy);
????????????????????disruptor.handleEventsWith(new?ClusterPublishEventHandler(mServer,?nodeId));
????????????????????disruptor.setDefaultExceptionHandler(new?IgnoreExceptionHandler());
????????????????????disruptor.start();
????????????????????ringBufferMap.put(nodeId,?disruptor.getRingBuffer());
????????????????}
????????????}
????????????log.info("publishMessage?create?RingBuffer?cost:{}ms,?ringBufferMap:{},size:{}",?System.currentTimeMillis()?-?st,?ringBufferMap,?ringBufferMap.size());
????????}
????????RingBuffer<ClusterPublishEvent>?ringBuffer?=?ringBufferMap.get(nodeId);
????????long?sequence?=?ringBuffer.next();
????????//?當(dāng)環(huán)形緩沖區(qū)未用完時,?返回的是空對象,否則,返回的是緩存的數(shù)據(jù)。
????????ClusterPublishEvent?clusterEvent?=?ringBuffer.get(sequence);
????????clusterEvent.setFromUser(fromUser);
????????clusterEvent.setClientId(clientId);
????????//?此topic,是節(jié)點(diǎn)轉(zhuǎn)發(fā)的topic:?NM2R,?NTF,DESTROYUSER,?只有這三種
????????clusterEvent.setTopic(topic);
????????clusterEvent.setPayload(payload);
????????clusterEvent.setTraceId(MDC.get(ImSvcConstants.TRACE_ID));
????????//?發(fā)布事件,?會觸發(fā)ClusterPublishEventHandler.onEvent方法
????????ringBuffer.publish(sequence);
????}
關(guān)鍵點(diǎn)解析:
-
采用 BlockingWaitStrategy
作為等待策略,確保高效的 CPU 資源利用。 -
采用 DaemonThreadFactory.INSTANCE
創(chuàng)建線程池,避免應(yīng)用程序退出時線程未正常回收。 -
handleEventsWith
設(shè)定事件處理器ClusterPublishEventHandler
,用于消息處理。 -
setDefaultExceptionHandler
避免異常影響消息處理流程。
2.2 按照節(jié)點(diǎn)轉(zhuǎn)發(fā)消息
根據(jù)用戶所在的服務(wù)節(jié)點(diǎn),進(jìn)行消息轉(zhuǎn)發(fā)(發(fā)送消息事件到Disruptor)
????public?void?publish2Receivers(Long?messageId,?Set<String>?receivers,?String?exceptClientId,?int?pullType,?String?topic)?{
????????//未綁定broker的用戶默認(rèn)由本中心處理
????????Map<String,?String>?allReceiverMap?=?new?HashMap<>();
????????for?(String?receiver?:?receivers)?{
????????????allReceiverMap.put(receiver,?localNodeId);
????????}
????????//從分布式緩存獲取獲取用戶路由
????????Map<String,?String>?receiverMap?=?userRouteStore.getAll(receivers);
????????allReceiverMap.putAll(receiverMap);
????????Map<String,?Set<String>>?nodeMap?=?new?HashMap<>();
????????//使用nodeId分組
????????allReceiverMap.forEach((receiver,?nodeId)?->?{
????????????if?(!nodeMap.containsKey(nodeId))?{
????????????????nodeMap.put(nodeId,?new?HashSet<>());
????????????}
????????????nodeMap.get(nodeId).add(receiver);
????????});
????????//獲取可用節(jié)點(diǎn)
????????Cluster?cluster?=?mServer.getHazelcastInstance().getCluster();
????????Set<Member>?members?=?cluster.getMembers();
????????List<String>?collect?=?members.stream().map(member?->?member.getStringAttribute(HZ_Cluster_Node_ID)).collect(Collectors.toList());
????????log.info("hazelcast?node?list:{}",JSON.toJSONString(collect));
????????Map<String,?Member>?memberMap?=?members.stream().collect(Collectors.toMap(
????????????member?->?member.getStringAttribute(HZ_Cluster_Node_ID),?member?->?member,?(k1,?k2?)->k1));
????????//按照節(jié)點(diǎn)分發(fā)
????????nodeMap.forEach((nodeId,?set)?->?{
????????????//?轉(zhuǎn)發(fā)到其他節(jié)點(diǎn)發(fā)送
????????????if?(!nodeId.equals(localNodeId)?&&?memberMap.containsKey(nodeId))?{
????????????????WFCMessage.NotifyMessage2Receivers?notifyMessage2Receivers?=?WFCMessage.NotifyMessage2Receivers.newBuilder()
????????????????????.setMessageId(messageId)
????????????????????.addAllReceivers(set)
????????????????????.setExceptClientId(exceptClientId==null?"":exceptClientId)
????????????????????.setPullType(pullType)
????????????????????.setTopic(topic)
????????????????????.build();
????????????????clusterQueueService.publishMessage(nodeId,nodeId,null,?IMTopic.NotifyMessage2ReceiversTopic,?notifyMessage2Receivers.toByteArray());
????????????}
????????????//?當(dāng)前節(jié)點(diǎn)處理發(fā)送
????????????else?{
????????????????WFCMessage.Message?message?=?mServer.getStore().messagesStore().getMessage(messageId);
????????????????if?(message?!=?null)?{
????????????????????//?Add?By?Youqibin?16:11?2022/3/15?接收通知前置處理
????????????????????preHandle(message,?set);
????????????????????mServer.getImBusinessScheduler().execute(()?->messagesPublisher.publish2Receivers(message,?set,?exceptClientId,?pullType,?localNodeId));
????????????????????//?Add?By?Youqibin?16:11?2022/3/15?接收通知后置處理
????????????????????postHandle(message,?set);
????????????????}
????????????}
????????});
????}
關(guān)鍵點(diǎn)解析:
-
clusterQueueService.publishMessage, 使用Disruptor發(fā)送消息事件,高性能異步處理
2.3 事件處理器 onEvent
當(dāng) Disruptor 事件發(fā)布后,ClusterPublishEventHandler.onEvent
負(fù)責(zé)實(shí)際的消息轉(zhuǎn)發(fā)邏輯。
public?class?ClusterPublishEventHandler?implements?EventHandler<ClusterPublishEvent>?{
????private?final?Server?server;
????private?final?String?nodeId;
????public?ClusterPublishEventHandler(Server?server,?String?nodeId)?{
????????this.server?=?server;
????????this.nodeId?=?nodeId;
????}
????@Override
????public?void?onEvent(ClusterPublishEvent?event,?long?sequence,?boolean?endOfBatch)?{
????????log.info("Processing?event:?{}?on?node:?{}",?event,?nodeId);
????????server.forwardMessage(nodeId,?event.getFromUser(),?event.getClientId(),?event.getTopic(),?event.getPayload());
????}
}
關(guān)鍵點(diǎn)解析:
-
onEvent
方法接收到ClusterPublishEvent
后,調(diào)用server.forwardMessage
進(jìn)行消息轉(zhuǎn)發(fā)。 -
endOfBatch
用于標(biāo)識當(dāng)前事件是否為批處理中的最后一個事件。 -
log.info
記錄消息處理的關(guān)鍵日志,便于后續(xù)排查。
3. 總結(jié)
本文介紹了 Disruptor 在 IM 系統(tǒng)中的應(yīng)用,核心邏輯包括:
-
初始化 Disruptor:為每個 nodeId
創(chuàng)建獨(dú)立的 RingBuffer。 -
按照節(jié)點(diǎn)轉(zhuǎn)發(fā)消息:將用戶消息存入對應(yīng)節(jié)點(diǎn)的 RingBuffer。 -
消息處理: onEvent
方法從 RingBuffer 讀取消息,并執(zhí)行轉(zhuǎn)發(fā)。
通過 Disruptor,可以大幅降低鎖競爭,提高 IM 系統(tǒng)的吞吐量,使其能夠在高并發(fā)環(huán)境下穩(wěn)定運(yùn)行。
4. 最后
歡迎關(guān)注加瓦點(diǎn)燈,每天推送干貨知識,一起進(jìn)步!
本文由 mdnice 多平臺發(fā)布