一、案例



- 從圖一可以看出provider端提供了兩個服務(wù),并且
JsonDemoService指定序列化協(xié)議為fastjson;DemoService指定序列化協(xié)議為hessian2; - 從圖二可以看出兩個服務(wù)注冊到ZK上的URL中序列化協(xié)議都是對的,證明通過
@DubboService(parameters = {"serialization", "xxx"})這種方式指定序列化協(xié)議是生效的; - 從圖三可以看出調(diào)用
JsonDemoService報錯了,原因是使用了hessian2協(xié)議進行序列化,但是JsonDemoRequest沒有實現(xiàn)Serializable;
到這里就發(fā)現(xiàn)一個問題,provider中JsonDemoService明明指定序列化協(xié)議為fastjson,并且從注冊到ZK上的URL貌似也能印證這一點,但是為什么consumer調(diào)用時卻使用了hessian2協(xié)議???帶著這個問題我們來分析下dubbo provider和consumer之間交互時序列化協(xié)議的實現(xiàn)原理。
二、解析

下面對這六個節(jié)點分別進行分析
1、服務(wù)發(fā)布

以上是服務(wù)端啟動服務(wù)導(dǎo)出時涉及到序列化的粗略流程
- dubbo啟動的入口
DubboBootstrap.start中會先初始化一些基礎(chǔ)組件,例如,配置中心相關(guān)組件、事件監(jiān)聽、元數(shù)據(jù)相關(guān)組件等等,這些組件最終都會被緩存到ConfigManager.configsCache中(Dubbo中凡是實現(xiàn)AbstractConfig接口的類,在被Spring初始化之后,最終都會被緩存到configManager中),其中就包含ServiceBean,@DubboService(parameters = {"serialization", "xxx"})中parameters也會保存在ServiceBean.parameters中。
image.png -
exportServices()方法,它是服務(wù)發(fā)布核心邏輯的入口,會遍歷ConfigManager中每一個ServiceBean進行導(dǎo)出。 -
ServiceConfig.doExportUrlsFor1Protocol主要分為兩部分,一部分是組裝服務(wù)的 URL,另一部分就是服務(wù)發(fā)布。其中組裝服務(wù)URL時會把配置的ServiceBean.parameters拼接到URL中,即&serialization=fastjson。服務(wù)發(fā)布的部分是"registry://" 協(xié)議,會通過RegistryProtocol.export實現(xiàn)。
// JsonDemoService服務(wù)URL
dubbo://192.168.1.15:20880/org.apache.dubbo.demo.JsonDemoService?anyhost=true
&application=dubbo-demo-annotation-provider
&bind.ip=192.168.1.15
&bind.port=20880
&deprecated=false
&dubbo=2.0.2
&dynamic=true
&generic=false
&interface=org.apache.dubbo.demo.JsonDemoService
&methods=sayByeBye
&pid=39221
&release=
&serialization=fastjson
&side=provider
×tamp=1655450646368
-
RegistryProtocol.export主要也是分為兩部分,一部分是通過具體的DubboProtocol創(chuàng)建服務(wù);另一部分就是通過SPI接口RegistryFactory獲取到具體的Registry把服務(wù)注冊到注冊中心(ZK或Nacos等)。 -
DubboProtocol.openServer創(chuàng)建服務(wù)傳入的參數(shù)只有一個URL,最終會創(chuàng)建Netty ServerBootstrap并經(jīng)過層層包裝成ProtocolServer,然后URL會被封裝到Netty ChannelHandler并注冊到Netty ChannelPipeline上(這部分下面會講到)。創(chuàng)建的Server會緩存在serverMap中,其中key是當(dāng)前服務(wù)的IP+dubbo協(xié)議端口號192.168.1.15:20880 -> {DubboProtocolServer@3845}。也就是說同一個dubbo端口只會創(chuàng)建一個Server,并且Server對應(yīng)的URL為該端口上第一個被掃描到的服務(wù)的URL。
/**
* <host:port, ProtocolServer>
* 記錄了全部的 ProtocolServer 實例,其中的 Key 是 host 和 port 組成的字符串,
* Value 是監(jiān)聽該地址的 ProtocolServer。ProtocolServer 就是對 RemotingServer 的一層簡單封裝,表示一個服務(wù)端
*/
protected final Map<String, ProtocolServer> serverMap = new ConcurrentHashMap<>();
private void openServer(URL url) {
String key = url.getAddress();
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
// 只有服務(wù)端才能創(chuàng)建 ProtocolServer 并對外服務(wù)
if (isServer) {
// 檢查是否已有 ProtocolServer 在監(jiān)聽 URL 指定的地址
ProtocolServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
}
}
} else {
// 如果已有ProtocolServer實例,則嘗試根據(jù)URL信息重置ProtocolServer
server.reset(url);
}
}
}
- 服務(wù)注冊在
register方法中,其中參數(shù)registeredProviderUrl就是最終注冊到注冊中心的dubbo協(xié)議URL,最終調(diào)用ZK zkClient.create方法向 Zookeeper 注冊服務(wù);
private void register(URL registryUrl, URL registeredProviderUrl) {
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registeredProviderUrl);
}
public void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
2、客戶端引用

