服务器之家:专注于服务器技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|JAVA教程|ASP教程|

java rocketmq--消息的产生(普通消息)

2019-06-26 13:06有爱jj JAVA教程

这篇文章主要介绍了java rocketmq--消息的产生(普通消息),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,,需要的朋友可以参考下

前言

与消息发送紧密相关的几行代码:

1. DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

2. producer.start();

3. Message msg = new Message(...)

4. SendResult sendResult = producer.send(msg);

5. producer.shutdown();

那这几行代码执行时,背后都做了什么?

一. 首先是DefaultMQProducer.start 

@Override
public void start() throws MQClientException {
this.defaultMQProducerImpl.start();
}

调用了默认生成消息的实现类 -- DefaultMQProducerImpl

调用defaultMQProducerImpl.start()方法,DefaultMQProducerImpl.start()会初始化得到MQClientInstance实例对象,MQClientInstance实例对象调用它自己的start方法会 ,启动一些服务,如拉去消息服务PullMessageService.Start()、启动负载平衡服务RebalanceService.Start(),比如网络通信服务MQClientAPIImpl.Start()

另外,还会执行与生产消息相关的信息,如注册produceGroup、new一个TopicPublishInfo对象并以默认TopicKey为键值,构成键值对存入DefaultMQProducerImpl的topicPublishInfoTable中。

efaultMQProducerImpl.start()后,获取的MQClientInstance实例对象会调用sendHeartbeatToAllBroker()方法,不断向broker发送心跳包,yin'b可以使用下面一幅图大致描述DefaultMQProducerImpl.start()过程:

java rocketmq--消息的产生(普通消息)

上图中的三个部分中涉及的内容:

1.1 初始化MQClientInstance

一个客户端只能产生一个MQClientInstance实例对象,产生方式使用了工厂模式与单例模式。MQClientInstance.start()方法启动一些服务,源码如下:

public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}

1.2 注册producer

该过程会将这个当前producer对象注册到MQClientInstance实例对象的的producerTable中。一个jvm(一个客户端)中一个producerGroup只能有一个实例,MQClientInstance操作producerTable大概有如下几个方法:

  • -- selectProducer
  • -- updateTopicRouteInfoFromNameServer
  • -- prepareHeartbeatData
  • -- isNeedUpdateTopicRouteInfo
  • -- shutdown

注:

根据不同的clientId,MQClientManager将给出不同的MQClientInstance;

根据不同的group,MQClientInstance将给出不同的MQProducer和MQConsumer

1.3 向路由信息表中添加路由

topicPublishInfoTable定义:

