Dubbo中的服務(wù)引用

本系列主要參考官網(wǎng)文檔、芋道源碼的源碼解讀和《深入理解Apache Dubbo與實(shí)戰(zhàn)》一書。Dubbo版本為2.6.1。

文章內(nèi)容順序:
1.服務(wù)引用的介紹
2.服務(wù)引用的入口方法getObject()=>createProxy()方法介紹

3.本地引用

  • 3.1是否為本地引用的判別方法,InjvmProtocol#isInjvmRefer
  • 3.2createProxy()中的本地引用鏈路
  • 3.3InjvmIvoker介紹,引出proxyFactory

4.proxyFactory擴(kuò)展點(diǎn)

  • 4.1proxyFactory的包裝類StubProxyFactoryWrapper,本地存根的作用
  • 4.2擴(kuò)展類JavassistProxyFactory的getProxy(Invoker)
  • 4.3Proxy 實(shí)例中傳入了InvokerInvocationHandler類的意義
  • 4.4JavassistProxyFactory生成的代碼樣例及作用
  • 4.5服務(wù)引用存在哪?

5.遠(yuǎn)程引用

  • 5.1 createProxy()中的遠(yuǎn)程引用鏈路
  • 5.2只配置了一個(gè)注冊中心的遠(yuǎn)程引用
  • 5.3RegistryProtocol#refer()
  • 5.4直連時(shí)的遠(yuǎn)程引用
  • 5.5new DubboInvoker時(shí)的getClients(url)
  • 5.6getClients(url)中的getSharedClient(url)
  • 5.7getClients(url)中的initClient(url)
  • 5.8initClient(url)中的自適應(yīng)Exchangers#connect()
  • 5.9多注冊中心時(shí)鏈路

1.服務(wù)引用的介紹

來自官網(wǎng)的服務(wù)引用介紹:

  • Dubbo 服務(wù)引用的時(shí)機(jī)有兩個(gè),第一個(gè)是在 Spring 容器調(diào)用 ReferenceBean 的 afterPropertiesSet方法時(shí)引用服務(wù),第二個(gè)是在 ReferenceBean對應(yīng)的服務(wù)被注入到其他類中時(shí)引用。這兩個(gè)引用服務(wù)的時(shí)機(jī)區(qū)別在于,第一個(gè)是餓漢式的,第二個(gè)是懶漢式的。
  • 默認(rèn)情況下,Dubbo 使用懶漢式引用服務(wù)。如果需要使用餓漢式,可通過配置 <dubbo:reference> 的 init 屬性開啟。下面我們按照 Dubbo 默認(rèn)配置進(jìn)行分析,整個(gè)分析過程從 ReferenceBean 的 getObject方法開始。當(dāng)我們的服務(wù)被注入到其他類中時(shí),Spring 會第一時(shí)間調(diào)用 getObject 方法,并由該方法執(zhí)行服務(wù)引用邏輯。
  • 按照慣例,在進(jìn)行具體工作之前,需先進(jìn)行配置檢查與收集工作。接著根據(jù)收集到的信息決定服務(wù)用的方式,有三種,第一種是引用本地 (JVM) 服務(wù),第二是通過直連方式引用遠(yuǎn)程服務(wù),第三是通過注冊中心引用遠(yuǎn)程服務(wù)。不管是哪種引用方式,最后都會得到一個(gè) Invoker 實(shí)例。
  • 如果有多個(gè)注冊中心,多個(gè)服務(wù)提供者,這個(gè)時(shí)候會得到一組 Invoker 實(shí)例,此時(shí)需要通過集群管理類 Cluster 將多個(gè) Invoker 合并成一個(gè)實(shí)例。合并后的 Invoker 實(shí)例已經(jīng)具備調(diào)用本地或遠(yuǎn)程服務(wù)的能力了,但并不能將此實(shí)例暴露給用戶使用,這會對用戶業(yè)務(wù)代碼造成侵入。此時(shí)框架還需要通過代理工廠類 (ProxyFactory) 為服務(wù)接口生成代理類,并讓代理類去調(diào)用 Invoker 邏輯。避免了 Dubbo 框架代碼對業(yè)務(wù)代碼的侵入,同時(shí)也讓框架更容易使用。
image.png

本圖暫不考慮集群容錯(cuò)、網(wǎng)絡(luò)調(diào)用、序列化反序列

2.服務(wù)引用的入口,getObject()=>createProxy()方法介紹

那么就從ReferenceBean的getObject開始吧:

public Object getObject() throws Exception {
    return get();
}

public synchronized T get() {
    if (destroyed) {
        throw new IllegalStateException("Already destroyed!");
    }
    // 檢測 ref (即service對象)是否為空,為空則通過 init 方法創(chuàng)建
    if (ref == null) {
        // init 方法主要用于處理配置,以及調(diào)用 createProxy 生成代理類
        init();
    }
    return ref;
}

注意這個(gè)ReferenceConfig#init(),主要邏輯就是配置解析封裝(實(shí)在太長就不貼了,而且解析配置不在討論的點(diǎn)),該實(shí)例包含了事件通知配置,比如 onreturn、onthrow、oninvoke 等。并在最后調(diào)用 createProxy ()創(chuàng)建代理對象。

