Error message here!

Hide Error message here!

忘记密码?

Error message here!

请输入正确邮箱

Hide Error message here!

密码丢失?请输入您的电子邮件地址。您将收到一个重设密码链接。

Error message here!

返回登录

Close

Traceid duplication in rocketmq

The way of architecture design 2021-02-23 19:18:29 阅读数:3 评论数:0 点赞数:0 收藏数:0

background

Recently, we found that in production mq Of traceId There is repetition , Theoretically, different news consumption ,tracdId It shouldn't be the same , But why is there a certain probability ?

The query code is as follows :

 protected ConsumeStatus consumeMsgSingle(MessageExt ext) {
log.debug("AbstractMessageListener-consumeMessage() msgId:{}, body:{}", ext.getMsgId(), new String(ext.getBody()));
String message = new String(ext.getBody());
// Get key
String key = RocketMQUtils.concatKey(ext.getTopic(), ext.getTags());
// according to key from handleMap Get our processing class from
MessageProcessor messageProcessor = handleMap.get(key);
if (Objects.isNull(messageProcessor)) {
messageProcessor = handleMap.get(ext.getTopic());
}
Optional.ofNullable(messageProcessor).orElseThrow(() -> new RRException(String.format(" Message handling class not found , topic:%s, tag:%s", ext.getTopic(), ext.getTags())));
Object obj = null;
try {
// take String Type of message The reverse sequence is transformed into the corresponding object .
obj = messageProcessor.transferMessage(message);
if (obj instanceof MqMetaInfo) {
MqMetaInfo meta = (MqMetaInfo) obj;
MqMetaInfoConverter.fromExt(meta, ext);
}
generateMDC(ext);
} catch (Exception e) {
StringBuilder errMsg = new StringBuilder(" Object deserialization failed , ")
.append("messageId: ")
.append(ext.getMsgId()).append("\n")
.append("msgBody: ")
.append(new String(ext.getBody())).append("\n")
.append("messageExt ")
.append(ext).append("\n")
.append("stackTrace: ")
.append(JSON.toJSONString(e.getStackTrace()));
log.error("AbstractMessageListener-consumeMessage() error:{}, msgId:{}, message:{}, errMsg:{}"
, e, ext.getMsgId(), new String(ext.getBody()), errMsg.toString());
throw new RRException(errMsg.toString());
}
// Process the message
boolean result = messageProcessor.handleMessage(obj);
if (!result) {
if (ext.getReconsumeTimes() > Integer.MAX_VALUE) {
return ConsumeStatus.SUCCESS;
}
return ConsumeStatus.FAIL;
}
return ConsumeStatus.SUCCESS;
}

generateMDC The method is as follows :

Cause analysis

You can see if message There is traceId, Then put traceId Associated to the thread , And print it out . But it is found that the method is not cleaned up after execution traceId The action of , namely RocketMq Our customers use thread pools , And after thread recycling traceId Still bound to the thread , If there is news coming to spend next time, there will be the same traceId appear

repeat

consumer

@Slf4j
@Service(value = "multiConsumerDemoProcessor")
public class MultiConsumerDemoProcessor implements MessageProcessor<String> {
@Override
public boolean handleMessage(String orderNo) {
log.info(" Start spending :{}", orderNo);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return true;
}
@Override
public Class<String> getClazz() {
return null;
}
@Override
public String transferMessage(String message) {
return message;
}
}

producer

public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("ip");
producer.start();
for (int i = 0; i < 10; i++)
try {
{
Message msg = new Message("multi-consumer-demo",
"demo",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
//producer.shutdown();
}
}
Running results :


You can see traceId There are repetitions

solve

add finally sentence , Release traceId

The solution

Copyright statement
In this paper,the author:[The way of architecture design],Reprint please bring the original link, thank you

编程之旅,人生之路,不止于编程,还有诗和远方。
阅代码原理,看框架知识,学企业实践;
赏诗词,读日记,踏人生之路,观世界之行;