多条告白如次剧本只需引入一次
正文重要领会RocketMQ中怎样保护动静无序的。
RocketMQ的本子为:4.2.0 release。
一.时序图
仍旧老规则,先把领会进程的时序图摆出来:
1.Producer发送程序动静
2.Consumer接受程序动静(一)
3.Consumer接受程序动静(二)
二.源码领会 – Producer发送程序动静
1 DefaultMQProducer#send:发送动静,入参中有自设置的动静部队采用器。
// DefaultMQProducer#send public SendResult send(Message msg, MessageQueueSelector selector, Object arg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(msg, selector, arg); }1.1 DefaultMQProducerImpl#makeSureStateOK:保证Producer的状况是运奇迹态-ServiceState.RUNNING。
// DefaultMQProducerImpl#makeSureStateOK private void makeSureStateOK() throws MQClientException { if (this.serviceState != ServiceState.RUNNING) { throw new MQClientException("The producer service state not OK, "+ this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); } }1.2 DefaultMQProducerImpl#tryToFindTopicPublishInfo:按照Topic获得颁布Topic用到的路由消息。
// DefaultMQProducerImpl#tryToFindTopicPublishInfo private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);// 为空则从 NameServer革新获得,false,不传入 defaultMQProducer topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {// 有了路由消息并且状况OK,则归来 return topicPublishInfo; } else { this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }1.3 挪用自设置动静部队采用器的select本领。
// DefaultMQProducerImpl#sendSelectImpl MessageQueue mq = null; try { mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg); } catch (Throwable e) { throw new MQClientException("select message queue throwed exception.", e); } // Producer#main SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId);1.4 DefaultMQProducerImpl#sendKernelImpl:发送动静的中心实行本领。
// DefaultMQProducerImpl#sendKernelImpl ...... switch (communicationMode) { case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeSync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this); break; ......1.4.1 MQClientAPIImpl#sendMessage:发送动静。
// MQClientAPIImpl#sendMessage ...... switch (communicationMode) {// 按照发送动静的形式(同步/异步)采用各别的办法,默许是同步 case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTimeSync) { throw new RemotingTooMuchRequestException("sendMessage call timeout"); } return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request); ......1.4.1.1 MQClientAPIImpl#sendMessageSync:发送同步动静。
// MQClientAPIImpl#sendMessageSync private SendResult sendMessageSync( final String addr, final String brokerName, final Message msg, final long timeoutMillis, final RemotingCommand request ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); assert response != null; return this.processSendResponse(brokerName, msg, response); }1.4.1.1.1 NettyRemotingClient#invokeSync:结构RemotingCommand,挪用的办法是同步。
// NettyRemotingClient#invokeSync RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime); if (this.rpcHook != null) { this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response); } return response;三.源码领会 – Consumer接受程序动静(一)
1 DefaultMQPushConsumer#registerMessageListener:把Consumer传入的动静监听器介入到messageListener中。
// DefaultMQPushConsumer#registerMessageListener public void registerMessageListener(MessageListenerOrderly messageListener) { this.messageListener = messageListener; this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); }1.1 DefaultMQPushConsumerImpl#registerMessageListener:把Consumer传入的动静监听器介入到messageListenerInner中。
// DefaultMQPushConsumerImpl#registerMessageListener public void registerMessageListener(MessageListener messageListener) { this.messageListenerInner = messageListener; }2 DefaultMQPushConsumer#start:启用Consumer。
// DefaultMQPushConsumer#start public void start() throws MQClientException { this.defaultMQPushConsumerImpl.start(); }2.1 DefaultMQPushConsumerImpl#start:启用ConsumerImpl。
// DefaultMQPushConsumerImpl#start switch (this.serviceState) { case CREATE_JUST:// 方才创造 ...... if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {// 无序动静效劳 this.consumeOrderly = true; this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {// 并发无序动静效劳 this.consumeOrderly = false; this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); } ...... this.consumeMessageService.start();// 启用动静效劳 ...... mQClientFactory.start();// 启用MQClientInstance ......2.1.1 newConsumeMessageOrderlyService():结构程序动静效劳。
// ConsumeMessageOrderlyService#ConsumeMessageOrderlyService public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerOrderly messageListener) { this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; this.messageListener = messageListener; this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer(); this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup(); this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>(); this.consumeExecutor = new ThreadPoolExecutor(// 主动静耗费线程池,平常实行收到的ConsumeRequest。多线程 this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl("ConsumeMessageThread_")); this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_")); }2.1.2ConsumeMessageOrderlyService#start:启用动静部队存户端范例。
// DefaultMQPushConsumerImpl#start this.consumeMessageService.start(); // ConsumeMessageOrderlyService#start public void start() { if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { ConsumeMessageOrderlyService.this.lockMQPeriodically();// 准时向broker发送批量锁住暂时正在耗费的部队汇合的动静 } }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS); } }2.1.2.1ConsumeMessageOrderlyService#lockMQPeriodically:准时向broker发送批量锁住暂时正在耗费的部队汇合的动静。
2.1.2.1.1 RebalanceImpl#lockAll:锁住一切正在动静的部队。
// ConsumeMessageOrderlyService#lockMQPeriodically if (!this.stopped) { this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll(); } // RebalanceImpl#lockAll HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();// 按照brokerName从processQueueTable获得正在耗费的部队汇合 ...... Set<MessageQueue> lockOKMQSet = this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);// 向Broker发送锁住动静部队的训令 for (MessageQueue mq : lockOKMQSet) { ProcessQueue processQueue = this.processQueueTable.get(mq); if (processQueue != null) { if (!processQueue.isLocked()) { log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq); } processQueue.setLocked(true); processQueue.setLastLockTimestamp(System.currentTimeMillis()); } } ......2.1.3 MQClientInstance#start:启用MQClientInstance。进程较搀杂,放到大题目四中领会。
// DefaultMQPushConsumerImpl#start mQClientFactory.start();四.源码领会 – Consumer接受程序动静(二)
1 MQClientInstance#start:启用存户端范例MQClientInstance。
// MQClientInstance#start synchronized (this) { switch (this.serviceState) { case CREATE_JUST: ...... // Start pull service 启用拉废除息效劳 this.pullMessageService.start(); // Start rebalance service 启用耗费端负载平衡效劳 this.rebalanceService.start(); ......1.1 PullMessageService#run:启用拉废除息效劳。本质挪用的是DefaultMQPushConsumerImpl的pullMessage本领。
// PullMessageService#run public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); } // PullMessageService#pullMessage private void pullMessage(final PullRequest pullRequest) { final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); if (consumer != null) { DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; impl.pullMessage(pullRequest);// 挪用DefaultMQPushConsumerImpl的pullMessage } else { log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); } }1.1.1.1 DefaultMQPushConsumerImpl#pullMessage:拉废除息。提交到ConsumeMessageOrderlyService的线程池consumeExecutor中实行。
// DefaultMQPushConsumerImpl#pullMessage ...... PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { switch (pullResult.getPullStatus()) { case FOUND: long prevRequestOffset = pullRequest.getNextOffset(); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); long pullRT = System.currentTimeMillis() - beginTimestamp; ...... DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); ......1.1.1.1.1.1.1