RPC之美團(tuán)pigeon源碼分析(三)調(diào)用方服務(wù)監(jiān)聽和調(diào)用

在此之前我們理清了pigeon服務(wù)方的初始化、注冊和消息處理邏輯,本篇我們來看看pigeon調(diào)用方的實(shí)現(xiàn)。

第一部分我們先看看服務(wù)調(diào)用的實(shí)現(xiàn)。
服務(wù)調(diào)用示例:

@RestController
@RequestMapping("/common")
public class CommonController {

    @Autowired
    private CommonService commonService;

    @RequestMapping(value = "/hello")
    @ResponseBody
    public String hello(@RequestParam("name") String name) {
        System.out.println("enter hello");
        return commonService.hello(name);
    }
}

CommonService 就是服務(wù)方發(fā)布的服務(wù)接口,可以看到在調(diào)用方只需要引入相應(yīng)服務(wù)的api jar包,就可以像調(diào)用本地方法一樣調(diào)用對應(yīng)的服務(wù)接口,這也是大部分RPC框架的實(shí)現(xiàn)效果。
CommonService 通過@Autowired注解在spring容器中找到對應(yīng)的bean,我們來看看相應(yīng)的bean配置

    <bean id="commonService" class="com.dianping.pigeon.remoting.invoker.config.spring.ReferenceBean" init-method="init">
        <!-- 服務(wù)全局唯一的標(biāo)識url,默認(rèn)是服務(wù)接口類名,必須設(shè)置 -->
        <property name="url" value="http://service.dianping.com/rpcserver/commonService_1.0.0" />
        <!-- 接口名稱,必須設(shè)置 -->
        <property name="interfaceName" value="com.study.rpcserver.api.CommonService" />
        <!-- 超時(shí)時(shí)間,毫秒,默認(rèn)5000,建議自己設(shè)置 -->
        <property name="timeout" value="2000" />
        <!-- 序列化,hessian/fst/protostuff,默認(rèn)hessian,可不設(shè)置-->
        <property name="serialize" value="hessian" />
        <!-- 調(diào)用方式,sync/future/callback/oneway,默認(rèn)sync,可不設(shè)置 -->
        <property name="callType" value="sync" />
        <!-- 失敗策略,快速失敗failfast/失敗轉(zhuǎn)移failover/失敗忽略failsafe/并發(fā)取最快返回forking,默認(rèn)failfast,可不設(shè)置 -->
        <property name="cluster" value="failfast" />
        <!-- 是否超時(shí)重試,默認(rèn)false,可不設(shè)置 -->
        <property name="timeoutRetry" value="false" />
        <!-- 重試次數(shù),默認(rèn)1,可不設(shè)置 -->
        <property name="retries" value="1" />
    </bean>

ReferenceBean繼承了spring的FactoryBean接口,來處理復(fù)雜bean的生成,通過getObject()方法來返回對應(yīng)bean實(shí)例。接下來我們就以ReferenceBean為入口來切入pigeon調(diào)用方的實(shí)現(xiàn)思路。

    public void init() throws Exception {
        if (StringUtils.isBlank(interfaceName)) {
            throw new IllegalArgumentException("invalid interface:" + interfaceName);
        }
        this.objType = ClassUtils.loadClass(this.classLoader, this.interfaceName.trim());
        //服務(wù)調(diào)用相關(guān)的配置信息,就是我們對每一個(gè)接口服務(wù)在xml文件中的配置
        InvokerConfig<?> invokerConfig = new InvokerConfig(this.objType, this.url, this.timeout, this.callType,
                this.serialize, this.callback, this.group, this.writeBufferLimit, this.loadBalance, this.cluster,
                this.retries, this.timeoutRetry, this.vip, this.version, this.protocol);
        invokerConfig.setClassLoader(classLoader);
        invokerConfig.setSecret(secret);
        invokerConfig.setRegionPolicy(regionPolicy);

        if (!CollectionUtils.isEmpty(methods)) {
            Map<String, InvokerMethodConfig> methodMap = new HashMap<String, InvokerMethodConfig>();
            invokerConfig.setMethods(methodMap);
            for (InvokerMethodConfig method : methods) {
                methodMap.put(method.getName(), method);
            }
        }

        checkMock(); // 降級配置檢查
        invokerConfig.setMock(mock);
        checkRemoteAppkey();
        invokerConfig.setRemoteAppKey(remoteAppKey);
        //生成接口的代理對象
        this.obj = ServiceFactory.getService(invokerConfig);
        configLoadBalance(invokerConfig);
    }
    //FactoryBean返回的bean實(shí)例
    public Object getObject() {
        return this.obj;
    }