從createProxy 我們也將開始真正的引用鏈路。

  //map為 `side`,`dubbo`,`timestamp`,`pid`等 參數(shù)
    private T createProxy(Map<String, String> map) {
        URL tmpUrl = new URL("temp", "localhost", 0, map);
        // 是否本地引用
        final boolean isJvmRefer;
        // injvm 屬性為空,不通過該屬性判斷
        if (isInjvm() == null) {
            // 直連服務(wù)提供者,參見文檔《直連提供者》https://dubbo.gitbooks.io/dubbo-user-book/demos/explicit-target.html
            if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
                isJvmRefer = false;
            // 通過 `tmpUrl` 判斷,是否需要本地引用
            } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
                // by default, reference local service if there is
                isJvmRefer = true;
            // 默認(rèn)不是
            } else {
                isJvmRefer = false;
            }
        // 通過 injvm 屬性。
        } else {
            isJvmRefer = isInjvm();
        }

        // 本地引用
        if (isJvmRefer) {
            // 創(chuàng)建本地服務(wù)引用 URL 對象。
            URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
            // 引用服務(wù),返回 Invoker 對象
            invoker = refprotocol.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }
        // 正常流程,一般為遠(yuǎn)程引用
        } else {
            // 定義直連地址,可以是服務(wù)提供者的地址,也可以是注冊中心的地址
            if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
                // 拆分地址成數(shù)組,使用 ";" 分隔。
                String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
                // 循環(huán)數(shù)組,添加到 `url` 中。
                if (us != null && us.length > 0) {
                    for (String u : us) {
                        // 創(chuàng)建 URL 對象
                        URL url = URL.valueOf(u);
                        // 設(shè)置默認(rèn)路徑
                        if (url.getPath() == null || url.getPath().length() == 0) {
                            url = url.setPath(interfaceName);
                        }
                        // 注冊中心的地址,帶上服務(wù)引用的配置參數(shù)
                        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                        // 服務(wù)提供者的地址
                        } else {
                            urls.add(ClusterUtils.mergeUrl(url, map));
                        }
                    }
                }
            // 注冊中心
            } else { // assemble URL from register center's configuration
                // 加載注冊中心 URL 數(shù)組
                List<URL> us = loadRegistries(false);
                // 循環(huán)數(shù)組,添加到 `url` 中。
                if (us != null && !us.isEmpty()) {
                    for (URL u : us) {
                        // 加載監(jiān)控中心 URL
                        URL monitorUrl = loadMonitor(u);
                        // 服務(wù)引用配置對象 `map`,帶上監(jiān)控中心的 URL
                        if (monitorUrl != null) {
                            map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                        }
                        // 注冊中心的地址,帶上服務(wù)引用的配置參數(shù)
                        urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); // 注冊中心,帶上服務(wù)引用的配置參數(shù)
                    }
                }
                if (urls.isEmpty()) {
                    throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                }
            }

            // 單 `urls` 時(shí),引用服務(wù),返回 Invoker 對象
            if (urls.size() == 1) {
                // 引用服務(wù)
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
            } else {
                // 循環(huán) `urls` ,引用服務(wù),返回 Invoker 對象
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    // 引用服務(wù)
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    // 使用最后一個(gè)注冊中心的 URL
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // use last registry url
                    }
                }
                // 有注冊中心
                if (registryURL != null) { // registry url is available
                    // 對有注冊中心的 Cluster 只用 AvailableCluster
                    // use AvailableCluster only when register's cluster is available
                    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                // 無注冊中心,全部都是服務(wù)直連
                } else { // not a registry url
                    invoker = cluster.join(new StaticDirectory(invokers));
                }
            }
        }

        // 啟動時(shí)檢查
        Boolean c = check;
        if (c == null && consumer != null) {
            c = consumer.isCheck();
        }
        if (c == null) {
            c = true; // default true
        }
        if (c && !invoker.isAvailable()) {
            throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
        }
        if (logger.isInfoEnabled()) {
            logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
        }

        // 創(chuàng)建 Service 代理對象
        // create service proxy
        return (T) proxyFactory.getProxy(invoker);
    }

3.createProxy()中的本地引用鏈路

3.1是否為本地引用的判別方法,InjvmProtocol#isInjvmRefer

同樣的,服務(wù)引用也分為本地引用和遠(yuǎn)程引用,本地引用還是遠(yuǎn)程引用是從URL來辨別的,先從本地引用來講起。
這邊來簡單介紹下他的判別方法InjvmProtocol#isInjvmRefer,也比較簡單

public boolean isInjvmRefer(URL url) {
        final boolean isJvmRefer;
        String scope = url.getParameter(Constants.SCOPE_KEY);
        // Since injvm protocol is configured explicitly, we don't need to set any extra flag, use normal refer process.
        // 當(dāng) `protocol = injvm` 時(shí),本身已經(jīng)是 jvm 協(xié)議了,走正常流程就是了。
        if (Constants.LOCAL_PROTOCOL.toString().equals(url.getProtocol())) {
            isJvmRefer = false;
        // 當(dāng) `scope = local` 或者 `injvm = true` 時(shí),本地引用
        } else if (Constants.SCOPE_LOCAL.equals(scope) || (url.getParameter("injvm", false))) {
            // if it's declared as local reference
            // 'scope=local' is equivalent to 'injvm=true', injvm will be deprecated in the future release
            isJvmRefer = true;
        // 當(dāng) `scope = remote` 時(shí),遠(yuǎn)程引用
        } else if (Constants.SCOPE_REMOTE.equals(scope)) {
            // it's declared as remote reference
            isJvmRefer = false;
        // 當(dāng) `generic = true` 時(shí),即使用泛化調(diào)用,遠(yuǎn)程引用。
        } else if (url.getParameter(Constants.GENERIC_KEY, false)) {
            // generic invocation is not local reference
            isJvmRefer = false;
        // 當(dāng)本地已經(jīng)有該 Exporter 時(shí),本地引用
        } else if (getExporter(exporterMap, url) != null) {
            // by default, go through local reference if there's the service exposed locally
            isJvmRefer = true;
        // 默認(rèn),遠(yuǎn)程引用
        } else {
            isJvmRefer = false;
        }
        return isJvmRefer;
    }
}

上面的方法沒什么好說的,都已經(jīng)注釋好啦。

createProxy()中的本地引用鏈路

