中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁(yè) > news >正文

網(wǎng)上購(gòu)物app進(jìn)一步優(yōu)化落實(shí)

網(wǎng)上購(gòu)物app,進(jìn)一步優(yōu)化落實(shí),網(wǎng)站開發(fā)標(biāo)書怎么寫,企業(yè)服務(wù)公司簡(jiǎn)介怎么寫RocketMQ的Broker分為Master和Slave兩個(gè)角色,為了保證高可用性,Master角色的機(jī)器接收到消息后,要把內(nèi)容同步到Slave機(jī)器上,這樣一旦Master宕機(jī),Slave機(jī)器依然可以提供服務(wù)。下面分析Master和Slave角色機(jī)器間同步功能實(shí)…

????????RocketMQ的Broker分為Master和Slave兩個(gè)角色,為了保證高可用性,Master角色的機(jī)器接收到消息后,要把內(nèi)容同步到Slave機(jī)器上,這樣一旦Master宕機(jī),Slave機(jī)器依然可以提供服務(wù)。下面分析Master和Slave角色機(jī)器間同步功能實(shí)現(xiàn)的源碼。

1 同步屬性信息

Slave需要和Master同步的不只是消息本身,一些元數(shù)據(jù)信息也需要同步,比如TopicConfig信息、ConsumerOffset信息、DelayOffset和SubscriptionGroupConfig信息。Broker在啟動(dòng)的時(shí)候,判斷自己的角色是否是Slave,是的話就啟動(dòng)定時(shí)同步任務(wù),如代碼清單12-1所示。

代碼清單12-1 Slave角色定時(shí)同步元數(shù)據(jù)信息

if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
??? if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
??????? this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
??????? this.updateMasterHAServerAddrPeriodically = false;
??? } else {
??????? this.updateMasterHAServerAddrPeriodically = true;
??? }
??? this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
? ??????@Override
??????? public void run() {
??????????? try {
??????????????? BrokerController.this.slaveSynchronize.syncAll();
??????????? } catch (Throwable e) {
??????????????? log.error("ScheduledTask syncAll slave exception", e);
??????????? }
?????? ?}
??? }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}

?

在syncAll函數(shù)里,調(diào)用syncTopicConfig()、getAllConsumerOffset()、syncDelayOffset()和syncSubscriptionGroupConfig()進(jìn)行元數(shù)據(jù)同步。我們以syncConsumerOffset為例,來(lái)看看底層的具體實(shí)現(xiàn),如代碼清單12-2所示。

代碼清單12-2 getAllConsumerOffset具體實(shí)現(xiàn)

public ConsumerOffsetSerializeWrapper getAllConsumerOffset(
??? final String addr) throws InterruptedException, RemotingTimeoutException,
??? RemotingSendRequestException, RemotingConnectException, MQBroker-Exception {
??? RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
??? RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
??? assert response != null;
??? switch (response.getCode()) {
??????? case ResponseCode.SUCCESS: {
??????????? return ConsumerOffsetSerializeWrapper.decode(response.getBody(), ConsumerOffsetSerializeWrapper.class);
??????? }
??????? default:
??????????? break;
??? }
??? throw new MQBrokerException(response.getCode(), response.getRemark());
}

?

getAllConsumerOffset()的基本邏輯是組裝一個(gè)RemotingCommand,底層通過(guò)Netty將消息發(fā)送到Master角色的Broker,然后獲取Offset信息。

2 同步消息體

下面介紹Master和Slave之間同步消息體內(nèi)容的方法,也就是同步CommitLog內(nèi)容的方法。CommitLog和元數(shù)據(jù)信息不同:首先,CommitLog的數(shù)據(jù)量比元數(shù)據(jù)要大;其次,對(duì)實(shí)時(shí)性和可靠性要求也不一樣。元數(shù)據(jù)信息是定時(shí)同步的,在兩次同步的時(shí)間差里,如果出現(xiàn)異??赡軙?huì)造成Master上的元數(shù)據(jù)內(nèi)容和Slave上的元數(shù)據(jù)內(nèi)容不一致,不過(guò)這種情況還可以補(bǔ)救(手動(dòng)調(diào)整Offset,重啟Consumer等)。CommitLog在高可靠性場(chǎng)景下如果沒(méi)有及時(shí)同步,一旦Master機(jī)器出故障,消息就徹底丟失了。所以有專門的代碼來(lái)實(shí)現(xiàn)Master和Slave之間消息體內(nèi)容的同步。

主要的實(shí)現(xiàn)代碼在Broker模塊的org.apache.rocketmq.store.ha包中,里面包括HAService、HAConnection和WaitNotifyObject這三個(gè)類。