ServiceFactory.getService(invokerConfig);根據(jù)配置的interfaceName生成一個(gè)java代理對象

    private static ServiceProxy serviceProxy = ServiceProxyLoader.getServiceProxy();

    public static <T> T getService(InvokerConfig<T> invokerConfig) throws RpcException {
        return serviceProxy.getProxy(invokerConfig);
    }

跟蹤代碼,進(jìn)入AbstractServiceProxy.getProxy方法,核心代碼如下:

    protected final static Map<InvokerConfig<?>, Object> services = new ConcurrentHashMap<InvokerConfig<?>, Object>();
    @Override
    public <T> T getProxy(InvokerConfig<T> invokerConfig) {
        //InvokerConfig實(shí)現(xiàn)了自定義equals和hashCode方法
        service = services.get(invokerConfig);
        if (service == null) {
            synchronized (interner.intern(invokerConfig)) {
                service = services.get(invokerConfig);
                if (service == null) {
                    //此處執(zhí)行調(diào)用方的一些初始化邏輯,包括InvokerProcessHandlerFactory.init();初始化調(diào)用方Filter責(zé)任鏈等
                    InvokerBootStrap.startup();
                    //生成代理對象
                    service = SerializerFactory.getSerializer(invokerConfig.getSerialize()).proxyRequest(invokerConfig);
                    try {
                        //獲取服務(wù)信息,創(chuàng)建Client實(shí)例
                        ClientManager.getInstance().registerClients(invokerConfig);
                    } catch (Throwable t) {
                        logger.warn("error while trying to setup service client:" + invokerConfig, t);
                    }
                    services.put(invokerConfig, service);
                }
        }
        return (T) service;
    }

AbstractSerializer.proxyRequest使用我們熟悉的JDK動態(tài)代理來生成服務(wù)接口的代理對象

    @Override
    public Object proxyRequest(InvokerConfig<?> invokerConfig) throws SerializationException {
        return Proxy.newProxyInstance(ClassUtils.getCurrentClassLoader(invokerConfig.getClassLoader()),
                new Class[] { invokerConfig.getServiceInterface() }, new ServiceInvocationProxy(invokerConfig,
                        InvokerProcessHandlerFactory.selectInvocationHandler(invokerConfig)));
    }
        //InvokerProcessHandlerFactory.selectInvocationHandler獲取調(diào)用方請求責(zé)任鏈
        public static void init() {
        if (!isInitialized) {
            if (Constants.MONITOR_ENABLE) {
                registerBizProcessFilter(new RemoteCallMonitorInvokeFilter());
            }
            registerBizProcessFilter(new TraceFilter());
            registerBizProcessFilter(new DegradationFilter());
                        //關(guān)于ClusterInvokeFilter后文詳細(xì)介紹
            registerBizProcessFilter(new ClusterInvokeFilter());
            registerBizProcessFilter(new GatewayInvokeFilter());
            registerBizProcessFilter(new ContextPrepareInvokeFilter());
            registerBizProcessFilter(new SecurityFilter());
                        //遠(yuǎn)程調(diào)用
            registerBizProcessFilter(new RemoteCallInvokeFilter());
            bizInvocationHandler = createInvocationHandler(bizProcessFilters);
            isInitialized = true;
        }
    }

    public static ServiceInvocationHandler selectInvocationHandler(InvokerConfig<?> invokerConfig) {
        return bizInvocationHandler;
    }

ServiceInvocationProxy繼承了java.lang.reflect.InvocationHandler接口,invoke實(shí)現(xiàn)邏輯如下:

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
                //代理對象的非服務(wù)方法調(diào)用走原有邏輯
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(handler, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return handler.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return handler.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return handler.equals(args[0]);
        }
                //服務(wù)接口執(zhí)行責(zé)任鏈處理邏輯
        return extractResult(handler.handle(new DefaultInvokerContext(invokerConfig, methodName, parameterTypes, args)),
                method.getReturnType());
    }

同服務(wù)端責(zé)任鏈的分析一樣,我們首先重點(diǎn)看下RemoteCallInvokeFilter的處理邏輯,核心代碼如下:

    @Override
    public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext)
            throws Throwable {
        Client client = invocationContext.getClient();
        InvocationRequest request = invocationContext.getRequest();
        InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig();    
        。。。
        //以同步調(diào)用場景分析下遠(yuǎn)程調(diào)用邏輯
        CallbackFuture future = new CallbackFuture();
        response = InvokerUtils.sendRequest(client, invocationContext.getRequest(), future);
        invocationContext.getTimeline().add(new TimePoint(TimePhase.Q));
        if (response == null) {
            response = future.getResponse(request.getTimeout());
        }        
        return response;
    }    

    public static InvocationResponse sendRequest(Client client, InvocationRequest request, Callback callback) {
        InvocationResponse response = response = client.write(request);
        return response;
    }

