Error message here!

Hide Error message here!

忘记密码?

Error message here!

请输入正确邮箱

Hide Error message here!

密码丢失?请输入您的电子邮件地址。您将收到一个重设密码链接。

Error message here!

返回登录

Close

RocketMq中的traceId重复问题

架构设计之道 2021-02-23 19:12:34 阅读数:1 评论数:0 点赞数:0 收藏数:0

背景

最近发现生产上mq的traceId有重复现象,理论上不同消息消费,tracdId不应该相同,但为什么有一定的概率会出现呢?

查询代码如下:

 protected ConsumeStatus consumeMsgSingle(MessageExt ext) {
log.debug("AbstractMessageListener-consumeMessage() msgId:{}, body:{}", ext.getMsgId(), new String(ext.getBody()));
String message = new String(ext.getBody());
//获取到key
String key = RocketMQUtils.concatKey(ext.getTopic(), ext.getTags());
//根据key从handleMap里获取到我们的处理类
MessageProcessor messageProcessor = handleMap.get(key);
if (Objects.isNull(messageProcessor)) {
messageProcessor = handleMap.get(ext.getTopic());
}
Optional.ofNullable(messageProcessor).orElseThrow(() -> new RRException(String.format("未找到消息处理类, topic:%s, tag:%s", ext.getTopic(), ext.getTags())));
Object obj = null;
try {
//将String类型的message反序列化成对应的对象。
obj = messageProcessor.transferMessage(message);
if (obj instanceof MqMetaInfo) {
MqMetaInfo meta = (MqMetaInfo) obj;
MqMetaInfoConverter.fromExt(meta, ext);
}
generateMDC(ext);
} catch (Exception e) {
StringBuilder errMsg = new StringBuilder("对象反序列化失败, ")
.append("messageId: ")
.append(ext.getMsgId()).append("\n")
.append("msgBody: ")
.append(new String(ext.getBody())).append("\n")
.append("messageExt ")
.append(ext).append("\n")
.append("stackTrace: ")
.append(JSON.toJSONString(e.getStackTrace()));
log.error("AbstractMessageListener-consumeMessage() error:{}, msgId:{}, message:{}, errMsg:{}"
, e, ext.getMsgId(), new String(ext.getBody()), errMsg.toString());
throw new RRException(errMsg.toString());
}
//处理消息
boolean result = messageProcessor.handleMessage(obj);
if (!result) {
if (ext.getReconsumeTimes() > Integer.MAX_VALUE) {
return ConsumeStatus.SUCCESS;
}
return ConsumeStatus.FAIL;
}
return ConsumeStatus.SUCCESS;
}

generateMDC方法如下:

原因分析

可以看到如果message中有traceId,则把traceId关联到该线程,并打印出来。但发现最终该方法执行完成后未做清理traceId的动作,即RocketMq的消费者用的是线程池,而线程回收后traceId依然绑定在该线程上,如果下次有消息过来消费则会有同样traceId出现

重现

消费者

@Slf4j
@Service(value = "multiConsumerDemoProcessor")
public class MultiConsumerDemoProcessor implements MessageProcessor<String> {
@Override
public boolean handleMessage(String orderNo) {
log.info("开始消费:{}", orderNo);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return true;
}
@Override
public Class<String> getClazz() {
return null;
}
@Override
public String transferMessage(String message) {
return message;
}
}

生产者

public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("ip");
producer.start();
for (int i = 0; i < 10; i++)
try {
{
Message msg = new Message("multi-consumer-demo",
"demo",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
//producer.shutdown();
}
}
运行结果:


可以看到traceId是有重复的

解决

加上finally语句,释放traceId

解决结果

版权声明
本文为[架构设计之道]所创,转载请带上原文链接,感谢
https://segmentfault.com/a/1190000039264922

编程之旅,人生之路,不止于编程,还有诗和远方。
阅代码原理,看框架知识,学企业实践;
赏诗词,读日记,踏人生之路,观世界之行;