網(wǎng)上購(gòu)物app進(jìn)一步優(yōu)化落實(shí)
????????RocketMQ的Broker分為Master和Slave兩個(gè)角色,為了保證高可用性,Master角色的機(jī)器接收到消息后,要把內(nèi)容同步到Slave機(jī)器上,這樣一旦Master宕機(jī),Slave機(jī)器依然可以提供服務(wù)。下面分析Master和Slave角色機(jī)器間同步功能實(shí)現(xiàn)的源碼。
1 同步屬性信息
Slave需要和Master同步的不只是消息本身,一些元數(shù)據(jù)信息也需要同步,比如TopicConfig信息、ConsumerOffset信息、DelayOffset和SubscriptionGroupConfig信息。Broker在啟動(dòng)的時(shí)候,判斷自己的角色是否是Slave,是的話就啟動(dòng)定時(shí)同步任務(wù),如代碼清單12-1所示。
代碼清單12-1 Slave角色定時(shí)同步元數(shù)據(jù)信息
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
??? if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
??????? this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
??????? this.updateMasterHAServerAddrPeriodically = false;
??? } else {
??????? this.updateMasterHAServerAddrPeriodically = true;
??? }
??? this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
? ??????@Override
??????? public void run() {
??????????? try {
??????????????? BrokerController.this.slaveSynchronize.syncAll();
??????????? } catch (Throwable e) {
??????????????? log.error("ScheduledTask syncAll slave exception", e);
??????????? }
?????? ?}
??? }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}
?
在syncAll函數(shù)里,調(diào)用syncTopicConfig()、getAllConsumerOffset()、syncDelayOffset()和syncSubscriptionGroupConfig()進(jìn)行元數(shù)據(jù)同步。我們以syncConsumerOffset為例,來(lái)看看底層的具體實(shí)現(xiàn),如代碼清單12-2所示。
代碼清單12-2 getAllConsumerOffset具體實(shí)現(xiàn)
public ConsumerOffsetSerializeWrapper getAllConsumerOffset(
??? final String addr) throws InterruptedException, RemotingTimeoutException,
??? RemotingSendRequestException, RemotingConnectException, MQBroker-Exception {
??? RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
??? RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
??? assert response != null;
??? switch (response.getCode()) {
??????? case ResponseCode.SUCCESS: {
??????????? return ConsumerOffsetSerializeWrapper.decode(response.getBody(), ConsumerOffsetSerializeWrapper.class);
??????? }
??????? default:
??????????? break;
??? }
??? throw new MQBrokerException(response.getCode(), response.getRemark());
}
?
getAllConsumerOffset()的基本邏輯是組裝一個(gè)RemotingCommand,底層通過(guò)Netty將消息發(fā)送到Master角色的Broker,然后獲取Offset信息。
2 同步消息體
下面介紹Master和Slave之間同步消息體內(nèi)容的方法,也就是同步CommitLog內(nèi)容的方法。CommitLog和元數(shù)據(jù)信息不同:首先,CommitLog的數(shù)據(jù)量比元數(shù)據(jù)要大;其次,對(duì)實(shí)時(shí)性和可靠性要求也不一樣。元數(shù)據(jù)信息是定時(shí)同步的,在兩次同步的時(shí)間差里,如果出現(xiàn)異??赡軙?huì)造成Master上的元數(shù)據(jù)內(nèi)容和Slave上的元數(shù)據(jù)內(nèi)容不一致,不過(guò)這種情況還可以補(bǔ)救(手動(dòng)調(diào)整Offset,重啟Consumer等)。CommitLog在高可靠性場(chǎng)景下如果沒(méi)有及時(shí)同步,一旦Master機(jī)器出故障,消息就徹底丟失了。所以有專門的代碼來(lái)實(shí)現(xiàn)Master和Slave之間消息體內(nèi)容的同步。
主要的實(shí)現(xiàn)代碼在Broker模塊的org.apache.rocketmq.store.ha包中,里面包括HAService、HAConnection和WaitNotifyObject這三個(gè)類。
HAService是實(shí)現(xiàn)commitLog同步的主體,它在Master機(jī)器和Slave機(jī)器上執(zhí)行的邏輯不同,默認(rèn)是在Master機(jī)器上執(zhí)行,見(jiàn)代碼清單12-3。
代碼清單12-3 根據(jù)Broker角色,確定是否設(shè)置HaMasterAddress
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
??? if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig
??????? .getHaMasterAddress().length() >= 6) {
??????? this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
??????? this.updateMasterHAServerAddrPeriodically = false;
??? } else {
??????? this.updateMasterHAServerAddrPeriodically = true;
??? }
?
當(dāng)Broker角色是Slave的時(shí)候,MasterAddr的值會(huì)被正確設(shè)置,這樣HAService在啟動(dòng)的時(shí)候,在HAClient這個(gè)內(nèi)部類中,connectMaster會(huì)被正確執(zhí)行,如代碼清單12-4所示。
代碼清單12-4 Slave角色連接Master
private boolean connectMaster() throws ClosedChannelException {
??? if (null == socketChannel) {
??? ????String addr = this.masterAddress.get();
??????? if (addr != null) {
??????????? SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
??????????? if (socketAddress != null) {
??????????????? this.socketChannel = RemotingUtil.connect(socketAddress);
??????????????? if (this.socketChannel != null) {
??????????????????? this.socketChannel.register(this.selector, SelectionKey.OP_READ);
??????????????? }
??????????? }
??????? }
??????? this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
??????? this.lastWriteTimestamp = System.currentTimeMillis();
??? }
??? return this.socketChannel != null;
}
?
從代碼中可以看出,HAClient試圖通過(guò)Java NIO函數(shù)去連接Master角色的Broker。Master角色有相應(yīng)的監(jiān)聽(tīng)代碼,如代碼清單12-5所示。
代碼清單12-5 監(jiān)聽(tīng)Slave的HA連接
public void beginAccept() throws Exception {
??? this.serverSocketChannel = ServerSocketChannel.open();
??? this.selector = RemotingUtil.openSelector();
??? this.serverSocketChannel.socket().setReuseAddress(true);
??? this.serverSocketChannel.socket().bind(this.socketAddressListen);
??? this.serverSocketChannel.configureBlocking(false);
??? this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}
CommitLog的同步,不是經(jīng)過(guò)netty command的方式,而是直接進(jìn)行TCP連接,這樣效率更高。連接成功以后,通過(guò)對(duì)比Master和Slave的Offset,不斷進(jìn)行同步。
3 sync_master和async_master
sync_master和async_master是寫在Broker配置文件里的配置參數(shù),這個(gè)參數(shù)影響的是主從同步的方式。從字面意思理解,sync_master是同步方式,也就是Master角色Broker中的消息要立刻同步過(guò)去;async_master是異步方式,也就是Master角色Broker中的消息是通過(guò)異步處理的方式同步到Slave角色的機(jī)器上的。下面結(jié)合代碼來(lái)分析,sync_master下的消息同步如代碼清單12-6所示。
代碼清單12-6 sync_master下的消息同步
public void handleHA(AppendMessageResult result,
??? PutMessageResult putMessageResult, MessageExt messageExt) {
??? if (BrokerRole.SYNC_MASTER == this.defaultMessageStore
??????? .getMessageStoreConfig().getBrokerRole()) {
??????? HAService service = this.defaultMessageStore.getHaService();
??????? if (messageExt.isWaitStoreMsgOK()) {
??????????? // Determine whether to wait
??????????? if (service.isSlaveOK(result.getWroteOffset() + result
??????????????? .getWroteBytes())) {
??????????????? GroupCommitRequest request = new GroupCommitRequest
??????????????????? (result.getWroteOffset() + result
??????????????????? .getWroteBytes());
??????????????? service.putRequest(request);
??????????????? service.getWaitNotifyObject().wakeupAll();
??????????????? boolean flushOK =
??????????????????? request.waitForFlush(this.defaultMessageStore
??????????????????????? .getMessageStoreConfig().getSyncFlushTimeout());
??????????????? if (!flushOK) {
??????????????????? log.error("do sync transfer other node, wait return, " +
??????????????????????? "but failed, topic: " + messageExt
??????????????????????? .getTopic() + " tags: "
??????????????????????? + messageExt.getTags() + " client address: " +
??????????????????????? messageExt.getBornHostNameString());
??????????????????? putMessageResult.setPutMessageStatus(PutMessageStatus
??????????????????????? .FLUSH_SLAVE_TIMEOUT);
?? ?????????????}
??????????? }
??????????? // Slave problem
??????????? else {
??????????????? // Tell the producer, slave not available
??????????????? putMessageResult.setPutMessageStatus(PutMessageStatus
??????????????????? .SLAVE_NOT_AVAILABLE);
??????? ????}
??????? }
??? }
}
?
在CommitLog類的putMessage函數(shù)末尾,調(diào)用handleHA函數(shù)。代碼中的關(guān)鍵詞是wakeupAll和waitForFlush,在同步方式下,Master每次寫消息的時(shí)候,都會(huì)等待向Slave同步消息的過(guò)程,同步完成后再返回,如代碼清單12-7所示。(putMessage函數(shù)比較長(zhǎng),僅列出關(guān)鍵的代碼)。
代碼清單12-7 putMessage中調(diào)用handleHA
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
??? // Set the storage time
??? msg.setStoreTimestamp(System.currentTimeMillis());
??? // Set the message body BODY CRC (consider the most appropriate setting
??? // on the client)
??? msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
??? // Back to Results
??? AppendMessageResult result = null;
??? StoreStatsService storeStatsService = this.defaultMessageStore
??????? .getStoreStatsService();
??? String topic = msg.getTopic();
??? int queueId = msg.getQueueId();
? ……
??? handleDiskFlush(result, putMessageResult, msg);
??? handleHA(result, putMessageResult, msg);
??? return putMessageResult;
}
?