dubbo多序列化協(xié)議配置原理

一、案例

序列化代碼截圖.png

zk-providers.png

報錯截圖.png
  1. 從圖一可以看出provider端提供了兩個服務(wù),并且JsonDemoService指定序列化協(xié)議為fastjsonDemoService指定序列化協(xié)議為hessian2;
  2. 從圖二可以看出兩個服務(wù)注冊到ZK上的URL中序列化協(xié)議都是對的,證明通過@DubboService(parameters = {"serialization", "xxx"})這種方式指定序列化協(xié)議是生效的;
  3. 從圖三可以看出調(diào)用JsonDemoService報錯了,原因是使用了hessian2協(xié)議進行序列化,但是JsonDemoRequest沒有實現(xiàn)Serializable;

到這里就發(fā)現(xiàn)一個問題,provider中JsonDemoService明明指定序列化協(xié)議為fastjson,并且從注冊到ZK上的URL貌似也能印證這一點,但是為什么consumer調(diào)用時卻使用了hessian2協(xié)議???帶著這個問題我們來分析下dubbo provider和consumer之間交互時序列化協(xié)議的實現(xiàn)原理。

二、解析

涉及序列化的環(huán)節(jié) (1).png

下面對這六個節(jié)點分別進行分析

1、服務(wù)發(fā)布

服務(wù)導(dǎo)出.png

以上是服務(wù)端啟動服務(wù)導(dǎo)出時涉及到序列化的粗略流程

  1. dubbo啟動的入口DubboBootstrap.start中會先初始化一些基礎(chǔ)組件,例如,配置中心相關(guān)組件、事件監(jiān)聽、元數(shù)據(jù)相關(guān)組件等等,這些組件最終都會被緩存到ConfigManager.configsCache中(Dubbo中凡是實現(xiàn)AbstractConfig接口的類,在被Spring初始化之后,最終都會被緩存到configManager中),其中就包含ServiceBean@DubboService(parameters = {"serialization", "xxx"})中parameters也會保存在ServiceBean.parameters中。
    image.png
  2. exportServices() 方法,它是服務(wù)發(fā)布核心邏輯的入口,會遍歷ConfigManager中每一個ServiceBean進行導(dǎo)出。
  3. ServiceConfig.doExportUrlsFor1Protocol主要分為兩部分,一部分是組裝服務(wù)的 URL,另一部分就是服務(wù)發(fā)布。其中組裝服務(wù)URL時會把配置的ServiceBean.parameters拼接到URL中,即&serialization=fastjson。服務(wù)發(fā)布的部分是"registry://" 協(xié)議,會通過RegistryProtocol.export實現(xiàn)。
// JsonDemoService服務(wù)URL
dubbo://192.168.1.15:20880/org.apache.dubbo.demo.JsonDemoService?anyhost=true
&application=dubbo-demo-annotation-provider
&bind.ip=192.168.1.15
&bind.port=20880
&deprecated=false
&dubbo=2.0.2
&dynamic=true
&generic=false
&interface=org.apache.dubbo.demo.JsonDemoService
&methods=sayByeBye
&pid=39221
&release=
&serialization=fastjson
&side=provider
&timestamp=1655450646368
  1. RegistryProtocol.export主要也是分為兩部分,一部分是通過具體的DubboProtocol創(chuàng)建服務(wù);另一部分就是通過SPI接口RegistryFactory獲取到具體的Registry把服務(wù)注冊到注冊中心(ZK或Nacos等)。
  2. DubboProtocol.openServer創(chuàng)建服務(wù)傳入的參數(shù)只有一個URL,最終會創(chuàng)建Netty ServerBootstrap并經(jīng)過層層包裝成ProtocolServer,然后URL會被封裝到Netty ChannelHandler并注冊到Netty ChannelPipeline上(這部分下面會講到)。創(chuàng)建的Server會緩存在serverMap中,其中key是當(dāng)前服務(wù)的IP+dubbo協(xié)議端口號192.168.1.15:20880 -> {DubboProtocolServer@3845}也就是說同一個dubbo端口只會創(chuàng)建一個Server,并且Server對應(yīng)的URL為該端口上第一個被掃描到的服務(wù)的URL。
