濰坊網(wǎng)站建設(shè)最新報(bào)價(jià)steam交易鏈接在哪看
Nacos集群數(shù)據(jù)同步
? 當(dāng)我們有服務(wù)進(jìn)行注冊(cè)以后,會(huì)寫(xiě)入注冊(cè)信息同時(shí)會(huì)觸發(fā)ClientChangedEvent事件,通過(guò)這個(gè)事件,就會(huì)開(kāi)始進(jìn)行Nacos的集群數(shù)據(jù)同步,當(dāng)然這其中只有有一個(gè)Nacos節(jié)點(diǎn)來(lái)處理對(duì)應(yīng)的客戶端請(qǐng)求,其實(shí)這其中還涉及到一個(gè)負(fù)責(zé)節(jié)點(diǎn)和非負(fù)責(zé)節(jié)點(diǎn)
負(fù)責(zé)節(jié)點(diǎn)
? 這是首先我們要查看的是DistroClientDataProcessor(客戶端數(shù)據(jù)一致性處理器)類型,這個(gè)類型會(huì)處理當(dāng)前節(jié)點(diǎn)負(fù)責(zé)的Client,那我們要查看其中的syncToAllServer方法。
private void syncToAllServer(ClientEvent event) {Client client = event.getClient();// 判斷客戶端是否為空,是否是臨時(shí)實(shí)例,判斷是否是負(fù)責(zé)節(jié)點(diǎn)if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {return;}if (event instanceof ClientEvent.ClientDisconnectEvent) {// 客戶端斷開(kāi)連接DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.DELETE);} else if (event instanceof ClientEvent.ClientChangedEvent) {// 客戶端新增/修改DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.CHANGE);}
}
? distroProtocol會(huì)循環(huán)所有其他nacos節(jié)點(diǎn),提交一個(gè)異步任務(wù),這個(gè)異步任務(wù)會(huì)延遲1s,其實(shí)這里我們就可以看到這里涉及到客戶端的斷開(kāi)和客戶端的新增和修改,對(duì)于Delete操作,由DistroSyncDeleteTask處理,對(duì)于Change操作,由DistroSyncChangeTask處理,這里我們從DistroSyncChangeTask來(lái)看
public class DistroSyncChangeTask extends AbstractDistroExecuteTask {private static final DataOperation OPERATION = DataOperation.CHANGE;public DistroSyncChangeTask(DistroKey distroKey, DistroComponentHolder distroComponentHolder) {super(distroKey, distroComponentHolder);}@Overrideprotected DataOperation getDataOperation() {return OPERATION;}// 無(wú)回調(diào)@Overrideprotected boolean doExecute() {String type = getDistroKey().getResourceType();DistroData distroData = getDistroData(type);if (null == distroData) {Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());return true;}return getDistroComponentHolder().findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());}// 有回調(diào)@Overrideprotected void doExecuteWithCallback(DistroCallback callback) {String type = getDistroKey().getResourceType();DistroData distroData = getDistroData(type);if (null == distroData) {Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());return;}getDistroComponentHolder().findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer(), callback);}@Overridepublic String toString() {return "DistroSyncChangeTask for " + getDistroKey().toString();}// 從DistroClientDataProcessor獲取DistroDataprivate DistroData getDistroData(String type) {DistroData result = getDistroComponentHolder().findDataStorage(type).getDistroData(getDistroKey());if (null != result) {result.setType(OPERATION);}return result;}
}
? 獲取到的DistroData,其實(shí)是從ClientManager實(shí)時(shí)獲取Client。
// DistroClientDataProcessor
@Override
public DistroData getDistroData(DistroKey distroKey) {Client client = clientManager.getClient(distroKey.getResourceKey());if (null == client) {return null;}// 把生成的同步數(shù)據(jù)放入到數(shù)組中byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(client.generateSyncData());return new DistroData(distroKey, data);
}
? AbstractClient繼承了Client,同時(shí)給DistroClientDataProcessorClient提供Client的注冊(cè)信息,包括客戶端注冊(cè)了哪些namespace,哪些group,哪些service,哪些instance。
@Override
public ClientSyncData generateSyncData() {List<String> namespaces = new LinkedList<>();List<String> groupNames = new LinkedList<>();List<String> serviceNames = new LinkedList<>();List<InstancePublishInfo> instances = new LinkedList<>();for (Map.Entry<Service, InstancePublishInfo> entry : publishers.entrySet()) {namespaces.add(entry.getKey().getNamespace());groupNames.add(entry.getKey().getGroup());serviceNames.add(entry.getKey().getName());instances.add(entry.getValue());}return new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances);
}
? 這里我們?cè)诨剡^(guò)頭來(lái)看syncData方法,這個(gè)方法實(shí)際上是由DistroClientTransportAgent封裝為DistroDataRequest調(diào)用其他Nacos節(jié)點(diǎn)。
@Override
public boolean syncData(DistroData data, String targetServer) {if (isNoExistTarget(targetServer)) {return true;}DistroDataRequest request = new DistroDataRequest(data, data.getType());Member member = memberManager.find(targetServer);if (checkTargetServerStatusUnhealthy(member)) {Loggers.DISTRO.warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy", targetServer);return false;}try {Response response = clusterRpcClientProxy.sendRequest(member, request);return checkResponse(response);} catch (NacosException e) {Loggers.DISTRO.error("[DISTRO-FAILED] Sync distro data failed! ", e);}return false;
}
非負(fù)責(zé)節(jié)點(diǎn)
? 當(dāng)負(fù)責(zé)節(jié)點(diǎn)將數(shù)據(jù)發(fā)送給非負(fù)責(zé)節(jié)點(diǎn)以后,將要處理發(fā)送過(guò)來(lái)的Client數(shù)據(jù)。這里我們要看DistroClientDataProcessor.processData方法
@Override
public boolean processData(DistroData distroData) {switch (distroData.getType()) {case ADD:case CHANGE:ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class).deserialize(distroData.getContent(), ClientSyncData.class);//處理同步數(shù)據(jù)handlerClientSyncData(clientSyncData);return true;case DELETE:String deleteClientId = distroData.getDistroKey().getResourceKey();Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);clientManager.clientDisconnected(deleteClientId);return true;default:return false;}
}
? 然后來(lái)查看具體處理方法handlerClientSyncData
private void handlerClientSyncData(ClientSyncData clientSyncData) {Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId());// 同步客戶端連接clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());// 獲取Client(此時(shí)注冊(cè)到的是ConnectionBasedClient)Client client = clientManager.getClient(clientSyncData.getClientId());// 更新Client數(shù)據(jù)upgradeClient(client, clientSyncData);
}
? DistroClientDataProcessor的upgradeClient方法,更新Client里的注冊(cè)表信息,發(fā)布對(duì)應(yīng)事件
private void upgradeClient(Client client, ClientSyncData clientSyncData) {List<String> namespaces = clientSyncData.getNamespaces();List<String> groupNames = clientSyncData.getGroupNames();List<String> serviceNames = clientSyncData.getServiceNames();List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();Set<Service> syncedService = new HashSet<>();for (int i = 0; i < namespaces.size(); i++) {Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));Service singleton = ServiceManager.getInstance().getSingleton(service);syncedService.add(singleton);InstancePublishInfo instancePublishInfo = instances.get(i);if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {client.addServiceInstance(singleton, instancePublishInfo);NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));}}for (Service each : client.getAllPublishedService()) {if (!syncedService.contains(each)) {client.removeServiceInstance(each);NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));}}
}
? **注意:**這里要注意下此時(shí)的Client實(shí)現(xiàn)類ConnectionBasedClient,只不過(guò)它的isNative屬性為false,這是非負(fù)責(zé)節(jié)點(diǎn)和負(fù)責(zé)節(jié)點(diǎn)的主要區(qū)別。
? 其實(shí)判斷當(dāng)前nacos節(jié)點(diǎn)是否為負(fù)責(zé)節(jié)點(diǎn)的依據(jù)就是這個(gè)isNative屬性,如果是客戶端直接注冊(cè)在這個(gè)nacos節(jié)點(diǎn)上的ConnectionBasedClient,它的isNative屬性為true;如果是由Distro協(xié)議,同步到這個(gè)nacos節(jié)點(diǎn)上的ConnectionBasedClient,它的isNative屬性為false。
? 那其實(shí)我們都知道2.x的版本以后使用了長(zhǎng)連接,所以通過(guò)長(zhǎng)連接建立在哪個(gè)節(jié)點(diǎn)上,哪個(gè)節(jié)點(diǎn)就是責(zé)任節(jié)點(diǎn),客戶端也只會(huì)向這個(gè)責(zé)任節(jié)點(diǎn)發(fā)送請(qǐng)求。
Distro協(xié)議負(fù)責(zé)集群數(shù)據(jù)統(tǒng)一
? Distro為了確保集群間數(shù)據(jù)一致,不僅僅依賴于數(shù)據(jù)發(fā)生改變時(shí)的實(shí)時(shí)同步,后臺(tái)有定時(shí)任務(wù)做數(shù)據(jù)同步。
? 在1.x版本中,責(zé)任節(jié)點(diǎn)每5s同步所有Service的Instance列表的摘要(md5)給非責(zé)任節(jié)點(diǎn),非責(zé)任節(jié)點(diǎn)用對(duì)端傳來(lái)的服務(wù)md5比對(duì)本地服務(wù)的md5,如果發(fā)生改變,需要反查責(zé)任節(jié)點(diǎn)。
? 在2.x版本中,對(duì)這個(gè)流程做了改造,責(zé)任節(jié)點(diǎn)會(huì)發(fā)送Client全量數(shù)據(jù),非責(zé)任節(jié)點(diǎn)定時(shí)檢測(cè)同步過(guò)來(lái)的Client是否過(guò)期,減少1.x版本中的反查。
? 責(zé)任節(jié)點(diǎn)每5s向其他節(jié)點(diǎn)發(fā)送DataOperation=VERIFY類型的DistroData,來(lái)維持非責(zé)任節(jié)點(diǎn)的Client數(shù)據(jù)不過(guò)期。
//DistroVerifyTimedTask
@Override
public void run() {try {// 所有其他節(jié)點(diǎn)List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("server list is: {}", targetServer);}for (String each : distroComponentHolder.getDataStorageTypes()) {// 遍歷想這些節(jié)點(diǎn)發(fā)送Client.isNative=true的DistroData,type = VERIFYverifyForDataStorage(each, targetServer);}} catch (Exception e) {Loggers.DISTRO.error("[DISTRO-FAILED] verify task failed.", e);}
}
? 非責(zé)任節(jié)點(diǎn)每5s掃描isNative=false的client,如果client30s內(nèi)沒(méi)有被VERIFY的DistroData更新過(guò)續(xù)租時(shí)間,會(huì)刪除這個(gè)同步過(guò)來(lái)的Client數(shù)據(jù)。
//ConnectionBasedClientManager->ExpiredClientCleaner
private static class ExpiredClientCleaner implements Runnable {private final ConnectionBasedClientManager clientManager;public ExpiredClientCleaner(ConnectionBasedClientManager clientManager) {this.clientManager = clientManager;}@Overridepublic void run() {long currentTime = System.currentTimeMillis();for (String each : clientManager.allClientId()) {ConnectionBasedClient client = (ConnectionBasedClient) clientManager.getClient(each);if (null != client && client.isExpire(currentTime)) {clientManager.clientDisconnected(each);}}}
}
-------------------------------------------------------------------------------------------
@Override
public boolean isExpire(long currentTime) {// 判斷30s內(nèi)沒(méi)有續(xù)租 認(rèn)為過(guò)期return !isNative() && currentTime - getLastRenewTime() > ClientConfig.getInstance().getClientExpiredTime();
}