Error message here!

Hide Error message here!

忘记密码?

Error message here!

请输入正确邮箱

Hide Error message here!

密码丢失?请输入您的电子邮件地址。您将收到一个重设密码链接。

Error message here!

返回登录

Close

rocketmq源码解client管理心跳检测②

天河聊架构 2019-08-03 23:03:27 阅读数:11 评论数:0 点赞数:0 收藏数:0

说在前面


接上篇



源码解析

进入这个方法,获取并创建channel,org.apache.rocketmq.remoting.netty.NettyRemotingClient#getAndCreateChannel

 private Channel getAndCreateChannel(final String addr) throws InterruptedException { if (null == addr) {// 获取和namesrv通信的channel =》 return getAndCreateNameserverChannel(); }
ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isOK()) { return cw.getChannel(); }
// 创建channel=》 return this.createChannel(addr); }

进入这个方法,获取和namesrv通信的channel,org.apache.rocketmq.remoting.netty.NettyRemotingClient#getAndCreateNameserverChannel

private Channel getAndCreateNameserverChannel() throws InterruptedException { String addr = this.namesrvAddrChoosed.get(); if (addr != null) { ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isOK()) { return cw.getChannel(); } }
// 从namesrvAddrChoosed中查找namesrv,如果不存在同步轮询的方式从namesrvAddrList中取 final List<String> addrList = this.namesrvAddrList.get(); if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { addr = this.namesrvAddrChoosed.get(); if (addr != null) { ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isOK()) { return cw.getChannel(); } }
if (addrList != null && !addrList.isEmpty()) { for (int i = 0; i < addrList.size(); i++) { int index = this.namesrvIndex.incrementAndGet(); index = Math.abs(index); index = index % addrList.size(); String newAddr = addrList.get(index);
this.namesrvAddrChoosed.set(newAddr); log.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);// 同步创建渠道=》 Channel channelNew = this.createChannel(newAddr); if (channelNew != null) { return channelNew; } } } } catch (Exception e) { log.error("getAndCreateNameserverChannel: create name server channel exception", e); } finally { this.lockNamesrvChannel.unlock(); } } else { log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS); }
return null; }

进入这个方法,同步创建渠道,org.apache.rocketmq.remoting.netty.NettyRemotingClient#createChannel

 private Channel createChannel(final String addr) throws InterruptedException { ChannelWrapper cw = this.channelTables.get(addr);// 代码走到这里,这里的逻辑正常情况下是走不到的,为了代码严谨性 if (cw != null && cw.isOK()) { cw.getChannel().close(); channelTables.remove(addr); }
if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { boolean createNewConnection;// 为了代码严谨性,这里又做了一次判断 cw = this.channelTables.get(addr); if (cw != null) {
if (cw.isOK()) { cw.getChannel().close(); this.channelTables.remove(addr); createNewConnection = true;// 如果channel还在用,不让创建 } else if (!cw.getChannelFuture().isDone()) { createNewConnection = false; } else { this.channelTables.remove(addr); createNewConnection = true; } } else { createNewConnection = true; }
if (createNewConnection) {// 重新建立连接 ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr)); log.info("createChannel: begin to connect remote host[{}] asynchronously", addr); cw = new ChannelWrapper(channelFuture);// 建立的channel也放到本次缓存中 this.channelTables.put(addr, cw); } } catch (Exception e) { log.error("createChannel: create channel exception", e); } finally { this.lockChannelTables.unlock(); } } else { log.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS); }
if (cw != null) { ChannelFuture channelFuture = cw.getChannelFuture(); if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {// 对channel再次判断 if (cw.isOK()) { log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString()); return cw.getChannel(); } else { log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause()); } } else { log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(), channelFuture.toString()); } }
return null; }

往上返回到这个方法,根据地址创建channel,org.apache.rocketmq.remoting.netty.NettyRemotingClient#createChannel,这个方法上面介绍过了。


进入这个方法,执行同步请求,org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#invokeSyncImpl

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { final int opaque = request.getOpaque();
try { final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);// 缓存正在进行的响应 this.responseTable.put(opaque, responseFuture); final SocketAddress addr = channel.remoteAddress(); channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); }
responseTable.remove(opaque); responseFuture.setCause(f.cause());// 响应解析完毕会解除countDownLatch的阻塞 =》 responseFuture.putResponse(null); log.warn("send a request command to channel <" + addr + "> failed."); } });
// 这里用countDownLatch实现的阻塞 =》 RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause()); } else { throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); } }
return responseCommand; } finally { this.responseTable.remove(opaque); } }

往上返回到这个方法,更新订阅的topic配置,org.apache.rocketmq.broker.topic.TopicConfigManager#updateOrderTopicConfig

public void updateOrderTopicConfig(final KVTable orderKVTableFromNs) {
if (orderKVTableFromNs != null && orderKVTableFromNs.getTable() != null) { boolean isChange = false; Set<String> orderTopics = orderKVTableFromNs.getTable().keySet(); for (String topic : orderTopics) { TopicConfig topicConfig = this.topicConfigTable.get(topic); if (topicConfig != null && !topicConfig.isOrder()) { topicConfig.setOrder(true); isChange = true; log.info("update order topic config, topic={}, order={}", topic, true); } }
for (Map.Entry<String, TopicConfig> entry : this.topicConfigTable.entrySet()) { String topic = entry.getKey(); if (!orderTopics.contains(topic)) { TopicConfig topicConfig = entry.getValue(); if (topicConfig.isOrder()) { topicConfig.setOrder(false); isChange = true; log.info("update order topic config, topic={}, order={}", topic, false); } } }
if (isChange) {// 更新数据版本号 this.dataVersion.nextVersion();// 持久化=》 this.persist(); } } }

往上返回到这个方法,注册消费者,org.apache.rocketmq.broker.client.ConsumerManager#registerConsumer

 public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
// 获取消费组信息 ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); if (null == consumerGroupInfo) { ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere); ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp); consumerGroupInfo = prev != null ? prev : tmp; }
// 更新channel信息=》 boolean r1 = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere);// 更新订阅细信息=》 boolean r2 = consumerGroupInfo.updateSubscription(subList);
if (r1 || r2) { if (isNotifyConsumerIdsChangedEnable) {// 通知消费组的所有消费者channel和订阅信息改变了=》 this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); } }
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
return r1 || r2; }

进入这个方法,更新channel信息,org.apache.rocketmq.broker.client.ConsumerGroupInfo#updateChannel

 public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere) { boolean updated = false; this.consumeType = consumeType; this.messageModel = messageModel; this.consumeFromWhere = consumeFromWhere;
// 获取channel信息 ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel()); if (null == infoOld) { ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew); if (null == prev) { log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType, messageModel, infoNew.toString()); updated = true; }
infoOld = infoNew; } else { if (!infoOld.getClientId().equals(infoNew.getClientId())) { log.error("[BUG] consumer channel exist in broker, but clientId not equal. GROUP: {} OLD: {} NEW: {} ", this.groupName, infoOld.toString(), infoNew.toString()); this.channelInfoTable.put(infoNew.getChannel(), infoNew); } }
this.lastUpdateTimestamp = System.currentTimeMillis(); infoOld.setLastUpdateTimestamp(this.lastUpdateTimestamp);
return updated; }

进入这个方法,更新订阅细信息,org.apache.rocketmq.broker.client.ConsumerGroupInfo#updateSubscription

 public boolean updateSubscription(final Set<SubscriptionData> subList) { boolean updated = false;
for (SubscriptionData sub : subList) {// 获取之前的topic订阅信息 SubscriptionData old = this.subscriptionTable.get(sub.getTopic()); if (old == null) { SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub); if (null == prev) { updated = true; log.info("subscription changed, add new topic, group: {} {}", this.groupName, sub.toString()); } } else if (sub.getSubVersion() > old.getSubVersion()) { if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) { log.info("subscription changed, group: {} OLD: {} NEW: {}", this.groupName, old.toString(), sub.toString() ); }
this.subscriptionTable.put(sub.getTopic(), sub); } }
Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, SubscriptionData> next = it.next(); String oldTopic = next.getKey();
boolean exist = false; for (SubscriptionData sub : subList) { if (sub.getTopic().equals(oldTopic)) { exist = true; break; } }
if (!exist) { log.warn("subscription changed, group: {} remove topic {} {}", this.groupName, oldTopic, next.getValue().toString() );
// 如果已存在之前的topic删除 it.remove(); updated = true; } }
this.lastUpdateTimestamp = System.currentTimeMillis();
return updated; }

往上返回到这个方法,通知消费组的所有消费者channel和订阅信息改变了,org.apache.rocketmq.broker.client.DefaultConsumerIdsChangeListener#handle

 @Override public void handle(ConsumerGroupEvent event, String group, Object... args) { if (event == null) { return; } switch (event) { case CHANGE: if (args == null || args.length < 1) { return; } List<Channel> channels = (List<Channel>) args[0]; if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) { for (Channel chl : channels) {// broker按channel、消费组通知client=》 this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group); } } break; case UNREGISTER:// 按组取消注册=》 this.brokerController.getConsumerFilterManager().unRegister(group); break; case REGISTER: if (args == null || args.length < 1) { return; } Collection<SubscriptionData> subscriptionDataList = (Collection<SubscriptionData>) args[0];// 按组注册订阅信息=》 this.brokerController.getConsumerFilterManager().register(group, subscriptionDataList); break; default: throw new RuntimeException("Unknown event " + event); } }

进入这个方法, broker按channel、消费组通知client,org.apache.rocketmq.broker.client.net.Broker2Client#notifyConsumerIdsChanged

 public void notifyConsumerIdsChanged( final Channel channel, final String consumerGroup) { if (null == consumerGroup) { log.error("notifyConsumerIdsChanged consumerGroup is null"); return; }
NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader(); requestHeader.setConsumerGroup(consumerGroup); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);
try {// 单向通知=》 this.brokerController.getRemotingServer().invokeOneway(channel, request, 10); } catch (Exception e) { log.error("notifyConsumerIdsChanged exception, " + consumerGroup, e.getMessage()); } }

进入这个方法,单向通知,org.apache.rocketmq.remoting.netty.NettyRemotingServer#invokeOneway,这个方法前面介绍过了。


进入这个方法,按组取消注册,org.apache.rocketmq.broker.filter.ConsumerFilterManager#unRegister

 public void unRegister(final String consumerGroup) { for (String topic : filterDataByTopic.keySet()) {// 取消注册=》 this.filterDataByTopic.get(topic).unRegister(consumerGroup); } }

进入这个方法,取消注册,org.apache.rocketmq.broker.filter.ConsumerFilterManager.FilterDataMapByTopic#unRegister

 public void unRegister(String consumerGroup) { if (!this.groupFilterData.containsKey(consumerGroup)) { return; }
// 获取消费者过滤信息 ConsumerFilterData data = this.groupFilterData.get(consumerGroup);
if (data == null || data.isDead()) { return; }
long now = System.currentTimeMillis();
log.info("Unregister consumer filter: {}, deadTime: {}", data, now);
data.setDeadTime(now); }

往上返回到这个方法,按组注册订阅信息,org.apache.rocketmq.broker.filter.ConsumerFilterManager#register(java.lang.String, java.util.Collection<org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData>)

 public void register(final String consumerGroup, final Collection<SubscriptionData> subList) { for (SubscriptionData subscriptionData : subList) {// 注册订阅信息=》 register( subscriptionData.getTopic(), consumerGroup, subscriptionData.getSubString(), subscriptionData.getExpressionType(), subscriptionData.getSubVersion() ); }
// make illegal topic dead. 按组获取消费者过滤数据=》 Collection<ConsumerFilterData> groupFilterData = getByGroup(consumerGroup);
Iterator<ConsumerFilterData> iterator = groupFilterData.iterator(); while (iterator.hasNext()) { ConsumerFilterData filterData = iterator.next();
boolean exist = false; for (SubscriptionData subscriptionData : subList) { if (subscriptionData.getTopic().equals(filterData.getTopic())) { exist = true; break; } }
if (!exist && !filterData.isDead()) {// 如果要更新的订阅数据不存在,之前的过滤数据失效 filterData.setDeadTime(System.currentTimeMillis()); log.info("Consumer filter changed: {}, make illegal topic dead:{}", consumerGroup, filterData); } } }

进入这个方法,注册订阅信息,org.apache.rocketmq.broker.filter.ConsumerFilterManager#register(java.lang.String, java.lang.String, java.lang.String, java.lang.String, long)

 public boolean register(final String topic, final String consumerGroup, final String expression, final String type, final long clientVersion) { if (ExpressionType.isTagType(type)) { return false; }
if (expression == null || expression.length() == 0) { return false; }
// 获取topic的过滤数据信息 FilterDataMapByTopic filterDataMapByTopic = this.filterDataByTopic.get(topic);
if (filterDataMapByTopic == null) { FilterDataMapByTopic temp = new FilterDataMapByTopic(topic); FilterDataMapByTopic prev = this.filterDataByTopic.putIfAbsent(topic, temp); filterDataMapByTopic = prev != null ? prev : temp; }
BloomFilterData bloomFilterData = bloomFilter.generate(consumerGroup + "#" + topic);
// 注册过滤消息=》 return filterDataMapByTopic.register(consumerGroup, expression, type, bloomFilterData, clientVersion); }

进入这个方法,注册过滤消息,org.apache.rocketmq.broker.filter.ConsumerFilterManager.FilterDataMapByTopic#register

 public boolean register(String consumerGroup, String expression, String type, BloomFilterData bloomFilterData, long clientVersion) {// 获取之前的消费者过滤数据 ConsumerFilterData old = this.groupFilterData.get(consumerGroup);
if (old == null) {// 构建消费者过滤数据=》 ConsumerFilterData consumerFilterData = build(topic, consumerGroup, expression, type, clientVersion); if (consumerFilterData == null) { return false; } consumerFilterData.setBloomFilterData(bloomFilterData);
old = this.groupFilterData.putIfAbsent(consumerGroup, consumerFilterData); if (old == null) { log.info("New consumer filter registered: {}", consumerFilterData); return true; } else {// 如果当前的client版本小于等于老的client版本 if (clientVersion <= old.getClientVersion()) { if (!type.equals(old.getExpressionType()) || !expression.equals(old.getExpression())) { log.warn("Ignore consumer({} : {}) filter(concurrent), because of version {} <= {}, but maybe info changed!old={}:{}, ignored={}:{}", consumerGroup, topic, clientVersion, old.getClientVersion(), old.getExpressionType(), old.getExpression(), type, expression); }// 如果数据一样 if (clientVersion == old.getClientVersion() && old.isDead()) {// 设置取消注册的数据注册 reAlive(old); return true; }
return false; } else { this.groupFilterData.put(consumerGroup, consumerFilterData); log.info("New consumer filter registered(concurrent): {}, old: {}", consumerFilterData, old); return true; } } } else { if (clientVersion <= old.getClientVersion()) { if (!type.equals(old.getExpressionType()) || !expression.equals(old.getExpression())) { log.info("Ignore consumer({}:{}) filter, because of version {} <= {}, but maybe info changed!old={}:{}, ignored={}:{}", consumerGroup, topic, clientVersion, old.getClientVersion(), old.getExpressionType(), old.getExpression(), type, expression); } if (clientVersion == old.getClientVersion() && old.isDead()) { reAlive(old); return true; }
return false; }
boolean change = !old.getExpression().equals(expression) || !old.getExpressionType().equals(type); if (old.getBloomFilterData() == null && bloomFilterData != null) { change = true; } if (old.getBloomFilterData() != null && !old.getBloomFilterData().equals(bloomFilterData)) { change = true; }
// if subscribe data is changed, or consumer is died too long.如果订阅数据改变了 if (change) {// 构建消费者过滤数据 ConsumerFilterData consumerFilterData = build(topic, consumerGroup, expression, type, clientVersion); if (consumerFilterData == null) { // new expression compile error, remove old, let client report error.构建订阅数据报错,删除老的订阅数据 this.groupFilterData.remove(consumerGroup); return false; } consumerFilterData.setBloomFilterData(bloomFilterData);
this.groupFilterData.put(consumerGroup, consumerFilterData);
log.info("Consumer filter info change, old: {}, new: {}, change: {}", old, consumerFilterData, change);
return true; } else { old.setClientVersion(clientVersion); if (old.isDead()) { reAlive(old); } return true; } } }

往上返回到这个方法,注册生产者,org.apache.rocketmq.broker.client.ProducerManager#registerProducer

 public void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) { try { ClientChannelInfo clientChannelInfoFound = null;
if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try {// 获取消费组的channel信息 HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group); if (null == channelTable) { channelTable = new HashMap<>(); this.groupChannelTable.put(group, channelTable); }
clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel()); if (null == clientChannelInfoFound) { channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo); log.info("new producer connected, group: {} channel: {}", group, clientChannelInfo.toString()); } } finally { this.groupChannelLock.unlock(); }
if (clientChannelInfoFound != null) { clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis()); } } else { log.warn("ProducerManager registerProducer lock timeout"); } } catch (InterruptedException e) { log.error("", e); } }

往上返回到这个方法,org.apache.rocketmq.broker.processor.ClientManageProcessor#heartBeat结束。



说在最后

本次解析仅代表个人观点,仅供参考。



扫码进入技术微信群


版权声明
本文为[天河聊架构]所创,转载请带上原文链接,感谢
https://mp.weixin.qq.com/s/GbwSWmRpgAUPHhUkZk-Tyw