dubbo剖析:二 服務(wù)引用

注:文章中使用的dubbo源碼版本為2.5.4

零、文章目錄

一、服務(wù)引用的目的
二、關(guān)鍵概念及關(guān)系
三、服務(wù)引用流程詳解
四、整體流程圖總結(jié)
五、后續(xù)系列文章預(yù)告

一、服務(wù)引用的目的

“服務(wù)消費(fèi)者”向注冊中心訂閱“服務(wù)提供者”提供的服務(wù)地址,并生成服務(wù)接口的實(shí)際代理對象。

二、關(guān)鍵概念及關(guān)系

2.1 ReferenceBean & ReferenceConfig

服務(wù)引用Bean和引用配置類,服務(wù)引用信息的承載體。

2.2 RegistryProtocol

協(xié)議抽象接口Protocol的實(shí)現(xiàn)。其<T> Invoker<T> refer(Class<T> type, URL url)方法完成了服務(wù)引用的完整功能。

2.3 RegistryDirectory

同時(shí)具有目錄服務(wù)Directory和監(jiān)聽器NotifyListener兩個(gè)接口的功能,并組合了注冊器Registry。

  • 它的Invoker集合數(shù)據(jù)來源于zk注冊中心;
  • 其對外暴露的subscribe(URL url)方法完成了在注冊中心對服務(wù)提供者地址變更的監(jiān)聽功能;
  • 其同時(shí)實(shí)現(xiàn)了NotifyListener接口的notify(List<URL> urls)方法執(zhí)行監(jiān)聽回調(diào),監(jiān)聽服務(wù)提供者地址變更并創(chuàng)建對應(yīng)的Invoker;

2.4 JavassistProxyFactory

基于javassist的動(dòng)態(tài)代理工廠,其內(nèi)部使用dubbo自己的動(dòng)態(tài)代理機(jī)制Proxy完成動(dòng)態(tài)代理類的創(chuàng)建工作。

2.5 關(guān)系圖

服務(wù)引用相關(guān)接口關(guān)系圖

三、服務(wù)引用流程詳解

3.1 簡潔流程圖

  • 引用配置及初始化
    配置初始化
  • 注冊中心訂閱 & Invoker生成
  • Invoker獲取
  • 生成代理對象
    簡潔流程圖

3.2 服務(wù)引用入口

引用配置:

服務(wù)引用入口

服務(wù)引用方在工程中會(huì)有如上圖中的Spring配置。
在容器啟動(dòng)的時(shí)候會(huì)解析schema元素<dubbo:reference/>轉(zhuǎn)換成dubbo內(nèi)部數(shù)據(jù)結(jié)構(gòu)ReferenceBean。
ReferenceBean就是服務(wù)引用,它繼承了dubbo的引用配置類ReferenceConfig,并實(shí)現(xiàn)了Spring的FactoryBean和InitializingBean接口。

  • 對于服務(wù)引用獲取的demoService,并不是想要ReferenceBean這個(gè)對象實(shí)例本身,而是想獲取對遠(yuǎn)程調(diào)用的 代理,能夠通過這個(gè)代理服務(wù)調(diào)用遠(yuǎn)程dubbo服務(wù);
  • dubbo就是通過factoryBean.getObject()來創(chuàng)建并返回基于DemoService接口的代理對象;

配置初始化:

    public Object getObject() throws Exception {
        return get();
    }

ReferenceBean的getObject方法執(zhí)行其get()方法,最終會(huì)執(zhí)行到ReferenceConfiginit()方法,在該方法中完成代理對象的生成過程。

    private void init() {
        /*...省略部分代碼...*/
        //step1 根據(jù)配置生成Map
        Map<String, String> map = new HashMap<String, String>();
        Map<Object, Object> attributes = new HashMap<Object, Object>();
        map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
        map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
        map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));

        /*...省略部分代碼...*/
        //step2 調(diào)用createProxy生成代理類ref
        ref = createProxy(map);
    }

如上,init()中主要干的事情有兩件。一,根據(jù)配置生成map;二,調(diào)用createProxy生成代理對象ref。

  • 下圖展示了生成的map的信息:


    map的信息結(jié)構(gòu)
  • createProxy()方法主要做了如下三件事:1)加載配置中心拼裝成urls;2)遍歷urls,調(diào)用refProtocol創(chuàng)建遠(yuǎn)程的動(dòng)態(tài)代理Invoker;3)調(diào)用proxyFactory創(chuàng)建服務(wù)代理。

    private T createProxy(Map<String, String> map) {
        /*...省略部分代碼...*/
        //step1 加載配置的所有注冊中心,拼裝成urls
                List<URL> us = loadRegistries(false);
                if (us != null && us.size() > 0) {
                    for (URL u : us) {
                        URL monitorUrl = loadMonitor(u);
                        if (monitorUrl != null) {
                            map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                        }
                        urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                    }
                }
        /*...省略部分代碼...*/
        //step2 遍歷urls,調(diào)用refProtocol.refer創(chuàng)建遠(yuǎn)程的動(dòng)態(tài)代理Invoker
                for (URL url : urls) {
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // 用了最后一個(gè)registry url
                    }
                }
                if (registryURL != null) { // 有 注冊中心協(xié)議的URL
                    // 對有注冊中心的Cluster 只用 AvailableCluster
                    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                } else { // 不是 注冊中心的URL
                    invoker = cluster.join(new StaticDirectory(invokers));
                }
        /*...省略部分代碼...*/
        // step3 調(diào)用proxyFactory創(chuàng)建服務(wù)代理
        return (T) proxyFactory.getProxy(invoker);