這邊我們先走本地引用的鏈路,再貼一下createProxy()的部分代碼如下,注意到在創(chuàng)建本地服務(wù)引用 URL 對象時(shí),已經(jīng)把Protocol設(shè)置成InjvmProtocol了,

  // 本地引用
        if (isJvmRefer) {
            // 創(chuàng)建本地服務(wù)引用 URL 對象。
            URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
            // 引用服務(wù),返回 Invoker 對象
            invoker = refprotocol.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }

看到直接調(diào)用了Protocol#refer(interface, url),根據(jù)url獲得對應(yīng) Protocol 拓展實(shí)現(xiàn)為 InjvmProtocol 。同樣的還是SPI機(jī)制,調(diào)用鏈路是:
Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => InjvmProtocol,已經(jīng)在SPI和服務(wù)暴露一文提過很多次了,包裝類里export()refer()的邏輯都差不多,就不再介紹。直接來看Protocol#refer(interface, url)做了什么吧。

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        return new InjvmInvoker<T>(serviceType, url, url.getServiceKey(), exporterMap);
    }

非常樸素的new了InjvmInvoker,在進(jìn)去看看他干了什么。

3.3InjvmIvoker介紹,引出proxyFactory

    /**
     * Exporter 集合
     *
     * key: 服務(wù)鍵
     *
     * 該值實(shí)際就是 {@link com.alibaba.dubbo.rpc.protocol.AbstractProtocol#exporterMap}
     */
    private final Map<String, Exporter<?>> exporterMap;

    InjvmInvoker(Class<T> type, URL url, String key, Map<String, Exporter<?>> exporterMap) {
        super(type, url);
        this.key = key;
        this.exporterMap = exporterMap;
    }
  • 最后返回的就是這個(gè)InjvmIvoker,Invoker 是 Dubbo 的核心模型,代表一個(gè)可執(zhí)行體,這個(gè)InjvmIvoker他將一些屬性和本地的緩存聚合到一起形成了一個(gè)Invoker。
    每個(gè)InjvmInvoker都會持有一個(gè)指向本地緩存的指針。
  • 拿到這個(gè)Invoker后,在ReferenceConfig#createProxy最后調(diào)用
// 創(chuàng)建 Service 代理對象
       // create service proxy
       return (T) proxyFactory.getProxy(invoker);

調(diào)用 ProxyFactory#getProxy(invoker)方法,創(chuàng)建 Service 代理對象。這邊的proxyFactory同樣也是擴(kuò)展點(diǎn),由傳入的invoker.url來決定調(diào)用哪個(gè)。
順帶一提 Service 代理對象的內(nèi)部,可以調(diào)用 Invoker#invoke(Invocation)方法,進(jìn)行 Dubbo 服務(wù)的調(diào)用。
那么最后,getProxy()到底是怎么創(chuàng)建代理對象的呢?

4.proxyFactory擴(kuò)展點(diǎn)

image.png

ProxyFactory同樣也是個(gè)擴(kuò)展類,有Stub包裝類,還有兩個(gè)擴(kuò)展類實(shí)現(xiàn),可以自由決定用哪個(gè)Factory。Invoker通過Javassist動態(tài)代理或者JDK動態(tài)代理,兩者都是通過生成字節(jié)碼實(shí)現(xiàn)的。

而技術(shù)的選型可以看下這張圖作者的解釋


image.png

4.1proxyFactory的包裝類StubProxyFactoryWrapper

  • ProxyFactory#getProxy(invoker)`鏈路其實(shí)是這樣的,StubProxyFactoryWrapper的作用就是生成本地存根。


    image.png

在此之前我們先要知道本地存根的作用(也就是為什么要調(diào)用StubProxyFactoryWrapper):
遠(yuǎn)程服務(wù)后,客戶端通常只剩下接口,而實(shí)現(xiàn)全在服務(wù)器端,但提供方有些時(shí)候想在客戶端也執(zhí)行部分邏輯,比如:做 ThreadLocal 緩存,提前驗(yàn)證參數(shù),調(diào)用失敗后偽造容錯(cuò)數(shù)據(jù)等等,此時(shí)就需要在 API 中帶上 Stub,客戶端生成 Proxy 實(shí)例,會把 Proxy 通過構(gòu)造函數(shù)傳給 Stub ,然后把 Stub 暴露給用戶,Stub 可以決定要不要去調(diào) Proxy。
說多了不如看一個(gè)簡單的實(shí)例:Dubbo本地存根

StubProxyFactoryWrapper#getProxy(invoker)代碼如下