client.write(request);最終調(diào)用NettyClient或HttpInvokerClient的doWrite方法發(fā)送請求消息體。
至此我們理清了服務(wù)調(diào)用的邏輯,簡單來說就是通過JDK動態(tài)代理來生成服務(wù)方接口對應(yīng)的實(shí)例對象,在方法執(zhí)行邏輯中調(diào)用遠(yuǎn)程服務(wù)。

但對于每一個(gè)服務(wù)接口,調(diào)用方是如何知道遠(yuǎn)程服務(wù)的訪問地址的呢?以及新注冊或者下線的服務(wù)地址,調(diào)用方如何得到即時(shí)通知?
接下來進(jìn)入本篇第二部分,遠(yuǎn)程調(diào)用Client的初始化和調(diào)用方對服務(wù)信息的心跳監(jiān)聽。
以請求責(zé)任鏈的ClusterInvokeFilter為入口:

    public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext)
            throws Throwable {
        InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig();
                //失敗策略cluster可配,默認(rèn)為快速失敗failfast
        Cluster cluster = ClusterFactory.selectCluster(invokerConfig.getCluster());
        if (cluster == null) {
            throw new IllegalArgumentException("Unsupported cluster type:" + cluster);
        }
        return cluster.invoke(handler, invocationContext);
    }

跟蹤代碼進(jìn)入FailfastCluster.invoke方法,核心代碼如下:

    private ClientManager clientManager = ClientManager.getInstance();
    @Override
    public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext)
            throws Throwable {
        InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig();
        //構(gòu)造請求消息對象
        InvocationRequest request = InvokerUtils.createRemoteCallRequest(invocationContext, invokerConfig);
        //是否超時(shí)重試
        boolean timeoutRetry = invokerConfig.isTimeoutRetry();
        //重試次數(shù)
        int retry = invokerConfig.getRetries(invocationContext.getMethodName());
        //關(guān)于重試和重試次數(shù)的邏輯在此不做過多說明,只摘取主干代碼
        //獲取遠(yuǎn)程客戶端
        Client remoteClient = clientManager.getClient(invokerConfig, request, null);
        //就是在這里設(shè)置的RemoteCallInvokeFilter中用到的客戶端Client
        invocationContext.setClient(remoteClient);   
        try {
            //向后執(zhí)行責(zé)任鏈
            return handler.handle(invocationContext);
        } catch (NetworkException e) {
            remoteClient = clientManager.getClient(invokerConfig, request, null);
            invocationContext.setClient(remoteClient);
            return handler.handle(invocationContext);
        }     
    }

ClientManager 為單例模式,我們看看內(nèi)部實(shí)現(xiàn)

        //私有構(gòu)造函數(shù) 
    private ClientManager() {
        this.providerAvailableListener = new ProviderAvailableListener();
        this.clusterListener = new DefaultClusterListener(providerAvailableListener);
        this.clusterListenerManager.addListener(this.clusterListener);
        providerAvailableThreadPool.execute(this.providerAvailableListener);
        RegistryEventListener.addListener(providerChangeListener);
        RegistryEventListener.addListener(registryConnectionListener);
        RegistryEventListener.addListener(groupChangeListener);
        registerThreadPool.getExecutor().allowCoreThreadTimeOut(true);
    }

    private RouteManager routerManager = DefaultRouteManager.INSTANCE;

    public Client getClient(InvokerConfig<?> invokerConfig, InvocationRequest request, List<Client> excludeClients) {
                //根據(jù)全局唯一標(biāo)識url獲取Client集合
        List<Client> clientList = clusterListener.getClientList(invokerConfig);
        List<Client> clientsToRoute = new ArrayList<Client>(clientList);
        if (excludeClients != null) {
            clientsToRoute.removeAll(excludeClients);
        }
                //根據(jù)負(fù)載均衡策略選取有效的Client
                //此處細(xì)節(jié)比較多,感興趣的朋友可以自行細(xì)致瀏覽下源碼,限于篇幅不一一講解了
        return routerManager.route(clientsToRoute, invokerConfig, request);
    }

距離目標(biāo)越來越近了,我們繼續(xù)跟蹤代碼DefaultClusterListener的實(shí)現(xiàn)

    private ConcurrentHashMap<String, List<Client>> serviceClients = new ConcurrentHashMap<String, List<Client>>();

    public List<Client> getClientList(InvokerConfig<?> invokerConfig) {
        //根據(jù)url獲取對應(yīng)的Client集合
        List<Client> clientList = this.serviceClients.get(invokerConfig.getUrl());
        return clientList;
    }

