3.功能特性

功能特性 #

普通消息 #

mqadmin工具创建主题:

mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=NORMAL

先要手动用管理工具创建普通主题topic,再发送消息:

//普通消息发送。
MessageBuilder messageBuilder = new MessageBuilderImpl();
Message message = messageBuilder.setTopic("topic")
    //设置消息索引键,可根据关键字精确查找某条消息。
    .setKeys("messageKey")
    //设置消息Tag,用于消费端根据指定Tag过滤消息。
    .setTag("messageTag")
    //消息体。
    .setBody("messageBody".getBytes())
    .build();
try {
    //发送消息,需要关注发送结果,并捕获失败等异常。
    SendReceipt sendReceipt = producer.send(message);
    System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
    e.printStackTrace();
}

//消费示例:使用SimpleConsumer消费普通消息,主动获取消息进行消费处理并提交消费结果。
List<MessageView> messageViewList = null;
try {
    messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
    messageViewList.forEach(messageView -> {
        System.out.println(messageView);
        //消费处理完成后,需要主动调用ACK提交消费结果。
        try {
            simpleConsumer.ack(messageView);
        } catch (ClientException e) {
            e.printStackTrace();
        }
    });
} catch (ClientException e) {
    //如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
    e.printStackTrace();
}

建议设置全局唯一业务索引键(消息的Key),方便问题追踪,例如,订单ID,用户ID,请求ID等。

延时消息 #

  • 定时消息设置的定时时间是一个预期触发的系统时间戳
  • 定时时间的格式为毫秒级的Unix时间戳
  • 定时时长最大值默认为24小时,不支持自定义修改
  • 定时时长参数精确到毫秒级,默认秒,要毫秒,延时事件设置成毫秒级即可。

创建主题:

mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=DELAY

使用和普通消息一致,先创建延时类型主题,再在消息上再加上延时时间:setDeliveryTimestamp(deliverTimeStamp)

应该避免避免大量相同定时时刻的消息,因为系统压力会导致消息分发延迟,影响定时精度。

顺序消息 #

消费者按照发送消息的先后顺序获取消息,从而实现业务场景中的顺序处理。

顺序消息的顺序关系通过消息组(MessageGroup)判定和识别。

应用场景例子:

  • 撮合交易,先出价先交易
  • 数据实时增量同步

要保证消息顺序,需要满足一定条件:

  • 单一生产者
  • 串行发送
  • 投递顺序,业务方消费消息时需要严格按照接收—处理—应答的语义处理消息,避免因异步处理导致消息乱序
  • 有限重试,顺序消息投递仅在重试次数限定范围内

使用:

创建主题:

mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=FIFO -o true

# 创建顺序消息消费组
mqadmin updateSubGroup -c <cluster_name> -g <consumer_group_name> -n <nameserver_address> -o true
  • ‵-o`: 创建顺序消息主题

先创建顺序消息类型主题,和发送普通消息一致,发送到顺序类型的主题里面即可。

//设置顺序消息的排序分组,该分组尽量保持离散,避免热点排序分组。
setMessageGroup("fifoGroup001")

同一个MessageGroup的消息,如果前序消息没有消费完成,再次调用Receive是获取不到后续消息的。

事务消息 #

支持在分布式场景下保障消息生产和本地事务的最终一致性。

应用场景:

分布式系统里面,一个业务操作,需要变更多个系统,通过事务消息通知其他系统变更,和本地事务绑定确保结果一致。

事务消息处理流程:

  1. 生产者将消息发送至Apache RocketMQ服务端。
  2. 服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。
  3. 生产者开始执行本地事务逻辑。
  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback)。
  5. 在断网或者是生产者应用重启的特殊情况下,若MQ服务端未收到发送者提交的二次确认结果或收到的二次确认结果为Unknown未知状态,服务端将对消息生产者集群中任一生产者实例发起消息回查。
  6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  7. 生产者根据检查到的本地事务的最终状态再次提交二次确认。

创建主题:

mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=TRANSACTION

消息发送重试 #

  • RocketMQ 在客户端SDK中内置请求重试逻辑
  • 生产者在初始化时设置消息发送最大重试次数

广播消费和共享消费 #

同一条消息支持被多个消费者分组订阅,同时对于每个消费者分组可以有多个消费者。

  • 消费组间广播消费 :每个消费者分组只初始化唯一一个消费者,每个消费者可消费到消费者分组内所有的消息,各消费者分组都订阅相同的消息,以此实现单客户端级别的广播一对多推送效果。
  • 消费组内共享消费 :每个消费者分组下初始化了多个消费者,这些消费者共同分担消费者分组内的所有消息,实现消费者分组内流量的水平拆分和均衡负载。

消费重试 #

消费重试指的是,消费者在消费某条消息失败后,Apache RocketMQ 服务端会根据重试策略重新消费该消息,超过一定次数后若还未消费成功,则该消息将不再继续重试,直接被发送到死信队列中。

消息重试的触发条件

  • 消费失败,包括消费者返回消息失败状态标识或抛出非预期异常。

  • 消息处理超时,包括在PushConsumer中排队超时。

最大重试次数由消费者分组创建时的元数据控制。