public <T> T getProxy(Invoker<T> invoker) throws RpcException {
        // 獲得 Service Proxy 對象,這邊調(diào)用的就是指定的proxyFactory了
        T proxy = proxyFactory.getProxy(invoker);
        if (GenericService.class != invoker.getInterface()) { // 非泛化引用
            // 獲得 `stub` 配置項(xiàng)
            String stub = invoker.getUrl().getParameter(Constants.STUB_KEY, invoker.getUrl().getParameter(Constants.LOCAL_KEY));
            if (ConfigUtils.isNotEmpty(stub)) {
                Class<?> serviceType = invoker.getInterface();
                // `stub = true` 的情況,使用接口 + `Stub` 字符串。
                if (ConfigUtils.isDefault(stub)) {
                    if (invoker.getUrl().hasParameter(Constants.STUB_KEY)) {
                        stub = serviceType.getName() + "Stub";
                    } else {
                        stub = serviceType.getName() + "Local";
                    }
                }
                try {
                    // 加載 Stub 類
                    Class<?> stubClass = ReflectUtils.forName(stub);
                    if (!serviceType.isAssignableFrom(stubClass)) {
                        throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + serviceType.getName());
                    }
                    try {
                        Constructor<?> constructor = ReflectUtils.findConstructor(stubClass, serviceType);
                          // 創(chuàng)建 Stub 對象,使用帶 Service Proxy 對象的構(gòu)造方法
                        proxy = (T) constructor.newInstance(new Object[]{proxy});

                        // 【TODO 8033】參數(shù)回調(diào)
                        //export stub service
                        URL url = invoker.getUrl();
                        if (url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT)) {
                            url = url.addParameter(Constants.STUB_EVENT_METHODS_KEY, StringUtils.join(Wrapper.getWrapper(proxy.getClass()).getDeclaredMethodNames(), ","));
                            url = url.addParameter(Constants.IS_SERVER_KEY, Boolean.FALSE.toString());
                            try {
                                export(proxy, (Class) invoker.getInterface(), url);
                            } catch (Exception e) {
                                LOGGER.error("export a stub service error.", e);
                            }
                        }
                    } catch (NoSuchMethodException e) {
                        throw new IllegalStateException("No such constructor \"public " + stubClass.getSimpleName() + "(" + serviceType.getName() + ")\" in stub implementation class " + stubClass.getName(), e);
                    }
                } catch (Throwable t) {
                    LOGGER.error("Failed to create stub implementation class " + stub + " in consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", cause: " + t.getMessage(), t);
                    // ignore
                }
            }
        }
        return proxy;
    }
  • 動態(tài)的用proxyFactory(默認(rèn)是JavassistFactory)來獲取Service Proxy 對象
  • 獲取配置中的stub 配置項(xiàng),而后調(diào)用 ReflectUtils#forName(stub) 方法,加載我們自己寫的 Stub 類,注意是加載哦,就是拿到Class,初始化對象是在下面完成的。
  • 之后創(chuàng)建 Stub 對象,使用帶 Service Proxy 對象作為參數(shù)的構(gòu)造方法。例如,public DemoServiceStub(DemoService demoService)。通過這樣的方式,我們就擁有了一個(gè)內(nèi)部有 Proxy Service 對象的 Stub 對象啦,可以實(shí)現(xiàn)各種 OOXX 啦。最后將這個(gè)Stub對象返回。
  • 再次提醒,所以我們有的,是這個(gè)Stub對象,可以用這個(gè)對象來對執(zhí)行方法進(jìn)行一些本地的AOP攔截

4.2擴(kuò)展類JavassistProxyFactory的getProxy(Invoker)

StubProxyFactoryWrapper#getProxy(invoker)第一行就調(diào)用了擴(kuò)展類的getProxy(invoker)
既然默認(rèn)實(shí)現(xiàn)是javassist,那么我們就來看看javassist都干了什么吧
先來看看他的父類,AbstractProxyFactory,

public abstract class AbstractProxyFactory implements ProxyFactory {

    public <T> T getProxy(Invoker<T> invoker) throws RpcException {
        Class<?>[] interfaces = null;
        // TODO 8022 芋艿
        String config = invoker.getUrl().getParameter("interfaces");
        if (config != null && config.length() > 0) {
            String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
            if (types != null && types.length > 0) {
                interfaces = new Class<?>[types.length + 2];
                interfaces[0] = invoker.getInterface();
                interfaces[1] = EchoService.class;
                for (int i = 0; i < types.length; i++) {
                    interfaces[i + 1] = ReflectUtils.forName(types[i]);
                }
            }
        }
        // 增加 EchoService 接口,用于回生測試。參見文檔《回聲測試》https://dubbo.gitbooks.io/dubbo-user-book/demos/echo-service.html
        if (interfaces == null) {
            interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
        }
        return getProxy(invoker, interfaces);
    }

    public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types);

}

可以看到,該抽象類,主要是實(shí)現(xiàn)了 #getProxy(invoker)方法,獲得需要生成代理的接口們,而后調(diào)用了我們的子類JavassistProxyFactory#getProxy(invoker, types)。接著往下看

public class JavassistProxyFactory extends AbstractProxyFactory {

    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
      /// 生成 Proxy 子類(Proxy 是抽象類)。并調(diào)用 Proxy 子類的 newInstance 方法創(chuàng)建 Proxy 實(shí)例
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        // TODO Wrapper類不能正確處理帶$的類名
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

}

JavassistProxyFactory#getProxy(invoker, types)return的時(shí)候生成 Proxy 子類(Proxy 是抽象類)。并調(diào)用 Proxy 子類的 newInstance 方法創(chuàng)建 Proxy 實(shí)例。注意:其中傳入的參數(shù)是 InvokerInvocationHandler類,通過這樣的方式,讓 proxy 和真正的邏輯代碼解耦。
那我們來看看這個(gè)InvokerInvocationHandler類到底是什么

4.3Proxy 實(shí)例中傳入了InvokerInvocationHandler類

public class InvokerInvocationHandler implements InvocationHandler {

    /**
     * Invoker 對象
     */
    private final Invoker<?> invoker;

    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        // wait 等Object類的方法,直接反射調(diào)用
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        // 基礎(chǔ)方法,不使用 RPC 調(diào)用
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        // RPC 調(diào)用
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }

}
  • 這個(gè)類就是攔截一些接口類調(diào)用的用途。將一些簡單的方法本地調(diào)用,就不用浪費(fèi)網(wǎng)絡(luò)了通信了。
  • 這里注意InvokerInvocationHandler#invoke return的調(diào)用 invoker.invoke(new RpcInvocation(method, args)).recreate(),如果消費(fèi)者調(diào)用invoke的話,會在這里終于開始了網(wǎng)絡(luò)調(diào)用

4.4JavassistProxyFactory生成的代碼樣例及作用

最終JavassistProxyFactory生成的代碼樣例如下:

package org.apache.dubbo.common.bytecode;

public class proxy0 implements org.apache.dubbo.demo.DemoService {

    public static java.lang.reflect.Method[] methods;

    private java.lang.reflect.InvocationHandler handler;

    public proxy0() {
    }

    public proxy0(java.lang.reflect.InvocationHandler arg0) {
        handler = $1;
    }