問題來了,serviceClients是在什么時(shí)候創(chuàng)建的Client實(shí)例呢?
我們回顧下AbstractServiceProxy.getProxy中的一段邏輯:

                    try {
                        ClientManager.getInstance().registerClients(invokerConfig);
                    } catch (Throwable t) {
                        logger.warn("error while trying to setup service client:" + invokerConfig, t);
                    }

從異常信息我們可以清晰的看到,這里就是創(chuàng)建service client的入口,最終調(diào)用到DefaultClusterListener.addConnect添加Client映射關(guān)系到serviceClients。調(diào)用鏈路比較長,在此簡單貼一下線程調(diào)用棧:


image.png

至此我們理清了Client的創(chuàng)建,接下來我們看看調(diào)用方的心跳監(jiān)聽。
我們直接連接注冊中心zookeeper的相關(guān)類CuratorClient,用的是curator-framework-2.7.1.jar,這個(gè)ZK客戶端功能很強(qiáng)大,可以非常方便的對具體的zk節(jié)點(diǎn)添加listener回調(diào)。

    private boolean newCuratorClient() throws InterruptedException {
                //根據(jù)zk地址創(chuàng)建zkClient
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString(address)
                .sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(connectionTimeout)
                .retryPolicy(new MyRetryPolicy(retries, retryInterval)).build();
                //監(jiān)聽連接狀態(tài),掉線重連
        client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                logger.info("zookeeper state changed to " + newState);
                if (newState == ConnectionState.RECONNECTED) {
                    RegistryEventListener.connectionReconnected();
                }
                monitor.logEvent(EVENT_NAME, "zookeeper:" + newState.name().toLowerCase(), "");
            }
        });
                //監(jiān)聽change事件!?。?        client.getCuratorListenable().addListener(new CuratorEventListener(this), curatorEventListenerThreadPool);
        client.start();
        boolean isConnected = client.getZookeeperClient().blockUntilConnectedOrTimedOut();
        CuratorFramework oldClient = this.client;
        this.client = client;
        close(oldClient);
        return isConnected;
    }

CuratorEventListener繼承org.apache.curator.framework.api.CuratorListener,看下事件處理邏輯

    @Override
    public void eventReceived(CuratorFramework client, CuratorEvent curatorEvent) throws Exception {
        WatchedEvent event = (curatorEvent == null ? null : curatorEvent.getWatchedEvent());
                //過濾不敢興趣的EventType
        if (event == null
                || (event.getType() != EventType.NodeCreated && event.getType() != EventType.NodeDataChanged
                        && event.getType() != EventType.NodeDeleted && event.getType() != EventType.NodeChildrenChanged)) {
            return;
        }
        try {
                        //解析節(jié)點(diǎn)路徑并分類
            PathInfo pathInfo = parsePath(event.getPath());
                        
            if (pathInfo.type == ADDRESS) {//服務(wù)地址  
                addressChanged(pathInfo);
            } else if (pathInfo.type == WEIGHT) {//權(quán)重
                weightChanged(pathInfo);
            } else if (pathInfo.type == APP) {
                appChanged(pathInfo);
            } else if (pathInfo.type == VERSION) {
                versionChanged(pathInfo);
            } else if (pathInfo.type == PROTOCOL) {
                protocolChanged(pathInfo);
            } else if (pathInfo.type == HOST_CONFIG) {
                registryConfigChanged(pathInfo);
            }
        } catch (Throwable e) {
            logger.error("Error in ZookeeperWatcher.process()", e);
            return;
        }
    }
    /*
     * 1. Get newest value from ZK and watch again 2. Determine if changed
     * against cache 3. notify if changed 4. pay attention to group fallback
     * notification
     */
    private void addressChanged(PathInfo pathInfo) throws Exception {
        if (shouldNotify(pathInfo)) {
            String hosts = client.get(pathInfo.path);
            logger.info("Service address changed, path " + pathInfo.path + " value " + hosts);
            List<String[]> hostDetail = Utils.getServiceIpPortList(hosts);
            serviceChangeListener.onServiceHostChange(pathInfo.serviceName, hostDetail);
        }
        // Watch again
        client.watch(pathInfo.path);
    }

addressChanged難得加了注釋,判斷是否需要回調(diào),回調(diào)。

本篇到此結(jié)束,內(nèi)容較多,希望能對大家有所助益。

轉(zhuǎn)載請備注原文鏈接。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。
禁止轉(zhuǎn)載,如需轉(zhuǎn)載請通過簡信或評論聯(lián)系作者。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容