public class DefaultMQProducerImpl implements MQProducerInner {
private final Logger log = ClientLogger.getLog();
private final Random random = new Random();
private final DefaultMQProducer defaultMQProducer;
private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>();

它是一个以topic为key的Map型数据结构,DefaultMQProducerImpl.start()时会默认创建一个key=MixAll.DEFAULT_TOPIC的TopicPublishInfo存放到topicPublishInfoTable中。

1.4 发送心跳包

MQClientInstance向broker发送心跳包时,调用sendHeartbeatToAllBroker( ),以及从MQClientInstance实例对象的brokerAddrTable中拿到所有broker地址,向这些broker发送心跳包。

sendHeartbeatToAllBroker会涉及到prepareHeartbeatData()方法,该方法会生成heartbeatData数据,发送心跳包时,heartbeatData作为心跳包的body。与producer相关的部分代码如下:

// Producer
for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {
MQProducerInner impl = entry.getValue();
if (impl != null) {
ProducerData producerData = new ProducerData();
producerData.setGroupName(entry.getKey());
heartbeatData.getProducerDataSet().add(producerData);
}

二、. SendResult sendResult = producer.send(msg)

首先会调用DefaultMQProducer.send(msg) ,继而调用sendDefaultImpl:

public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

sendDefaultImpl做了啥?

2.1. 获取topicPublishInfo

根据msg的topic从topicPublishInfoTable获取对应的topicPublishInfo,如果没有则更新路由信息,从nameserver端拉取最新路由信息。从nameserver端拉取最新路由信息大致为:

首先getTopicRouteInfoFromNameServer,然后topicRouteData2TopicPublishInfo。

java rocketmq--消息的产生(普通消息)

2.2 选择消息发送的队列

普通消息:默认方式下,selectOneMessageQueue从topicPublishInfo中的messageQueueList中选择一个队列(MessageQueue)进行发送消息,默认采用长轮询的方式选择队列 。

它的机制如下:正常情况下,顺序选择queue进行发送;如果某一个节点发生了超时,则下次选择queue时,跳过相同的broker。不同的队列选择策略形成了生产消息的几种模式,如顺序消息,事务消息。

顺序消息:将一组需要有序消费的消息发往同一个broker的同一个队列上即可实现顺序消息,假设相同订单号的支付,退款需要放到同一个队列,那么就可以在send的时候,自己实现MessageQueueSelector,根据参数arg字段来选择queue。

private SendResult sendSelectImpl(
Message msg,
MessageQueueSelector selector,
Object arg,
final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 。。。}

事务消息:只有在消息发送成功,并且本地操作执行成功时,才发送提交事务消息,做事务提交,消息发送失败,直接发送回滚消息,进行回滚,具体如何实现后面会单独成文分析。

2.3 封装消息体通信包,发送数据包

首先,根据获取的MessageQueue中的getBrokerName,调用findBrokerAddressInPublish得到该消息存放对应的broker地址,如果没有找到则跟新路由信息,重新获取地址 :

brokerAddrTable.get(brokerName).get(MixAll.MASTER_ID)

可知获取的broker均为master(id=0)

然后, 将与该消息相关信息打包成RemotingCommand数据包,其RequestCode.SEND_MESSAGE

根据获取的broke地址,将数据包到对应的broker,默认是发送超时时间为3s。

封装消息请求包的包头:

SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);

发送消息包(普通消息默认为同步方式):

SendResult sendResult = null;
switch (communicationMode) {
   case SYNC:
  sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
  brokerAddr,
  mq.getBrokerName(),
   msg,
  requestHeader,
   timeout,
  communicationMode,
  context,
  this);
break;

处理来自broker端的响应数据包:

private SendResult sendMessageSync(
final String addr,
final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
return this.processSendResponse(brokerName, msg, response);
}

broker端处理request数据包后会将消息存储到commitLog,具体过程后续分析。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。

延伸 · 阅读

精彩推荐
  • JAVA教程Retrofit+RxJava实现带进度条的文件下载

    Retrofit+RxJava实现带进度条的文件下载

    这篇文章主要为大家详细介绍了Retrofit+RxJava实现带进度条的文件下载,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...

    yw_5_241202019-06-25
  • JAVA教程idea远程调试spark的步骤讲解

    idea远程调试spark的步骤讲解

    今天小编就为大家分享一篇关于idea远程调试spark的步骤讲解,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧...

    双斜杠少年4472019-06-23
  • JAVA教程Java抛出异常与自定义异常类应用示例

    Java抛出异常与自定义异常类应用示例

    这篇文章主要介绍了Java抛出异常与自定义异常类,结合实例形式分析了Java针对错误与异常处理的try、catch、throw等语句相关使用技巧,需要的朋友可以参考下...

    水中鱼之19993412019-06-24
  • JAVA教程Mapper批量插入Oracle数据@InsertProvider注解

    Mapper批量插入Oracle数据@InsertProvider注解

    今天小编就为大家分享一篇关于Mapper批量插入Oracle数据@InsertProvider注解,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧...

    qq_418751474882019-06-20
  • JAVA教程java中ThreadPoolExecutor常识汇总

    java中ThreadPoolExecutor常识汇总

    这篇文章主要介绍了java中ThreadPoolExecutor常识汇总,线程池技术在并发时经常会使用到,java中的线程池的使用是通过调用ThreadPoolExecutor来实现的,需要的朋友可以参考下...

    有爱jj1932019-06-25
  • JAVA教程Java读取.properties配置文件方法示例

    Java读取.properties配置文件方法示例

    这篇文章主要介绍了Java读取.properties配置文件,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...

    黄浩#4452019-06-20
  • JAVA教程详解Java设计模式——迭代器模式

    详解Java设计模式——迭代器模式

    这篇文章主要介绍了Java设计模式——迭代器模式,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...

    No_Game_No_Life_1192019-06-22
  • JAVA教程Zookeeper连接超时问题与拒绝连接的解决方案

    Zookeeper连接超时问题与拒绝连接的解决方案

    今天小编就为大家分享一篇关于Zookeeper连接超时问题与拒绝连接的解决方案,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧...

    qq_238762134322019-06-20