Java 服務(wù)調(diào)用全流程追蹤 簡易實(shí)現(xiàn)方案

undefined

前言

前段時(shí)間,本人一直協(xié)助項(xiàng)目組在做系統(tǒng)的重構(gòu),系統(tǒng)應(yīng)用被拆分成了多個(gè)服務(wù),部分服務(wù)做了集群部署。隨著上述架構(gòu)的演進(jìn),自然而然的引進(jìn)了ELK + Filebeat 做日志收集。但是在使用Kibana查看日志時(shí),由于缺少TraceID,導(dǎo)致開發(fā)人員很難篩選出指定請求的相關(guān)日志,也很難追蹤應(yīng)用對下游服務(wù)的調(diào)用過程,耗費(fèi)了很多時(shí)間。我自己查過幾次問題之后,實(shí)在受不了每次要花這么久的時(shí)間,就趕緊向主管提了這一次的改造。

本篇文章主要是記錄本人對項(xiàng)目TraceID鏈路追蹤改造的解決方案的研究、遇到的問題和具體的實(shí)現(xiàn),同時(shí)本次改造也加深了我自己對分布式服務(wù)追蹤的一些理解,我也寫在了里面。

本文主要內(nèi)容:

  • 初步實(shí)現(xiàn)
  • 異步線程traceId丟失的問題
  • 面向 Dubbo RPC 鏈路追蹤
  • 面向 HTTP Service 鏈路追蹤
  • 思考 SpringCloud Sleuth 的實(shí)現(xiàn)
  • 小結(jié)

一、初步實(shí)現(xiàn)

大體的思路就是借助slf4j的MDC功能 + Spring Interceptor,當(dāng)外部請求進(jìn)入時(shí)生成一個(gè)traceId放在MDC當(dāng)中。

MDC

這里簡單介紹一下MDC。

MDC(Mapped Diagnostic Context,映射調(diào)試上下文)是 log4j 和 logback 提供的一種方便在多線程條件下記錄日志的功能。MDC 可以看成是一個(gè)與當(dāng)前線程綁定的Map,可以往其中添加鍵值對。MDC 中包含的內(nèi)容可以被同一線程中執(zhí)行的代碼所訪問。當(dāng)前線程的子線程會繼承其父線程中的 MDC 的內(nèi)容。當(dāng)需要記錄日志時(shí),只需要從 MDC 中獲取所需的信息即可。MDC 的內(nèi)容則由程序在適當(dāng)?shù)臅r(shí)候保存進(jìn)去。對于一個(gè) Web 應(yīng)用來說,通常是在請求被處理的最開始保存這些數(shù)據(jù)。

簡單來說,MDC就是日志框架提供的一個(gè)InheritableThreadLocal,項(xiàng)目代碼中可以將鍵值對放入其中,在打印的時(shí)候從ThreadLocal中獲取到對應(yīng)的值然后打印出來。詳細(xì)的原理本文就不贅述了。看下 log4j 和 logback 里面的實(shí)現(xiàn)類就知道了。

實(shí)現(xiàn)

  1. 自定義Spring 攔截器 TraceInterceptor
/**
 * @author Richard_yyf
 */
public class TraceInterceptor extends HandlerInterceptorAdapter {

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        // 清空
        MDC.clear();

        ThreadMdcUtil.setTraceIdIfAbsent();

        //后續(xù)邏輯... ...
        return true;
    }
}
  1. 注冊 攔截器
/**
 * @author Richard_yyf
 */
@Configuration
public class WebMvcConfig implements WebMvcConfigurer {

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(traceInterceptor())
                .addPathPatterns("/**")
                .order(0);
    }

    @Bean
    public TraceInterceptor traceInterceptor() {
        return new TraceInterceptor();
    }

}

ThreadMdcUtil是我自己封裝的一個(gè)工具類,包裝了對 TraceId 的一些操作:

public class ThreadMdcUtil {
    
    public static String createTraceId() {
        String uuid = UUID.randomUUID().toString();
        return DigestUtils.md5Hex(uuid).substring(8, 24);
    }

    public static void setTraceIdIfAbsent() {
        if (MDC.get(TRACE_ID) == null) {
            MDC.put(TRACE_ID, createTraceId());
        }
    }
    // 省略了一些方法在后面會展示出來
}

