中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁(yè) > news >正文

網(wǎng)站建設(shè)知識(shí)百度官方網(wǎng)站下載安裝

網(wǎng)站建設(shè)知識(shí),百度官方網(wǎng)站下載安裝,個(gè)人網(wǎng)站 數(shù)據(jù)庫(kù)如何上傳到空間,學(xué)校網(wǎng)站的建設(shè)目標(biāo)目錄 一、消息存儲(chǔ)概覽 二、Broker接收消息 三、消息存儲(chǔ)流程 1. DefaultMessageStore類 2. 存儲(chǔ)流程 1):同步與異步存儲(chǔ) 2):CommitLog異步存儲(chǔ)消息 3):提交消息(Commit) 四、參考資料 一、消息存儲(chǔ)概覽 如下圖所…

目錄

一、消息存儲(chǔ)概覽

二、Broker接收消息

三、消息存儲(chǔ)流程

1.?DefaultMessageStore類

2. 存儲(chǔ)流程

????????1):同步與異步存儲(chǔ)

????????2):CommitLog異步存儲(chǔ)消息

????????3):提交消息(Commit)

四、參考資料


一、消息存儲(chǔ)概覽

????????如下圖所示,是消息從生產(chǎn)者發(fā)送消息到消費(fèi)者消費(fèi)消息的大致流程。

  • step1:生產(chǎn)者發(fā)送消息到消息存儲(chǔ)Broker端;
  • step2:單一文件Commitlog存儲(chǔ)所有主題消息,確保順序?qū)懭?#xff0c;提高吞吐量;
  • step3:消息通過(guò)堆外緩存,Commit消息寫(xiě)入文件內(nèi)存映射,然后Flush寫(xiě)入磁盤(pán);
  • step4:消息Flush磁盤(pán)后,把消息轉(zhuǎn)發(fā)到ConsumeQueue、IndexFile供消費(fèi)者消費(fèi);
  • step5:主題下消費(fèi)隊(duì)列內(nèi)容相同,但是一個(gè)消費(fèi)隊(duì)列在同一時(shí)刻只能被一個(gè)消費(fèi)者消費(fèi);
  • step6:消費(fèi)者根據(jù)集群/廣播模式、PUSH/PULL模式來(lái)消費(fèi)消息。

????????如何實(shí)現(xiàn)順序存儲(chǔ)的呢?通過(guò)org.apache.rocketmq.store.PutMessageLock接口,在消息追加文件內(nèi)存映射時(shí),加鎖實(shí)現(xiàn)存儲(chǔ)消息串行。

? ? ? ? 消息存儲(chǔ)模式:同步、異步。默認(rèn)異步存儲(chǔ),但是無(wú)論同步還是異步,最終執(zhí)行存儲(chǔ)方法是org.apache.rocketmq.store.CommitLog#asyncPutMessage(異步執(zhí)行,提高存儲(chǔ)效率),而同步需要等待存儲(chǔ)結(jié)果才能返回。

? ? ? ? 本章主要介紹生產(chǎn)者發(fā)送消息,Broker如何接收消息,如何Commit寫(xiě)入文件內(nèi)存映射,并沒(méi)有介紹如何刷盤(pán)、轉(zhuǎn)發(fā)到ConsumeQueue和IndexFile、HA主從同步等內(nèi)容。

二、Broker接收消息

????????org.apache.rocketmq.broker.processor.SendMessageProcessor是生產(chǎn)者發(fā)送消息后,Broker接收消息的核心實(shí)現(xiàn)類。 發(fā)送消息請(qǐng)求碼是RequestCode.SEND_MESSAGE。發(fā)送消息參考《RocketMQ5.0.0消息發(fā)送》。

????????org.apache.rocketmq.broker.processor.SendMessageProcessor#processRequest不僅處理生產(chǎn)者發(fā)來(lái)的消息,同時(shí)還是處理消費(fèi)端消費(fèi)ACK的處理請(qǐng)求。其核心邏輯是sendMessage或sendBatchMessage處理方法,都是Broker端存儲(chǔ)消息。

