哪里去找做的好看的網(wǎng)站北京自動網(wǎng)絡(luò)營銷推廣
一、引言
? 在 Nacos 后臺管理服務列表中,我們可以看到微服務列表,其中有一欄叫“健康實例數(shù)” ? ?(如下圖),表示對應的客戶端實例信息是否可用狀態(tài)。
?
那Nacos服務端是怎么感知客戶端的狀態(tài)是否可用呢 ?
本章重點:
- 實例心跳接口做了哪些事情 ?
- 服務端是怎么維護不健康的實例的,怎么下線不健康實例的,做了哪些操作 ?
二、目錄?????
目錄
一、引言
二、目錄????????
三、服務端實例心跳接口源碼分析
四、服務端實例心跳健康檢查定時任務源碼分析
五、總結(jié)
???
三、服務端實例心跳接口源碼分析
主線任務:實例心跳接口做了哪些事情 ?
?在客戶端服務發(fā)起注冊的時候 (在第二章節(jié)),會開啟一個心跳任務,每5s發(fā)送一次健康心跳檢查,告訴服務端我這個服務還活著。(前面已經(jīng)講過
public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {if (NAMING_LOGGER.isDebugEnabled()) {NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());}// 組裝請求參數(shù)Map<String, String> params = new HashMap<String, String>(8);Map<String, String> bodyMap = new HashMap<String, String>(2);if (!lightBeatEnabled) {bodyMap.put("beat", JacksonUtils.toJson(beatInfo));}params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());params.put("ip", beatInfo.getIp());params.put("port", String.valueOf(beatInfo.getPort()));// 發(fā)送實例心跳接口請求String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);return JacksonUtils.toObj(result);
}
服務端接受到實例心跳接口,會現(xiàn)在內(nèi)存注冊表中找 Instance,如果找不到會重新注冊。然后提交一個 clientBeatProcessor 異步任務,更改 lastBeat 屬性
@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {// 省略部分代碼// 獲取請求參數(shù)namespaceId、serviceNameString namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName);Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);// 通過namespaceId、serviceName、ip、port、clusterName 從內(nèi)存注冊表當中獲取對應的 Instance 實例對象Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);// 如果 instance 為空,那么會重新注冊if (instance == null) {if (clientBeat == null) {result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);return result;}instance = new Instance();instance.setPort(clientBeat.getPort());instance.setIp(clientBeat.getIp());instance.setWeight(clientBeat.getWeight());instance.setMetadata(clientBeat.getMetadata());instance.setClusterName(clusterName);instance.setServiceName(serviceName);instance.setInstanceId(instance.getInstanceId());instance.setEphemeral(clientBeat.isEphemeral());// 這里調(diào)用重新注冊的方法serviceManager.registerInstance(namespaceId, serviceName, instance);}// 通過namespaceId、serviceName獲取對應的 ServiceService service = serviceManager.getService(namespaceId, serviceName);if (service == null) {throw new NacosException(NacosException.SERVER_ERROR,"service not found: " + serviceName + "@" + namespaceId);}if (clientBeat == null) {clientBeat = new RsInfo();clientBeat.setIp(ip);clientBeat.setPort(port);clientBeat.setCluster(clusterName);}// 重點:開啟異步任務,更改 lastBeat 屬性service.processClientBeat(clientBeat);// 省略部分代碼return result;
}
接著往下看重點 service.processClientBeat() 任務,這個方法會開啟一個異步任務,異步任務的話肯定會有run 方法,那我們直接看 clientBeatProcessor 對象中的 run 方法
public void processClientBeat(final RsInfo rsInfo) {ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();clientBeatProcessor.setService(this);clientBeatProcessor.setRsInfo(rsInfo);// 立即執(zhí)行HealthCheckReactor.scheduleNow(clientBeatProcessor);
}
在異步任務當中,首先會獲取當前節(jié)點下所有的臨時實例,然后通過 ip+port 找到當前 instance,然后把 instance 中的 lastBeat屬性更改為當前時間,并且如果 該 instance 為不健康狀態(tài),更改為健康狀態(tài)
public class ClientBeatProcessor implements Runnable {public static final long CLIENT_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15);private RsInfo rsInfo;private Service service;@JsonIgnorepublic PushService getPushService() {return ApplicationUtils.getBean(PushService.class);}public RsInfo getRsInfo() {return rsInfo;}public void setRsInfo(RsInfo rsInfo) {this.rsInfo = rsInfo;}public Service getService() {return service;}public void setService(Service service) {this.service = service;}@Overridepublic void run() {Service service = this.service;if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());}// 本小節(jié)重點方法// 獲取當前 ip、clusterNameString ip = rsInfo.getIp();String clusterName = rsInfo.getCluster();int port = rsInfo.getPort();Cluster cluster = service.getClusterMap().get(clusterName);// 獲取當前 cluster 下所有的臨時實例List<Instance> instances = cluster.allIPs(true);// 遍歷臨時實例for (Instance instance : instances) {// 通過判斷ip、port,確認是否是當前 instance 的實例if (instance.getIp().equals(ip) && instance.getPort() == port) {if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());}// 把 lastBeat屬性更改為當前時間instance.setLastBeat(System.currentTimeMillis());if (!instance.isMarked()) {// 如果 instance 為不健康狀態(tài),更改為健康狀態(tài)if (!instance.isHealthy()) {instance.setHealthy(true);Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",cluster.getService().getName(), ip, port, cluster.getName(),UtilsAndCommons.LOCALHOST_SITE);getPushService().serviceChanged(service);}}}}}
}
小結(jié):
? ? ?首先在 客戶端服務發(fā)起注冊的時候 (在第二章節(jié)),會開啟一個心跳任務,每5s發(fā)送一次健康心跳檢查,告訴服務端我這個服務還活著。(前面已經(jīng)講過)
? ? 那么服務端接受到了 實例心跳接口的請求,會現(xiàn)在內(nèi)存注冊表中找 Instance,如果找不到會重新注冊。然后提交一個 clientBeatProcessor 異步任務,在異步任務當中,首先會找到當前集群下的所有臨時實例,然后通過 ip +port 找到當前instance 實例,把當前instance 中的 lastBeat屬性更改為當前時間,如果 instance 為不健康狀態(tài),更改為健康狀態(tài),到此實例心跳接口就結(jié)束了。
四、服務端實例心跳健康檢查定時任務源碼分析
主線任務:服務端是怎么維護不健康的實例的,怎么下線不健康實例的,做了哪些操作 ?
? ? ?這塊代碼是在服務端 register(注冊)接口當中的,之前分析過 register 注冊邏輯,因為這塊是分支代碼,前面沒細看。
? ?我們來看下 createEmptyService 這個方法了,里面有個異步任務,作用就是:檢查有哪些客戶端是不健康的狀態(tài),如果不健康就需要對它進行處理
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {// 不知道是創(chuàng)建了一個什么服務createEmptyService(namespaceId, serviceName, instance.isEphemeral());// 根據(jù)namespaceId、serviceName獲取 Service服務Service service = getService(namespaceId, serviceName);// service為空就拋出異常if (service == null) {throw new NacosException(NacosException.INVALID_PARAM,"service not found, namespace: " + namespaceId + ", service: " + serviceName);}// 上面都是分支代碼// 主線任務:添加服務實例addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
我們直接看重點代碼,直接跳到開啟異步任務這里。上面的代碼流程:createEmptyService()-> createServiceIfAbsent()-> putServiceAndInit(service) -> service.init();
public void init() {// 開啟異步延時任務 clientBeatCheckTask ,每5s執(zhí)行一次HealthCheckReactor.scheduleCheck(clientBeatCheckTask);for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {entry.getValue().setService(this);entry.getValue().init();}
}
本章重點,開啟了一個 clientBeatCheckTask 異步任務。
@Override
public void run() {try {// 本章重點// 獲取全部臨時實例List<Instance> instances = service.allIPs(true);for (Instance instance : instances) {// 當前時間 - instance中 lastBeat屬性時間 > 15sif (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {if (!instance.isMarked()) {if (instance.isHealthy()) {// 如果這個 instance 實例還是健康狀態(tài),就更改為 "不健康狀態(tài)"!instance.setHealthy(false);Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",instance.getIp(), instance.getPort(), instance.getClusterName(),service.getName(), UtilsAndCommons.LOCALHOST_SITE,instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());// 事件發(fā)布監(jiān)聽事件,通過 upd 協(xié)議發(fā)送通知getPushService().serviceChanged(service);ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));}}}}if (!getGlobalConfig().isExpireInstance()) {return;}// 這里還是遍歷 臨時實例for (Instance instance : instances) {if (instance.isMarked()) {continue;}// 當前時間 - instance中 lastBeat屬性時間 > 30sif (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),JacksonUtils.toJson(instance));// 直接從注冊表中刪除當前 instancedeleteIp(instance);}}} catch (Exception e) {Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);}}
小結(jié):
- 第一個循環(huán)的作用,為了篩選出不健康的 Instance 實例,并且把 Instance 中的 healthy? 屬性改為 false。那么怎么篩選出不健康的實例的 ?利用的就是 Instance 中的 lastBeat 屬性。如果是健康的實例,那么客戶端就會每5s調(diào)一次實例心跳接口,更新 lastBeat 屬性為當前時間。如果是不健康的實例,那么 Instance 實例 中的 lastBeat 屬性是不會變化的,一旦 lastBeat 跟當前時間比超過 15s,就會被認定為不健康的實例。
- 第二個循環(huán)的作用,找出那些 Instance 是需要刪除的,如果 lastBeat 跟當前時間比超過 30s,Nacos 會把該 Instance 從注冊表當中進行刪除。
五、總結(jié)
總結(jié):
? ? ?本章講了Nacos怎么維護整個微服務實例健康狀態(tài)的流程,在客戶端發(fā)起注冊服務時會有心跳任務,每5s給服務端發(fā)送一次心態(tài),服務端會把該 Instance 實例中的lastBeat 屬性更新為當前時間。并且在服務端實例注冊的時候,會開啟心跳健康檢查任務,把 lastBeat 跟當前時間比超過 15s,就會被標識為不健康的實例,把lastBeat 跟當前時間比超過 30s,Nacos 會把該 Instance 從注冊表當中進行刪除。
最后的最后,別忘了把源碼分析圖補充完整:?