/**
* <host:port, ProtocolServer>
* 記錄了全部的 ProtocolServer 實例,其中的 Key 是 host 和 port 組成的字符串,
* Value 是監(jiān)聽該地址的 ProtocolServer。ProtocolServer 就是對 RemotingServer 的一層簡單封裝,表示一個服務(wù)端
*/
protected final Map<String, ProtocolServer> serverMap = new ConcurrentHashMap<>();

private void openServer(URL url) {
    String key = url.getAddress();
    boolean isServer = url.getParameter(IS_SERVER_KEY, true);
    // 只有服務(wù)端才能創(chuàng)建 ProtocolServer 并對外服務(wù)
    if (isServer) {
        // 檢查是否已有 ProtocolServer 在監(jiān)聽 URL 指定的地址
        ProtocolServer server = serverMap.get(key);
        if (server == null) {
            synchronized (this) {
                server = serverMap.get(key);
                if (server == null) {
                    serverMap.put(key, createServer(url));
                }
            }
        } else {
            // 如果已有ProtocolServer實例,則嘗試根據(jù)URL信息重置ProtocolServer
            server.reset(url);
        }
    }
}
  1. 服務(wù)注冊在register方法中,其中參數(shù)registeredProviderUrl就是最終注冊到注冊中心的dubbo協(xié)議URL,最終調(diào)用ZK zkClient.create方法向 Zookeeper 注冊服務(wù);
private void register(URL registryUrl, URL registeredProviderUrl) {
        Registry registry = registryFactory.getRegistry(registryUrl);
        registry.register(registeredProviderUrl);
}