????????以下代碼是org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage存儲(chǔ)之前對(duì)消息的預(yù)處理。 核心邏輯如下:

  • randomQueueId():發(fā)送時(shí)是否指定消費(fèi)隊(duì)列,若沒(méi)有指定,則隨機(jī)選擇;
  • handleRetryAndDLQ():消息是否延遲或重試消息,并處理;
  • sendTransactionPrepareMessage變量:判定是否事務(wù)消息,true事務(wù)消息;
  • 異步存儲(chǔ)消息(默認(rèn))

????????????????事務(wù)消息存儲(chǔ):TransactionalMessageServiceImpl#asyncPrepareMessage

? ? ? ? ? ? ? ? 普通消息存儲(chǔ):DefaultMessageStore#asyncPutMessage

  • 同步存儲(chǔ)消息:

????????????????事務(wù)消息存儲(chǔ):TransactionalMessageServiceImpl#prepareMessage

????????????????普通消息存儲(chǔ):DefaultMessageStore#putMessage

/*** 存儲(chǔ)之前,對(duì)消息的處理* step1:預(yù)發(fā)送處理,如:檢查消息、主題是否符合規(guī)范* step2:發(fā)送消息時(shí),是否指定消費(fèi)隊(duì)列,若沒(méi)有則隨機(jī)選擇* step3:消息是否進(jìn)入重試或延遲隊(duì)列中(重試次數(shù)失敗)* step4:消息是否是事務(wù)消息,若是則存儲(chǔ)為prepare消息* step5:BrokerConfig#asyncSendEnable是否開(kāi)啟異步存儲(chǔ),默認(rèn)開(kāi)啟true*        (異步存儲(chǔ)、同步存儲(chǔ))*/
public RemotingCommand sendMessage(final ChannelHandlerContext ctx,final RemotingCommand request,final SendMessageContext sendMessageContext,final SendMessageRequestHeader requestHeader,final TopicQueueMappingContext mappingContext,final SendMessageCallback sendMessageCallback) throws RemotingCommandException {// 預(yù)發(fā)送處理,如:檢查消息、主題是否符合規(guī)范final RemotingCommand response = preSend(ctx, request, requestHeader);if (response.getCode() != -1) {return response;}final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();// 獲取消息體final byte[] body = request.getBody();// 發(fā)送消息時(shí),是否指定消費(fèi)隊(duì)列,若沒(méi)有則隨機(jī)選擇int queueIdInt = requestHeader.getQueueId();// 獲取主題配置屬性TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (queueIdInt < 0) { // 隊(duì)列ID不符合,則在寫(xiě)隊(duì)列隨機(jī)找個(gè)queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());}MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(requestHeader.getTopic());msgInner.setQueueId(queueIdInt);// 消息擴(kuò)展屬性Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());// 消息是否進(jìn)入重試或延遲隊(duì)列中(重試次數(shù)失敗)if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig, oriProps)) {return response;}msgInner.setBody(body);msgInner.setFlag(requestHeader.getFlag());String uniqKey = oriProps.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (uniqKey == null || uniqKey.length() <= 0) {uniqKey = MessageClientIDSetter.createUniqID();oriProps.put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, uniqKey);}MessageAccessor.setProperties(msgInner, oriProps);msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags()));msgInner.setBornTimestamp(requestHeader.getBornTimestamp());msgInner.setBornHost(ctx.channel().remoteAddress());msgInner.setStoreHost(this.getStoreHost());msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));// Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());// 事務(wù)標(biāo)簽String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);boolean sendTransactionPrepareMessage = false;if (Boolean.parseBoolean(traFlag)&& !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1// Broker禁止事務(wù)消息存儲(chǔ)if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return response;}sendTransactionPrepareMessage = true;}long beginTimeMillis = this.brokerController.getMessageStore().now();// 消息是否異步存儲(chǔ)if (brokerController.getBrokerConfig().isAsyncSendEnable()) {CompletableFuture<PutMessageResult> asyncPutMessageFuture;if (sendTransactionPrepareMessage) { // 事務(wù)prepare操作asyncPutMessageFuture = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);} else {asyncPutMessageFuture = this.brokerController.getMessageStore().asyncPutMessage(msgInner);}final int finalQueueIdInt = queueIdInt;final MessageExtBrokerInner finalMsgInner = msgInner;asyncPutMessageFuture.thenAcceptAsync(putMessageResult -> {RemotingCommand responseFuture =handlePutMessageResult(putMessageResult, response, request, finalMsgInner, responseHeader, sendMessageContext,ctx, finalQueueIdInt, beginTimeMillis, mappingContext);if (responseFuture != null) {doResponse(ctx, request, responseFuture);}sendMessageCallback.onComplete(sendMessageContext, response);}, this.brokerController.getPutMessageFutureExecutor());// Returns null to release the send message threadreturn null;}// 消息同步存儲(chǔ)else {PutMessageResult putMessageResult = null;// 事務(wù)消息存儲(chǔ)if (sendTransactionPrepareMessage) {putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);} else {// 同步存儲(chǔ)消息putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);}handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt, beginTimeMillis, mappingContext);sendMessageCallback.onComplete(sendMessageContext, response);return response;}
}