    public java.lang.String sayHello(java.lang.String arg0) {
        Object[] args = new Object[1];
        args[0] = ($w) $1;
        Object ret = handler.invoke(this, methods[0], args);
        return (java.lang.String) ret;
    }
}

  • 這個(gè)proxy實(shí)現(xiàn)了我們的業(yè)務(wù)接口方法,使得我們自己調(diào)用業(yè)務(wù)方法,比如sayHello的時(shí)候,可以進(jìn)行這樣的鏈路:
    InvokerInvocationHandler對象(攔截基本方法,使其不進(jìn)行網(wǎng)絡(luò)調(diào)用,如toString()) => 網(wǎng)絡(luò)調(diào)用
  • 而使用方對此是沒有感覺的,他只需要調(diào)用消費(fèi)者的接口,Dubbo幫他實(shí)現(xiàn)了一切。

4.5服務(wù)引用存在哪?

別急,這就來
XML中的服務(wù)引用配置樣例如下:

<dubbo:reference interface="cn.com.wang.service.UserService" id="userService">
</dubbo:reference>
  • createProxy()方法最終返回的一個(gè)Stub(如果實(shí)現(xiàn)了本地存根的話),getObject()的返回值正是這個(gè)Stub,存儲在ReferenceBean中。
  • ReferenceBean 有個(gè)特殊之處,實(shí)現(xiàn)了FactoryBean ,FactoryBean就是spring的工廠bean,工廠bean也就是說當(dāng)我們要獲取dubbo:reference中interface時(shí)也就是我們的UserService,我們會直接注入到使用類中,spring就得從容器中找。
  • 也因?yàn)樗枪Sbean,才會有上文我們提到的,調(diào)用FactoryBean的getObject()方法,這個(gè)方法返回的對象就會作為標(biāo)簽配置返回的對象。所以我們的引用,也就是最后返回Service 代理對象,的實(shí)際上是存在Spring容器里的。

至此,我們簡單分析了本地引用和proxyFactory的動態(tài)代理以及本地存根的作用,接下來來看看遠(yuǎn)程引用與本地引用的區(qū)別吧。

5.遠(yuǎn)程調(diào)用

5.1 createProx()中的遠(yuǎn)程引入的鏈路

 private T createProxy(Map<String, String> map) {
     URL tmpUrl = new URL("temp", "localhost", 0, map);
     // 【省略代碼】是否本地引用
     final boolean isJvmRefer;

     if (isJvmRefer) {
    // 【省略代碼】本地引用
    } else {// 遠(yuǎn)程引用
        // url 不為空,表明用戶可能想進(jìn)行點(diǎn)對點(diǎn)調(diào)用
        if (url != null && url.length() > 0) {
            // 當(dāng)需要配置多個(gè) url 時(shí),可用分號進(jìn)行分割,這里會進(jìn)行切分
            String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
            if (us != null && us.length > 0) {
                for (String u : us) {
                    URL url = URL.valueOf(u);
                    if (url.getPath() == null || url.getPath().length() == 0) {
                        // 設(shè)置接口全限定名為 url 路徑
                        url = url.setPath(interfaceName);
                    }
                    
                    // 檢測 url 協(xié)議是否為 registry,若是,表明用戶想使用指定的注冊中心
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        // 將 map 轉(zhuǎn)換為查詢字符串,并作為 refer 參數(shù)的值添加到 url 中
                        urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                    } else {
                        // 合并 url,移除服務(wù)提供者的一些配置(這些配置來源于用戶配置的 url 屬性),
                        // 比如線程池相關(guān)配置。并保留服務(wù)提供者的部分配置,比如版本,group,時(shí)間戳等
                        // 最后將合并后的配置設(shè)置為 url 查詢字符串中。
                        urls.add(ClusterUtils.mergeUrl(url, map));
                    }
                }
            }
        } else {
            // url為空,加載注冊中心 url,
            //注意是這里開始將不同的注冊中心循環(huán)添加到urls
            List<URL> us = loadRegistries(false);
            if (us != null && !us.isEmpty()) {
                for (URL u : us) {
                    // 加載監(jiān)控中心 URL
                    URL monitorUrl = loadMonitor(u);
                    if (monitorUrl != null) {
                        // 服務(wù)引用配置對象 `map`,帶上監(jiān)控中心的 URL
                        map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                    }
                    // 添加 refer 參數(shù)到 url 中,并將 url 添加到 urls 中
                    urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                }
            }

            // 未配置注冊中心,拋出異常
            if (urls.isEmpty()) {
                throw new IllegalStateException("No such any registry to reference...");
            }
        }

        // 單個(gè)注冊中心或服務(wù)提供者(服務(wù)直連,下同)
        if (urls.size() == 1) {
            // 調(diào)用 RegistryProtocol 的 refer 構(gòu)建 Invoker 實(shí)例
            invoker = refprotocol.refer(interfaceClass, urls.get(0));
            
        // 多個(gè)注冊中心或多個(gè)服務(wù)提供者,或者兩者混合
        } else {
            List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
            URL registryURL = null;

            // 獲取所有的 Invoker
            for (URL url : urls) {
                // 通過 refprotocol 調(diào)用 refer 構(gòu)建 Invoker,refprotocol 會在運(yùn)行時(shí)
                // 根據(jù) url 協(xié)議頭加載指定的 Protocol 實(shí)例,并調(diào)用實(shí)例的 refer 方法
                invokers.add(refprotocol.refer(interfaceClass, url));
                if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                    registryURL = url;
                }
            }
            if (registryURL != null) {
                // 如果注冊中心鏈接不為空,則將使用 AvailableCluster
                URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                // 創(chuàng)建 StaticDirectory 實(shí)例,并由 Cluster 對多個(gè) Invoker 進(jìn)行合并
                invoker = cluster.join(new StaticDirectory(u, invokers));
            } else {
                invoker = cluster.join(new StaticDirectory(invokers));
            }
        }
    }

    Boolean c = check;
    if (c == null && consumer != null) {
        c = consumer.isCheck();
    }
    if (c == null) {
        c = true;
    }
    
    // invoker 可用性檢查
    if (c && !invoker.isAvailable()) {
        throw new IllegalStateException("No provider available for the service...");
    }

    // 生成代理類
    return (T) proxyFactory.getProxy(invoker);
}
  • 簡單概括下遠(yuǎn)程引用的邏輯:讀取直連配置項(xiàng),或注冊中心 url,并將讀取到的 url 存儲到 urls 中。然后根據(jù) urls 元素?cái)?shù)量進(jìn)行后續(xù)操作。
  • 若 urls 元素?cái)?shù)量為1,則直接通過 Protocol 自適應(yīng)拓展類構(gòu)建 Invoker 實(shí)例接口。
  • 若 urls 元素?cái)?shù)量大于1,即存在多個(gè)注冊中心或服務(wù)直連 url,此時(shí)先根據(jù) url 構(gòu)建 Invoker。然后再通過 Cluster 合并多個(gè) Invoker,最后調(diào)用 ProxyFactory 生成代理類。
  • 對于上述的判斷,可以看下多注冊中心官網(wǎng)的配置樣例來輔助理解。多注冊中心

