<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
业务系统logback.xml表达式修改,traceId记录在:X-B3-TraceId,由于spanId很少关注,因此未添加。
<!-- 文件输出格式 -->
<property name="PATTERN" value="%-5level [%d{HH:mm:ss.SSS}] [traceId:%yellow(%X{X-B3-TraceId})] [%thread] %logger{36} - %msg%n"/>
import lombok.Data;
/**
* @author baiyan
* @time 2020/11/13 13:17
*/
@Data
public class BaseResult {
/**
* httpCode
*/
private Integer code;
/**
* 业务code
*/
private String errorCode;
/**
* 业务信息
*/
private String message;
/**
* 链路id
*/
private String traceId;
public BaseResult() {
}
public BaseResult(Integer code, String message) {
this.code = code;
this.message = message;
}
public BaseResult(Integer code, String errorCode, String message) {
this.code = code;
this.errorCode = errorCode;
this.message = message;
}
protected static final Integer CODE_SUCCESS = 200;
protected static final Integer CODE_SYSTEM_ERROR = 500;
protected static final Integer CODE_CLIENT_ERROR = 400;
protected static final String MESSAGE_SUCCESS = "请求成功";
}
/**
* 链路追踪
*
* @author baiyan
* @date 2020/12/03
*/
@ControllerAdvice
public class AddTraceIdResponseBodyAdvice implements ResponseBodyAdvice<BaseResult> {
@Autowired
private Tracer tracer;
@Override
public boolean supports(MethodParameter returnType, Class<? extends HttpMessageConverter<?>> converterType) {
return BaseResult.class.isAssignableFrom(returnType.getParameterType());
}
@Override
public BaseResult beforeBodyWrite(BaseResult body, MethodParameter returnType, MediaType selectedContentType, Class<? extends HttpMessageConverter<?>> selectedConverterType, ServerHttpRequest request, ServerHttpResponse response) {
body.setTraceId(tracer.currentSpan().context().traceIdString());
return body;
}
}
/**
* spring上下文工具类
*
* @author baiyan
* @date 2020/11/13
*/
@Component
public class SpringContextUtil implements ApplicationContextAware {
/**
* spring应用上下文
*/
private static ApplicationContext applicationContext;
/**
* 实现ApplicationContextAware接口的回调方法。设置上下文环境
* @param applicationContext
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) {
SpringContextUtil.applicationContext = applicationContext;
}
/**
* @return ApplicationContext
*/
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
/**
* 获取对象
* @param name
* @return Object
* @throws BeansException
*/
public static Object getBean(String name) throws BeansException {
return applicationContext.getBean(name);
}
/**
* 获取对象
* @param clazz
* @return Object
* @throws BeansException
*/
public static <T> T getBean(Class<T> clazz) throws BeansException {
return applicationContext.getBean(clazz);
}
}
public class TraceUtil {
public static String getTraceId(){
Tracer tracer=(Tracer)SpringContextUtil.getBean("tracer");
if(tracer!=null){
Span span=tracer.currentSpan();
TraceContext traceContext=span.context();
return traceContext.traceIdString();
}
return null;
}
}
前端示例:
日志打印示例:
配置完成,只要控制台打印的日志都会带上此次线程的日志【内部传递通过ThreadLocal】,包括feign调用也能查询到对应的日志【feign之间的调用通过header参数传递】
单线程内traceId可以进行传递,多线程传递参数问题. 目前Zipkin类CurrentTraceContext给出对线程及线程池的的处理方法就是实现了Runnable重新实现了run方法,这样就解决了线程池的问题,当然不只提供了创建线程的方法,还包括线程池和Callable 【个人测试过,此种方法线程池无法获取到父线程traceId】
线程池解决 经过多次测试,使用LazyTraceThreadPoolTaskExecutor即可实现traceId的。下面给出在base包中应对traceId包装使用的线程池
/**
* 静态阻塞线程池
* @author baiyan
* @since 2020/3/23
*/
public interface BlockThreadPoolService {
/**
* 向线程池中添加任务
* @param task
* @return 任务(必须实现Runnable接口)
*/
Future<?> addTask(Runnable task);
/**
* 异步执行一批任务,直到任务执行完成
* @param task
*/
void runTasksUntilEnd(List<Runnable> task);
/**
* 向线程池中添加循环运行的任务
* @param task 任务(必须实现Runnable接口)
* @param interval 时间间隔,单位毫秒
*/
void loopTask(Runnable task, long interval);
/**
* 向线程池中添加循环运行的任务
* @param task 任务(必须实现Runnable接口)
* @param interval 时间间隔,单位毫秒
* @param delay 延迟执行的时间,单位毫秒,表示任务在delay ms后开始被定时调度
*/
void loopTask(Runnable task, long interval, long delay);
/**
* 停止线程池
*/
public void stop();
}
/**
* 线程池基类
* @author baiyan
* @since 2020/3/24
*/
@Slf4j
public class BlockThreadPoolServiceBase implements BlockThreadPoolService {
/** 主线程数 */
@Setter
private int corePoolSize = 20;
/** 最大线程数 */
@Setter
private int maximumPoolSize = 150;
/** 线程池维护线程所允许的空闲时间 */
@Setter
private int keepAliveTime = 60;
/** 线程池所使用的队列的大小 */
@Setter
private int queueSize = 100;
/** 是否已被初始化 */
@Setter
private boolean inited = false;
/** 单例延时线程池 */
private ScheduledExecutorService scheduledExecutorService;
/** trace跟踪线程池 */
private LazyTraceThreadPoolTaskExecutor lazyTraceThreadPoolTaskExecutor;
/**
* 初始化单例线程池
*/
public void init() {
if(inited) {
return;
}
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(corePoolSize);
threadPoolTaskExecutor.setMaxPoolSize(maximumPoolSize);
threadPoolTaskExecutor.setQueueCapacity(queueSize);
threadPoolTaskExecutor.setKeepAliveSeconds(keepAliveTime);
threadPoolTaskExecutor.setThreadNamePrefix("nssa-Thread-");
threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
threadPoolTaskExecutor.initialize();
this.lazyTraceThreadPoolTaskExecutor = new LazyTraceThreadPoolTaskExecutor(SpringContextUtil.getApplicationContext(),threadPoolTaskExecutor);
inited = true;
}
/**
* 添加任务
* @param task
* @return
*/
@Override
public Future<?> addTask(Runnable task) {
if(!inited) {
init();
}
return this.lazyTraceThreadPoolTaskExecutor.submit(TtlRunnable.get(task));
}
@Override
public void stop() {
lazyTraceThreadPoolTaskExecutor.shutdown();
if(scheduledExecutorService != null) {
scheduledExecutorService.shutdownNow();
}
}
@Override
public synchronized void loopTask(Runnable task, long interval) {
loopTask(task, interval, 0);
}
@Override
public void loopTask(Runnable task, long interval, long delay) {
if(scheduledExecutorService == null) {
ThreadFactory threadFactory = new ScheduledThreadFactory("schedule-pool-%d-%s");
scheduledExecutorService = Executors.newScheduledThreadPool(1, threadFactory);
}
int minInterval=100;
if(interval < minInterval) {
throw new IllegalArgumentException("不允许调度100ms以内的循环任务");
}
scheduledExecutorService.scheduleAtFixedRate(TtlRunnable.get(task), delay, interval, TimeUnit.MILLISECONDS);
}
@Override
public void runTasksUntilEnd(List<Runnable> tasks) {
List<Future<?>> futures = new ArrayList<Future<?>>();
for(Runnable task : tasks) {
futures.add(addTask(task));
}
for(Future<?> f : futures) {
try {
f.get();
} catch (Exception e) {
log.warn("", e);
}
}
}
/**
* 获取单例线程池实例
* @return
*/
protected LazyTraceThreadPoolTaskExecutor getExecutorService() {
return lazyTraceThreadPoolTaskExecutor;
}
/**
* 动态生成一个定时任务线程池
*/
static class ScheduledThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
ScheduledThreadFactory(String namePrefix) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
this.namePrefix = String.format(namePrefix, poolNumber.getAndIncrement(), "%d");
}
String getThreadName() {
return String.format(namePrefix,
threadNumber.getAndIncrement());
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, getThreadName(), 0);
if (!t.isDaemon()){
t.setDaemon(true);
}
if (t.getPriority() != Thread.NORM_PRIORITY){
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
}
原生jdk提供的InheritableThreadLocal,可以解决父子线程的变量传递,但是父线程传递到线程池时会存在数据丢失的情况。为了解决此问题,在此线程池中任务提交到线程池时使用了阿里的开源组件ttl,用于解决父子线程变量传递
感兴趣的可以看一下对应的源码,设计很巧妙,附上链接:https://github.com/alibaba/transmittable-thread-local/issues/123
业务应用base包线程池实例,新建一个单例线程池
/**
* 消息发送线程池
* @author baiyan
* @date 2020/5/19 10:01 上午
*/
public class MessageThreadPool {
private volatile static BlockThreadPoolService blockThreadPoolService;
public static BlockThreadPoolService getThreadPool() {
if(blockThreadPoolService == null){
synchronized (MessageThreadPool.class){
if(blockThreadPoolService == null) {
blockThreadPoolService = new BlockThreadPoolServiceBase();
}
}
}
return blockThreadPoolService ;
}
}
MessageThreadPool.getThreadPool().addTask(
()-> System.out.println("测试")
);
Runnable r1=()->log.info("hello1:");
Runnable r2=()->log.info("hello1:");
Runnable r3=()->log.info("hello1:");
Runnable r4=()->log.info("hello1:");
MessageThreadPool.getThreadPool().runTasksUntilEnd(
Lists.newArrayList(r1,r2,r3,r4)
);
发送消息:发送消息时头部加入当前线程的traceId即可【可使用TraceUtil.getTraceId()获取】 接收消息:消息接收之后参数使用@header去除对应traceId,然后调用slf4j的工具类,MDC.put("X-B3-TraceId",traceId),即可跟踪到对应链路信息。
上面这个只是一种解决思路,但是接收消息的步骤存在问题,如果有线程池的话就无法传递进去了,有待探索。