链路日志中追踪traceId
mdc就是基于Threadlocal进行的一个流程周转的标志物的传递,就是根据这种标志,可以追踪到日志的整体请求记录,便于进行定位到问题所在,而且对于用户的影响极小。
一,使用traceId概述
平时出现问题查询日志是程序员的解决方式,日志一般会从服务器的日志文件,然后查找自己需要的日志,或者日志输出到es中,在es中进行搜索日志,可是对于目前流行的微服务或者单体服务,将日志串起来去查看并不是一件容易的事情,一般微服务会调用多个系统,有http请求的,有mq的等会产生大量的日志,根据日志快速定位到具体的问题才是我们想要的解决方案,毕竟要用最短的时间找到问题所在,并快速解决。目前的elk搜集日志,也只是把所有的日志搜集起来,并没有将具体的日志按照请求串起来,所以这个目前需要在日志中添加traceId进行日志的追踪。
二,请求的源头
1,http请求
思路
在最开始请求系统时候生成一个全局唯一的traceId,放在http 请求header中,系统接收到请求后,从header中取出这个traceId,放入MDC中,这个traceId伴随着这整个请求的调用周期,即当一个服务调用另外一个服务的时候,需要将traceId往下传递,从而形成一条链路。
实现
@Service
public class TraceInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
String traceId = request.getHeader(ElkConstants.TRACE_ID);
if (StringUtils.isNotEmpty(traceId)) {
MDC.put(ElkConstants.TRACE_ID, traceId);
} else {
MDC.put(ElkConstants.TRACE_ID, UUID.randomUUID().toString());
}
return true;
}
@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
MDC.remove(ElkConstants.TRACE_ID);
}
}
@Configuration
public class InterceptorConfig implements WebMvcConfigurer {
@Resource
private TraceInterceptor traceInterceptor;
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(traceInterceptor).addPathPatterns("/**");
}
}
public class ElkConstants {
/**
* TRACE_ID
*/
public static final String TRACE_ID = "traceId";
}
2,定时任务Task
思路
在定时任务的时候生成一个全局唯一的traceId,放入MDC中,这个traceId伴随着这整个请求的调用周期
实现
@Component
@Slf4j
public class SyncTask extends BaseTask {
/**
* 定时任务
*/
public void sync() {
//设置traceId
setTraceId();
//自己的业务逻辑
}
}
public abstract class BaseTask {
public void setTraceId() {
MDC.put(ElkConstants.TRACE_ID, UUID.randomUUID().toString());
}
}
3,微服务(Feign)
思路
在请求的时候生成一个全局唯一的traceId,放入MDC中,这个traceId伴随着这整个请求的调用周期
实现
@Configuration
public class FeignInterceptor implements RequestInterceptor {
@Override
public void apply(RequestTemplate requestTemplate) {
requestTemplate.header(ElkConstants.TRACE_ID, (String) MDC.get(ElkConstants.TRACE_ID));
}
}
4,Mq的方式
思路
使用mq进行消息发送的时候,可以将请求发送消息的traceId从mdc中取出来,我们可以放到消息体中,当做一个字段,然后在消息端消费的时候,从消息体中获取到traceId,并放入到MDC中,伴随着此次的请求
实现(kafka为列子,其他也可以按照思路进行不同的实现)
@Component
public class KafkaSender implements MessageSender {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@Autowired
private KafkaConfig kafkaConfig;
private static final Logger log = LoggerFactory.getLogger(KafkaSender.class);
/**
* 异步发送
*
* @param mqSendEvent
* @return
*/
@Override
public boolean sendAsyncMessage(MqSendEvent mqSendEvent) {
try {
mqSendEvent.setTraceId(MDC.get("traceId"));
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(mqSendEvent.getTopic(),
JSON.toJSONString(mqSendEvent));
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
log.error("kafka sendAsyncMessage error, ex = {}, topic = {}, data = {}", ex, mqSendEvent.getTopic(),
JSON.toJSONString(mqSendEvent));
}
@Override
public void onSuccess(SendResult<String, Object> result) {
log.info("kafka sendAsyncMessage success topic = {}, data = {}", mqSendEvent.getTopic(),
JSON.toJSONString(mqSendEvent));
}
});
} finally {
MDC.clear();
}
return true;
}
}
public interface MessageSender {
/**
* 异步发送
*
* @param mqSendEvent
*/
public boolean sendAsyncMessage(MqSendEvent mqSendEvent);
}
public class MqSendEvent implements Serializable {
//topic
private String topic;
//数据
private String data;
//traceId数据
private String traceId;
public MqSendEvent(String topic, String data) {
this.topic = topic;
this.data = data;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
public String getTraceId() {
return traceId;
}
public void setTraceId(String traceId) {
this.traceId = traceId;
}
}
@Component
@Slf4j
public class UserRegisterListener extends MessageBaseListener {
/**
* 监听消息
*
* @param consumerRecord
* @param ack
*/
@Override
@KafkaListener(topics = {MessageConstants.REGISTER_USER_TOPIC}, containerFactory = MessageConstants.CONSUMER_CONTAINER_FACTORY_NAME)
protected void receiverMessage(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
try {
log.info("consumer message record:{}", consumerRecord);
Optional<?> optional = Optional.ofNullable(consumerRecord.value());
if (optional.isPresent()) {
Object value = optional.get();
MqSendEvent mqSendEvent = JSON.parseObject((String) value, MqSendEvent.class);
JSONObject jsonObject = JSON.parseObject(mqSendEvent.getData());
MDC.put(ElkConstants.TRACE_ID, mqSendEvent.getTraceId());
//具体业务逻辑
}
} finally {
MDC.clear();
}
ack.acknowledge();
}
}
5,多线程方式
思路
在定时任务的时候生成一个全局唯一的traceId,放入MDC中,这个traceId伴随着这整个请求的调用周期
实现
三,MDC实现链路追踪的原理
1,概述
MDC(Mapped Diagnostic Contexts),是Slf4J类日志系统中实现分布式多线程日志数据传递的重要工具,用户可利用MDC将一些运行时的上下文数据打印出来。目前只有log4j和logback提供原生的MDC支持
2,使用(具体不同场景使用见(二,请求源头))
在logback配置文件中配置MDC容器中的变量%X{trackId}
%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level %X{trackId} [%15.15t] %class{39}.%method[%L] : %m%n
UTF-8f
3,源码分析
MDC中的put方法
public static void put(String key, String val) throws IllegalArgumentException {
if (key == null) {
throw new IllegalArgumentException(“key parameter cannot be null”);
} else if (mdcAdapter == null) {
throw new IllegalStateException(“MDCAdapter cannot be null. See also http://www.slf4j.org/codes.html#null_MDCA”);
} else {
mdcAdapter.put(key, val);
}
}
MDCAdapter接口
public interface MDCAdapter {
//设置当前线程MDC上下文中指定key的值和value的值
void put(String var1, String var2);
// 获取当前线程MDC上下文中指定key的值
String get(String var1);
// 移除当前线程MDC上下文中指定key的值
void remove(String var1);
// 清空MDC上下文
void clear();
// 获取MDC上下文
Map<String, String> getCopyOfContextMap();
// 设置MDC上下文
void setContextMap(Map<String, String> var1);
}
LogbackMDCAdapter实现
public class LogbackMDCAdapter implements MDCAdapter {
final ThreadLocal<Map<String, String>> copyOnThreadLocal = new ThreadLocal();
private static final int WRITE_OPERATION = 1;
private static final int MAP_COPY_OPERATION = 2;
//threadlocal线程级别的,这就是为什么需要remove或者clear的原因所在,
//防止内存泄露,具体为啥,可以研究一下ThreadLocal底层原理就明白了
final ThreadLocal lastOperation = new ThreadLocal();
public LogbackMDCAdapter() {
}
......
可以看到LogbackMDCAdapter声明了类型为ThreadLocal的map。ThreadLocal 提供了线程本地的实例。ThreadLocal变量在线程之间隔离而在方法或类间能够共享,是属于线程单独私有的,线程之间相互隔离,到这里就可以大致理解为其实使用的就是ThreadLocal的私有线程的特性,大概就可以明白其中的原理了。
LogbackMDCAdapter 的put方法
public void put(String key, String val) throws IllegalArgumentException {
if (key == null) {
throw new IllegalArgumentException(“key cannot be null”);
} else {
//复制当前线程的threadLocal
Map<String, String> oldMap = (Map)this.copyOnThreadLocal.get();
//设置当前操作为写操作,并返回上一次的操作
Integer lastOp = this.getAndSetLastOperation(1);
//上一次不是读操作或者null,已经初始化了,有内容的话在里面设置内容
if (!this.wasLastOpReadOrNull(lastOp) && oldMap != null) {
oldMap.put(key, val);
} else {
// 复制一个当前线程的副本
Map<String, String> newMap = this.duplicateAndInsertNewMap(oldMap);
newMap.put(key, val);
}
}
}
private Integer getAndSetLastOperation(int op) {
//设置写操作,并返回上一次操作
Integer lastOp = (Integer)this.lastOperation.get();
this.lastOperation.set(op);
return lastOp;
}
private boolean wasLastOpReadOrNull(Integer lastOp) {
//判断操作类型 是null或者读操作
return lastOp == null || lastOp == 2;
}
创建线程安全的map放到threadLocal中
private Map<String, String> duplicateAndInsertNewMap(Map<String, String> oldMap) {
Map<String, String> newMap = Collections.synchronizedMap(new HashMap());
if (oldMap != null) {
synchronized(oldMap) {
newMap.putAll(oldMap);
}
}
this.copyOnThreadLocal.set(newMap);
return newMap;
}
get方法
public String get(String key) {
//从当前线程获取map
Map<String, String> map = (Map)this.copyOnThreadLocal.get();
return map != null && key != null ? (String)map.get(key) : null;
}
remove方法
public void remove(String key) {
if (key != null) {
Map<String, String> oldMap = (Map)this.copyOnThreadLocal.get();
if (oldMap != null) {
//设置为写操作
Integer lastOp = this.getAndSetLastOperation(1);
if (this.wasLastOpReadOrNull(lastOp)) {
//读操作或者null的时候,复制新map并移除当前key
Map<String, String> newMap = this.duplicateAndInsertNewMap(oldMap);
newMap.remove(key);
} else {
oldMap.remove(key);
}
}
}
}
clear方法
public void clear() {
//设置为写操作
this.lastOperation.set(1);
//移除复制
this.copyOnThreadLocal.remove();
}
四,MDC使用总结
mdc就是基于Threadlocal进行的一个流程周转的标志物的传递,就是根据这种标志,可以追踪到日志的整体请求记录,便于进行定位到问题所在,而且对于用户的影响极小
更多推荐
所有评论(0)