Invoker 的構(gòu)建過程以及代理類的過程比較重要,就是refprotocol.refer(type, url)這一方法的調(diào)用鏈路。

5.2只配置了一個(gè)注冊中心的遠(yuǎn)程引用:

截取上面的方法部分如下:

  // 單個(gè)注冊中心或服務(wù)提供者
        if (urls.size() == 1) {
                invoker = refprotocol.refer(interfaceClass, urls.get(0));

             // 多個(gè)注冊中心或多個(gè)服務(wù)提供者,或者兩者混合
            } else {
            //…………
        }

在上述方法中,如果只配置了一個(gè)注冊中心的話,urls屬性樣例如下(配置了直連會有不同,下面會介紹)
registry://224.5.6.7:1234/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.0&pid=26540&qos.port=33333&refer=application%3Ddemo-consumer%26check%3Dfalse%26dubbo%3D2.0.0%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D26540%26qos.port%3D33333%26register.ip%3D192.168.1.102%26side%3Dconsumer%26timestamp%3D1594622397172&registry=multicast&timestamp=1594622397254

image.png

調(diào)用 refprotocol.refer(type, url)方法,引用服務(wù),返回 Invoker 對象。又到了我們喜聞樂見的SPI自適應(yīng)特性,自動根據(jù) URL 參數(shù),獲得對應(yīng)的拓展實(shí)現(xiàn)。
在這里我們根據(jù)url可以得到調(diào)用的是RegistryProtocol,#refer(...)方法的調(diào)用順序是:
Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => RegistryProtocol

與服務(wù)暴露時(shí)類似,ProtocolFilterWrapper 用于創(chuàng)建過濾鏈,ProtocolListenerWrapper返回了一個(gè)ListenerInvokerWrapper對象,ListenerInvokerWrapper裝飾invoker, 在構(gòu)造器中遍歷listeners構(gòu)建referer的監(jiān)聽鏈,這兩個(gè)類都會放行RegistryProtocol。也就是什么都不做。

5.35.3RegistryProtocol#refer()

直接來看RegistryProtocol的refer(type, url)做了什么吧

 public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        // 獲得真實(shí)的注冊中心的 URL
        url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
        // 獲得注冊中心
        Registry registry = registryFactory.getRegistry(url);
        // TODO 芋艿
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // 將 url 查詢字符串轉(zhuǎn)為 Map,獲得服務(wù)引用配置參數(shù)集合
        // group="a,b" or group="*"
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
        String group = qs.get(Constants.GROUP_KEY);
        // 分組聚合,參見文檔 http://dubbo.io/books/dubbo-user-book/demos/group-merger.html
        if (group != null && group.length() > 0) {
            if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
                    || "*".equals(group)) {
                // 通過 SPI 加載 MergeableCluster 實(shí)例,并調(diào)用 doRefer 繼續(xù)執(zhí)行服務(wù)引用邏輯
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }
        // 執(zhí)行服務(wù)引用
        return doRefer(cluster, registry, type, url);
    }

上面代碼首先為 url 設(shè)置協(xié)議頭,然后根據(jù) url 參數(shù)從registryFactory加載注冊中心實(shí)例,如果用的是zk的協(xié)議,那么注冊器就是zk的注冊器。然后獲取 group 配置,根據(jù) group 配置決定doRefer()第一個(gè)參數(shù)的類型。這里的重點(diǎn)是 doRefer()方法

 private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        // 創(chuàng)建 RegistryDirectory 對象,并設(shè)置注冊中心和協(xié)議
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
       // 服務(wù)引用配置集合
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); 
      // 生成服務(wù)消費(fèi)者鏈接
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
        // 向注冊中心注冊自己(服務(wù)消費(fèi)者)
        if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {
            registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                    Constants.CHECK_KEY, String.valueOf(false))); 
        }
        // 向注冊中心訂閱服務(wù)提供者 + 路由規(guī)則 + 配置規(guī)則
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                        Constants.PROVIDERS_CATEGORY
                        + "," + Constants.CONFIGURATORS_CATEGORY
                        + "," + Constants.ROUTERS_CATEGORY));

        // 創(chuàng)建 Invoker 對象
        Invoker invoker = cluster.join(directory);
        // 向本地注冊表,注冊消費(fèi)者
        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
        return invoker;
    }
  • doRefer 方法創(chuàng)建一個(gè)RegistryDirectory實(shí)例,然后生成服務(wù)者消費(fèi)者鏈接,并向注冊中心進(jìn)行注冊。
  • 注冊完畢后,緊接著訂閱 providers、configurators、routers 等節(jié)點(diǎn)下的數(shù)據(jù)。完成訂閱后,RegistryDirectory 會收到這幾個(gè)節(jié)點(diǎn)下的子節(jié)點(diǎn)信息。
  • 由于一個(gè)服務(wù)可能部署在多臺服務(wù)器上,這樣就會在 providers 產(chǎn)生多個(gè)節(jié)點(diǎn),這個(gè)時(shí)候就需要 Cluster 將多個(gè)服務(wù)節(jié)點(diǎn)合并為一個(gè),并生成一個(gè) Invoker。
    注意這個(gè)cluster,他也是通過SPI機(jī)制依賴注入來的,(關(guān)于Cluster,RegistryDirectory的內(nèi)容本篇暫不討論,等我下一篇再說! )

