1.前言
全链路追踪大体原理是在请求中加上一个agent,然后每个系统都读取这个agent进行日志打印,最后收集日志进行分析。
可以使用skywalking,也可以自己通过ThreadLocal自己实现。具体原理后续再做阐述。本文主要解决的是在多线程中,如何保证链路ID继续传递。
最简单的办法,在声明线程池时,将TraceID加入即可。如果使用skywalking可以用RunnableWrapper.of()
进行调用。如果使用ThreadLocal或者slf4j的MDC实现的话,直接对应设置就行。
但是业务一般不关心TraceID的设置,有没有一种办法能让业务开发不手动写入TraceID吗?我们参考springboot封装的ThreadPoolTaskExecutor,自己封装一个线程池,对于直接new创建线程的情况不考虑(实际应用中应该避免这种用法)。对于单个线程,对Thread类进行一个代理,在代理中设置。从理论上是能解决问题的。
以下代码仅提供思路,代码没有在实际项目中应用过,如果需要使用请注意是否有bug
2.封装ThreadPoolTaskExecutor
public class ThreadPoolExecutorMdcWrapper extends ThreadPoolTaskExecutor {
@Override
public void execute(@NotNull Runnable task) {
super.execute(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
}
@NotNull
@Override
public <T> Future<T> submit(@NotNull Callable<T> task) {
return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
}
@NotNull
@Override
public Future<?> submit(@NotNull Runnable task) {
return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
}
}
ID生成
public class TraceIdUtil {
public static String getTraceId() {
return UUID.randomUUID().toString().replaceAll("-", "").toUpperCase();
}
}
MDC赋值。MDC是slf4j日志框架提供的在多线程环境下进行日志调用链路跟踪的一个类,设置相应的占位符之后即可到logback.xml配置文件中进行配置即可,具体使用可以参考官方文档。
public class ThreadMdcUtil {
private final static String TRACE_ID = "traceId";
public static void setTraceIdIfAbsent() {
if (MDC.get(TRACE_ID) == null) {
MDC.put(TRACE_ID, TraceIdUtil.getTraceId());
}
}
public static <T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> context) {
return () -> {
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
setTraceIdIfAbsent();
try {
return callable.call();
} finally {
MDC.clear();
}
};
}
public static Runnable wrap(final Runnable runnable, final Map<String, String> context) {
return () -> {
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
setTraceIdIfAbsent();
try {
runnable.run();
} finally {
MDC.clear();
}
};
}
}
初始化时,初始化ThreadPoolExecutorMdcWrapper即可。
@Bean
public Executor saveFileThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolExecutorMdcWrapper();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(20);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
ThreadLocal的设置也是类似的,在调用execute
的时候取出,然后使用Util去进行设置即可。
3.使用ThreadPoolTaskExecutor提供的方法setTaskDecorator
TaskDecorator是一个执行回调方法的装饰器,主要应用于传递上下文,或者提供任务的监控/统计信息。从源码我们可以看到,就是对线程池的execute方法进行重写,做了一个装饰。这个实现和上面我们自己重写execute方法异曲同工。
注意线程池中有的线程是一直存在一直被复用的,所以线程执行完成后需要在TaskDecorator的finally方法中移除传递的上下文对象。
public class ContextCopyingDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
try {
RequestAttributes context = RequestContextHolder.currentRequestAttributes();
Map<String,String> previous = MDC.getCopyOfContextMap();
SecurityContext securityContext = SecurityContextHolder.getContext();
return () -> {
try {
RequestContextHolder.setRequestAttributes(context);
MDC.setContextMap(previous);
SecurityContextHolder.setContext(securityContext);
runnable.run();
} finally {
RequestContextHolder.resetRequestAttributes();
MDC.clear();
SecurityContextHolder.clearContext();
}
};
} catch (IllegalStateException e) {
return runnable;
}
}
}
实现TaskDecorator接口后,直接new即可。
executor.setTaskDecorator(new ContextCopyingDecorator());
4.skywalking工作机制
skywalking的工作机制,需要三块协同。工作原理图大致如下:
- 一块是skywalking server,负责接收、存储并展示,所以server模块包含一个展示web子模块;
- 第二块是agent,负责代理微服务并收集需要的信息,转发给server;
- 第三块便是微服务本身,需要在启动时指定agent,以便生成代理类。
4.1.SkyWalking 核心模块介绍:
SkyWalking采用组件式开发,易于扩展,主要组件作用如下:
-
Skywalking Agent:链路数据采集tracing(调用链数据)和metric(指标)信息并上报,上报通过HTTP或者gRPC方式发送数据到Skywalking Collector
-
Skywalking Collector : 链路数据收集器,对agent传过来的tracing和metric数据进行整合分析通过Analysis Core模块处理并落入相关的数据存储中,同时会通过Query Core模块进行二次统计和监控告警
-
Storage: Skywalking的存储,支持以ElasticSearch、Mysql、TiDB、H2等主流存储作为存储介质进行数据存储,H2仅作为临时演示单机用。
-
SkyWalking UI: Web可视化平台,用来展示落地的数据,目前官方采纳了RocketBot作为SkyWalking的主UI
4.2.怎么自动采集 span 数据
SkyWalking 采用了插件化 + javaagent 的形式来实现了 span 数据的自动采集,这样可以做到对代码的 无侵入性,插件化意味着可插拔,扩展性好
4.3.如何跨进程传递 context
我们知道数据一般分为 header 和 body, 就像 http 有 header 和 body, RocketMQ 也有 MessageHeader,Message Body, body 一般放着业务数据,所以不宜在 body 中传递 context,应该在 header 中传递 context,如图示
参考