深圳網(wǎng)站建設(shè)網(wǎng)站制作網(wǎng)站推廣濰坊seo網(wǎng)絡(luò)推廣
基于Dubbo 3.1,詳細(xì)介紹了Dubbo服務(wù)的發(fā)布與引用的源碼。
此前我們學(xué)習(xí)了接口級的服務(wù)引入訂閱的refreshInterfaceInvoker方法,當(dāng)時(shí)還有最為關(guān)鍵的notify服務(wù)通知更新的部分源碼沒有學(xué)習(xí),本次我們來學(xué)習(xí)notify通知本地服務(wù)更新的源碼。
Dubbo 3.x服務(wù)引用源碼:
- Dubbo 3.x源碼(11)—Dubbo服務(wù)的發(fā)布與引用的入口
- Dubbo 3.x源碼(18)—Dubbo服務(wù)引用源碼(1)
- Dubbo 3.x源碼(19)—Dubbo服務(wù)引用源碼(2)
- Dubbo 3.x源碼(20)—Dubbo服務(wù)引用源碼(3)
- Dubbo 3.x源碼(21)—Dubbo服務(wù)引用源碼(4)
- Dubbo 3.x源碼(22)—Dubbo服務(wù)引用源碼(5)服務(wù)引用bean的獲取以及懶加載原理
- Dubbo 3.x源碼(23)—Dubbo服務(wù)引用源碼(6)MigrationRuleListener遷移規(guī)則監(jiān)聽器
- Dubbo 3.x源碼(24)—Dubbo服務(wù)引用源碼(7)接口級服務(wù)發(fā)現(xiàn)訂閱refreshInterfaceInvoker
- Dubbo 3.x源碼(25)—Dubbo服務(wù)引用源碼(8)notify訂閱服務(wù)通知更新
Dubbo 3.x服務(wù)發(fā)布源碼:
- Dubbo 3.x源碼(11)—Dubbo服務(wù)的發(fā)布與引用的入口
- Dubbo 3.x源碼(12)—Dubbo服務(wù)發(fā)布導(dǎo)出源碼(1)
- Dubbo 3.x源碼(13)—Dubbo服務(wù)發(fā)布導(dǎo)出源碼(2)
- Dubbo 3.x源碼(14)—Dubbo服務(wù)發(fā)布導(dǎo)出源碼(3)
- Dubbo 3.x源碼(15)—Dubbo服務(wù)發(fā)布導(dǎo)出源碼(4)
- Dubbo 3.x源碼(16)—Dubbo服務(wù)發(fā)布導(dǎo)出源碼(5)
- Dubbo 3.x源碼(17)—Dubbo服務(wù)發(fā)布導(dǎo)出源碼(6)
1 notify服務(wù)通知更新
當(dāng)?shù)谝淮斡嗛喎?wù)節(jié)點(diǎn),或者服務(wù)節(jié)點(diǎn)目錄的子節(jié)點(diǎn)更新時(shí),例如新的producer上下線,將會(huì)調(diào)用notify服務(wù)通知更新的方法,會(huì)更新本地緩存的數(shù)據(jù)。
notify方法的入口是FailbackRegistry的notify方法。
/*** FailbackRegistry的方法* <p>* 服務(wù)通知** @param url consumer side url* @param listener listener* @param urls provider latest urls*/
@Override
protected void notify(URL url, NotifyListener listener, List<URL> urls) {if (url == null) {throw new IllegalArgumentException("notify url == null");}if (listener == null) {throw new IllegalArgumentException("notify listener == null");}try {/** 調(diào)用doNotify方法更新*/doNotify(url, listener, urls);} catch (Exception t) {// Record a failed registration request to a failed listlogger.error("Failed to notify addresses for subscribe " + url + ", cause: " + t.getMessage(), t);}
}
/*** FailbackRegistry的方法* <p>* 服務(wù)通知*/
protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {//調(diào)用父類AbstractRegistry的方法super.notify(url, listener, urls);
}
2 AbstractRegistry#notify通知更新
該方法涉及兩個(gè)重要知識點(diǎn):
- 一是對于拉取到的服務(wù)節(jié)點(diǎn)url按照類別providers、configurators 、routers進(jìn)行分類,然后遍歷每個(gè)類別,依次調(diào)用RegistryDirectory#notify方法觸發(fā)監(jiān)聽回調(diào),進(jìn)行服務(wù)數(shù)據(jù)的更新。
- 二是RegistryDirectory#notify方法通知執(zhí)行完畢之后,調(diào)用saveProperties方法更新緩存文件。當(dāng)注冊中心由于網(wǎng)絡(luò)抖動(dòng)而訂閱失敗時(shí),至少可以返回現(xiàn)有的緩存的URL。
/*** AbstractRegistry的方法* <p>* 通知更新** @param url consumer side url* @param listener listener* @param urls provider latest urls*/
protected void notify(URL url, NotifyListener listener, List<URL> urls) {if (url == null) {throw new IllegalArgumentException("notify url == null");}if (listener == null) {throw new IllegalArgumentException("notify listener == null");}if ((CollectionUtils.isEmpty(urls)) && !ANY_VALUE.equals(url.getServiceInterface())) {// 1-4 Empty address.logger.warn("1-4", "", "", "Ignore empty notify urls for subscribe url " + url);return;}if (logger.isInfoEnabled()) {logger.info("Notify urls for subscribe url " + url + ", url size: " + urls.size());}//根據(jù)節(jié)點(diǎn)類別對url進(jìn)行分類Map<String, List<URL>> result = new HashMap<>();//遍歷url,進(jìn)行分類for (URL u : urls) {//服務(wù)消費(fèi)者和服務(wù)提供者的服務(wù)接口名匹配if (UrlUtils.isMatch(url, u)) {//獲取url的category類別,默認(rèn)providers,同時(shí)服務(wù)提供者urlServiceAddressURL固定返回providersString category = u.getCategory(DEFAULT_CATEGORY);//將url加入到對應(yīng)類別的categoryList中List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());categoryList.add(u);}}//result,一般有三個(gè)元素,即三個(gè)類別,providers、configurators 、routersif (result.size() == 0) {return;}Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());//遍歷每一個(gè)類別for (Map.Entry<String, List<URL>> entry : result.entrySet()) {//獲取類別String category = entry.getKey();List<URL> categoryList = entry.getValue();//存入categoryNotifiedcategoryNotified.put(category, categoryList);//執(zhí)行l(wèi)eitener的notify方法進(jìn)行通知,listener可以是RegistryDirectory/** RegistryDirectory#notify通知*/listener.notify(categoryList);/** 本地緩存*/// We will update our cache file after each notification.// When our Registry has a subscribed failure due to network jitter, we can return at least the existing cache URL.//將在每次通知后更新緩存文件。當(dāng)注冊中心由于網(wǎng)絡(luò)抖動(dòng)而訂閱失敗時(shí),至少可以返回現(xiàn)有的緩存的URL。//本地緩存,默認(rèn)支持if (localCacheEnabled) {saveProperties(url);}}
}
3 RegistryDirectory#notify更新本地內(nèi)存信息
該方法根據(jù)url更新RegistryDirectory對象的內(nèi)存信息,將可能會(huì)更新RegistryDirectory 內(nèi)部的configurators配置信息集合,routerChain路由鏈以及urlInvokerMap緩存。
在最后,會(huì)專門調(diào)用refreshOverrideAndInvoker方法,將服務(wù)提供者url轉(zhuǎn)換為invoker,進(jìn)行服務(wù)提供者的更新。
/*** RegistryDirectory的方法* * 服務(wù)變更通知* @param urls 服務(wù)提供者注冊信息列表*/
@Override
public synchronized void notify(List<URL> urls) {if (isDestroyed()) {return;}Map<String, List<URL>> categoryUrls = urls.stream().filter(Objects::nonNull)//類別合法性過濾.filter(this::isValidCategory).filter(this::isNotCompatibleFor26x)//根據(jù)類別分組.collect(Collectors.groupingBy(this::judgeCategory));//獲取配置信息url集合,可以為空List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());//將配置信息url轉(zhuǎn)換為Configurator集合,并賦值給configurators屬性,可以為空this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);//獲取路由信息url集合,可以為空List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());//將配置信息url轉(zhuǎn)換為Router集合,并加入routerChain路由鏈,可以為空toRouters(routerURLs).ifPresent(this::addRouters);// providers//獲取服務(wù)提供者url集合,可以為空List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());// 3.x added for extend URL address//添加擴(kuò)展URL地址 3.x的特性ExtensionLoader<AddressListener> addressListenerExtensionLoader = getUrl().getOrDefaultModuleModel().getExtensionLoader(AddressListener.class);//獲取AddressListener,默認(rèn)空集合List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);if (supportedListeners != null && !supportedListeners.isEmpty()) {for (AddressListener addressListener : supportedListeners) {providerURLs = addressListener.notify(providerURLs, getConsumerUrl(), this);}}/** 將服務(wù)提供者url轉(zhuǎn)換為invoker,進(jìn)行服務(wù)提供者的更新*/refreshOverrideAndInvoker(providerURLs);
}
3.1 refreshOverrideAndInvoker刷新invoker
該方法將服務(wù)提供者url轉(zhuǎn)換為invoker,進(jìn)行服務(wù)提供者的更新,這在consumer對producer的信息更新部分是非常重要的一個(gè)方法。
url轉(zhuǎn)換規(guī)則為:
- 如果URL已轉(zhuǎn)換為invoker,則不再重新引用它并直接從緩存獲取它,請注意,URL中的任何參數(shù)更改都將被重新引用。
- 如果傳入invoker列表不為空,則表示它是最新的invoker列表。
- 如果傳入invokerUrl的列表為空,則意味著該規(guī)則只是一個(gè)覆蓋規(guī)則或路由規(guī)則,需要重新對比以決定是否重新引用。
/*** RegistryDirectory的方法* <p>* 將服務(wù)提供者url轉(zhuǎn)換為invoker,進(jìn)行服務(wù)提供者的更新** @param urls 服務(wù)提供者url*/
private synchronized void refreshOverrideAndInvoker(List<URL> urls) {// mock zookeeper://xxx?mock=return nullrefreshInvoker(urls);
}/*** 將invokerURL列表轉(zhuǎn)換為Invoker Map** @param invokerUrls this parameter can't be null*/
private void refreshInvoker(List<URL> invokerUrls) {Assert.notNull(invokerUrls, "invokerUrls should not be null");//如果只有一個(gè)協(xié)議為empty的url,表示最新注冊中心沒有任何該服務(wù)提供者url信息if (invokerUrls.size() == 1&& invokerUrls.get(0) != null&& EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {//設(shè)置為禁止訪問this.forbidden = true; // Forbid to access//設(shè)置routerChain的服務(wù)提供者invoker集合為一個(gè)空集合routerChain.setInvokers(BitList.emptyList());//關(guān)閉urlInvokerMap中的所有服務(wù)提供者invokerdestroyAllInvokers(); // Close all invokers}//表明可能存在服務(wù)提供者urlelse {//允許訪問this.forbidden = false; // Allow to accessif (invokerUrls == Collections.<URL>emptyList()) {invokerUrls = new ArrayList<>();}// use local reference to avoid NPE as this.cachedInvokerUrls will be set null by destroyAllInvokers().//使用本地引用來避免NPE。cachedInvokerUrls將被destroyAllInvokers()方法設(shè)置為空。Set<URL> localCachedInvokerUrls = this.cachedInvokerUrls;//空的服務(wù)提供者url集合if (invokerUrls.isEmpty() && localCachedInvokerUrls != null) {// 1-4 Empty address.logger.warn("1-4", "configuration ", "","Service" + serviceKey + " received empty address list with no EMPTY protocol set, trigger empty protection.");invokerUrls.addAll(localCachedInvokerUrls);} else {//緩存的invoker url,便于比較localCachedInvokerUrls = new HashSet<>();localCachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparisonthis.cachedInvokerUrls = localCachedInvokerUrls;}if (invokerUrls.isEmpty()) {return;}// use local reference to avoid NPE as this.urlInvokerMap will be set null concurrently at destroyAllInvokers().//使用本地引用來避免NPE。urlInvokerMap將在destroyAllInvokers()方法設(shè)置為空。Map<URL, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap;// can't use local reference as oldUrlInvokerMap's mappings might be removed directly at toInvokers().//不能使用本地引用,因?yàn)閛ldUrlInvokerMap的映射可能會(huì)直接在toInvokers()中刪除。Map<URL, Invoker<T>> oldUrlInvokerMap = null;if (localUrlInvokerMap != null) {// the initial capacity should be set greater than the maximum number of entries divided by the load factor to avoid resizing.oldUrlInvokerMap = new LinkedHashMap<>(Math.round(1 + localUrlInvokerMap.size() / DEFAULT_HASHMAP_LOAD_FACTOR));localUrlInvokerMap.forEach(oldUrlInvokerMap::put);}/** 將URL轉(zhuǎn)換為Invoker*/Map<URL, Invoker<T>> newUrlInvokerMap = toInvokers(oldUrlInvokerMap, invokerUrls);// Translate url list to Invoker map/** If the calculation is wrong, it is not processed.** 1. The protocol configured by the client is inconsistent with the protocol of the server.* eg: consumer protocol = dubbo, provider only has other protocol services(rest).* 2. The registration center is not robust and pushes illegal specification data.**/if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {// 3-1 - Failed to convert the URL address into Invokers.logger.error("3-1", "inconsistency between the client protocol and the protocol of the server","", "urls to invokers error",new IllegalStateException("urls to invokers error. invokerUrls.size :" +invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));return;}List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));this.setInvokers(multiGroup ? new BitList<>(toMergeInvokerList(newInvokers)) : new BitList<>(newInvokers));// pre-route and build cache//invoker集合存入routerChain的invokers屬性routerChain.setInvokers(this.getInvokers());//設(shè)置urlInvokerMap為新的urlInvokerMapthis.urlInvokerMap = newUrlInvokerMap;try {//銷毀無用 InvokerdestroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker} catch (Exception e) {logger.warn("destroyUnusedInvokers error. ", e);}// 通知invoker刷新this.invokersChanged();}
}
3.2 toInvokers將URL轉(zhuǎn)換為Invoker
將url轉(zhuǎn)換為Invoker,如果url已被引用,將不會(huì)重新引用。將放入newUrlInvokeMap的項(xiàng)將從oldUrlInvokerMap中刪除。
該方法的大概邏輯為:
- 獲取獲取消費(fèi)者需要查詢過濾的協(xié)議,遍歷全部最新服務(wù)提供者url,依次進(jìn)行如下操作:
- 調(diào)用checkProtocolValid方法,校驗(yàn)當(dāng)前提供者url協(xié)議是否支持當(dāng)前服務(wù)消費(fèi)者調(diào)用,如果不支持則跳過該提供者。服務(wù)消費(fèi)者可以手動(dòng)指定消費(fèi)某些協(xié)議的服務(wù)提供者,其他的服務(wù)提供者將被丟棄。
- 調(diào)用mergeUrl方法,合并服務(wù)提供者url的配置,合并覆蓋順序是:override > -D參數(shù) >Consumer配置 > Provider配置,從這里可以知道消費(fèi)者的配置優(yōu)先級大于提供者的配置。
- 從原來的緩存中獲取該url對應(yīng)的invoker:
- 如果已經(jīng)存在該緩存,那么直接將緩存的invoker加入到新的invoker map緩存中,不再從新引用。
- 如果緩存沒有該url對應(yīng)的invoker,那么將會(huì)重新引用該invoker,并將新引入的invoker加入到新的invoker map緩存中。
- 返回最新的url到invoker的緩存map。
/*** RegistryDirectory的的方法** 將url轉(zhuǎn)換為Invoker,如果url已被引用,將不會(huì)重新引用。將放入newUrlInvokeMap的項(xiàng)將從oldUrlInvokerMap中刪除。** @param oldUrlInvokerMap 此前的url到invoker的映射* @param urls 最新服務(wù)提供者url集合* @return invokers 最新的url到invoker的映射*/
private Map<URL, Invoker<T>> toInvokers(Map<URL, Invoker<T>> oldUrlInvokerMap, List<URL> urls) {//新的映射mapMap<URL, Invoker<T>> newUrlInvokerMap = new ConcurrentHashMap<>(urls == null ? 1 : (int) (urls.size() / 0.75f + 1));if (urls == null || urls.isEmpty()) {return newUrlInvokerMap;}//獲取消費(fèi)者需要查詢過濾的協(xié)議String queryProtocols = this.queryMap.get(PROTOCOL_KEY);//遍歷最新服務(wù)提供者url集合for (URL providerUrl : urls) {//校驗(yàn)當(dāng)前提供者url協(xié)議是否支持當(dāng)前服務(wù)消費(fèi)者調(diào)用,如果不支持則跳過該提供者//服務(wù)消費(fèi)者可以手動(dòng)指定消費(fèi)某些協(xié)議的服務(wù)提供者,其他的服務(wù)提供者將被丟棄if (!checkProtocolValid(queryProtocols, providerUrl)) {continue;}//合并服務(wù)提供者url的配置,合并覆蓋順序是:override > -D參數(shù) >Consumer配置 > Provider配置//從這里可以知道消費(fèi)者的配置優(yōu)先級大于提供者的配置URL url = mergeUrl(providerUrl);// Cache key is url that does not merge with consumer side parameters,// regardless of how the consumer combines parameters,// if the server url changes, then refer again//從原來的緩存中獲取該url對應(yīng)的invokerInvoker<T> invoker = oldUrlInvokerMap == null ? null : oldUrlInvokerMap.remove(url);//如果緩存沒有該url對應(yīng)的invoker,那么將會(huì)重新引用該invokerif (invoker == null) { // Not in the cache, refer againtry {boolean enabled = true;if (url.hasParameter(DISABLED_KEY)) {enabled = !url.getParameter(DISABLED_KEY, false);} else {enabled = url.getParameter(ENABLED_KEY, true);}//如果啟用服務(wù)if (enabled) {//再次通過Protocol$Adaptive的refer方法引用該服務(wù)提供者//在最開始我們就是通過refer方法引用服務(wù)的,在再次見到這個(gè)方法,只不過這里的url已經(jīng)變成了某個(gè)服務(wù)提供者的url了invoker = protocol.refer(serviceType, url);}} catch (Throwable t) {// Thrown by AbstractProtocol.optimizeSerialization()if (t instanceof RpcException && t.getMessage().contains("serialization optimizer")) {// 4-2 - serialization optimizer class initialization failed.logger.error("4-2", "typo in optimizer class", "","Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);} else {// 4-3 - Failed to refer invoker by other reason.logger.error("4-3", "", "","Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);}}//加入到新的invoker map緩存中if (invoker != null) { // Put new invoker in cachenewUrlInvokerMap.put(url, invoker);}} else {//如果已經(jīng)存在該緩存,那么直接將緩存的invoker加入到新的invoker map緩存中,不再從新引用newUrlInvokerMap.put(url, invoker);}}//返回新的invoker mapreturn newUrlInvokerMap;
}
在上面的步驟中,如果是首次啟動(dòng)消費(fèi)者,將會(huì)統(tǒng)一走Protocol$Adaptive的refer方法引用該服務(wù)提供者的邏輯。還記得在最開始講consumer服務(wù)引入的時(shí)候嗎,那時(shí)候我們就是通過這個(gè)refer方法引用服務(wù)的,現(xiàn)在再次見到這個(gè)方法,只不過此前的url則是注冊中心協(xié)議url,對應(yīng)著RegistryProtocol,而這里的url已經(jīng)變成了某個(gè)服務(wù)提供者的url了,對應(yīng)著具體的協(xié)議實(shí)現(xiàn),例如DubboProtocol、RestProtocol。
我們此前就講過了Protocol$Adaptive的refer方法實(shí)際上返回的是被wrapper包裝的Protocol,這里我們直接看最底層的Protocol的refer方法,以默認(rèn)協(xié)議dubbo協(xié)議的Protocol實(shí)現(xiàn)DubboProtocol為例子!
4 DubboProtocol#refer dubbo協(xié)議服務(wù)引入
該方法執(zhí)行基于dubbo序列化協(xié)議的服務(wù)引入,最終會(huì)創(chuàng)建一個(gè)DubboInvoker,內(nèi)部包含一個(gè)nettyClient,已經(jīng)與對應(yīng)的服務(wù)提供者的nettyServer建立了連接,可用于發(fā)起rpc遠(yuǎn)程調(diào)用請求。
/*** DubboProtocol的方法** @param type 服務(wù)類型* @param url 遠(yuǎn)程服務(wù)提供者url* @return* @param <T>* @throws RpcException*/
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {//銷毀檢測checkDestroyed();//協(xié)議綁定引用return protocolBindingRefer(type, url);
}@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {//銷毀檢測checkDestroyed();//序列化優(yōu)化optimizeSerialization(url);// create rpc invoker.//創(chuàng)建一個(gè)DubboInvoker,可用于發(fā)起rpc遠(yuǎn)程調(diào)用DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);//加入?yún)f(xié)議緩存invokersinvokers.add(invoker);return invoker;
}
4.1 getClients獲取服務(wù)客戶端
該方法獲取服務(wù)提供者網(wǎng)絡(luò)調(diào)用客戶端。這里會(huì)判斷是否使用共享連接,因?yàn)橐粋€(gè)服務(wù)提供者根提供了很多的服務(wù)接口,這個(gè)的是否共享連接,實(shí)際上就是指的消費(fèi)者引入時(shí)候,是這些服務(wù)接口是否共用一些客戶端連接(默認(rèn)一個(gè)),或者說不同的服務(wù)接口使用獨(dú)立的客戶端連接(默認(rèn)一個(gè)服務(wù)一個(gè)連接)。默認(rèn)是共享連接。
/*** DubboProtocol的方法* 獲取服務(wù)客戶端** @param url 服務(wù)提供者url* @return ExchangeClient數(shù)組*/
private ExchangeClient[] getClients(URL url) {//獲取配置的連接數(shù),默認(rèn)為0int connections = url.getParameter(CONNECTIONS_KEY, 0);// whether to share connection// if not configured, connection is shared, otherwise, one connection for one service//是否共享連接,如果沒有配置connections,那么連接是共享的,否則,一個(gè)服務(wù)連接一個(gè)服務(wù)if (connections == 0) {/** The xml configuration should have a higher priority than properties.* 共享連接配置,xml配置的優(yōu)先級應(yīng)該高于屬性*/String shareConnectionsStr = StringUtils.isBlank(url.getParameter(SHARE_CONNECTIONS_KEY, (String) null))? ConfigurationUtils.getProperty(url.getOrDefaultApplicationModel(), SHARE_CONNECTIONS_KEY, DEFAULT_SHARE_CONNECTIONS): url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);connections = Integer.parseInt(shareConnectionsStr);//獲取共享客戶端List<ReferenceCountExchangeClient> shareClients = getSharedClient(url, connections);//設(shè)置到ExchangeClient數(shù)組中ExchangeClient[] clients = new ExchangeClient[connections];Arrays.setAll(clients, shareClients::get);return clients;}//非共享連接,表示當(dāng)前服務(wù)接口使用單獨(dú)的連接ExchangeClient[] clients = new ExchangeClient[connections];for (int i = 0; i < clients.length; i++) {//初始化新的客戶端clients[i] = initClient(url);}return clients;
}
4.2 getSharedClient獲取共享客戶端連接
如果是共享連接配置,那么調(diào)用getSharedClient方法獲取共享客戶端連接,默認(rèn)連接數(shù)為1。該方法的大概步驟為:
- 首先獲取服務(wù)提供者ip:port 作為共享連接的key,即共享連接情況下,同一個(gè)服務(wù)提供者實(shí)例下的所有服務(wù)接口共享某些連接。
- 從緩存referenceClientMap獲取key對應(yīng)的共享客戶端連接。
- 如果存在緩存,并且客戶端連接全部可用,那么增加連接技術(shù),然后返回即可。否則,只要有一個(gè)客戶端不可用,就需要用可用的客戶端替換不可用的客戶端。
- 如果此前沒有該key的客戶端連接緩存或者連接不是全部可用,都要走下面的步驟,嘗試新創(chuàng)建連接。
- 加synchronized鎖,在鎖代碼中再次雙重檢測,注意這里還有線程等待喚醒機(jī)制。
- 最后判斷如果客戶端連接為空,那么調(diào)用buildReferenceCountExchangeClientList方法構(gòu)建指定數(shù)量的客戶端連接。如果連接不為空,那么遍歷連接,判斷如果該連接不可用,那么新創(chuàng)建一個(gè)連接補(bǔ)充進(jìn)來。
- 最后的處理仍需要加synchronized鎖,判斷如果最終沒建立連接,那么移除無效緩存,否則將最終的客戶端連接存入緩存,最后喚醒其他等待的線程。
該方法的核心知識點(diǎn)有兩個(gè),一個(gè)是buildReferenceCountExchangeClientList方法構(gòu)建指定數(shù)量的客戶端連接,另一個(gè)就是方法中的synchronized鎖以及等待喚醒機(jī)制。
為什么需要等待喚醒呢?因?yàn)檫@是共享客戶端,那么可能有多個(gè)線程都在初始化同一個(gè)ip:port的多個(gè)客戶端,為了避免沖突,需要加鎖。
/*** DubboProtocol的方法* <p>* 獲取共享客戶端連接** @param url 服務(wù)提供者url* @param connectNum 共享連接數(shù)量,默認(rèn)1*/
@SuppressWarnings("unchecked")
private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {//獲取 服務(wù)提供者ip:port 作為共享連接的keyString key = url.getAddress();//從緩存獲取key對應(yīng)的共享客戶端連接Object clients = referenceClientMap.get(key);if (clients instanceof List) {//轉(zhuǎn)換為ReferenceCountExchangeClient集合,帶有引用計(jì)數(shù)的功能List<ReferenceCountExchangeClient> typedClients = (List<ReferenceCountExchangeClient>) clients;//檢測客戶端連接是否全部可用//只要有一個(gè)客戶端不可用,就需要用可用的客戶端替換不可用的客戶端。if (checkClientCanUse(typedClients)) {//如果可用//增加連接的引用計(jì)數(shù),如果我們創(chuàng)建新的調(diào)用者共享相同的連接,連接將關(guān)閉,沒有任何引用batchClientRefIncr(typedClients);return typedClients;}}//如果此前沒有該key的連接緩存,那么新創(chuàng)建List<ReferenceCountExchangeClient> typedClients = null;synchronized (referenceClientMap) {//死循環(huán)for (; ; ) {// guarantee just one thread in loading condition. And Other is waiting It had finished.//雙重檢測鎖clients = referenceClientMap.get(key);if (clients instanceof List) {typedClients = (List<ReferenceCountExchangeClient>) clients;if (checkClientCanUse(typedClients)) {batchClientRefIncr(typedClients);return typedClients;} else {//如果共享連接不是全部可用,那么緩存值先設(shè)置為為一個(gè)object對象,跳出循環(huán)referenceClientMap.put(key, PENDING_OBJECT);break;}}//如果客戶端連接PENDING_OBJECT,那么表示有其他線程正在初始化當(dāng)前客戶端連接,那么當(dāng)前線程等待直到被通知else if (clients == PENDING_OBJECT) {try {referenceClientMap.wait();} catch (InterruptedException ignored) {}}//如果沒有共享連接,那么緩存值先設(shè)置為為一個(gè)object對象,跳出循環(huán)else {referenceClientMap.put(key, PENDING_OBJECT);break;}}}try {//連接數(shù)量必須大于等于1connectNum = Math.max(connectNum, 1);// If the clients is empty, then the first initialization is//如果客戶端連接為空if (CollectionUtils.isEmpty(typedClients)) {/** 構(gòu)建客戶端連接*/typedClients = buildReferenceCountExchangeClientList(url, connectNum);}//如果連接不為空else {//遍歷連接for (int i = 0; i < typedClients.size(); i++) {//如果該連接不可用,那么新創(chuàng)建一個(gè)連接補(bǔ)充進(jìn)來ReferenceCountExchangeClient referenceCountExchangeClient = typedClients.get(i);// If there is a client in the list that is no longer available, create a new one to replace him.if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {typedClients.set(i, buildReferenceCountExchangeClient(url));continue;}referenceCountExchangeClient.incrementAndGetCount();}}} finally {synchronized (referenceClientMap) {//如果最終沒建立連接,那么移除無效緩存if (typedClients == null) {referenceClientMap.remove(key);} else {//將最終的客戶端連接存入緩存referenceClientMap.put(key, typedClients);}//喚醒其他線程referenceClientMap.notifyAll();}}return typedClients;
}
4.3 buildReferenceCountExchangeClientList構(gòu)建客戶端連接
該方法構(gòu)建指定數(shù)量的引用計(jì)數(shù)交換器客戶端,內(nèi)部循環(huán)調(diào)用buildReferenceCountExchangeClient方法構(gòu)建耽單個(gè)客戶端連接,內(nèi)部調(diào)用initClient方法,初始化交換器客戶端,啟動(dòng)一個(gè)nettyClient并與服務(wù)端建立了連接。
/*** DubboProtocol的方法* 構(gòu)建指定數(shù)量的引用計(jì)數(shù)交換器客戶端** @param url 服務(wù)提供者url* @param connectNum 客戶端數(shù)量* @return*/
private List<ReferenceCountExchangeClient> buildReferenceCountExchangeClientList(URL url, int connectNum) {List<ReferenceCountExchangeClient> clients = new ArrayList<>();//循環(huán)調(diào)用buildReferenceCountExchangeClient方法for (int i = 0; i < connectNum; i++) {clients.add(buildReferenceCountExchangeClient(url));}return clients;
}/*** 構(gòu)建一個(gè)引用計(jì)數(shù)交換器客戶端** @param url* @return*/
private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {//初始化交換器客戶端,啟動(dòng)一個(gè)nettyClient并與服務(wù)端建立了連接ExchangeClient exchangeClient = initClient(url);//創(chuàng)建ReferenceCountExchangeClientReferenceCountExchangeClient client = new ReferenceCountExchangeClient(exchangeClient, DubboCodec.NAME);// read configs//獲取服務(wù)器關(guān)閉等待超時(shí)時(shí)間,默認(rèn)10000msint shutdownTimeout = ConfigurationUtils.getServerShutdownTimeout(url.getScopeModel());client.setShutdownWaitTime(shutdownTimeout);return client;
}
4.4 initClient建立客戶端連接
該方法創(chuàng)建客戶端連接,大概步驟為:
- 首先獲取客戶端底層通信框架類型,應(yīng)該和服務(wù)端的底層通信框統(tǒng)一,默認(rèn)netty。
- 用ServiceConfigURL替換InstanceAddressURL,協(xié)議為dubbo協(xié)議。
- 獲取lazy參數(shù),判斷連接是否懶加載,默認(rèn)false,即餓加載。如果懶加載,那么只有在第一次調(diào)用服務(wù)時(shí)才會(huì)創(chuàng)建與服務(wù)端的連接,否則立即調(diào)用Exchangers.connect(url, requestHandler)方法與服務(wù)端建立底層通信客戶端連接。
默認(rèn)情況下,客戶端為餓加載,客戶端與服務(wù)端的連接,在消費(fèi)者客戶端啟動(dòng)引用服務(wù)的時(shí)候就已經(jīng)建立了,即服務(wù)提供者url轉(zhuǎn)換為invoker的時(shí)候,就已經(jīng)建立了連接。
/*** DubboProtocol的方法* 創(chuàng)建一個(gè)新的連接** @param url 服務(wù)提供者url*/
private ExchangeClient initClient(URL url) {/** Instance of url is InstanceAddressURL, so addParameter actually adds parameters into ServiceInstance,* which means params are shared among different services. Since client is shared among services this is currently not a problem.*///獲取客戶端底層通信框架類型,應(yīng)該和服務(wù)端的底層通信框統(tǒng)一,默認(rèn)nettyString str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));// BIO is not allowed since it has severe performance issue.//不允許使用BIO,因?yàn)樗袊?yán)重的性能問題,目前都是使用netty4if (StringUtils.isNotEmpty(str) && !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(str)) {throw new RpcException("Unsupported client type: " + str + "," +" supported client type is " + StringUtils.join(url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));}try {// Replace InstanceAddressURL with ServiceConfigURL.//用ServiceConfigURL替換InstanceAddressURL,協(xié)議為dubbo協(xié)議url = new ServiceConfigURL(DubboCodec.NAME, url.getUsername(), url.getPassword(), url.getHost(), url.getPort(), url.getPath(), url.getAllParameters());url = url.addParameter(CODEC_KEY, DubboCodec.NAME);// enable heartbeat by default//默認(rèn)啟用心跳url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));//連接是否懶加載,默認(rèn)false,即餓加載return url.getParameter(LAZY_CONNECT_KEY, false)//如果懶加載,那么只有在第一次調(diào)用服務(wù)時(shí)才會(huì)創(chuàng)建與服務(wù)端的連接? new LazyConnectExchangeClient(url, requestHandler)//餓加載,與服務(wù)端建立底層通信客戶端連接: Exchangers.connect(url, requestHandler);} catch (RemotingException e) {throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);}
}
4.5 Exchangers#connect建立連接
該方法和我們此前學(xué)習(xí)的服務(wù)提供者的Exchangers#bind方法類型,只不過bind方法創(chuàng)建服務(wù)端,該方法創(chuàng)建客戶端。
該方法內(nèi)部基于Dubbo SPI獲取Exchanger,默認(rèn)HeaderExchanger,然后調(diào)用HeaderExchanger#connect方法。
/*** Exchangers的方法** 客戶端建立與服務(wù)端的連接** @param url 服務(wù)提供者url* @param handler 請求處理器* @return 客戶端連接*/
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");}if (handler == null) {throw new IllegalArgumentException("handler == null");}//基于Dubbo SPI獲取Exchanger,默認(rèn)HeaderExchanger,然后調(diào)用HeaderExchanger#connect方法return getExchanger(url).connect(url, handler);
}
HeaderExchanger#connect方法中,首先對handler進(jìn)行包裝:DecodeHandler -> HeaderExchangeHandler -> requestHandler。
- DecodeHandler用于負(fù)責(zé)內(nèi)部的dubbo協(xié)議的請求解碼。
- HeaderExchangeHandler用于完成請求響應(yīng)的映射。
- requestHandler用于nettyHandler真正處理請求。
隨后調(diào)用Transporters#connect方法啟動(dòng)底層遠(yuǎn)程網(wǎng)絡(luò)通信客戶端,返回Client。Transporter是Dubbo對網(wǎng)絡(luò)傳輸層的抽象接口,Exchanger依賴于Transporter。
最后基于Client構(gòu)建HeaderExchangeClient返回。
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {//包裝handler:DecodeHandler -> HeaderExchangeHandler -> handler//調(diào)用Transporters#connect方法啟動(dòng)底層遠(yuǎn)程網(wǎng)絡(luò)通信客戶端,返回Client//基于Client構(gòu)建HeaderExchangeClient返回return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
Transporters#connect方法將會(huì)在handler的最外層繼續(xù)包裝一層ChannelHandlerDispatcher,它所有的 ChannelHandler 接口實(shí)現(xiàn)都會(huì)調(diào)用其中每個(gè) ChannelHandler 元素的相應(yīng)方法。隨后基于Dubbo SPI機(jī)制獲取Transporter的實(shí)現(xiàn),并調(diào)用connect方法完成綁定,目前僅NettyTransporter,基于netty4。
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");}//繼續(xù)包裝一層ChannelHandlerDispatcherChannelHandler handler;if (handlers == null || handlers.length == 0) {handler = new ChannelHandlerAdapter();} else if (handlers.length == 1) {handler = handlers[0];} else {handler = new ChannelHandlerDispatcher(handlers);}//基于Dubbo SPI機(jī)制獲取Transporter的實(shí)現(xiàn),并調(diào)用connect方法完成綁定return getTransporter(url).connect(url, handler);
}
4.6 NettyTransporter#connect創(chuàng)建NettyClient
該方法很簡單,就是根據(jù)url和handler創(chuàng)建一個(gè)NettyClient實(shí)例,在NettyClient的構(gòu)造器中,會(huì)調(diào)用doOpen()開啟客戶端,創(chuàng)建Bootstrap,設(shè)置EventLoopGroup,編配ChannelHandlerPipeline,隨后調(diào)用connect方法連接服務(wù)提供者所在服務(wù)端。
@Override
public Client connect(URL url, ChannelHandler handler) throws RemotingException {//基于url和handler創(chuàng)建NettyClientreturn new NettyClient(url, handler);
}
NettyClient的構(gòu)造器如下,將會(huì)調(diào)用父類構(gòu)造器啟動(dòng)客戶端。
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.// the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler//可通過CommonConstants中的THREAD_NAME_KEY和THREAD_POOL_KEY自定義客戶端線程池的名稱和類型//繼續(xù)包裝handler: MultiMessageHandler->HeartbeatHandler->handlersuper(url, wrapChannelHandler(url, handler));
}
AbstractClient的構(gòu)造器如下,將會(huì)獲取綁定的ip和端口以及其他參數(shù),然后調(diào)用doOpen方法真正的開啟netty客戶端,最后調(diào)用connect方法連接服務(wù)提供者所在服務(wù)端。
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {super(url, handler);// set default needReconnect true when channel is not connected//當(dāng)通道未連接時(shí)設(shè)置默認(rèn)needReconnect為trueneedReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, true);//初始化執(zhí)行器,消費(fèi)者的執(zhí)行程序是全局共享的,提供者ip不需要是線程名的一部分。initExecutor(url);try {/** 創(chuàng)建netty客戶端*/doOpen();} catch (Throwable t) {close();throw new RemotingException(url.toInetSocketAddress(), null,"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);}try {// connect./** 連接服務(wù)提供者所在服務(wù)端*/connect();if (logger.isInfoEnabled()) {logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());}} catch (RemotingException t) {// If lazy connect client fails to establish a connection, the client instance will still be created,// and the reconnection will be initiated by ReconnectTask, so there is no need to throw an exceptionif (url.getParameter(LAZY_CONNECT_KEY, false)) {logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() +" connect to the server " + getRemoteAddress() +" (the connection request is initiated by lazy connect client, ignore and retry later!), cause: " +t.getMessage(), t);return;}if (url.getParameter(Constants.CHECK_KEY, true)) {close();throw t;} else {logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()+ " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);}} catch (Throwable t) {close();throw new RemotingException(url.toInetSocketAddress(), null,"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);}
}
4.7 doOpen初始化NettyClient
該方法用于初始化并啟動(dòng)netty客戶端,是非常標(biāo)準(zhǔn)的netty客戶端啟動(dòng)代碼,如果你們使用過Netty,看過Netty源碼,一定就會(huì)感到非常熟悉。
創(chuàng)建Bootstrap,設(shè)置eventGroup,編配ChannelHandler。至此成功初始化了Bootstrap,但是并沒有連接服務(wù)端。
/*** NettyClient的方法** 初始化 bootstrap*/
@Override
protected void doOpen() throws Throwable {//創(chuàng)建NettyClientHandlerfinal NettyClientHandler nettyClientHandler = createNettyClientHandler();//創(chuàng)建Bootstrap,說明這是一個(gè)netty客戶端bootstrap = new Bootstrap();//初始化NettyClientinitBootstrap(nettyClientHandler);
}protected NettyClientHandler createNettyClientHandler() {//創(chuàng)建NettyClientHandler,當(dāng)前NettyClient對象本身也是一個(gè)ChannelHandler實(shí)例,其received方法委托給創(chuàng)建實(shí)例時(shí)傳遞的內(nèi)部的handler處理return new NettyClientHandler(getUrl(), this);
}protected void initBootstrap(NettyClientHandler nettyClientHandler) {//配置線程組bootstrap.group(EVENT_LOOP_GROUP.get())//設(shè)置Socket 參數(shù).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())//IO模型.channel(socketChannelClass());bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(DEFAULT_CONNECT_TIMEOUT, getConnectTimeout()));//設(shè)置處理器bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {ch.pipeline().addLast("negotiation", new SslClientTlsHandler(getUrl()));}NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);//自定義客戶端消息的業(yè)務(wù)處理邏輯Handlerch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug//解碼.addLast("decoder", adapter.getDecoder())//編碼.addLast("encoder", adapter.getEncoder())//心跳檢測.addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))//最后是此前創(chuàng)建的nettyClientHandler.addLast("handler", nettyClientHandler);String socksProxyHost = ConfigurationUtils.getProperty(getUrl().getOrDefaultApplicationModel(), SOCKS_PROXY_HOST);if(socksProxyHost != null && !isFilteredAddress(getUrl().getHost())) {int socksProxyPort = Integer.parseInt(ConfigurationUtils.getProperty(getUrl().getOrDefaultApplicationModel(), SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));ch.pipeline().addFirst(socks5ProxyHandler);}}});
}
4.8 connect連接服務(wù)端
在初始化Bootstrap之后,將調(diào)用connect方法真正的連接服務(wù)提供者所在的服務(wù)端,內(nèi)部調(diào)用doConnect方法執(zhí)行連接,該方法由子類實(shí)現(xiàn)。
/*** AbstractClient的方法* <p>* 連接服務(wù)提供者所在服務(wù)端*/
protected void connect() throws RemotingException {//加鎖connectLock.lock();try {//如果已連接則返回if (isConnected()) {return;}//如果已關(guān)閉則返回if (isClosed() || isClosing()) {logger.warn("No need to connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + ", cause: client status is closed or closing.");return;}/** 執(zhí)行連接*/doConnect();if (!isConnected()) {throw new RemotingException(this, "Failed to connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()+ ", cause: Connect wait timeout: " + getConnectTimeout() + "ms.");} else {if (logger.isInfoEnabled()) {logger.info("Successfully connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()+ ", channel is " + this.getChannel());}}} catch (RemotingException e) {throw e;} catch (Throwable e) {throw new RemotingException(this, "Failed to connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()+ ", cause: " + e.getMessage(), e);} finally {connectLock.unlock();}
}
NettyClient的doConnect方法如下,主要邏輯就是調(diào)用bootstrap.connect方法連接服務(wù)端:
/*** NettyClient的方法* 連接服務(wù)端*/
@Override
protected void doConnect() throws Throwable {long start = System.currentTimeMillis();//通過bootstrap連接服務(wù)端ChannelFuture future = bootstrap.connect(getConnectAddress());try {//等待連接超時(shí)事件boolean ret = future.awaitUninterruptibly(getConnectTimeout(), MILLISECONDS);//如果連接成功if (ret && future.isSuccess()) {//獲取通道Channel newChannel = future.channel();try {// Close old channel// copy reference//關(guān)閉舊的ChannelChannel oldChannel = NettyClient.this.channel;if (oldChannel != null) {try {if (logger.isInfoEnabled()) {logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);}oldChannel.close();} finally {NettyChannel.removeChannelIfDisconnected(oldChannel);}}} finally {if (NettyClient.this.isClosed()) {try {if (logger.isInfoEnabled()) {logger.info("Close new netty channel " + newChannel + ", because the client closed.");}newChannel.close();} finally {NettyClient.this.channel = null;NettyChannel.removeChannelIfDisconnected(newChannel);}} else {NettyClient.this.channel = newChannel;}}} else if (future.cause() != null) {Throwable cause = future.cause();// 6-1 Failed to connect to provider server by other reason.RemotingException remotingException = new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "+ getRemoteAddress() + ", error message is:" + cause.getMessage(), cause);logger.error("6-1", "network disconnected", "","Failed to connect to provider server by other reason.", cause);throw remotingException;} else {// 6-2 Client-side timeoutRemotingException remotingException = new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "+ getRemoteAddress() + " client-side timeout "+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());logger.error("6-2", "provider crash", "","Client-side timeout.", remotingException);throw remotingException;}} finally {// just add new valid channel to NettyChannel's cacheif (!isConnected()) {//future.cancel(true);}}
}
5 saveProperties更新本地文件信息
在每次通知內(nèi)存數(shù)據(jù)更新之后,更新緩存文件。當(dāng)注冊中心由于網(wǎng)絡(luò)抖動(dòng)而訂閱失敗時(shí),至少可以返回現(xiàn)有的緩存的URL。
/*** AbstractRegistry的方法** @param url 服務(wù)消費(fèi)者url*/
private void saveProperties(URL url) {//服務(wù)緩存文件路徑為 {user.home}/.dubbo/dubbo-registry-{dubbo.application.name}-{ip}-{post}.cacheif (file == null) {return;}try {//需要存儲(chǔ)的url字符串StringBuilder buf = new StringBuilder();//獲取該url的不同類別節(jié)點(diǎn)到對應(yīng)url列表的mapMap<String, List<URL>> categoryNotified = notified.get(url);//遍歷所有的節(jié)點(diǎn)urlif (categoryNotified != null) {for (List<URL> us : categoryNotified.values()) {for (URL u : us) {if (buf.length() > 0) {//追加空格buf.append(URL_SEPARATOR);}//追加url字符串buf.append(u.toFullString());}}}//消費(fèi)者url key以及對應(yīng)的節(jié)點(diǎn)url字符串存入propertiesproperties.setProperty(url.getServiceKey(), buf.toString());//版本自增long version = lastCacheChanged.incrementAndGet();//保存properties到本地文件if (syncSaveFile) {doSaveProperties(version);} else {registryCacheExecutor.schedule(() -> doSaveProperties(version), DEFAULT_INTERVAL_SAVE_PROPERTIES, TimeUnit.MILLISECONDS);}} catch (Throwable t) {logger.warn(t.getMessage(), t);}
}
本地緩存文件路徑為:{user.home}/.dubbo/dubbo-registry-{dubbo.application.name}-{ip}-{post}.cache,里面緩存的內(nèi)容如下,每一個(gè)服務(wù)接口占據(jù)一行,它的所有url字符串都追加在后面,通過空格分隔。
6 總結(jié)
本次我們學(xué)習(xí)了接口級別服務(wù)發(fā)現(xiàn)訂閱refreshInterfaceInvoker方法的具體實(shí)現(xiàn),大概步驟為:
- 第一次調(diào)用refreshInterfaceInvoker方法的時(shí)候,由于MigrationInvoker內(nèi)部的真實(shí)消費(fèi)者Invoker為null,那么需要?jiǎng)?chuàng)建一個(gè)消費(fèi)者Invoker。
- 首先創(chuàng)建動(dòng)態(tài)注冊心中目錄DynamicDirectory,隨后調(diào)用doCreateInvoker方法創(chuàng)建服務(wù)消費(fèi)者Invoker。
- 首先根據(jù)消費(fèi)者信息轉(zhuǎn)換為消費(fèi)者注冊信息url,內(nèi)部包括消費(fèi)者ip、指定引用的protocol(默認(rèn)consumer協(xié)議)、指定引用的服務(wù)接口、指定引用的方法以及其他消費(fèi)者信息。
- 調(diào)用registry.register方法將消費(fèi)者注冊信息url注冊到注冊中心。
- 調(diào)用directory.buildRouterChain方法構(gòu)建服務(wù)調(diào)用路由鏈RouterChain,賦給directory的routerChain屬性。
- 調(diào)用directory.subscribe方法進(jìn)行服務(wù)發(fā)現(xiàn)、引入并訂閱服務(wù)。
- directory本身是一個(gè)監(jiān)聽器,directory將會(huì)訂閱zookeeper對應(yīng)的服務(wù)接口節(jié)點(diǎn)下的dubbo/[service name]/providers,服務(wù)提供者目錄,以及dubbo/[service name]/configurators,即配置目錄,以及dubbo/[service name]/routers,即服務(wù)路由目錄。
- 依靠著zookeeper的watch監(jiān)聽回調(diào)機(jī)制,當(dāng)這些節(jié)點(diǎn)下的子節(jié)點(diǎn)發(fā)生變化時(shí)會(huì)觸發(fā)回調(diào)通知RegistryDirectory執(zhí)行notify方法,進(jìn)而完成本地服務(wù)列表的動(dòng)態(tài)更新功能。實(shí)際上服務(wù)提供者也會(huì)訂閱,只不過只會(huì)訂閱configurators節(jié)點(diǎn)。
- 在執(zhí)行訂閱的時(shí)候,將會(huì)進(jìn)行一次providers,configurators,routers節(jié)點(diǎn)目錄下字節(jié)點(diǎn)的獲取,這樣就獲取到了當(dāng)前的服務(wù)提供者url、配置信息url、服務(wù)路由url。
- 在subscribe方法的最后,也是最關(guān)鍵的一步,主動(dòng)調(diào)用notify方法通知數(shù)據(jù)變更。這里實(shí)際上會(huì)動(dòng)態(tài)更新本地內(nèi)存和文件中的服務(wù)提供者緩存,可能會(huì)更新RegistryDirectory 內(nèi)部的configurators配置信息集合,routerChain路由鏈以及urlInvokerMap緩存,這里面存放著服務(wù)提供者url到對應(yīng)的Invoker的映射。
- 如果沒有在本地緩存中找到某個(gè)服務(wù)提供者url的緩存,那么會(huì)將url轉(zhuǎn)換為對應(yīng)協(xié)議的Invoker,默認(rèn)DubboInvoker,DubboInvoker的內(nèi)部還會(huì)創(chuàng)建NettyClient客戶端,并與服務(wù)提供者所在的服務(wù)端建立連接。
- 將url轉(zhuǎn)換為Invoker之前,將會(huì)進(jìn)行配置的合并,合并覆蓋順序是:override > -D參數(shù) >Consumer配置 > Provider配置,從這里可以知道消費(fèi)者的配置優(yōu)先級大于提供者的配置。
- 調(diào)用cluster.join方法傳入directory進(jìn)行集群容錯(cuò)能力包裝,最終返回一個(gè)ClusterInvoker作為消費(fèi)者Invoker,即MockClusterInvoker,這是一個(gè)包裝類,內(nèi)部包含真正的集群容錯(cuò)Invoker,默認(rèn)是FailoverClusterInvoker。
到此我們可以知道上面的各種對象的關(guān)系(注意MockClusterInvoker上面還有一個(gè)MigrationInvoker沒畫出來):
到此接口級服務(wù)引入學(xué)習(xí)完畢,實(shí)際上Dubbo2就是采用的接口級別服務(wù)注冊和引入。后面我們將繼續(xù)學(xué)習(xí)應(yīng)用級服務(wù)引入,實(shí)際上這才是Dubbo3升級的一個(gè)重點(diǎn),非常值得學(xué)習(xí)!