5.45.4直連時(shí)的遠(yuǎn)程引用

上面講的是沒有配置直連的情況,這次我們改一下配置,將引用改成直連看看會咋樣。

<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService" url="dubbo://localhost:20880" />

同樣是到這一行代碼,

// 單個(gè)注冊中心或服務(wù)提供者
        if (urls.size() == 1) {
                invoker = refprotocol.refer(interfaceClass, urls.get(0));

             // 多個(gè)注冊中心或多個(gè)服務(wù)提供者,或者兩者混合
            } else {
            //…………
        }

此時(shí)的urls屬性如下
dubbo://localhost:20880/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=2144&qos.port=33333&register.ip=192.168.1.102&side=consumer&timestamp=1594651670644

image.png

注意到頭部已經(jīng)被換成dubbo了,與上文的registry類似,這邊就是調(diào)用DubboProtocol來進(jìn)行引用了,鏈路如下
Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => DubboProtocol
ProtocolFilterWrapper創(chuàng)建好過濾器鏈,ProtocolListenerWrapper開啟監(jiān)聽后,就開始調(diào)用DubboProtocol#refer()了,來看一下這個(gè)方法做了什么吧

5.5DubboProtocol#refer()

  public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        // 初始化序列化優(yōu)化器
        optimizeSerialization(url);
        // 獲得遠(yuǎn)程通信客戶端數(shù)組
        // 創(chuàng)建 DubboInvoker 對象
        // create rpc invoker.
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        // 添加到 `invokers`
        invokers.add(invoker);
        return invoker;
    }

上面方法看起來比較簡單,不過這里有一個(gè)調(diào)用需要我們注意一下,即 getClients(url)。這個(gè)方法用于獲取客戶端實(shí)例,實(shí)例類型為ExchangeClientExchangeClient實(shí)際上并不具備通信能力,它需要基于更底層的客戶端實(shí)例進(jìn)行通信。比如 NettyClient、MinaClient 等,默認(rèn)情況下,Dubbo 使用 NettyClient進(jìn)行通信。接下來,我們簡單看一下 getClients() 方法的邏輯。

5.5new DubboInvoker時(shí)的getClients(url)

private ExchangeClient[] getClients(URL url) {
    // 是否共享連接
    boolean service_share_connect = false;
    // 獲取連接數(shù),默認(rèn)為0,表示未配置
    int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
    // 如果未配置 connections,則共享連接
    if (connections == 0) {
        service_share_connect = true;
        connections = 1;
    }

    ExchangeClient[] clients = new ExchangeClient[connections];
    for (int i = 0; i < clients.length; i++) {
        if (service_share_connect) {
            // 獲取共享客戶端
            clients[i] = getSharedClient(url);
        } else {
            // 初始化新的客戶端
            clients[i] = initClient(url);
        }
    }
    return clients;
}

這里根據(jù) connections 數(shù)量決定是獲取共享客戶端還是創(chuàng)建新的客戶端實(shí)例,默認(rèn)情況下,使用共享客戶端實(shí)例。getSharedClient 方法中也會調(diào)用 initClient 方法,因此下面我們一起看一下這兩個(gè)方法,看看他是怎么共享,怎么初始化的。

5.6getClients(url)中的getSharedClient(url)

 private ExchangeClient getSharedClient(URL url) {
        // 從集合中,查找 ReferenceCountExchangeClient 對象
        String key = url.getAddress();
        ReferenceCountExchangeClient client = referenceClientMap.get(key);
        if (client != null) {
            // 若未關(guān)閉,增加指向該 Client 的數(shù)量,并返回它
            if (!client.isClosed()) {
                client.incrementAndGetCount();
                return client;
            // 若已關(guān)閉,移除
            } else {
                referenceClientMap.remove(key);
            }
        }
        // 同步,創(chuàng)建 ExchangeClient 對象。
        synchronized (key.intern()) {
            // 創(chuàng)建 ExchangeClient 對象
            ExchangeClient exchangeClient = initClient(url);
            // 將 `exchangeClient` 包裝,創(chuàng)建 ReferenceCountExchangeClient 對象
            client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
            // 添加到集合
            referenceClientMap.put(key, client);
            // 從 `ghostClientMap`移除
            ghostClientMap.remove(key);
            return client;
        }
    }
  • 上面方法先訪問緩存,若緩存未命中,則通過initClient 方法創(chuàng)建新的 ExchangeClient 實(shí)例,并將該實(shí)例傳給 ReferenceCountExchangeClient構(gòu)造方法創(chuàng)建一個(gè)帶有引用計(jì)數(shù)功能的 ExchangeClient實(shí)例。
  • ReferenceCountExchangeClient 類內(nèi)部使用引用計(jì)數(shù)的方式記錄共享的數(shù)量。
    ghostClientMap,這個(gè)名字說實(shí)話確實(shí)有點(diǎn)難理解,這個(gè)實(shí)際上是一個(gè)存儲LazyConnectExchangeClient的集合。
    這個(gè)時(shí)候我們就要理解LazyConnectExchangeClient的作用,當(dāng)服務(wù)引用時(shí),我們并不想此時(shí)就是開始通信,而是在調(diào)用的時(shí)候再與服務(wù)端通信,LazyConnectExchangeClient就像是一個(gè)緩存,在服務(wù)調(diào)用的時(shí)候才會創(chuàng)建真正的Client去連接,節(jié)省了資源.
  • LazyConnectExchangeClient 在每次數(shù)據(jù)傳輸前,先判斷tcp連接狀態(tài),若連接斷開則先執(zhí)行connect建立連接。

