1.前言

全链路追踪大体原理是在请求中加上一个agent,然后每个系统都读取这个agent进行日志打印,最后收集日志进行分析。

可以使用skywalking,也可以自己通过ThreadLocal自己实现。具体原理后续再做阐述。本文主要解决的是在多线程中,如何保证链路ID继续传递。

最简单的办法,在声明线程池时,将TraceID加入即可。如果使用skywalking可以用RunnableWrapper.of()进行调用。如果使用ThreadLocal或者slf4j的MDC实现的话,直接对应设置就行。

但是业务一般不关心TraceID的设置,有没有一种办法能让业务开发不手动写入TraceID吗?我们参考springboot封装的ThreadPoolTaskExecutor,自己封装一个线程池,对于直接new创建线程的情况不考虑(实际应用中应该避免这种用法)。对于单个线程,对Thread类进行一个代理,在代理中设置。从理论上是能解决问题的。

以下代码仅提供思路,代码没有在实际项目中应用过,如果需要使用请注意是否有bug

2.封装ThreadPoolTaskExecutor

public class ThreadPoolExecutorMdcWrapper extends ThreadPoolTaskExecutor {

    @Override
    public void execute(@NotNull Runnable task) {
        super.execute(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }

    @NotNull
    @Override
    public <T> Future<T> submit(@NotNull Callable<T> task) {
        return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }

    @NotNull
    @Override
    public Future<?> submit(@NotNull Runnable task) {
        return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }
}

ID生成

public class TraceIdUtil {
    public static String getTraceId() {
        return UUID.randomUUID().toString().replaceAll("-", "").toUpperCase();
    }
}

MDC赋值。MDC是slf4j日志框架提供的在多线程环境下进行日志调用链路跟踪的一个类,设置相应的占位符之后即可到logback.xml配置文件中进行配置即可,具体使用可以参考官方文档。

public class ThreadMdcUtil {
    private final static String TRACE_ID = "traceId";

    public static void setTraceIdIfAbsent() {
        if (MDC.get(TRACE_ID) == null) {
            MDC.put(TRACE_ID, TraceIdUtil.getTraceId());
        }
    }

    public static <T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> context) {
        return () -> {
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            setTraceIdIfAbsent();
            try {
                return callable.call();
            } finally {
                MDC.clear();
            }
        };
    }

    public static Runnable wrap(final Runnable runnable, final Map<String, String> context) {
        return () -> {
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            setTraceIdIfAbsent();
            try {
                runnable.run();
            } finally {
                MDC.clear();
            }
        };
    }
}

初始化时,初始化ThreadPoolExecutorMdcWrapper即可。

@Bean
public Executor saveFileThreadPool() {
    ThreadPoolTaskExecutor executor = new ThreadPoolExecutorMdcWrapper();
    executor.setCorePoolSize(2);
    executor.setMaxPoolSize(5);
    executor.setQueueCapacity(20);
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    executor.initialize();
    return executor;
}

ThreadLocal的设置也是类似的,在调用execute的时候取出,然后使用Util去进行设置即可。

3.使用ThreadPoolTaskExecutor提供的方法setTaskDecorator

TaskDecorator是一个执行回调方法的装饰器,主要应用于传递上下文,或者提供任务的监控/统计信息。从源码我们可以看到,就是对线程池的execute方法进行重写,做了一个装饰。这个实现和上面我们自己重写execute方法异曲同工。

注意线程池中有的线程是一直存在一直被复用的,所以线程执行完成后需要在TaskDecorator的finally方法中移除传递的上下文对象。
image-1651064293511

public class ContextCopyingDecorator implements TaskDecorator {
    @Override
    public Runnable decorate(Runnable runnable) {
        try {
			RequestAttributes context = RequestContextHolder.currentRequestAttributes();
			Map<String,String> previous = MDC.getCopyOfContextMap();
			SecurityContext securityContext = SecurityContextHolder.getContext();
			return () -> {
			    try {
			    	RequestContextHolder.setRequestAttributes(context);
			    	MDC.setContextMap(previous);				
			        SecurityContextHolder.setContext(securityContext);
			        runnable.run();
			    } finally {
			        RequestContextHolder.resetRequestAttributes();
			        MDC.clear();
			        SecurityContextHolder.clearContext();
			    }
			};
		} catch (IllegalStateException e) {
			return runnable;
		}
    }
}

实现TaskDecorator接口后,直接new即可。
executor.setTaskDecorator(new ContextCopyingDecorator());

4.skywalking工作机制

skywalking的工作机制,需要三块协同。工作原理图大致如下:

  • 一块是skywalking server,负责接收、存储并展示,所以server模块包含一个展示web子模块;
  • 第二块是agent,负责代理微服务并收集需要的信息,转发给server;
  • 第三块便是微服务本身,需要在启动时指定agent,以便生成代理类。
    image-1650514799904

4.1.SkyWalking 核心模块介绍:

SkyWalking采用组件式开发,易于扩展,主要组件作用如下:

  1. Skywalking Agent:链路数据采集tracing(调用链数据)和metric(指标)信息并上报,上报通过HTTP或者gRPC方式发送数据到Skywalking Collector

  2. Skywalking Collector : 链路数据收集器,对agent传过来的tracing和metric数据进行整合分析通过Analysis Core模块处理并落入相关的数据存储中,同时会通过Query Core模块进行二次统计和监控告警

  3. Storage: Skywalking的存储,支持以ElasticSearch、Mysql、TiDB、H2等主流存储作为存储介质进行数据存储,H2仅作为临时演示单机用。

  4. SkyWalking UI: Web可视化平台,用来展示落地的数据,目前官方采纳了RocketBot作为SkyWalking的主UI

4.2.怎么自动采集 span 数据

SkyWalking 采用了插件化 + javaagent 的形式来实现了 span 数据的自动采集,这样可以做到对代码的 无侵入性,插件化意味着可插拔,扩展性好
image-1650514939388

4.3.如何跨进程传递 context

我们知道数据一般分为 header 和 body, 就像 http 有 header 和 body, RocketMQ 也有 MessageHeader,Message Body, body 一般放着业务数据,所以不宜在 body 中传递 context,应该在 header 中传递 context,如图示
image-1650515049658

参考

SpringBoot+MDC实现全链路调用日志跟踪
分布式链路追踪原理详解