DigestUtils來自于第三方依賴:

<dependency>
    <groupId>commons-codec</groupId>
    <artifactId>commons-codec</artifactId>
    <version>***</version>
</dependency>

TRACE_ID放在 Constant類中方便引用:

public class Constant {
    ...
   public static final String TRACE_ID = "traceId";
    ...
}
  1. 在日志配置文件中修改輸出格式,增加TraceID字段的打印

    取值方式:%X{traceid}

image.png

結(jié)果

通過上面的步驟之后,你的web應(yīng)用接收到請求后打印的日志就會帶上TraceId。

image.png

二、遇上線程池 TraceID 丟失的問題

前面的方案只是簡單實(shí)現(xiàn)了我們的最基礎(chǔ)的需求。但是如果你真的使用起來,會發(fā)現(xiàn)異步的任務(wù)線程是沒有獲取到TraceID的。

一個(gè)成熟的應(yīng)用肯定會用到很多的線程池。常見的有@Async異步調(diào)用的線程池,應(yīng)用自身定義的一些線程池等等。

前面有稍微提到過,MDC是通過InheritableThreadLocal實(shí)現(xiàn)的,創(chuàng)建子線程時(shí),會復(fù)制父線程的inheritableThreadLocals屬性。但是在線程池中,線程是復(fù)用的,而不是新創(chuàng)建的,所以MDC內(nèi)容就無法傳遞進(jìn)去。

所以我們就需要曲線救國,既然線程是復(fù)用的,那我們理所當(dāng)然的就能想到在任務(wù)提交至線程池的時(shí)候做一些“騷”操作,來講MDC的內(nèi)容傳遞下去。

改造

這里就直接放上代碼:

/**
 * @author Richard_yyf
 */
public class ThreadMdcUtil {
    
    public static String createTraceId() {
        String uuid = UUID.randomUUID().toString();
        return DigestUtils.md5Hex(uuid).substring(8, 24);
    }

    public static void setTraceIdIfAbsent() {
        if (MDC.get(TRACE_ID) == null) {
            MDC.put(TRACE_ID, createTraceId());
        }
    }

    public static void setTraceId() {
        MDC.put(TRACE_ID, createTraceId());
    }

    public static void setTraceId(String traceId) {
        MDC.put(TRACE_ID, traceId);
    }

    public static <T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> context) {
        return () -> {
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            setTraceIdIfAbsent();
            try {
                return callable.call();
            } finally {
                MDC.clear();
            }
        };
    }

    public static Runnable wrap(final Runnable runnable, final Map<String, String> context) {
        return () -> {
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            setTraceIdIfAbsent();
            try {
                runnable.run();
            } finally {
                MDC.clear();
            }
        };
    }
}

自己包裝擴(kuò)展 ThreadPoolExecutor

/**
 * @author Richard_yyf
 */
public class ThreadPoolExecutorMdcWrapper extends ThreadPoolExecutor {

    public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
                                        RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    @Override
    public void execute(Runnable task) {
        super.execute(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()), result);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }

    @Override
    public Future<?> submit(Runnable task) {
        return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }
}

使用

具體的使用就是把你原來executor = new ThreadPoolExecutor(...)改成executor = new ThreadPoolExecutorMdcWrapper(...)即可。

比如你是用Spring @Async異步方法的,在配置線程池的時(shí)候就這樣聲明:

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @EnableAsync
    @Configuration
    class TaskPoolConfig {

        @Bean("taskExecutor")
        public Executor taskExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolExecutorMdcWrapper();
            executor.setCorePoolSize(10);
            executor.setMaxPoolSize(20);
            executor.setQueueCapacity(200);
            executor.setKeepAliveSeconds(60);
            return executor;
        }
    }

}

結(jié)果

按照上述步驟,你的異步任務(wù)在打印日志的時(shí)候,就會帶上原本請求的TraceID了。

image.png

三、面向 Dubbo RPC 鏈路追蹤

我們項(xiàng)目組主要使用Dubbo進(jìn)行微服務(wù)框架的開發(fā)。我們想在服務(wù)調(diào)用之間,傳遞上游服務(wù)的TraceID,來達(dá)到鏈路追蹤的效果。