三、消息存儲(chǔ)流程

1.?DefaultMessageStore類

????????org.apache.rocketmq.store.DefaultMessageStore是消息存儲(chǔ)實(shí)現(xiàn)類,也是存儲(chǔ)模塊最重要的一個(gè)類,其UML如下。

????????其關(guān)鍵屬性如下代碼所示。同步與異步存儲(chǔ)的方法:

  • 同步消息:單個(gè)消息putMessage()、批量消息putMessages()
  • 異步消息:單個(gè)消息asyncPutMessage()、批量消息asyncPutMessages()
// Commitlog引用次數(shù)
public final PerfCounter.Ticks perfs = new PerfCounter.Ticks(LOGGER);// 存儲(chǔ)配置屬性
private final MessageStoreConfig messageStoreConfig;
// CommitLog(Commitlog文件存儲(chǔ)實(shí)現(xiàn)類)
private final CommitLog commitLog;
// ConsumeQueue文件存儲(chǔ)實(shí)現(xiàn)類
private final ConsumeQueueStore consumeQueueStore;
// 刷盤(pán)線程
private final FlushConsumeQueueService flushConsumeQueueService;
// 刪除過(guò)期Commitlog文件服務(wù)
private final CleanCommitLogService cleanCommitLogService;
// 刪除過(guò)期ConsumeQueue文件服務(wù)
private final CleanConsumeQueueService cleanConsumeQueueService;
// 矯正邏輯偏移量服務(wù)
private final CorrectLogicOffsetService correctLogicOffsetService;
// index文件實(shí)現(xiàn)類
private final IndexService indexService;
// MappedFile分配服務(wù)
private final AllocateMappedFileService allocateMappedFileService;
// 消息提交到Commitlog時(shí)消息轉(zhuǎn)發(fā),構(gòu)建ConsumeQueue、index文件服務(wù)
private ReputMessageService reputMessageService;
// HA服務(wù)(主從同步服務(wù))
private HAService haService;
// 存儲(chǔ)狀態(tài)服務(wù)
private final StoreStatsService storeStatsService;
// 堆內(nèi)存緩存
private final TransientStorePool transientStorePool;// Broker狀態(tài)管理
private final BrokerStatsManager brokerStatsManager;
// 消息達(dá)到監(jiān)聽(tīng)器(消息拉取長(zhǎng)輪詢模式)
private final MessageArrivingListener messageArrivingListener;
// Broker配置屬性
private final BrokerConfig brokerConfig;
// 存儲(chǔ)刷盤(pán)檢查點(diǎn)
private StoreCheckpoint storeCheckpoint;
// 定時(shí)消息存儲(chǔ)實(shí)現(xiàn)類
private TimerMessageStore timerMessageStore;
// 日志打印次數(shù)
private AtomicLong printTimes = new AtomicLong(0);
// Commitlog文件轉(zhuǎn)發(fā)請(qǐng)求
private final LinkedList<CommitLogDispatcher> dispatcherList;// 延遲消息的延遲級(jí)別
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =new ConcurrentHashMap<Integer, Long>(32);// 最大延遲級(jí)別
private int maxDelayLevel;

