background
Recently, we found that in production mq Of traceId There is repetition , Theoretically, different news consumption ,tracdId It shouldn't be the same , But why is there a certain probability ?
The query code is as follows :
protected ConsumeStatus consumeMsgSingle(MessageExt ext) {
log.debug("AbstractMessageListener-consumeMessage() msgId:{}, body:{}", ext.getMsgId(), new String(ext.getBody()));
String message = new String(ext.getBody());
// Get key
String key = RocketMQUtils.concatKey(ext.getTopic(), ext.getTags());
// according to key from handleMap Get our processing class from
MessageProcessor messageProcessor = handleMap.get(key);
if (Objects.isNull(messageProcessor)) {
messageProcessor = handleMap.get(ext.getTopic());
}
Optional.ofNullable(messageProcessor).orElseThrow(() -> new RRException(String.format(" Message handling class not found , topic:%s, tag:%s", ext.getTopic(), ext.getTags())));
Object obj = null;
try {
// take String Type of message The reverse sequence is transformed into the corresponding object .
obj = messageProcessor.transferMessage(message);
if (obj instanceof MqMetaInfo) {
MqMetaInfo meta = (MqMetaInfo) obj;
MqMetaInfoConverter.fromExt(meta, ext);
}
generateMDC(ext);
} catch (Exception e) {
StringBuilder errMsg = new StringBuilder(" Object deserialization failed , ")
.append("messageId: ")
.append(ext.getMsgId()).append("\n")
.append("msgBody: ")
.append(new String(ext.getBody())).append("\n")
.append("messageExt ")
.append(ext).append("\n")
.append("stackTrace: ")
.append(JSON.toJSONString(e.getStackTrace()));
log.error("AbstractMessageListener-consumeMessage() error:{}, msgId:{}, message:{}, errMsg:{}"
, e, ext.getMsgId(), new String(ext.getBody()), errMsg.toString());
throw new RRException(errMsg.toString());
}
// Process the message
boolean result = messageProcessor.handleMessage(obj);
if (!result) {
if (ext.getReconsumeTimes() > Integer.MAX_VALUE) {
return ConsumeStatus.SUCCESS;
}
return ConsumeStatus.FAIL;
}
return ConsumeStatus.SUCCESS;
}
generateMDC The method is as follows :
Cause analysis
You can see if message There is traceId, Then put traceId Associated to the thread , And print it out . But it is found that the method is not cleaned up after execution traceId The action of , namely RocketMq Our customers use thread pools , And after thread recycling traceId Still bound to the thread , If there is news coming to spend next time, there will be the same traceId appear
repeat
consumer
@Slf4j
@Service(value = "multiConsumerDemoProcessor")
public class MultiConsumerDemoProcessor implements MessageProcessor<String> {
@Override
public boolean handleMessage(String orderNo) {
log.info(" Start spending :{}", orderNo);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return true;
}
@Override
public Class<String> getClazz() {
return null;
}
@Override
public String transferMessage(String message) {
return message;
}
}
producer
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("ip");
producer.start();
for (int i = 0; i < 10; i++)
try {
{
Message msg = new Message("multi-consumer-demo",
"demo",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
//producer.shutdown();
}
}
Running results :
You can see traceId There are repetitions
solve
add finally sentence , Release traceId