以上是服務(wù)引用時涉及到序列化的粗略流程
DubboBootstrap.start()方法除了會調(diào)用exportServices()方法完成服務(wù)發(fā)布之外,還會調(diào)用referServices()方法完成服務(wù)引用,其中會遍歷掃描到的所有ReferenceConfig進行服務(wù)引用(主要就是創(chuàng)建代理類);RegistryProtocol.doRefer中,首先會根據(jù) URL 初始化RegistryDirectory實例,然后生成 consumer協(xié)議 Subscribe URL 并進行注冊,之后會把URL作為參數(shù)通過具體的Registry訂閱服務(wù);
consumer://192.168.1.15/org.apache.dubbo.demo.DemoService?application=dubbo-demo-annotation-consumer
&cluster=failfast
&dubbo=2.0.2
&init=false
&interface=org.apache.dubbo.demo.DemoService
&methods=sayHello,sayHelloAsync
&pid=47044&side=consumer
&sticky=false
×tamp=1655733277885
-
ZookeeperRegistry.doSubscribe這里以ZK為例,首先根據(jù)consumer URL生成路徑,這里有3個路徑providers、routers、configurators;然后針對每個路徑注冊子節(jié)點監(jiān)聽器,同時返回子節(jié)點(providers子節(jié)點就是當(dāng)前引用服務(wù)的所有服務(wù)端URL);最后因為是首次引用,會觸發(fā)一次監(jiān)聽,其中providers監(jiān)聽便會創(chuàng)建對應(yīng)的客戶端NettyClient。
public void doSubscribe(final URL url, final NotifyListener listener) {
...
List<URL> urls = new ArrayList<>();
// 針對每一種category路徑進行監(jiān)聽(providers、routers、configurators)
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
// 封裝監(jiān)聽邏輯
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
zkClient.create(path, false);
// 添加監(jiān)聽,并返回該目錄當(dāng)前所有的子節(jié)點,providers時為所有服務(wù)端URL
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 如果有子節(jié)點直接觸發(fā)一次監(jiān)聽,urls就是服務(wù)端URL
notify(url, listener, urls);
}
-
DubboProtocol.getSharedClient創(chuàng)建Client傳入的只有URL和連接數(shù)兩個參數(shù),最終會創(chuàng)建Netty Bootstrap并經(jīng)過層層包裝成ReferenceCountExchangeClient,然后URL會被封裝到Netty ChannelHandler并注冊到Netty ChannelPipeline上。
另外還一個比較重要的是創(chuàng)建Client分為共享連接和獨享連接兩種模式:
當(dāng)使用獨享連接的時候,對每個 Service 建立固定數(shù)量的 Client,每個 Client 維護一個底層連接。如下圖所示,就是針對每個 Service 都啟動了兩個獨享連接:

當(dāng)使用共享連接的時候,會區(qū)分不同的網(wǎng)絡(luò)地址(host:port),一個地址只建立固定數(shù)量的共享連接。如下圖所示,Provider 1 暴露了多個服務(wù),Consumer 引用了 Provider 1 中的多個服務(wù),共享連接是說 Consumer 調(diào)用 Provider 1 中的多個服務(wù)時,是通過固定數(shù)量的共享 TCP 長連接進行數(shù)據(jù)傳輸,這樣就可以達到減少服務(wù)端連接數(shù)的目的。