2. 存儲(chǔ)流程

????????1):同步與異步存儲(chǔ)

????????如下圖所示是同步與異步存儲(chǔ)的實(shí)現(xiàn)方法調(diào)用鏈,它們之間區(qū)別與聯(lián)系:

聯(lián)系:a. 同步存儲(chǔ)實(shí)際是調(diào)用異步存儲(chǔ)方法,即:DefaultMessageStore#asyncPutMessage;

? ? ? ? ? ?b. 最終執(zhí)行存儲(chǔ)方法是org.apache.rocketmq.store.CommitLog#asyncPutMessage;

區(qū)別:同步存儲(chǔ)需要等待存儲(chǔ)結(jié)果waitForPutResult()

????????2):CommitLog異步存儲(chǔ)消息

????????org.apache.rocketmq.store.CommitLog#asyncPutMessage是異步執(zhí)行存儲(chǔ)消息。如下代碼所示,關(guān)鍵步驟如下:

  • step1:mappedFileQueue隊(duì)列中,獲取可寫(xiě)入的Commitlog,即:從commitlog目錄下獲取當(dāng)前寫(xiě)入的Commitlog;
  • step2:獲取當(dāng)前寫(xiě)入Commitlog的偏移量 = 文件偏移量 + 該文件寫(xiě)入位置;
  • step3:是否需要HA(主從Broker同步數(shù)據(jù));
  • step4:CommitLog.calMsgLength獲取該消息的總長(zhǎng)度(不定長(zhǎng),總長(zhǎng)度存儲(chǔ)在前4個(gè)字節(jié));
  • step5:加鎖后(串行寫(xiě)),判定再次Commitlog文件沒(méi)有或已被寫(xiě)滿,則創(chuàng)建新的Commitlog文件;
  • step6:Commit操作,即:消息緩存或直接(是否開(kāi)啟堆外內(nèi)存池)追加到Commitlog文件內(nèi)存映射buffer,追加當(dāng)前消息,并沒(méi)有刷寫(xiě)到磁盤(pán),則返回結(jié)果,方法:{@link DefaultMappedFile#appendMessage};
  • step7:執(zhí)行同步或異步刷盤(pán)、HA主從同步復(fù)制等,方法:?{@link CommitLog#handleDiskFlushAndHA}。
/*** 執(zhí)行存儲(chǔ)消息* step1:mappedFileQueue隊(duì)列中,獲取可寫(xiě)入的Commitlog,即:從commitlog目錄下獲取當(dāng)前寫(xiě)入的Commitlog;* step2:獲取當(dāng)前寫(xiě)入Commitlog的偏移量 = 文件偏移量 + 該文件寫(xiě)入位置;* step3:是否需要HA(主從Broker同步數(shù)據(jù));* step4:CommitLog.calMsgLength獲取該消息的總長(zhǎng)度(不定長(zhǎng),總長(zhǎng)度存儲(chǔ)在前4個(gè)字節(jié));* step5:加鎖后(串行寫(xiě)),判定再次Commitlog文件沒(méi)有或已被寫(xiě)滿,則創(chuàng)建新的Commitlog文件;* step6:Commit操作,即:消息緩存或直接(是否開(kāi)啟堆外內(nèi)存池)追加到Commitlog文件內(nèi)存映射buffer,追加當(dāng)前消息,并沒(méi)有刷寫(xiě)到磁盤(pán),則返回結(jié)果*       {@link DefaultMappedFile#appendMessage}* step7:執(zhí)行同步或異步刷盤(pán)、HA主從同步復(fù)制等*       {@link CommitLog#handleDiskFlushAndHA}* @param msg 消息* @return 存儲(chǔ)結(jié)果*/
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {// Set the storage timeif (!defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {msg.setStoreTimestamp(System.currentTimeMillis());}// Set the message body CRC (consider the most appropriate setting on the client)msg.setBodyCRC(UtilAll.crc32(msg.getBody()));// Back to ResultsAppendMessageResult result = null;StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();String topic = msg.getTopic();InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();if (bornSocketAddress.getAddress() instanceof Inet6Address) {msg.setBornHostV6Flag();}InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();if (storeSocketAddress.getAddress() instanceof Inet6Address) {msg.setStoreHostAddressV6Flag();}PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();updateMaxMessageSize(putMessageThreadLocal);String topicQueueKey = generateKey(putMessageThreadLocal.getKeyBuilder(), msg);long elapsedTimeInLock = 0;// mappedFileQueue隊(duì)列中,獲取可寫(xiě)入的Commitlog,即:從commitlog目錄下獲取當(dāng)前寫(xiě)入的CommitlogMappedFile unlockMappedFile = null;MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();// 當(dāng)前寫(xiě)入Commitlog的偏移量long currOffset;if (mappedFile == null) {currOffset = 0;} else {// 當(dāng)前寫(xiě)入Commitlog的偏移量 = 文件偏移量 + 該文件寫(xiě)入位置currOffset = mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();}// 是否需要HAint needAckNums = this.defaultMessageStore.getMessageStoreConfig().getInSyncReplicas();boolean needHandleHA = needHandleHA(msg);if (needHandleHA && this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) {if (this.defaultMessageStore.getHaService().inSyncReplicasNums(currOffset) < this.defaultMessageStore.getMessageStoreConfig().getMinInSyncReplicas()) {return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null));}if (this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet()) {// -1 means all ack in SyncStateSetneedAckNums = MixAll.ALL_ACK_IN_SYNC_STATE_SET;}} else if (needHandleHA && this.defaultMessageStore.getBrokerConfig().isEnableSlaveActingMaster()) {int inSyncReplicas = Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(),this.defaultMessageStore.getHaService().inSyncReplicasNums(currOffset));needAckNums = calcNeedAckNums(inSyncReplicas);if (needAckNums > inSyncReplicas) {// Tell the producer, don't have enough slaves to handle the send requestreturn CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null));}}topicQueueLock.lock(topicQueueKey);try {boolean needAssignOffset = true;if (defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()&& defaultMessageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {needAssignOffset = false;}if (needAssignOffset) {defaultMessageStore.assignOffset(msg, getMessageNum(msg));}/*當(dāng)前消息編碼,計(jì)算byteBuf(字節(jié)緩存)長(zhǎng)度, null則正常處理CommitLog.calMsgLength獲取該消息的總長(zhǎng)度(不定長(zhǎng),總長(zhǎng)度存儲(chǔ)在前4個(gè)字節(jié))*/PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);if (encodeResult != null) {return CompletableFuture.completedFuture(encodeResult);}msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer());PutMessageContext putMessageContext = new PutMessageContext(topicQueueKey);putMessageLock.lock(); //spin or ReentrantLock ,depending on store configtry {long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();this.beginTimeInLock = beginLockTimestamp;// Here settings are stored timestamp, in order to ensure an orderly// globalif (!defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {msg.setStoreTimestamp(beginLockTimestamp); // 消息存儲(chǔ)時(shí)間戳,確保消息存儲(chǔ)有序}// Commitlog文件沒(méi)有或已被寫(xiě)滿,則創(chuàng)建新的Commitlog文件if (null == mappedFile || mappedFile.isFull()) {// 創(chuàng)建新的Commitlog文件mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise}if (null == mappedFile) {log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, null));}// Commitlog文件寫(xiě)入消息(Commitlog文件內(nèi)存映射buffer,追加當(dāng)前消息,并沒(méi)有刷寫(xiě)到磁盤(pán),則返回結(jié)果)result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);switch (result.getStatus()) {case PUT_OK:onCommitLogAppend(msg, result, mappedFile);break;case END_OF_FILE:onCommitLogAppend(msg, result, mappedFile);unlockMappedFile = mappedFile;// Create a new file, re-write the messagemappedFile = this.mappedFileQueue.getLastMappedFile(0);if (null == mappedFile) {// XXX: warn and notify melog.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, result));}result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);if (AppendMessageStatus.PUT_OK.equals(result.getStatus())) {onCommitLogAppend(msg, result, mappedFile);}break;case MESSAGE_SIZE_EXCEEDED:case PROPERTIES_SIZE_EXCEEDED:beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));case UNKNOWN_ERROR:beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));default:beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));}elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;beginTimeInLock = 0;} finally {putMessageLock.unlock();}} finally {topicQueueLock.unlock(topicQueueKey);}// putMessage加鎖超時(shí)if (elapsedTimeInLock > 500) {log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);}if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {this.defaultMessageStore.unlockMappedFile(unlockMappedFile);}PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);// StatisticsstoreStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(result.getMsgNum());storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());/*執(zhí)行同步或異步刷盤(pán)、HA主從同步復(fù)制等注意:MappedFile.appendMessage只是將消息追加到Commitlog文件內(nèi)存映射buffer中,并沒(méi)有刷寫(xiě)到磁盤(pán),則返回結(jié)果*/return handleDiskFlushAndHA(putMessageResult, msg, needAckNums, needHandleHA);
}

????????3):提交消息(Commit)

????????org.apache.rocketmq.store.logfile.DefaultMappedFile#appendMessage是文件內(nèi)存映射追加消息方法,目的是把堆外緩存池消息或直接Commit到文件內(nèi)存映射,其調(diào)用鏈如下。

????????org.apache.rocketmq.store.logfile.DefaultMappedFile#appendMessagesInner是追加消息到文件內(nèi)存映射的核心方法,如下代碼所示。

/*** Commit操作,即:消息緩存或直接(是否開(kāi)啟堆外內(nèi)存池)追加到Commitlog文件內(nèi)存映射buffer,追加當(dāng)前消息,并沒(méi)有刷寫(xiě)到磁盤(pán),則返回結(jié)果* step1:當(dāng)前Commitlog的寫(xiě)指針,判斷文件是否寫(xiě)滿;* step2:slice():創(chuàng)建與原ByteBuffer共享的內(nèi)存區(qū),擁有獨(dú)立的position、limit、capacity等指針,并設(shè)置position當(dāng)前寫(xiě)指針* step3:判斷是否是批量消息,并追加消息到Commitlog文件內(nèi)存映射buffer,并沒(méi)有刷寫(xiě)到磁盤(pán),則返回結(jié)果*/
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,PutMessageContext putMessageContext) {assert messageExt != null;assert cb != null;// 當(dāng)前Commitlog的寫(xiě)指針int currentPos = WROTE_POSITION_UPDATER.get(this);// 未寫(xiě)滿,則追加if (currentPos < this.fileSize) {/*slice():創(chuàng)建與原ByteBuffer共享的內(nèi)存區(qū),擁有獨(dú)立的position、limit、capacity等指針并設(shè)置position當(dāng)前寫(xiě)指針*/ByteBuffer byteBuffer = appendMessageBuffer().slice();byteBuffer.position(currentPos);AppendMessageResult result;// 批量消息if (messageExt instanceof MessageExtBatch && !((MessageExtBatch) messageExt).isInnerBatch()) {// traditional batch message// Commitlog文件內(nèi)存映射buffer,追加當(dāng)前消息,并沒(méi)有刷寫(xiě)到磁盤(pán),則返回結(jié)果result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,(MessageExtBatch) messageExt, putMessageContext);}// 單個(gè)消息else if (messageExt instanceof MessageExtBrokerInner) {// traditional single message or newly introduced inner-batch message// Commitlog文件內(nèi)存映射buffer,追加當(dāng)前消息,并沒(méi)有刷寫(xiě)到磁盤(pán),則返回結(jié)果result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,(MessageExtBrokerInner) messageExt, putMessageContext);} else {return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}WROTE_POSITION_UPDATER.addAndGet(this, result.getWroteBytes());this.storeTimestamp = result.getStoreTimestamp();return result;}log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}

? ? ? ? 需要注意的是:appendMessageBuffer().slice(),創(chuàng)建與原ByteBuffer共享的內(nèi)存區(qū),擁有獨(dú)立的position、limit、capacity等指針,并設(shè)置position當(dāng)前寫(xiě)指針。創(chuàng)建的內(nèi)存追加消息。

四、參考資料

Rocket Mq消息持久化_飛科-程序人生的博客-CSDN博客

百度安全驗(yàn)證

【RocketMQ】同一個(gè)項(xiàng)目中,同一個(gè)topic,可以存在多個(gè)消費(fèi)者么? - N!CE波 - 博客園

RocketMQ5.0.0消息發(fā)送_愛(ài)我所愛(ài)0505的博客-CSDN博客

RocketMQ5.0.0路由中心NameServer_愛(ài)我所愛(ài)0505的博客-CSDN博客

RocketMQ5.0.0消息存儲(chǔ)<一>_存儲(chǔ)文件及內(nèi)存映射_愛(ài)我所愛(ài)0505的博客-CSDN博客?

http://www.risenshineclean.com/news/47056.html

相關(guān)文章:

  • 做除塵環(huán)保的如何推廣自己的網(wǎng)站seo收錄查詢工具
  • 2022年最新血糖標(biāo)準(zhǔn)權(quán)威發(fā)布徐州seo外包
  • 上海平臺(tái)網(wǎng)站制作公司新東方在線網(wǎng)上課程
  • 找做網(wǎng)站的朋友短視頻營(yíng)銷成功的案例
  • 網(wǎng)站正在建設(shè)中 html企業(yè)培訓(xùn)有哪些方面
  • 企業(yè)加盟網(wǎng)站建設(shè)百家號(hào)權(quán)重查詢站長(zhǎng)工具
  • 哈爾濱網(wǎng)站備案手續(xù)費(fèi)廣告聯(lián)盟官網(wǎng)
  • web前端設(shè)計(jì)模板seo優(yōu)化師是什么
  • 郴州建網(wǎng)站百度熱搜seo
  • 哪里網(wǎng)站開(kāi)發(fā)好國(guó)產(chǎn)免費(fèi)crm系統(tǒng)有哪些在線
  • 騰訊云動(dòng)態(tài)加速WordPress網(wǎng)站頁(yè)面怎么優(yōu)化
  • 裝修設(shè)計(jì)網(wǎng)站哪個(gè)好前端性能優(yōu)化有哪些方法
  • 網(wǎng)站建設(shè)教程模板網(wǎng)站排名seo軟件
  • 仿 wordpress簡(jiǎn)述seo的應(yīng)用范圍
  • 電腦關(guān)鍵字被限制 購(gòu)物網(wǎng)站明星百度指數(shù)在線查詢
  • 動(dòng)漫主題WordPressseo服務(wù)運(yùn)用什么技術(shù)
  • 電子商務(wù)網(wǎng)站開(kāi)發(fā)實(shí)戰(zhàn)長(zhǎng)春seo排名扣費(fèi)
  • 網(wǎng)站目錄文件查看在線一鍵免費(fèi)生成網(wǎng)頁(yè)網(wǎng)站
  • 自己的電腦做網(wǎng)站服務(wù)器嗎北京網(wǎng)站建設(shè)制作開(kāi)發(fā)
  • 食品公司名字大全上海網(wǎng)站seo
  • vs做動(dòng)態(tài)網(wǎng)站登錄青島百度快速排名優(yōu)化
  • 西安做網(wǎng)站推廣營(yíng)銷型網(wǎng)站建設(shè)方案
  • 網(wǎng)站介紹詞百度一下瀏覽器
  • 成都網(wǎng)站建設(shè)費(fèi)用免費(fèi)的推廣引流軟件下載
  • 天貓網(wǎng)站設(shè)計(jì)小米口碑營(yíng)銷案例
  • 網(wǎng)站制作驗(yàn)收單大數(shù)據(jù)是干什么的
  • 廣州市建設(shè)企業(yè)網(wǎng)站平臺(tái)什么叫做網(wǎng)絡(luò)營(yíng)銷
  • 有什么網(wǎng)站可以做初中試題sem是什么崗位
  • 知名高端網(wǎng)站設(shè)計(jì)企業(yè)百度網(wǎng)站檢測(cè)
  • 卡盟網(wǎng)站怎么做圖片素材網(wǎng)站seo方法