Dubbo 提供了這樣的機(jī)制,可以通過Dubbo RPC + Dubbo Filter 來設(shè)置和傳遞消費(fèi)者的TraceID。

詳見官網(wǎng)對于這兩個(gè)概念的說明。

Dubbo RPC
Dubbo Filter

這邊我直接給出代碼和擴(kuò)展點(diǎn)配置。

Dubbo Filter for Consumer

消費(fèi)者應(yīng)用端:

/**
 * @author Richard_yyf
 */
@Activate(group = {Constants.CONSUMER})
public class ConsumerRpcTraceFilter implements Filter {

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        //如果MDC上下文有追蹤ID,則原樣傳遞給provider端
        String traceId = MDC.get(TRACE_ID);
        if (StringUtils.isNotEmpty(traceId)) {
            RpcContext.getContext().setAttachment(TRACE_ID, traceId);
        }
        return invoker.invoke(invocation);
    }

}

SPI 配置

resources目錄下,創(chuàng)建/META-INF/dubbo/com.alibaba.dubbo.rpc.Filter文件.

consumerRpcTraceFilter=com.xxx.xxx.filter.ConsumerRpcTraceFilter
image.png

Dubbo Filter for Provider

服務(wù)提供者應(yīng)用端:

/**
 * @author Richard_yyf
 */
@Activate(group = {Constants.PROVIDER})
public class ProviderRpcTraceFilter implements Filter {
    
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        // 接收消費(fèi)端的traceId
        String traceId = RpcContext.getContext().getAttachment(TRACE_ID);
        if (StringUtils.isBlank(traceId)) {
            traceId = ThreadMdcUtil.createTraceId();
        }

        // 設(shè)置日志traceId
        ThreadMdcUtil.setTraceId(traceId);

        // TODO 如果這個(gè)服務(wù)還會調(diào)用下一個(gè)服務(wù),需要再次設(shè)置下游參數(shù)
        // RpcContext.getContext().setAttachment("trace_id", traceId);

        try {
            return invoker.invoke(invocation);
        } finally {
            // 調(diào)用完成后移除MDC屬性
            MDC.remove(TRACE_ID);
        }
    }

}

SPI 配置:

providerRpcTraceFilter=com.xxx.xxx.filter.ProviderRpcTraceFilter

四、面向 HTTP Service 鏈路追蹤

除了Dubbo RPC 的這種方式,常見微服務(wù)之間的調(diào)用也有通過 HTTP REST 來完成調(diào)用的。這種場景下就需要在上游服務(wù)在發(fā)起HTTP調(diào)用的時(shí)候自動將 TraceID添加到 HTTP Header 中。

以常用的 Spring RestTemplate 為例,使用攔截器來包裝 HTTP Header。

        RestTemplate restTemplate = new RestTemplate();

        List<ClientHttpRequestInterceptor> list = new ArrayList<>();
        list.add((request, body, execution) -> {
            String traceId = MDC.get(TRACE_ID);
            if (StringUtils.isNotEmpty(traceId)) {
                request.getHeaders().add(TRACE_ID, traceId);
            }
            return execution.execute(request, body);
        });

        restTemplate.setInterceptors(list);

下游服務(wù)由于是通過HTTP 接口暴露的服務(wù),就添加一個(gè)攔截器來獲取就好。

public class TraceInterceptor extends HandlerInterceptorAdapter {

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
        MDC.clear();
        String traceId = request.getHeader(TRACE_ID);
        if (StringUtils.isEmpty(traceId)) {
            ThreadMdcUtil.setTraceId();
        } else {
            MDC.put(TRACE_ID, traceId);
        }
        return true;
    }
}

五、思考 Spring Cloud Sleuth 的實(shí)現(xiàn)

經(jīng)過上面的幾個(gè)步驟,我們相當(dāng)于是自己形成了一個(gè)比較基礎(chǔ)的服務(wù)追蹤的解決方案。

Spring Cloud 作為一個(gè)一站式 微服務(wù)開發(fā)框架,提供了Spring Cloud Sleuth 作為 該技術(shù)體系下分布式跟蹤的解決方案。這里我想拿出來講一講。

