功能特性 #
普通消息 #
mqadmin工具创建主题:
mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=NORMAL
先要手动用管理工具创建普通主题topic,再发送消息:
//普通消息发送。
MessageBuilder messageBuilder = new MessageBuilderImpl();
Message message = messageBuilder.setTopic("topic")
//设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
//设置消息Tag,用于消费端根据指定Tag过滤消息。
.setTag("messageTag")
//消息体。
.setBody("messageBody".getBytes())
.build();
try {
//发送消息,需要关注发送结果,并捕获失败等异常。
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
//消费示例:使用SimpleConsumer消费普通消息,主动获取消息进行消费处理并提交消费结果。
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
//消费处理完成后,需要主动调用ACK提交消费结果。
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
//如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
e.printStackTrace();
}
建议设置全局唯一业务索引键(消息的Key),方便问题追踪,例如,订单ID,用户ID,请求ID等。
延时消息 #
- 定时消息设置的定时时间是一个预期触发的系统时间戳
- 定时时间的格式为毫秒级的Unix时间戳
- 定时时长最大值默认为24小时,不支持自定义修改
- 定时时长参数精确到毫秒级,默认秒,要毫秒,延时事件设置成毫秒级即可。
创建主题:
mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=DELAY
使用和普通消息一致,先创建延时类型主题,再在消息上再加上延时时间:setDeliveryTimestamp(deliverTimeStamp)
。
应该避免避免大量相同定时时刻的消息,因为系统压力会导致消息分发延迟,影响定时精度。
顺序消息 #
消费者按照发送消息的先后顺序获取消息,从而实现业务场景中的顺序处理。
顺序消息的顺序关系通过消息组(MessageGroup)判定和识别。
应用场景例子:
- 撮合交易,先出价先交易
- 数据实时增量同步
要保证消息顺序,需要满足一定条件:
- 单一生产者
- 串行发送
- 投递顺序,业务方消费消息时需要严格按照接收—处理—应答的语义处理消息,避免因异步处理导致消息乱序
- 有限重试,顺序消息投递仅在重试次数限定范围内
使用:
创建主题:
mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=FIFO -o true
# 创建顺序消息消费组
mqadmin updateSubGroup -c <cluster_name> -g <consumer_group_name> -n <nameserver_address> -o true
- ‵-o`: 创建顺序消息主题
先创建顺序消息类型主题,和发送普通消息一致,发送到顺序类型的主题里面即可。
//设置顺序消息的排序分组,该分组尽量保持离散,避免热点排序分组。
setMessageGroup("fifoGroup001")
同一个MessageGroup的消息,如果前序消息没有消费完成,再次调用Receive是获取不到后续消息的。
事务消息 #
支持在分布式场景下保障消息生产和本地事务的最终一致性。
应用场景:
分布式系统里面,一个业务操作,需要变更多个系统,通过事务消息通知其他系统变更,和本地事务绑定确保结果一致。
事务消息处理流程:
- 生产者将消息发送至Apache RocketMQ服务端。
- 服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。
- 生产者开始执行本地事务逻辑。
- 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback)。
- 在断网或者是生产者应用重启的特殊情况下,若MQ服务端未收到发送者提交的二次确认结果或收到的二次确认结果为Unknown未知状态,服务端将对消息生产者集群中任一生产者实例发起消息回查。
- 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
- 生产者根据检查到的本地事务的最终状态再次提交二次确认。
创建主题:
mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=TRANSACTION
消息发送重试 #
- RocketMQ 在客户端SDK中内置请求重试逻辑
- 生产者在初始化时设置消息发送最大重试次数
广播消费和共享消费 #
同一条消息支持被多个消费者分组订阅,同时对于每个消费者分组可以有多个消费者。
- 消费组间广播消费 :每个消费者分组只初始化唯一一个消费者,每个消费者可消费到消费者分组内所有的消息,各消费者分组都订阅相同的消息,以此实现单客户端级别的广播一对多推送效果。
- 消费组内共享消费 :每个消费者分组下初始化了多个消费者,这些消费者共同分担消费者分组内的所有消息,实现消费者分组内流量的水平拆分和均衡负载。
消费重试 #
消费重试指的是,消费者在消费某条消息失败后,Apache RocketMQ 服务端会根据重试策略重新消费该消息,超过一定次数后若还未消费成功,则该消息将不再继续重试,直接被发送到死信队列中。
消息重试的触发条件
消费失败,包括消费者返回消息失败状态标识或抛出非预期异常。
消息处理超时,包括在PushConsumer中排队超时。
最大重试次数由消费者分组创建时的元数据控制。