3.3 注冊中心訂閱及Invoker創(chuàng)建

創(chuàng)建動(dòng)態(tài)代理Invoker:
refProtocol為dubbo通過ExtensionLoader動(dòng)態(tài)注入的RegistryProtocol實(shí)例。通過其refer方法創(chuàng)建Invoker
具體進(jìn)行的工作分為如下幾步:

  • 從url的registryKey獲取注冊中心類型:zookeeper
  • 從RegistryFactory獲取注冊器:Registry
  • 構(gòu)建RegistryDirectory,其同時(shí)也是回調(diào)監(jiān)聽器
  • 向注冊中心注冊服務(wù)消費(fèi)者
  • 從注冊中心訂閱服務(wù)提供者(即引用的服務(wù))
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        //step1 從url的registryKey獲取注冊中心類型:zookeeper
        url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
        //step2 從RegistryFactory獲取注冊器
        Registry registry = registryFactory.getRegistry(url);
        //...省略部分代碼...
        //doRefer
        return doRefer(cluster, registry, type, url);
    }
    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {

        //step3 構(gòu)建RegistryDirectory,可以把它理解為注冊資源,其中包含了消費(fèi)者/服務(wù)/路由等相關(guān)信息
        //其同時(shí)也是回調(diào)監(jiān)聽器
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
        if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {

            //step4 向注冊中心注冊服務(wù)消費(fèi)者
            registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                    Constants.CHECK_KEY, String.valueOf(false)));
        }

        //step5 從注冊中心訂閱服務(wù)提供者(即引用的服務(wù))
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                Constants.PROVIDERS_CATEGORY
                        + "," + Constants.CONFIGURATORS_CATEGORY
                        + "," + Constants.ROUTERS_CATEGORY));
        //step6 
        return cluster.join(directory);
    }

訂閱服務(wù)提供者并初始化監(jiān)聽處理:

RegistryDirectory

先按照上圖對RegistryDirectory進(jìn)行說明:

  • RegistryDirectory內(nèi)包含了一個(gè)Registry實(shí)例,即其提供了subscribe方法用來實(shí)現(xiàn)對url的訂閱;
  • RegistryDirectory同時(shí)實(shí)現(xiàn)了NotifyListener接口,即其本身具有回調(diào)監(jiān)聽器的功能;
  • 以上結(jié)合,RegistryDirectory可以完成對服務(wù)引用url的主動(dòng)訂閱和監(jiān)聽,監(jiān)聽會(huì)觸發(fā)其自身notify方法;

所以,執(zhí)行到"step5 從注冊中心訂閱服務(wù)提供者",調(diào)用RegistryDirectorysubscribe(URL url)方法時(shí),完成了對引用url的訂閱并同時(shí)出發(fā)監(jiān)聽流程:

  • 根據(jù)類別將監(jiān)聽urls分類為invoker,router,configurator;
  • 對于有變化的invokerUrls,調(diào)用DubboProtocol的refer方法生成DubboInvoker
  • DubboInvoker即實(shí)際遠(yuǎn)程代理Invoker,其可完成請求遠(yuǎn)程dubbo服務(wù)并獲取響應(yīng)結(jié)果的功能;(具體DubboProtocol的refer方法執(zhí)行過程不展開,后續(xù)有專門章節(jié)講解
  • 最終刷新urlInvokerMap和methodInvokerMap,將對應(yīng)的invokerUrl和dubboInvoker存入map中供后續(xù)使用;
    public synchronized void notify(List<URL> urls) {
        //step1 根據(jù)類別將urls分類
        List<URL> invokerUrls = new ArrayList<URL>();
        List<URL> routerUrls = new ArrayList<URL>();
        List<URL> configuratorUrls = new ArrayList<URL>();
        for (URL url : urls) {
            String protocol = url.getProtocol();
            String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
            if (Constants.ROUTERS_CATEGORY.equals(category)
                    || Constants.ROUTE_PROTOCOL.equals(protocol)) {
                routerUrls.add(url);
            } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
                    || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
                configuratorUrls.add(url);
            } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
                invokerUrls.add(url);
            } else {
                logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
            }
        }
        //...省略部分代碼...
        //providers
        refreshInvoker(invokerUrls);
    }
    private void refreshInvoker(List<URL> invokerUrls) {
           //...省略部分代碼...
           Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// 將URL列表轉(zhuǎn)成Invoker列表
           //...省略部分代碼...
           //step3 最終刷新urlInvokerMap和methodInvokerMap
           this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
           this.urlInvokerMap = newUrlInvokerMap;
    }
    private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
           //...省略部分代碼...
           //step2 protocol.refer根據(jù)url創(chuàng)建遠(yuǎn)程代理Invoker
         invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
    }