Sleuth 是一個(gè)成熟的技術(shù)解決方案,基于 Google Dapper 為理論基礎(chǔ)實(shí)現(xiàn),里面的一些術(shù)語都來自于那篇論文。在對于TraceID傳遞的問題上,我們上面講的簡單版的解決方案的一些解決問題的思路,實(shí)際上在Sleuth 中也有體現(xiàn)。

首先就是分布式追蹤,Sleuth 會將 SpanIDTraceID添加到 Slf4J MDC 中,這樣在打印出來的日志就會有帶上對應(yīng)的標(biāo)識。

在遇到線程池 TraceID 傳遞失效的問題時(shí),我們相當(dāng)了對提交任務(wù)的操作進(jìn)行包裝,而在Slueth 中,是通過實(shí)現(xiàn)HystrixConcurrencyStrategy接口來解決 TraceID異步傳遞的問題。Hystrix在實(shí)際調(diào)用時(shí),會調(diào)用HystrixConcurrencyStrategywrapCallable 方法。通過實(shí)現(xiàn)這個(gè)接口,在wrapCallable 中將TraceID存放起來(具體參見SleuthHystrixConcurrencyStrategy)。

在面對Dubbo RPC 的調(diào)用方式和 Http Service 的調(diào)用方式中,我們通過Dubbo RpcContext + Filter和 Http Header + Interceptor 的方式,通過協(xié)議或者框架本身提供的擴(kuò)展點(diǎn)和上下文機(jī)制,來傳遞TraceID。而在 Spring Cloud Sleuth中追蹤@Async,RestTemplate,Zuul,Feign等組件時(shí),也是類似的解決思路。比如追蹤RestTemplate就是和上文一樣借用了Spring Client的 Interceptor 機(jī)制 (@See TraceRestTemplateInterceptor)。

上述就是將我們的簡單解決方案和 Spring Cloud Sleuth 的對比,想說明日志追蹤的思想和一些技術(shù)解決思路是共通相近的。

當(dāng)然,Spring Cloud Sleuth 基于 Dapper 實(shí)現(xiàn),提供了一個(gè)比較成熟的分布式系統(tǒng)調(diào)用追蹤架構(gòu),集成ZipKin + spring-cloud-sleuth-zipkin 依賴之后,能夠搭建一個(gè)完整的具有數(shù)據(jù)收集、數(shù)據(jù)存儲和數(shù)據(jù)展示功能的分布式服務(wù)追蹤系統(tǒng)。

通過Sleuth可以很清楚的了解到一個(gè)服務(wù)請求經(jīng)過了哪些服務(wù),每個(gè)服務(wù)處理花費(fèi)了多長。從而讓我們可以很方便的理清各微服務(wù)間的調(diào)用關(guān)系。此外Sleuth可以幫助我們:

  • 耗時(shí)分析: 通過Sleuth可以很方便的了解到每個(gè)采樣請求的耗時(shí),從而分析出哪些服務(wù)調(diào)用比較耗時(shí);
  • 可視化錯誤: 對于程序未捕捉的異常,可以通過集成Zipkin服務(wù)界面上看到;
  • 鏈路優(yōu)化: 對于調(diào)用比較頻繁的服務(wù),可以針對這些服務(wù)實(shí)施一些優(yōu)化措施。

PS:spring-cloud-sleth 2.0 中開始 正式支持 Dubbo,思路的話則是通過Dubbo filter 擴(kuò)展機(jī)制。

小結(jié)

再講講為什么不引入Sleuth + ZipKin 這種解決方案呢?因?yàn)槲覀兿到y(tǒng)的調(diào)用鏈路并不復(fù)雜,一般只有一層的調(diào)用關(guān)系,所以并不希望增加第三方的組件,更傾向于使用簡單的解決方案。

本篇文章到這里就結(jié)束了。實(shí)現(xiàn)一個(gè)簡單的微服務(wù)調(diào)用追蹤的日志方案并沒有太大的難度,重要的還是解決問題的思路,并且觸類旁通,去學(xué)習(xí)一些市面上的已存在的優(yōu)秀技術(shù)解決方案。

如果本文有幫助到你,希望能點(diǎn)個(gè)贊,這是對我的最大動力。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容