在RocketMQ中,使用Producer相关类来生产消息,第一次使用的时,会调用producer.start()方法来进行初始化。这里我们来探索一下Producer的start做了些什么。
时序图如下:
producer.start()其实是调用DefaultMQProducerImpl.start(),重点看看里面2句代码:
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
说明:把DefaultMQProducer对象添加到MQClientInstance的producerTable属性中:
public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) { if (null == group || null == producer) { return false; } MQProducerInner prev = this.producerTable.putIfAbsent(group, producer); if (prev != null) { log.warn("the producer group[{}] exist already.", group); return false; } return true;}
producerTable对象里面存储producerGroupName和DefaultMQProducer的映射。key-value:<producerGroupName,DefaultMQProducer>
if (startFactory) { mQClientFactory.start();}
mQClientFactory.start()调用MQClientInstance(负责启动通信服务和定时任务)的start:
public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // If not specified,looking address from name server if (null == this.clientConfig.getNamesrvAddr()) { this.clientConfig.setNamesrvAddr(this.mQClientAPIImpl.fetchNameServerAddr()); } // Start request-response channel 启动MQClientAPIImpl this.mQClientAPIImpl.start(); // Start various schedule tasks this.startScheduledTask(); // Start pull service this.pullMessageService.start(); // Start rebalance service this.rebalanceService.start(); // Start push service 用于将消费失败的消息发回broker this.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info("the client factory [{}] start OK", this.clientId); this.serviceState = ServiceState.RUNNING; break; case RUNNING: break; case SHUTDOWN_ALREADY: break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); default: break; } }}
在Producer启动的时候,serviceState值为CREATE_JUST。因为在以上的方法中就是调用各种的start方法:
-
this.mQClientAPIImpl.start();
说明:调用:MQClientAPIImpl.start()--->NettyRemotingClient.start():MQClientAPIImpl(客户端与远程交互的封装,其内部使用了RemotingClient来实现与远程的交互),NettyRemotingClient.start()的方法里面启动了netty的通信客户端(这里就不对netty做介绍了,因为这个是个大工程)。
-
this.startScheduledTask();
说明:启动各种的定时任务:
-
/** * 启动各种定时任务 */private void startScheduledTask() { //每两分钟执行一次寻址服务(NameServer地址) if (null == this.clientConfig.getNamesrvAddr()) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr(); } catch (Exception e) { log.error("ScheduledTask fetchNameServerAddr exception", e); } } }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); } //每30秒更新一次所有的topic的路由信息(topicRouteTable) this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.updateTopicRouteInfoFromNameServer(); } catch (Exception e) { log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); } } }, 10, this.clientConfig.getPollNameServerInteval(), TimeUnit.MILLISECONDS); //每30秒移除离线的broker //每30秒发送一次心跳给所有的master broker this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.cleanOfflineBroker(); MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); } catch (Exception e) { log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); } } }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS); //更新offset每5秒提交一次消费的offset,broker端为ConsumerOffsetManager负责记录,此offset是逻辑偏移量,比如说,consumerA@consumerAGroup 在broker_a的queue 0的消费队列共有10000条消息,目前消费到888,那么offset就是888. //因为producer和consumer内部都持有MQClientInstance实例,故MQClientInstance既有生产者逻辑,又有消费者逻辑。 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.persistAllConsumerOffset(); } catch (Exception e) { log.error("ScheduledTask persistAllConsumerOffset exception", e); } } }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS); //每1分钟调整一次线程池,这也是针对消费者来说的,具体为如果消息堆积超过10W条,则调大线程池,最多64个线程;如果消息堆积少于8W条,则调小线程池,最少20的线程。 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.adjustThreadPool(); } catch (Exception e) { log.error("ScheduledTask adjustThreadPool exception", e); } } }, 1, 1, TimeUnit.MINUTES);}
查看方法上面的注释。
-
this.pullMessageService.start();
说明:consumer的拉取消息线程实现方式:PullMessageService继承ServiceThread(对拉取消息请求进行了封装,使其队列化),start拉取消息线程启动,在run方法里面实现了:不断的从pullRequestQueue中取出请求,并调用消息拉取。
-
this.rebalanceService.start();
说明:再平衡线程启动
-
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
说明:暂时没有搞明白
总结:在Producer启动的时候,做了几件事情:
- 把自己保持到producerTable属性中<groupName,Producer>
- 启动各种定时任务(对MQClientInstance说明一下,因为生产和消费者都会持有MQClientInstance所以在启动任务的时候会启动生产和消费相关的任务线程)
启动执行寻址服务的任务(NameServer地址)。
启动更新所有的topic的路由信息(topicRouteTable)的任务。
启动移除离线的broker和发送心跳给所有的master broker的任务。
启动提交消费的offset(逻辑偏移量)到broker(broker端为ConsumerOffsetManager负责记录) 的任务。
启动调整线程池的任务(针对消费)。