public void doRegister(URL url) {
    try {
        zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

2、客戶端引用

服務(wù)引用.png

以上是服務(wù)引用時涉及到序列化的粗略流程

  1. DubboBootstrap.start()方法除了會調(diào)用exportServices()方法完成服務(wù)發(fā)布之外,還會調(diào)用referServices()方法完成服務(wù)引用,其中會遍歷掃描到的所有ReferenceConfig進行服務(wù)引用(主要就是創(chuàng)建代理類);

  2. RegistryProtocol.doRefer中,首先會根據(jù) URL 初始化RegistryDirectory實例,然后生成 consumer協(xié)議 Subscribe URL 并進行注冊,之后會把URL作為參數(shù)通過具體的 Registry訂閱服務(wù);

consumer://192.168.1.15/org.apache.dubbo.demo.DemoService?application=dubbo-demo-annotation-consumer
&cluster=failfast
&dubbo=2.0.2
&init=false
&interface=org.apache.dubbo.demo.DemoService
&methods=sayHello,sayHelloAsync
&pid=47044&side=consumer
&sticky=false
&timestamp=1655733277885
  1. ZookeeperRegistry.doSubscribe這里以ZK為例,首先根據(jù)consumer URL生成路徑,這里有3個路徑providers、routers、configurators;然后針對每個路徑注冊子節(jié)點監(jiān)聽器,同時返回子節(jié)點(providers子節(jié)點就是當(dāng)前引用服務(wù)的所有服務(wù)端URL);最后因為是首次引用,會觸發(fā)一次監(jiān)聽,其中providers監(jiān)聽便會創(chuàng)建對應(yīng)的客戶端NettyClient。
public void doSubscribe(final URL url, final NotifyListener listener) {
        ...
        List<URL> urls = new ArrayList<>();
        // 針對每一種category路徑進行監(jiān)聽(providers、routers、configurators)
        for (String path : toCategoriesPath(url)) {
            ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
            // 封裝監(jiān)聽邏輯
            ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
            zkClient.create(path, false);
            // 添加監(jiān)聽,并返回該目錄當(dāng)前所有的子節(jié)點,providers時為所有服務(wù)端URL
            List<String> children = zkClient.addChildListener(path, zkListener);
            if (children != null) {
                urls.addAll(toUrlsWithEmpty(url, path, children));
            }
        }
        // 如果有子節(jié)點直接觸發(fā)一次監(jiān)聽,urls就是服務(wù)端URL
        notify(url, listener, urls);
}
  1. DubboProtocol.getSharedClient創(chuàng)建Client傳入的只有URL和連接數(shù)兩個參數(shù),最終會創(chuàng)建Netty Bootstrap并經(jīng)過層層包裝成ReferenceCountExchangeClient,然后URL會被封裝到Netty ChannelHandler并注冊到Netty ChannelPipeline上。

另外還一個比較重要的是創(chuàng)建Client分為共享連接和獨享連接兩種模式:
當(dāng)使用獨享連接的時候,對每個 Service 建立固定數(shù)量的 Client,每個 Client 維護一個底層連接。如下圖所示,就是針對每個 Service 都啟動了兩個獨享連接:

獨享連接.png

當(dāng)使用共享連接的時候,會區(qū)分不同的網(wǎng)絡(luò)地址(host:port),一個地址只建立固定數(shù)量的共享連接。如下圖所示,Provider 1 暴露了多個服務(wù),Consumer 引用了 Provider 1 中的多個服務(wù),共享連接是說 Consumer 調(diào)用 Provider 1 中的多個服務(wù)時,是通過固定數(shù)量的共享 TCP 長連接進行數(shù)據(jù)傳輸,這樣就可以達到減少服務(wù)端連接數(shù)的目的。
共享連接.png

那怎么去創(chuàng)建共享連接呢?創(chuàng)建共享連接的實現(xiàn)細節(jié)是在getSharedClient()方法中,它首先從referenceClientMap緩存(Map<String, List<ReferenceCountExchangeClient>> 類型)中查詢 Key(host 和 port 拼接成的字符串)對應(yīng)的共享 Client 集合,如果查找到的 Client 集合全部可用,則直接使用這些緩存的 Client,否則要創(chuàng)建新的 Client 來補充替換緩存中不可用的 Client。

private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {
    String key = url.getAddress();
    List<ReferenceCountExchangeClient> clients = referenceClientMap.get(key);
    ...
    locks.putIfAbsent(key, new Object());
    synchronized (locks.get(key)) {
        clients = referenceClientMap.get(key);
        ...
        // If the clients is empty, then the first initialization is
        if (CollectionUtils.isEmpty(clients)) {
            clients = buildReferenceCountExchangeClientList(url, connectNum);
            referenceClientMap.put(key, clients);

        } else {
            for (int i = 0; i < clients.size(); i++) {
                ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i);
                // If there is a client in the list that is no longer available, create a new one to replace him.
                if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {
                    clients.set(i, buildReferenceCountExchangeClient(url));
                    continue;
                }

                referenceCountExchangeClient.incrementAndGetCount();
            }
        }
        locks.remove(key);
        return clients;
    }
}

dubbo默認如果沒有設(shè)置connections參數(shù),則使用共享連接,也就是說引用同一個host+port的所有服務(wù)默認只會創(chuàng)建一個Client,并且Client對應(yīng)的服務(wù)端URL為第一個被掃描到的Referance對應(yīng)服務(wù)的URL,該Client后續(xù)與服務(wù)端交互使用的序列化協(xié)議都會由這個服務(wù)端URL上的serialization決定。

3、客戶端請求

前面兩部分已經(jīng)解析了服務(wù)注冊、NettyServer、NettyClient上綁定的URL的由來以及序列化協(xié)議的設(shè)置。到這里已經(jīng)能夠解釋開頭案例中選錯序列化協(xié)議的問題了,但是還有一個疑問就是同一個服務(wù)序列化協(xié)議是如何在服務(wù)端和客戶端之間保證一致的?帶著這個疑問繼續(xù)分析下面幾個部分。

下面幾部分再分析下服務(wù)端與消費端交互過程中序列化協(xié)議是如何傳遞的。

消費端請求過程比較長,這里簡單概括下DubboInvoker之前的流程:
上層業(yè)務(wù) Bean 會被封裝成Invoker對象,然后傳入DubboProtocol.export()方法中,該 Invoker 被封裝成 DubboExporter,并保存到exporterMap集合中緩存。
DubboProtocol暴露的 ProtocolServer收到請求時,經(jīng)過一系列解碼處理,最終會到達 DubboProtocol.requestHandler 這個 ExchangeHandler 對象中,該ExchangeHandler對象會從 exporterMap 集合中取出請求的 Invoker,并調(diào)用其 invoke() 方法處理請求。
DubboProtocol.protocolBindingRefer()方法則會將底層的 ExchangeClient集合封裝成DubboInvoker,然后由上層邏輯封裝成代理對象,這樣業(yè)務(wù)層就可以像調(diào)用本地 Bean 一樣,完成遠程調(diào)用。

客戶端請求.png
  1. DubboInvoke.doInvoke方法中會委托ExchangeClient執(zhí)行Channel的request操作,這個ExchangeClient就是服務(wù)引用時創(chuàng)建的ReferenceCountExchangeClient,它里面包裝了服務(wù)URL和底層進行RPC通信的NettyClient。
protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    ...
    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = calculateTimeout(invocation, methodName);
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            return AsyncRpcResult.newDefaultAsyncResult(invocation);
        } else {
            ExecutorService executor = getCallbackExecutor(getUrl(), inv);
            CompletableFuture<AppResponse> appResponseFuture =
                    currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
            // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
            FutureContext.getContext().setCompatibleFuture(appResponseFuture);
            AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
            result.setExecutor(executor);
            return result;
        }
    } ...
}
  1. HeaderExchangeChannel.request這里會用Request模型把請求內(nèi)容RpcInvocation裝飾起來,然后發(fā)送一個Request類型的消息。
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
    Request req = new Request();
    req.setVersion(Version.getProtocolVersion());
    req.setTwoWay(true);
    req.setData(request);
    // 創(chuàng)建DefaultFuture對象,可以從future中主動獲得請求對應(yīng)的響應(yīng)信息
    DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
    try {
        channel.send(req);
    } catch (RemotingException e) {
        future.cancel();
        throw e;
    }
    return future;
}
  1. NettyChannel.send這里會委托給Netty 框架中的 Channel(與當(dāng)前的 Dubbo Channel 對象一一對應(yīng))執(zhí)行writeAndFlush方法,看過Netty代碼的話就知道這里會執(zhí)行Netty的ChannelPipeline。