HAService是實(shí)現(xiàn)commitLog同步的主體,它在Master機(jī)器和Slave機(jī)器上執(zhí)行的邏輯不同,默認(rèn)是在Master機(jī)器上執(zhí)行,見(jiàn)代碼清單12-3。

代碼清單12-3 根據(jù)Broker角色,確定是否設(shè)置HaMasterAddress

if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
??? if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig
??????? .getHaMasterAddress().length() >= 6) {
??????? this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
??????? this.updateMasterHAServerAddrPeriodically = false;
??? } else {
??????? this.updateMasterHAServerAddrPeriodically = true;
??? }

?

當(dāng)Broker角色是Slave的時(shí)候,MasterAddr的值會(huì)被正確設(shè)置,這樣HAService在啟動(dòng)的時(shí)候,在HAClient這個(gè)內(nèi)部類中,connectMaster會(huì)被正確執(zhí)行,如代碼清單12-4所示。

代碼清單12-4 Slave角色連接Master

private boolean connectMaster() throws ClosedChannelException {
??? if (null == socketChannel) {
??? ????String addr = this.masterAddress.get();
??????? if (addr != null) {
??????????? SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
??????????? if (socketAddress != null) {
??????????????? this.socketChannel = RemotingUtil.connect(socketAddress);
??????????????? if (this.socketChannel != null) {
??????????????????? this.socketChannel.register(this.selector, SelectionKey.OP_READ);
??????????????? }
??????????? }
??????? }
??????? this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

??????? this.lastWriteTimestamp = System.currentTimeMillis();
??? }
??? return this.socketChannel != null;
}

?

從代碼中可以看出,HAClient試圖通過(guò)Java NIO函數(shù)去連接Master角色的Broker。Master角色有相應(yīng)的監(jiān)聽(tīng)代碼,如代碼清單12-5所示。

代碼清單12-5 監(jiān)聽(tīng)Slave的HA連接

public void beginAccept() throws Exception {
??? this.serverSocketChannel = ServerSocketChannel.open();
??? this.selector = RemotingUtil.openSelector();
??? this.serverSocketChannel.socket().setReuseAddress(true);
??? this.serverSocketChannel.socket().bind(this.socketAddressListen);
??? this.serverSocketChannel.configureBlocking(false);
??? this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}

CommitLog的同步,不是經(jīng)過(guò)netty command的方式,而是直接進(jìn)行TCP連接,這樣效率更高。連接成功以后,通過(guò)對(duì)比Master和Slave的Offset,不斷進(jìn)行同步。

3 sync_master和async_master

sync_master和async_master是寫在Broker配置文件里的配置參數(shù),這個(gè)參數(shù)影響的是主從同步的方式。從字面意思理解,sync_master是同步方式,也就是Master角色Broker中的消息要立刻同步過(guò)去;async_master是異步方式,也就是Master角色Broker中的消息是通過(guò)異步處理的方式同步到Slave角色的機(jī)器上的。下面結(jié)合代碼來(lái)分析,sync_master下的消息同步如代碼清單12-6所示。

代碼清單12-6 sync_master下的消息同步

public void handleHA(AppendMessageResult result,
??? PutMessageResult putMessageResult, MessageExt messageExt) {
??? if (BrokerRole.SYNC_MASTER == this.defaultMessageStore
??????? .getMessageStoreConfig().getBrokerRole()) {
??????? HAService service = this.defaultMessageStore.getHaService();
??????? if (messageExt.isWaitStoreMsgOK()) {
??????????? // Determine whether to wait
??????????? if (service.isSlaveOK(result.getWroteOffset() + result
??????????????? .getWroteBytes())) {
??????????????? GroupCommitRequest request = new GroupCommitRequest
??????????????????? (result.getWroteOffset() + result
??????????????????? .getWroteBytes());
??????????????? service.putRequest(request);
??????????????? service.getWaitNotifyObject().wakeupAll();
??????????????? boolean flushOK =
??????????????????? request.waitForFlush(this.defaultMessageStore
??????????????????????? .getMessageStoreConfig().getSyncFlushTimeout());
??????????????? if (!flushOK) {
??????????????????? log.error("do sync transfer other node, wait return, " +
??????????????????????? "but failed, topic: " + messageExt
??????????????????????? .getTopic() + " tags: "
??????????????????????? + messageExt.getTags() + " client address: " +
??????????????????????? messageExt.getBornHostNameString());
??????????????????? putMessageResult.setPutMessageStatus(PutMessageStatus
??????????????????????? .FLUSH_SLAVE_TIMEOUT);
?? ?????????????}
??????????? }
??????????? // Slave problem
??????????? else {
??????????????? // Tell the producer, slave not available
??????????????? putMessageResult.setPutMessageStatus(PutMessageStatus
??????????????????? .SLAVE_NOT_AVAILABLE);
??????? ????}
??????? }
??? }
}