5.7getClients(url)中的initClient(url)

DubboProtocol#getSharedClient()也也調(diào)用了DubboProtocol#initClient(),看來是繞不過這方法了,來繼續(xù)看他的代碼。

private ExchangeClient initClient(URL url) {
        // 獲取客戶端類型,默認(rèn)為 netty
        String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
        // 校驗(yàn) Client 的 Dubbo SPI 拓展是否存在,不存在則拋出異常
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported client type: " + str + "," +
                    " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
        }

        // 設(shè)置編解碼器為 Dubbo ,即 DubboCountCodec
        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);

        // 默認(rèn)開啟 heartbeat
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));

        // 連接服務(wù)器,創(chuàng)建客戶端
        ExchangeClient client;
        try {
            // 懶連接,創(chuàng)建 LazyConnectExchangeClient 對象
            if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
                client = new LazyConnectExchangeClient(url, requestHandler);
            // 直接連接,創(chuàng)建 HeaderExchangeClient 對象
            } else {
                client = Exchangers.connect(url, requestHandler);
            }
        } catch (RemotingException e) {
            throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
        }
        return client;
    }
  • DubboProtocol的requestHandlerExchangeHandler的實(shí)現(xiàn),是remoting層接收數(shù)據(jù)后的回調(diào)。只定義了一個(gè)回復(fù)請求結(jié)果的方法,返回的是請求結(jié)果
  • initClient() 方法首先獲取用戶配置的客戶端類型,默認(rèn)為 netty。然后檢測用戶配置的客戶端類型是否存在,不存在則拋出異常。最后根據(jù) lazy 配置決定創(chuàng)建什么類型的客戶端。這里的 LazyConnectExchangeClient 代碼并不是很復(fù)雜,該類會在 request 方法被調(diào)用時(shí)通過 Exchangers#connect 方法創(chuàng)建 ExchangeClient 客戶端,該類的代碼本節(jié)就不分析了。下面我們分析一下 Exchangers#connect() 方法。

5.8initClient(url)中的自適應(yīng)Exchangers#connect()

public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handler == null) {
        throw new IllegalArgumentException("handler == null");
    }
    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
    // 獲取 Exchanger 實(shí)例,默認(rèn)為 HeaderExchangeClient
    return getExchanger(url).connect(url, handler);

Exchangers#getExchanger 會通過 SPI 加載 HeaderExchanger 實(shí)例(在Dubbo中只有這一個(gè)實(shí)例),來就來看看HeaderExchanger#connect()

public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
  // 這里包含了多個(gè)調(diào)用,分別如下:
    // 1. 創(chuàng)建 HeaderExchangeHandler 對象
    // 2. 創(chuàng)建 DecodeHandler 對象
    // 3. 通過 Transporters 構(gòu)建 Client 實(shí)例
    // 4. 創(chuàng)建 HeaderExchangeClient 對象
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

我們這里重點(diǎn)看一下 Transporters#connect 方法

public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    ChannelHandler handler;
    if (handlers == null || handlers.length == 0) {
        handler = new ChannelHandlerAdapter();
    } else if (handlers.length == 1) {
        handler = handlers[0];
    } else {
        // 如果 handler 數(shù)量大于1,則創(chuàng)建一個(gè) ChannelHandler 分發(fā)器
        handler = new ChannelHandlerDispatcher(handlers);
    }
    
    // 獲取 Transporter 自適應(yīng)拓展類,并調(diào)用 connect 方法生成 Client 實(shí)例
    return getTransporter().connect(url, handler);
}

如上,getTransporter() 方法返回的是自適應(yīng)拓展類,該類會在運(yùn)行時(shí)根據(jù)客戶端類型加載指定的 Transporter 實(shí)現(xiàn)類。若用戶未配置客戶端類型,則默認(rèn)加載 NettyTransporter,并調(diào)用該類的 connect() 方法。如下:

public Client connect(URL url, ChannelHandler listener) throws RemotingException {
    // 創(chuàng)建 NettyClient 對象
    return new NettyClient(url, listener);
}

到這里就不繼續(xù)跟下去了,在往下就是通過 Netty 提供的 API 構(gòu)建 Netty 客戶端了。

5.9多注冊中心時(shí)鏈路

else {
                // 循環(huán) `urls` ,引用服務(wù),返回 Invoker 對象
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    // 引用服務(wù)
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    // 使用最后一個(gè)注冊中心的 URL
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // use last registry url
                    }
                }
                // 有注冊中心
                if (registryURL != null) { // registry url is available
                    // 對有注冊中心的 Cluster 只用 AvailableCluster
                    // use AvailableCluster only when register's cluster is available
                    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                // 無注冊中心,全部都是服務(wù)直連
                } else { // not a registry url
                    invoker = cluster.join(new StaticDirectory(invokers));
                }
            }

可以看到多注冊中心的時(shí)候多了一步 cluster.join(),其他與上文分析的單注冊中心并無區(qū)別,僅僅是Cluster 將多個(gè)服務(wù)節(jié)點(diǎn)合并為一個(gè),并生成一個(gè) Invoker,這個(gè)我們留待下面章節(jié)來主要分析Cluster。

至此,我們的DubboProcotol#refer()也分析完畢告一段落,我們知道了他里面創(chuàng)建了一個(gè)DubboInvoker,其中有我們的Clients實(shí)例,基于此,我們才可以通過它來進(jìn)行遠(yuǎn)程調(diào)用了。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容