ChannelFuture future = channel.writeAndFlush(message);
  1. ChannelPipeline是在打開Netty客戶端時構(gòu)建的,這里會注冊用于編碼的ChannelHandler->InternalEncoder(實現(xiàn)了Netty的MessageToByteEncoder接口)
protected void doOpen() throws Throwable {
    // 創(chuàng)建NettyClientHandler
    final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
    bootstrap = new Bootstrap();
    bootstrap.group(NIO_EVENT_LOOP_GROUP)
            .option(ChannelOption.SO_KEEPALIVE, true)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
            .channel(socketChannelClass());

    // 設(shè)置連接超時時間,這里使用到AbstractEndpoint中的connectTimeout字段
    bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000, getConnectTimeout()));
    bootstrap.handler(new ChannelInitializer<SocketChannel>() {

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            // 心跳請求的時間間隔
            int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());

            if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
                ch.pipeline().addLast("negotiation", SslHandlerInitializer.sslClientHandler(getUrl(), nettyClientHandler));
            }
            // 通過NettyCodecAdapter創(chuàng)建Netty中的編解碼器
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
            // 注冊ChannelHandler
            ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                    .addLast("decoder", adapter.getDecoder())
                    .addLast("encoder", adapter.getEncoder())
                    .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
                    .addLast("handler", nettyClientHandler);
            ....
        }
    });
}
  1. InternalEncoder最終會通過SPI接口調(diào)用到ExchangeCodec.encodeRequest方法對Request進行序列化,這里可以看到通過Channel上綁定的服務(wù)URL中設(shè)置的序列化協(xié)議獲取Serialization,再獲取ContentType的ID值,是一個byte類型的值,唯一確定一個算法。最后再通過位運算把這個ID值寫到dubbo傳輸協(xié)議頭的16-23位上。
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
    Serialization serialization = getSerialization(channel);
    // header.
    // 創(chuàng)建16字節(jié)的字節(jié)數(shù)組
    byte[] header = new byte[HEADER_LENGTH];
    // set magic number.
    // 設(shè)置前16位數(shù)據(jù),也就是設(shè)置header[0]和header[1]的數(shù)據(jù)為Magic High和Magic Low
    Bytes.short2bytes(MAGIC, header);
    // set request and serialization flag.
    // 16-23位為serialization編號,用到或運算10000000|serialization編號,例如serialization編號為11111,則為00011111
    header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
    ...
}

4、服務(wù)端接收

