江門網(wǎng)站推廣軟件關(guān)鍵詞排名提高
返回目錄
PING / PONG / MEET 的發(fā)送
過程
- 計(jì)算freshNodes。freshNodes表示在消息中能攜帶的,在cluster節(jié)點(diǎn)字典中的節(jié)點(diǎn)總數(shù),但需要減去myself和對(duì)端節(jié)點(diǎn),因?yàn)閙yself的信息會(huì)存儲(chǔ)在消息頭中。實(shí)際上,并非所有在cluster節(jié)點(diǎn)字典中的節(jié)點(diǎn)都需要帶出去,對(duì)于那些處于handshake,disconnected狀態(tài)的節(jié)點(diǎn)是不用考慮的。
freshNodes = size(cluster nodes) - 2 - 計(jì)算wanted。wanted表示準(zhǔn)備在消息中攜帶的gossip數(shù)。個(gè)數(shù)不能少于3,不能超過freshNodes。
wanted = floor(size(cluster nodes) / 10)
wanted = max(wanted, 3)
wanted = min(wanted, freshNodes) - 獲取pfailWanted。pfailWanted表示處于PFAIL狀態(tài)的節(jié)點(diǎn)數(shù)。
- 如果當(dāng)前l(fā)ink不是inbound,而且消息類型為PING,則更新對(duì)端實(shí)體的pingSent(發(fā)送ping的時(shí)間)為當(dāng)前時(shí)間。
- 構(gòu)建消息頭部
- 計(jì)算maxIterations。maxIterations表示在遍歷cluster節(jié)點(diǎn)字典時(shí)的最大遍歷次數(shù)。
maxIterations = wanted * 3 - 編寫gossip
gossipCount = 0
while freshNodes > 0 gossipCount < wanted && maxIterations:從cluster節(jié)點(diǎn)字典中隨機(jī)抽取一個(gè)node /* 注意:是隨機(jī)地抽取 */if node是myself || 處于PFAIL狀態(tài): /* myself會(huì)在消息頭,PFAIL會(huì)在最后追加 */continueif (node處于handshake或者NOADDR(地址不可知)) ||(node還沒有outbound link && node沒有slots):freshNodes-- /* 節(jié)省CPU */continueif node已被添加到gossip:continue把node添加到gossipfreshNodes--gossipCount++
- 如果pfailCount > 0,則把PFAIL節(jié)點(diǎn)追加到gossip
while cluster節(jié)點(diǎn)字典還沒遍歷完 and pfailWanted > 0:獲取一個(gè)nodeif (node處于handshake或者NOADDR) || node并非PFAIL: /* 只追加PFAIL的節(jié)點(diǎn) */continuegossipCount++pfailWanted--
- 如果配置了cluster-announce-hostname,則
- 給消息的mflags添加EXT_DATA標(biāo)記,表示消息附帶extension
- 把cluster-announce-hostname寫入pingExtension
- 把pingExtension追加到消息末尾
- 計(jì)算消息的總長(zhǎng)度
- 設(shè)置消息的count = gossipCount
- 設(shè)置消息的extensions = extension的個(gè)數(shù)
- 設(shè)置消息的totalLen = 上面計(jì)算出的總長(zhǎng)度
- 發(fā)送消息
- PING和MEET通過outbound link發(fā)送
- PONG通過inbound link發(fā)送
gossip內(nèi)容
每個(gè)要加入到gossip中的節(jié)點(diǎn)都會(huì)在其中生成一個(gè)對(duì)應(yīng)條目,包含的信息:
- 節(jié)點(diǎn)ID
- 最近一次ping此節(jié)點(diǎn)的時(shí)間(如果已經(jīng)收到此節(jié)點(diǎn)的PONG,會(huì)重置為0)
- 最近一次收到此節(jié)點(diǎn)PONG的時(shí)間
- 節(jié)點(diǎn)IP
- 節(jié)點(diǎn)的port,pport,cport
- 節(jié)點(diǎn)的flags
PING / PONG / MEET的接收處理
過程
第1 ~ 3步是涵蓋所有類型的消息。
- 合法性檢查
- 消息類型(type)的檢查
- 消息長(zhǎng)度檢查。有的消息會(huì)攜帶gossip,有的會(huì)附帶extension,所以消息的總長(zhǎng)度是可變的,需要結(jié)合消息頭部的元數(shù)據(jù)和消息的內(nèi)容進(jìn)行檢查。
- 根據(jù)link和消息頭,從cluster節(jié)點(diǎn)字典中查找實(shí)體sender
if link關(guān)聯(lián)了node && node不是處于handshake:sender = node
else:sender = 根據(jù)消息頭中的sender,從cluster節(jié)點(diǎn)字典中查找實(shí)體if sender存在 && link沒有關(guān)聯(lián)node:/* inbound link在創(chuàng)建時(shí)還不知道節(jié)點(diǎn)的真實(shí)ID,所以會(huì)找不到sender,因此要到達(dá)這里才能關(guān)聯(lián)上sender */把link設(shè)置為sender的inbound link/* 關(guān)聯(lián)后,下次就可以從link獲取sender,而不需要再?gòu)腸luster節(jié)點(diǎn)字典查找了 */把sender關(guān)聯(lián)到link
- 通用處理,適用于所有消息類型
if sender存在:/* 更新數(shù)據(jù)接收時(shí)間,以免因?yàn)閟ender一直在發(fā)送數(shù)據(jù)而誤認(rèn)為它timeout */sender的dataReceived = 當(dāng)前時(shí)間if sender沒有處于handshake:/* 更新currentEpoch和configEpoch */if 消息頭的currentEpoch > server的currentEpoch:server的currentEpoch = 消息頭的currentEpochif 消息頭的configEpoch > sender的configEpoch:sender的configEpoch = 消息頭的configEpoch更新sender的復(fù)制偏移和相應(yīng)時(shí)間if server正在做manual failover && myself是sender的slave &&消息頭的mflags包含PAUSED && server的mf_master_offset == -1:更新server的mf_server_offset = sender的復(fù)制偏移
- 處理PING,MEET消息
if 消息類型是PING或者M(jìn)EET:/* 獲取myself的IP */if (消息類型是MEET || myself還沒有IP) && 沒有配置cluster-announce-ip:myself的IP = 從socket中獲取自己的IP/* 在cluster節(jié)點(diǎn)字典中創(chuàng)建實(shí)體 */if sender不存在 && 消息類型為MEET:為sender創(chuàng)建node實(shí)體,生成隨機(jī)ID/* 獲取sender的IP和ports */if 消息頭的myip不為全0:node的ip = myipelse:node的ip = 從socket獲取對(duì)端的ip把消息頭的port,pport, cport復(fù)制到node把node加入cluster節(jié)點(diǎn)字典/* 解析gossip */if sender不存在 && 消息類型為MEET:處理消息的gossip回復(fù)PONG消息給對(duì)端節(jié)點(diǎn)
- 處理PING,PONG,MEET消息
if 消息類型是PING或者PONG或者M(jìn)EET:node = link關(guān)聯(lián)的nodeif link是outbound:if node處于handshake:/* sender已經(jīng)代表了對(duì)端節(jié)點(diǎn),接下去node的ID也會(huì)被更新成跟sender的一樣,同一個(gè)ID不能對(duì)應(yīng)2個(gè)實(shí)體,所以需要把node刪掉 */if sender存在: 檢查ip和ports的變化,更新sender /* Point-1 */從cluster節(jié)點(diǎn)字典中刪除node /* 會(huì)釋放node的inbound和outbound link*/return更新node的ID = 消息頭的sender /* 隨機(jī)ID -> 真實(shí)的ID */從node中刪除HANDSHAKE標(biāo)記 /* 表示handshake過程結(jié)束 */根據(jù)消息頭的flags,把node設(shè)置為master或slaveelse if node的ID != 消息頭的sender:/* 可能對(duì)端變更了ID,也可能是由于網(wǎng)絡(luò)的變化連錯(cuò)了節(jié)點(diǎn) */標(biāo)記node為NOADDR重置node的ip,port,pport,cport為0釋放linkreturn/* 檢查NOFAILOVER,更新sender */if sender存在: /* 我們假定對(duì)端發(fā)來的信息是最新的,所以直接更新.NOFAILOVER對(duì)應(yīng)的是cluster-replica-no-failover配置項(xiàng) */if 消息頭的flags帶了NOFAILOVER:標(biāo)記sender為NOFAILOVERelse:移除sender的NOFAILOVER/* 如果ip和各個(gè)端口發(fā)生了變化,則需要更新 */if sender存在 && 消息類型為PING && sender沒有處于handshake:檢查ip和ports的變化,更新sender/* 如果我們收到了PONG,需要清除PFAIL和FAIL狀態(tài) */if link是outbound && 消息類型為PONG:更新node的pongReceived = 當(dāng)前時(shí)間重置node的pingSent = 0 if node處于PFAIL:清除PFAIL狀態(tài) /* 簡(jiǎn)單地移除PFAIL標(biāo)記 */else if node處于FAIL:清除node的FAIL狀態(tài) /* Point-2 *//* 檢查是否發(fā)生了角色切換: slave -> master or master -> slave */if sender存在:if 消息頭的slaveof為全0: /* 對(duì)端現(xiàn)在是master */把sender轉(zhuǎn)換為master /* Point-3: 如果sender原本就是master,就無(wú)需轉(zhuǎn)換 */else: /* 對(duì)端現(xiàn)在是slave */if sender需要從master變?yōu)閟lave:把sender轉(zhuǎn)換為slave /* Point-4 */if sender的master發(fā)生了變化: 把sender從舊master遷移到新master /* Point-5 *//* 更新slots */if sender是master && sender的slots跟消息頭的myslots存在不同:重新綁定消息頭的myslots到sender /* Point-6 *//* 如果sender聲稱slot屬于它,但事實(shí)并非如此,則需要通知sender */if sender存在 && sender的slots跟消息頭的myslots存在不同:查找消息頭的myslots中是否存在這樣一個(gè)slot:1. 在myself看來,它不屬于sender2. 它的configEpoch > 消息頭的configEpochif 存在這樣的slot:發(fā)送UPDATE消息給sender/* 解決configEpoch沖突 */ if sender是master && myself也是master &&消息頭的configEpoch == myself的configEpoch:if myself的ID < sender的ID: /* 字符串比較 */myself的configEpoch = (++server的currentEpoch)保存配置/* 處理gossip */ if sender存在:處理消息中的gossip /* Point-7 *//* 處理消息中的pingExtension */if 消息頭的extensions > 0: /* 消息帶有extension *//* 當(dāng)前只有一種extension: pingExtension */sender的hostname = pingExtension中的hostname
Points 解釋
Point-1:檢查ip和ports的變化,更新sender
/* 檢查是否使用同一條連接,若是,則不可能有變化 */
if link跟sender的link一樣: return
/* 檢查IP和ports是否有變化 */
if sender的ports跟消息頭的ports相同 &&: /* port, pport, cport */sender的ip跟消息頭的myip相同: /* 如果myip為全0,則使用socket上的對(duì)端IP */return
更新sender的IP和ports
if sender的link存在: /* IP和ports變更了,需要重新建立連接 */釋放掉sender的link /* 釋放后會(huì)使用新的IP和port自動(dòng)重連 */
if sender是myself的master:更新server復(fù)制時(shí)使用的IP和port,重新開啟復(fù)制
Point-2:清除node的FAIL狀態(tài)
/* 對(duì)于slave節(jié)點(diǎn),只要連接上,都會(huì)認(rèn)為它恢復(fù)了;對(duì)于沒有slot的master節(jié)點(diǎn),只要連接上,就不需要在看它是否fail了足夠長(zhǎng)的時(shí)間 */
if sender是slave || sender沒有slots:清除它的FAIL標(biāo)記/* 對(duì)于master節(jié)點(diǎn),如果它fail的時(shí)間足夠長(zhǎng),而且還有slots屬于它,說明它沒有被failover,現(xiàn)在連接上了,則可以認(rèn)為它恢復(fù)了 */
if sender是master && sender有slots &&當(dāng)前時(shí)間 - sender的failTime > 2 * cluster-node-timeout:清除它的FAIL標(biāo)記
Point-3:把sender轉(zhuǎn)換為master
解除sender和oldMaster之間的關(guān)聯(lián)
if oldMaster沒有其他slave:清除oldMaster的MIGRATE_TO標(biāo)記
if sender != myself:添加MIGRATE_TO標(biāo)記到sender
設(shè)置sender為master
Point-4:把sender轉(zhuǎn)換為slave
把原本屬于sender的slots,全部設(shè)置為沒有owner
清除sender的MIGRATE_TO標(biāo)記
設(shè)置sender為slave
Point-5:把sender從原本的master遷移到新的master
解除sender和oldMaster之間的關(guān)聯(lián)
if oldMaster沒有其他slave:清除oldMaster的MIGRATE_TO標(biāo)記
建立sender和newMaster之間的關(guān)聯(lián)
添加MIGRATE_TO標(biāo)記到newMaster
Point-6:重新綁定消息頭的myslots到sender
if sender == myself: /* 不需要對(duì)自己進(jìn)行更新 */return
if myself是master:currentMaster = myself
elsecurrentMaster = myself的master
newMaster = null
migratedOurSlots = 0
遍歷消息頭中的myslots:跳過那些已經(jīng)屬于sender的slot跳過處于importing狀態(tài)的slot/* 如果slot沒有owner,或者sender聲稱slot屬于它 */if slot沒有owner || slot的configEpoch < 消息頭的configEpoch:if slot的owner == myself && slot上有key: /* slot是myself的,需要標(biāo)識(shí)它是dirty */記錄slot到dirtySlotsif slot的owner == currentMaster:/* 存在slot從currentMaster遷移到sender */newMaster = sender++migratedOurSlots /* 記錄遷移的slot數(shù) */把slot從舊的owner刪除把slot添加到senderif newMaster != null && /* 存在slot從currentMaster遷移到sender */currentMaster沒有slot了 && (cluster-allow-replica-migration || /* 配置了允許副本遷移 */消息頭的myslots數(shù)量 == migratedOurSlots): /* myslots全部從currentMaster遷移到sender *//* 如果myself是master,當(dāng)前沒有slot,說明myself被failover,它需要成為newMaster的副本;如果myself是slave,它的master沒有slot,說明它當(dāng)前沒有slot可以復(fù)制,myself需要成為myslots的副本 *//* sender成為myself的master */if myself是master:清除它的MIGRATE_TO標(biāo)記設(shè)置它為slave清除它的那些處于importing和migrating的slotselse:解除它跟oldMaster之間的關(guān)聯(lián)把myself的master設(shè)置為sender更新server復(fù)制時(shí)使用的IP和port,重新開啟復(fù)制重置server的manual failover狀態(tài)
else if dirtySlots數(shù)量 > 0:/* 消息表明這些slots跟我們沒有關(guān)系了,但是我們還有key在上面,為了維持key和slot之間的一致性,需要清除掉這些key */遍歷dirtySlots,刪除每個(gè)slot上所有的key
Point-7:處理消息中的gossip
遍歷消息中的每個(gè)gossip:根據(jù)gossip的nodename,從cluster節(jié)點(diǎn)字典中查找nodeif node存在: /* 找到關(guān)聯(lián)這個(gè)gossip的node */if sender是master && node != myself:if gossip表示node處于PFAIL或FAIL: /* 對(duì)端節(jié)點(diǎn)連接不上node */把sender發(fā)起的failureReport添加到node /* 如果已經(jīng)存在,則更新時(shí)間 *//* 檢查node是否需要標(biāo)記為FAIL */neededQuorum = clusterSize / 2 + 1 /* 計(jì)算仲裁所需要的票數(shù) */if node處于PFAIL狀態(tài) && /* 我們無(wú)法連接上它 */node沒有處于FAIL狀態(tài): /* 沒FAIL才需要處理 */清除那些已經(jīng)失效的failureReport /* 時(shí)間超過了 2 * cluster-node-timeout */failures = 有效的failureReport數(shù)if myself是master: /* 如果我們是master,我們也需要投票 */++failuresif falures >= neededQuorum: /* 多數(shù)通過, FAIL成立 */把node從PFAIL轉(zhuǎn)換成FAIL,并設(shè)置failTime為當(dāng)前時(shí)間廣播FAIL消息到其他節(jié)點(diǎn),迫使它們標(biāo)記此node已FAILelse: /* sender能連接上node */從node上刪除sender發(fā)起的failureReport /* 如果有的話 */清除那些已經(jīng)失效的failureReport/* 檢查是否需要更新node的pongReceived */if gossip表示node沒有處于PFAIL或FAIL && /* 在對(duì)端節(jié)點(diǎn)看來,node沒有PFAIL或FAIL */node的pingSent == 0 && /* 我們沒有處于等待回復(fù)的ping */node上沒有failureReport: /* 在我們看來,node沒有FAIL */if gossip的pongReceived > node的pongReceived && /* 對(duì)端節(jié)點(diǎn)收到node的PONG比我們更新 */gossip的pongReceived < 當(dāng)前時(shí)間 + 500ms: /* 各個(gè)節(jié)點(diǎn)的時(shí)鐘可能不同步,500ms是容錯(cuò) */ node的pongReceived = gossip的pongReceived/* 檢查IP和ports是否發(fā)生了變化 */ if node原本處于PFAIL或FAIL && /* 我們連接不上它 */但對(duì)端節(jié)點(diǎn)能連接上node && 我們和對(duì)端節(jié)點(diǎn)拿到的node IP,port和cport不一樣:釋放掉我們對(duì)node的link更新node上的IP和ports,等待下次自動(dòng)重連else: /* node在我們這還不存在 */ /* 可以通過"cluster forget"命令把node從server的cluster節(jié)點(diǎn)字典中刪掉,為了避免node的地址和端口被重用,錯(cuò)誤連接到別的cluster,于是需要把刪掉的node加入到server的黑名單中 */if sender存在 && gossip表示node是有地址的 &&gossip的nodename不在我們的黑名單中:創(chuàng)建node實(shí)體填入gossip的IP和ports加入cluster節(jié)點(diǎn)字典