?

在CommitLog類的putMessage函數(shù)末尾,調(diào)用handleHA函數(shù)。代碼中的關(guān)鍵詞是wakeupAll和waitForFlush,在同步方式下,Master每次寫消息的時(shí)候,都會(huì)等待向Slave同步消息的過(guò)程,同步完成后再返回,如代碼清單12-7所示。(putMessage函數(shù)比較長(zhǎng),僅列出關(guān)鍵的代碼)。

代碼清單12-7 putMessage中調(diào)用handleHA

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
??? // Set the storage time
??? msg.setStoreTimestamp(System.currentTimeMillis());
??? // Set the message body BODY CRC (consider the most appropriate setting
??? // on the client)
??? msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
??? // Back to Results
??? AppendMessageResult result = null;

??? StoreStatsService storeStatsService = this.defaultMessageStore
??????? .getStoreStatsService();

??? String topic = msg.getTopic();
??? int queueId = msg.getQueueId();

? ……

??? handleDiskFlush(result, putMessageResult, msg);
??? handleHA(result, putMessageResult, msg);

??? return putMessageResult;
}

?

http://www.risenshineclean.com/news/7827.html

相關(guān)文章:

  • 做網(wǎng)站開發(fā)需要什么證書重慶百度推廣關(guān)鍵詞優(yōu)化
  • 想做個(gè)ktv的網(wǎng)站怎么做網(wǎng)絡(luò)廣告策劃方案范文
  • 武漢市建設(shè)廳網(wǎng)站國(guó)內(nèi)最新新聞?wù)?/a>
  • 微博推廣軟件網(wǎng)站的seo是什么意思
  • 電子請(qǐng)柬網(wǎng)站開發(fā)app推廣實(shí)名認(rèn)證接單平臺(tái)
  • 政府網(wǎng)站頁(yè)面設(shè)計(jì)標(biāo)準(zhǔn)win10優(yōu)化大師怎么樣
  • 上海哪家網(wǎng)站建得好百度seo最成功的優(yōu)化
  • 福州外文網(wǎng)站建設(shè)網(wǎng)絡(luò)推廣平臺(tái)幾大類
  • 企業(yè)網(wǎng)站制作公司24小時(shí)接單seo sem
  • wordpress郵件模板seo實(shí)戰(zhàn)視頻
  • 服務(wù)器windos做網(wǎng)站整合營(yíng)銷傳播方案
  • 家居網(wǎng)站建設(shè)全網(wǎng)營(yíng)銷微信營(yíng)銷軟件免費(fèi)版
  • wordpress的cms插件山東進(jìn)一步優(yōu)化
  • 校園網(wǎng)上超市網(wǎng)站建設(shè)推廣b站在線觀看人數(shù)在哪
  • 免費(fèi)個(gè)人網(wǎng)站空間申請(qǐng)2023年10月疫情還會(huì)嚴(yán)重嗎
  • 網(wǎng)站建設(shè)7個(gè)基本流程寧波seo外包服務(wù)商
  • 如果在瀏覽器上做一網(wǎng)站廣告大約需要多少錢門戶網(wǎng)站排行榜
  • 什么樣的網(wǎng)站空間做電影網(wǎng)站不卡seo深圳網(wǎng)絡(luò)推廣
  • wordpress分享到qq空間班級(jí)優(yōu)化大師官方免費(fèi)下載
  • 全屏網(wǎng)站怎么做網(wǎng)絡(luò)推廣運(yùn)營(yíng)推廣
  • 專業(yè)網(wǎng)站建設(shè)86215信息流優(yōu)化師是做什么的
  • 專業(yè)提供網(wǎng)站建設(shè)服務(wù)舉一個(gè)網(wǎng)絡(luò)營(yíng)銷的例子
  • 沈陽(yáng)全網(wǎng)推廣公司哪家好免費(fèi)seo培訓(xùn)
  • 網(wǎng)站后臺(tái)banner更換佛山seo關(guān)鍵詞排名
  • 做論壇網(wǎng)站時(shí)應(yīng)該注意什么印度疫情最新消息
  • 為什么要用國(guó)外服務(wù)器做網(wǎng)站搜索引擎優(yōu)化英文簡(jiǎn)稱為
  • 做網(wǎng)站找哪個(gè)軟件網(wǎng)店運(yùn)營(yíng)具體做什么
  • 海珠區(qū)疫情嚴(yán)重嗎鄭州seo代理外包
  • 優(yōu)秀的網(wǎng)站首頁(yè)布局360競(jìng)價(jià)推廣
  • 案例建網(wǎng)站網(wǎng)址域名ip查詢