服務(wù)端接收.png
  1. InternalDecoder.decode跟客戶端請求時候流程類似,服務(wù)端服務(wù)發(fā)布創(chuàng)建Netty ServerBootstrap時也會向ChannelPipeline注冊用于解碼的ChannelHandler->InternalDecoder(實現(xiàn)了Netty的MessageToByteDecoder接口)
protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
        // 將ByteBuf封裝成統(tǒng)一的ChannelBuffer
        ChannelBuffer message = new NettyBackedChannelBuffer(input);
        // 拿到關(guān)聯(lián)的Channel
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        do {
            // 記錄當(dāng)前readerIndex的位置
            int saveReaderIndex = message.readerIndex();
            // 委托給Codec2進行解碼
            Object msg = codec.decode(channel, message);
            ....
        } while (message.readable());
    }
}
  1. ExchangeCodec.decode可以看到通過位運算從客戶端Request頭中取出序列化協(xié)議ID,然后跟NettyChannel、data字節(jié)流一起包裝成DecodeableRpcInvocation放到Request中data中,后續(xù)通過DecodeableRpcInvocation對請求數(shù)據(jù)進行反序列化。
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
    byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
    // get request id.
    long id = Bytes.bytes2long(header, 4);
    .....
    // decode request.
    Request req = new Request(id);
    req.setVersion(Version.getProtocolVersion());
    req.setTwoWay((flag & FLAG_TWOWAY) != 0);
    if ((flag & FLAG_EVENT) != 0) {
        req.setEvent(true);
    }
    try {
        Object data;
        DecodeableRpcInvocation inv;
        if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
            inv = new DecodeableRpcInvocation(channel, req, is, proto);
            inv.decode();
        } else {
            inv = new DecodeableRpcInvocation(channel, req,
                    new UnsafeByteArrayInputStream(readMessageData(is)), proto);
        }
        data = inv;
        req.setData(data);
    } catch (Throwable t) {
        if (log.isWarnEnabled()) {
            log.warn("Decode request failed: " + t.getMessage(), t);
        }
        // bad request
        req.setBroken(true);
        req.setData(t);
    }
    return req;
}
  1. DecodeableRpcInvocation.decode這里通過客戶端傳入的serializationType獲取Serialization對請求數(shù)據(jù)進行反序列化。
ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
                .deserialize(channel.getUrl(), input);

到這里就可以解釋同一個服務(wù)序列化協(xié)議是如何在服務(wù)端和客戶端之間保證一致的?這個疑問了,就是服務(wù)端并不會從綁定的NettyServer URL中獲取,也不會從服務(wù)端注冊到注冊中心的URL中獲取,而是從客戶端請求中獲取。

三、總結(jié)

再回到文章開頭的案例,

首先總結(jié)下選錯序列化協(xié)議的原因

dubbo消費端創(chuàng)建底層通信Client時默認使用共享連接模式,也就是說引用同一個host+port的所有服務(wù)默認只會創(chuàng)建一個Client,并且Client對應(yīng)的服務(wù)端URL為第一個被掃描到的Referance對應(yīng)服務(wù)的URL,該Client后續(xù)與服務(wù)端交互使用的序列化協(xié)議都會由這個服務(wù)端URL上的serialization決定。
針對開頭的案例,由于先掃描到了DemoService(指定hessian2序列化),就會創(chuàng)建一個綁定了DemoService URL的共享Client,后續(xù)調(diào)用這個host+port上所有的服務(wù)都會使用這個共享Client,也就是都會使用hessian2序列化。

然后提供兩個解決方案
  1. 配置多個dubbo:protocol指定不同的端口號和序列化協(xié)議,然后在注冊服務(wù)時指定具體的dubbo:protocol
<!-- 新增hessian2協(xié)議,綁定20881端口 -->
<dubbo:protocol id="dubbo2" name="dubbo" port="20881" default="false" serialization="hessian2"/>
<!-- 注冊服務(wù)指定使用dubbo2協(xié)議 -->
@DubboService(protocol="dubbo2")
  1. 如果只有個別服務(wù)需要使用特殊的序列化協(xié)議,也可以通過配置connections參數(shù),讓創(chuàng)建Client時使用獨享連接模式。
@DubboService(parameters = {"serialization", "hessian2"}, connections=1)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容