3.4 Invoker獲取及集群

Invoker生成并返回:
執(zhí)行到return cluster.join(directory),即通過FailFastCluster獲取并返回遠(yuǎn)程代理Invoker:

public class FailfastCluster implements Cluster {
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new FailfastClusterInvoker<T>(directory);
    }
}

其實(shí)就是new了一個(gè)FailfastClusterInvoker并返回,構(gòu)造方法參數(shù)為3.3步驟中的RegistryDirectory。

FailfastClusterInvoker干了神馬:
當(dāng)invoke(Invocation invocation)方法被調(diào)用時(shí):

  • 根據(jù)參數(shù)invocation從RegistryDirectory中獲取對應(yīng)遠(yuǎn)程DubboInvocation;
  • 動(dòng)態(tài)創(chuàng)建負(fù)載均衡算法;
  • 根據(jù)負(fù)載均衡算法從Invoker列表中選出一個(gè)invoker執(zhí)行;
    public Result invoke(final Invocation invocation) throws RpcException {
        LoadBalance loadbalance;
        //step1 根據(jù)服務(wù)引用的url從RegistryDirectory中獲取對應(yīng)遠(yuǎn)程DubboInvocation
        List<Invoker<T>> invokers = list(invocation);
        if (invokers != null && invokers.size() > 0) {
            //step2 動(dòng)態(tài)創(chuàng)建負(fù)載均衡算法
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                    .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
        } else {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        //step3 根據(jù)負(fù)載均衡算法從Invoker列表中選出一個(gè)invoker執(zhí)行
        return doInvoke(invocation, invokers, loadbalance);
    }
    protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
        List<Invoker<T>> invokers = directory.list(invocation);
        return invokers;
    }

3.5 代理對象的生成

最終回到ReferenceConfig中init的最后一步return (T) proxyFactory.getProxy(invoker);,通過JavassistProxyFactory.getProxy(invoker)將Invoker轉(zhuǎn)換成代理對象ref:

  • 先從invoker中解析出要?jiǎng)?chuàng)建的代理對象所實(shí)現(xiàn)的接口;
  • new一個(gè)InvokerInvocationHandler,它包含了invoker,實(shí)際也會(huì)執(zhí)行invoker的invoke方法;
  • Proxy.getProxy.newInstance
    使用dubbo的動(dòng)態(tài)代理接口創(chuàng)建動(dòng)態(tài)代理對象T ref;
public class JavassistProxyFactory extends AbstractProxyFactory {
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }
}

執(zhí)行完以上步驟后,終于生成了dubbo遠(yuǎn)程動(dòng)態(tài)代理對象T ref,我們可以直接使用該對象來完成RPC調(diào)用了。

四、整體流程圖總結(jié)

整體流程圖

整體看下來,服務(wù)引用主要分為以下幾大步驟:

1、引用配置及配置初始化
該部分以Spring配置及ReferenceBean為入口,主要在ReferenceConfig中進(jìn)行。

  • ReferenceConfig依賴RegistryProtocol完成了 "服務(wù)引用者注冊"、"服務(wù)提供者訂閱"和"Invoker創(chuàng)建" 的工作;
  • ReferenceConfig依賴JavassistProxyFactory完成了 "代理對象生成" 的工作;

2、注冊中心訂閱 & Invoker生成與獲取
該部分主要由RegistryDirectoryFailfastCluster實(shí)現(xiàn)。

  • 通過ReferenceConfig調(diào)用RegistryDirectory的subscribe方法,觸發(fā)了對服務(wù)提供者url的訂閱及監(jiān)聽,在監(jiān)聽過程中RegistryDirectory借助DubboProtocol完成了Invoker的創(chuàng)建工作,并保存了服務(wù)引用url和Invoker的關(guān)系;
  • 通過ReferenceConfig調(diào)用FailfastCluster的join方法,完成了對Invoker對象的獲取;

3、生成代理對象
該部分主要由JavassistProxyFactory完成。

  • ReferenceConfig調(diào)用JavassistProxyFactorygetProxy方法為入口,傳入Invoker;
  • 新創(chuàng)建了InvokerInvocationHandler,并使用dubbo自己的動(dòng)態(tài)代理工具Proxy最終生成代理對象T ref;

五、服務(wù)發(fā)布引用簡潔過程圖

簡潔過程圖

黑線表示方法調(diào)用
虛線表示對象引用
紅線表示請求調(diào)用過程

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容