那怎么去創(chuàng)建共享連接呢?創(chuàng)建共享連接的實現(xiàn)細節(jié)是在
getSharedClient()方法中,它首先從referenceClientMap緩存(Map<String, List<ReferenceCountExchangeClient>> 類型)中查詢 Key(host 和 port 拼接成的字符串)對應(yīng)的共享 Client 集合,如果查找到的 Client 集合全部可用,則直接使用這些緩存的 Client,否則要創(chuàng)建新的 Client 來補充替換緩存中不可用的 Client。
private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {
String key = url.getAddress();
List<ReferenceCountExchangeClient> clients = referenceClientMap.get(key);
...
locks.putIfAbsent(key, new Object());
synchronized (locks.get(key)) {
clients = referenceClientMap.get(key);
...
// If the clients is empty, then the first initialization is
if (CollectionUtils.isEmpty(clients)) {
clients = buildReferenceCountExchangeClientList(url, connectNum);
referenceClientMap.put(key, clients);
} else {
for (int i = 0; i < clients.size(); i++) {
ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i);
// If there is a client in the list that is no longer available, create a new one to replace him.
if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {
clients.set(i, buildReferenceCountExchangeClient(url));
continue;
}
referenceCountExchangeClient.incrementAndGetCount();
}
}
locks.remove(key);
return clients;
}
}
dubbo默認如果沒有設(shè)置connections參數(shù),則使用共享連接,也就是說引用同一個host+port的所有服務(wù)默認只會創(chuàng)建一個Client,并且Client對應(yīng)的服務(wù)端URL為第一個被掃描到的Referance對應(yīng)服務(wù)的URL,該Client后續(xù)與服務(wù)端交互使用的序列化協(xié)議都會由這個服務(wù)端URL上的serialization決定。
3、客戶端請求
前面兩部分已經(jīng)解析了服務(wù)注冊、NettyServer、NettyClient上綁定的URL的由來以及序列化協(xié)議的設(shè)置。到這里已經(jīng)能夠解釋開頭案例中選錯序列化協(xié)議的問題了,但是還有一個疑問就是同一個服務(wù)序列化協(xié)議是如何在服務(wù)端和客戶端之間保證一致的?帶著這個疑問繼續(xù)分析下面幾個部分。
下面幾部分再分析下服務(wù)端與消費端交互過程中序列化協(xié)議是如何傳遞的。
消費端請求過程比較長,這里簡單概括下DubboInvoker之前的流程:
上層業(yè)務(wù) Bean 會被封裝成Invoker對象,然后傳入DubboProtocol.export()方法中,該 Invoker 被封裝成DubboExporter,并保存到exporterMap集合中緩存。
在DubboProtocol暴露的ProtocolServer收到請求時,經(jīng)過一系列解碼處理,最終會到達DubboProtocol.requestHandler這個ExchangeHandler對象中,該ExchangeHandler對象會從exporterMap集合中取出請求的 Invoker,并調(diào)用其 invoke() 方法處理請求。
DubboProtocol.protocolBindingRefer()方法則會將底層的ExchangeClient集合封裝成DubboInvoker,然后由上層邏輯封裝成代理對象,這樣業(yè)務(wù)層就可以像調(diào)用本地 Bean 一樣,完成遠程調(diào)用。

-
DubboInvoke.doInvoke方法中會委托ExchangeClient執(zhí)行Channel的request操作,這個ExchangeClient就是服務(wù)引用時創(chuàng)建的ReferenceCountExchangeClient,它里面包裝了服務(wù)URL和底層進行RPC通信的NettyClient。
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
...
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = calculateTimeout(invocation, methodName);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
result.setExecutor(executor);
return result;
}
} ...
}
-
HeaderExchangeChannel.request這里會用Request模型把請求內(nèi)容RpcInvocation裝飾起來,然后發(fā)送一個Request類型的消息。
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
// 創(chuàng)建DefaultFuture對象,可以從future中主動獲得請求對應(yīng)的響應(yīng)信息
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
-
NettyChannel.send這里會委托給Netty 框架中的 Channel(與當(dāng)前的 Dubbo Channel 對象一一對應(yīng))執(zhí)行writeAndFlush方法,看過Netty代碼的話就知道這里會執(zhí)行Netty的ChannelPipeline。
ChannelFuture future = channel.writeAndFlush(message);
- ChannelPipeline是在打開Netty客戶端時構(gòu)建的,這里會注冊用于編碼的
ChannelHandler->InternalEncoder(實現(xiàn)了Netty的MessageToByteEncoder接口)
protected void doOpen() throws Throwable {
// 創(chuàng)建NettyClientHandler
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
bootstrap = new Bootstrap();
bootstrap.group(NIO_EVENT_LOOP_GROUP)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
.channel(socketChannelClass());
// 設(shè)置連接超時時間,這里使用到AbstractEndpoint中的connectTimeout字段
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000, getConnectTimeout()));
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 心跳請求的時間間隔
int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
ch.pipeline().addLast("negotiation", SslHandlerInitializer.sslClientHandler(getUrl(), nettyClientHandler));
}
// 通過NettyCodecAdapter創(chuàng)建Netty中的編解碼器
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
// 注冊ChannelHandler
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
.addLast("handler", nettyClientHandler);
....
}
});
}
- InternalEncoder最終會通過SPI接口調(diào)用到
ExchangeCodec.encodeRequest方法對Request進行序列化,這里可以看到通過Channel上綁定的服務(wù)URL中設(shè)置的序列化協(xié)議獲取Serialization,再獲取ContentType的ID值,是一個byte類型的值,唯一確定一個算法。最后再通過位運算把這個ID值寫到dubbo傳輸協(xié)議頭的16-23位上。
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
Serialization serialization = getSerialization(channel);
// header.
// 創(chuàng)建16字節(jié)的字節(jié)數(shù)組
byte[] header = new byte[HEADER_LENGTH];
// set magic number.
// 設(shè)置前16位數(shù)據(jù),也就是設(shè)置header[0]和header[1]的數(shù)據(jù)為Magic High和Magic Low
Bytes.short2bytes(MAGIC, header);
// set request and serialization flag.
// 16-23位為serialization編號,用到或運算10000000|serialization編號,例如serialization編號為11111,則為00011111
header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
...
}
4、服務(wù)端接收

