博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RocketMQ探索-Producer的start
阅读量:6885 次
发布时间:2019-06-27

本文共 6090 字,大约阅读时间需要 20 分钟。

hot3.png

在RocketMQ中,使用Producer相关类来生产消息,第一次使用的时,会调用producer.start()方法来进行初始化。这里我们来探索一下Producer的start做了些什么。

时序图如下:

165756_9cl1_3134950.png

producer.start()其实是调用DefaultMQProducerImpl.start(),重点看看里面2句代码:

boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);

说明:把DefaultMQProducer对象添加到MQClientInstance的producerTable属性中:

public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {    if (null == group || null == producer) {        return false;    }    MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);    if (prev != null) {        log.warn("the producer group[{}] exist already.", group);        return false;    }    return true;}

producerTable对象里面存储producerGroupName和DefaultMQProducer的映射。key-value:<producerGroupName,DefaultMQProducer>

if (startFactory) {    mQClientFactory.start();}

mQClientFactory.start()调用MQClientInstance(负责启动通信服务和定时任务)的start:

public void start() throws MQClientException {    synchronized (this) {        switch (this.serviceState) {            case CREATE_JUST:                this.serviceState = ServiceState.START_FAILED;                // If not specified,looking address from name server                if (null == this.clientConfig.getNamesrvAddr()) {                    this.clientConfig.setNamesrvAddr(this.mQClientAPIImpl.fetchNameServerAddr());                }                // Start request-response channel 启动MQClientAPIImpl                this.mQClientAPIImpl.start();                // Start various schedule tasks                this.startScheduledTask();                // Start pull service                this.pullMessageService.start();                // Start rebalance service                this.rebalanceService.start();                // Start push service 用于将消费失败的消息发回broker                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);                log.info("the client factory [{}] start OK", this.clientId);                this.serviceState = ServiceState.RUNNING;                break;            case RUNNING:                break;            case SHUTDOWN_ALREADY:                break;            case START_FAILED:                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);            default:                break;        }    }}

在Producer启动的时候,serviceState值为CREATE_JUST。因为在以上的方法中就是调用各种的start方法:

  • this.mQClientAPIImpl.start();

        说明:调用:MQClientAPIImpl.start()--->NettyRemotingClient.start():MQClientAPIImpl(客户端与远程交互的封装,其内部使用了RemotingClient来实现与远程的交互),NettyRemotingClient.start()的方法里面启动了netty的通信客户端(这里就不对netty做介绍了,因为这个是个大工程)。

  • this.startScheduledTask();

         说明:启动各种的定时任务:

  • /** * 启动各种定时任务 */private void startScheduledTask() {    //每两分钟执行一次寻址服务(NameServer地址)    if (null == this.clientConfig.getNamesrvAddr()) {        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {            @Override            public void run() {                try {                    MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();                } catch (Exception e) {                    log.error("ScheduledTask fetchNameServerAddr exception", e);                }            }        }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);    }    //每30秒更新一次所有的topic的路由信息(topicRouteTable)    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {        @Override        public void run() {            try {                MQClientInstance.this.updateTopicRouteInfoFromNameServer();            } catch (Exception e) {                log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);            }        }    }, 10, this.clientConfig.getPollNameServerInteval(), TimeUnit.MILLISECONDS);    //每30秒移除离线的broker    //每30秒发送一次心跳给所有的master broker    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {        @Override        public void run() {            try {                MQClientInstance.this.cleanOfflineBroker();                MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();            } catch (Exception e) {                log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);            }        }    }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);    //更新offset每5秒提交一次消费的offset,broker端为ConsumerOffsetManager负责记录,此offset是逻辑偏移量,比如说,consumerA@consumerAGroup 在broker_a的queue 0的消费队列共有10000条消息,目前消费到888,那么offset就是888.    //因为producer和consumer内部都持有MQClientInstance实例,故MQClientInstance既有生产者逻辑,又有消费者逻辑。    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {        @Override        public void run() {            try {                MQClientInstance.this.persistAllConsumerOffset();            } catch (Exception e) {                log.error("ScheduledTask persistAllConsumerOffset exception", e);            }        }    }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);    //每1分钟调整一次线程池,这也是针对消费者来说的,具体为如果消息堆积超过10W条,则调大线程池,最多64个线程;如果消息堆积少于8W条,则调小线程池,最少20的线程。    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {        @Override        public void run() {            try {                MQClientInstance.this.adjustThreadPool();            } catch (Exception e) {                log.error("ScheduledTask adjustThreadPool exception", e);            }        }    }, 1, 1, TimeUnit.MINUTES);}

         查看方法上面的注释。

  • this.pullMessageService.start();

         说明:consumer的拉取消息线程实现方式:PullMessageService继承ServiceThread(对拉取消息请求进行了封装,使其队列化),start拉取消息线程启动,在run方法里面实现了:不断的从pullRequestQueue中取出请求,并调用消息拉取。

  • this.rebalanceService.start();

         说明:再平衡线程启动

  • this.defaultMQProducer.getDefaultMQProducerImpl().start(false);

         说明:暂时没有搞明白

总结:在Producer启动的时候,做了几件事情:

  1. 把自己保持到producerTable属性中<groupName,Producer>
  2. 启动各种定时任务(对MQClientInstance说明一下,因为生产和消费者都会持有MQClientInstance所以在启动任务的时候会启动生产和消费相关的任务线程)

         启动执行寻址服务的任务(NameServer地址)。

         启动更新所有的topic的路由信息(topicRouteTable)的任务。

         启动移除离线的broker和发送心跳给所有的master broker的任务。

         启动提交消费的offset(逻辑偏移量)到broker(broker端为ConsumerOffsetManager负责记录)            的任务。

         启动调整线程池的任务(针对消费)。

转载于:https://my.oschina.net/u/3134950/blog/1456821

你可能感兴趣的文章
“独角兽”企业都爱选择腾讯云,背后原因值得考究
查看>>
浅析 Vue 2.6 中的 nextTick 方法
查看>>
199. Binary Tree Right Side View
查看>>
配置SpringBoot方便的切换jar和war
查看>>
2018最佳GAN论文回顾(下)
查看>>
Vue使用element-ui所遇BUG与需求集结(二)
查看>>
使用Redis管道提升性能
查看>>
包含Min函数的stack
查看>>
Java 8 常用时间 api
查看>>
联发科技智能家居事业群接管电视业务,下半年发8K电视芯片 ...
查看>>
用AI赋能客服,灵声科技获数千万元A轮融资
查看>>
3月14日云栖精选夜读 | 阿里云成为开源组织CDF创始成员,积极推动软件生态构建 ...
查看>>
弹性公网EIP,让网络更自由、灵活
查看>>
一对一直播源码都实现了哪几种常见的优化技术? ...
查看>>
Unity学习系列一简介
查看>>
利用Python框架pyxxnet_project实现的网络服务
查看>>
一个最简单的WebSocket hello world demo
查看>>
Midway 外部版启动过程分析
查看>>
JDK自带的java.util.Timer定时器的实现原理
查看>>
[Web开发] 检测IE版本号的方法总结
查看>>