-
InternalDecoder.decode跟客戶端請求時候流程類似,服務(wù)端服務(wù)發(fā)布創(chuàng)建Netty ServerBootstrap時也會向ChannelPipeline注冊用于解碼的ChannelHandler->InternalDecoder(實現(xiàn)了Netty的MessageToByteDecoder接口)
protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
// 將ByteBuf封裝成統(tǒng)一的ChannelBuffer
ChannelBuffer message = new NettyBackedChannelBuffer(input);
// 拿到關(guān)聯(lián)的Channel
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
do {
// 記錄當(dāng)前readerIndex的位置
int saveReaderIndex = message.readerIndex();
// 委托給Codec2進行解碼
Object msg = codec.decode(channel, message);
....
} while (message.readable());
}
}
-
ExchangeCodec.decode可以看到通過位運算從客戶端Request頭中取出序列化協(xié)議ID,然后跟NettyChannel、data字節(jié)流一起包裝成DecodeableRpcInvocation放到Request中data中,后續(xù)通過DecodeableRpcInvocation對請求數(shù)據(jù)進行反序列化。
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
// get request id.
long id = Bytes.bytes2long(header, 4);
.....
// decode request.
Request req = new Request(id);
req.setVersion(Version.getProtocolVersion());
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
if ((flag & FLAG_EVENT) != 0) {
req.setEvent(true);
}
try {
Object data;
DecodeableRpcInvocation inv;
if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
inv = new DecodeableRpcInvocation(channel, req, is, proto);
inv.decode();
} else {
inv = new DecodeableRpcInvocation(channel, req,
new UnsafeByteArrayInputStream(readMessageData(is)), proto);
}
data = inv;
req.setData(data);
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn("Decode request failed: " + t.getMessage(), t);
}
// bad request
req.setBroken(true);
req.setData(t);
}
return req;
}
-
DecodeableRpcInvocation.decode這里通過客戶端傳入的serializationType獲取Serialization對請求數(shù)據(jù)進行反序列化。
ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
.deserialize(channel.getUrl(), input);
到這里就可以解釋同一個服務(wù)序列化協(xié)議是如何在服務(wù)端和客戶端之間保證一致的?這個疑問了,就是服務(wù)端并不會從綁定的NettyServer URL中獲取,也不會從服務(wù)端注冊到注冊中心的URL中獲取,而是從客戶端請求中獲取。
三、總結(jié)
再回到文章開頭的案例,
首先總結(jié)下選錯序列化協(xié)議的原因
dubbo消費端創(chuàng)建底層通信Client時默認使用共享連接模式,也就是說引用同一個host+port的所有服務(wù)默認只會創(chuàng)建一個Client,并且Client對應(yīng)的服務(wù)端URL為第一個被掃描到的Referance對應(yīng)服務(wù)的URL,該Client后續(xù)與服務(wù)端交互使用的序列化協(xié)議都會由這個服務(wù)端URL上的serialization決定。
針對開頭的案例,由于先掃描到了DemoService(指定hessian2序列化),就會創(chuàng)建一個綁定了DemoService URL的共享Client,后續(xù)調(diào)用這個host+port上所有的服務(wù)都會使用這個共享Client,也就是都會使用hessian2序列化。
然后提供兩個解決方案
- 配置多個
dubbo:protocol指定不同的端口號和序列化協(xié)議,然后在注冊服務(wù)時指定具體的dubbo:protocol。
<!-- 新增hessian2協(xié)議,綁定20881端口 -->
<dubbo:protocol id="dubbo2" name="dubbo" port="20881" default="false" serialization="hessian2"/>
<!-- 注冊服務(wù)指定使用dubbo2協(xié)議 -->
@DubboService(protocol="dubbo2")
- 如果只有個別服務(wù)需要使用特殊的序列化協(xié)議,也可以通過配置
connections參數(shù),讓創(chuàng)建Client時使用獨享連接模式。
@DubboService(parameters = {"serialization", "hessian2"}, connections=1)
