RocketMQ技术内幕
字数 3273 2025-08-11 21:26:29

RocketMQ 技术内幕详解

一、RocketMQ 核心架构

1.1 基本模型

RocketMQ 是一个简单的 pub/sub 模型的消息队列系统,采用分区设计支持高并发和水平扩展:

  • 一个 Topic 可以有多个生产者
  • 一个消息可以被多个消费者消费
  • 消费者支持集群和广播两种负载均衡模式

1.2 核心组件

RocketMQ 架构分为四大部分:

1.2.1 Producer(生产者)

  • 根据负载均衡策略选择 broker 集群队列投递消息
  • 支持快速失败和重试机制
  • 消息组成要素:
    • Topic:消息主题(一级分类)
    • Body:消息内容
    • Properties:消息属性
    • Tag:消息标签(二级分类)
    • Keys:业务唯一标识码
    • TransactionId:事务消息ID

1.2.2 Consumer(消费者)

  • 支持 push(推)和 pull(拉)两种消费模式
  • 支持集群和广播两种消费模式
  • 提供实时订阅机制

1.2.3 NameServer(命名服务)

  • 轻量级 Topic 路由注册中心
  • 主要功能:
    • Broker 管理:接受注册和心跳检测
    • 路由信息管理:保存完整路由信息供客户端查询
  • 部署特点:
    • 多实例部署,实例间无通信
    • 每个实例保存完整路由信息
    • 无状态节点,可集群部署

1.2.4 Broker(代理服务器)

  • 核心功能:
    • 消息存储、投递和查询
    • 服务高可用保证
  • 部署架构:
    • Master-Slave 架构
    • 一个 Master 可对应多个 Slave
    • 一个 Slave 只能对应一个 Master
    • BrokerId=0 表示 Master,非0表示 Slave

二、集群工作流程

  1. 启动 NameServer:作为路由控制中心,等待连接
  2. 启动 Broker
    • 与所有 NameServer 保持长连接
    • 定时发送心跳包(包含 Broker 和 Topic 信息)
  3. 创建 Topic
    • 指定存储的 Broker
    • 可配置自动创建(线上建议关闭)
  4. 生产者发送消息
    • 连接 NameServer 获取路由
    • 选择队列并与 Broker 建立连接
  5. 消费者接收消息
    • 连接 NameServer 获取路由
    • 直接与 Broker 建立连接消费

三、消息类型详解

3.1 普通消息

  • 发送方式:
    • 同步发送:可靠,有响应
    • 异步发送:可靠,带回调通知
    • 单向传输:不可靠,无响应(适合日志等场景)

3.2 顺序消息

  • 实现原理:
    • 按 shardingKey 分配到同一队列
    • 保证生产顺序性和消费顺序性
  • 生产顺序性要求:
    • 单一生产者
    • 串行发送
  • 严格顺序配置:
    • 创建 Topic 时指定 -o true
    • NameServer 配置 orderMessageEnable=true
    • NameServer 配置 returnOrderTopicConfigToBroker=true

3.3 延迟消息

  • 支持 18 个延迟等级(1s-2h)
  • 实现逻辑:
    • 先定时存储等待触发
    • 时间到达后投递给消费者
  • 注意事项:
    • 避免大量消息设置相同触发时间
    • 使用 message.setDelayTimeLevel(3) 设置

3.4 批量消息

  • 提升吞吐率,减少调用次数
  • 限制:
    • 单批不超过 1MiB
    • 同一批 Topic 必须相同
  • 使用 send(List<Message> messages)

3.5 事务消息

  • 二阶段提交实现分布式事务
  • 执行流程:
    1. 发送半事务消息到 Broker
    2. Broker 持久化后返回 Ack
    3. 执行本地事务
    4. 提交二次确认(Commit/Rollback)
  • 回查机制:
    • 未决状态会触发回查
    • 默认尝试 15 次
  • 注意事项:
    • 使用 TransactionMQProducer
    • 需实现 TransactionListener
    • ProducerGroupName 不能随意设置

四、消费模式与负载均衡

4.1 消费模式

  • 集群模式
    • 消息只需被组内任一消费者处理
    • 可通过扩缩消费者调整能力
  • 广播模式
    • 消息被组内所有消费者处理
    • 扩缩消费者不影响能力
    • 仅推荐小流量场景使用

4.2 负载均衡策略

  • 平均分配
  • 机房优先分配
  • 一致性 hash 分配

4.3 消费位点

  • 集群模式:客户端提交给服务端保存
  • 广播模式:客户端自己保存
  • 重平衡可能导致少量消息重复

五、消息过滤

5.1 Tag 过滤

  • 简单场景使用
  • 一个消息一个 Tag
  • 订阅示例:
    consumer.subscribe("TagFilterTest", "TagA||TagB");
    consumer.subscribe("TagFilterTest", "*");
    

5.2 SQL92 过滤

  • 一个消息可设置多个属性
  • Broker 需配置 enablePropertyFilter=true
  • 使用示例:
    // 生产端
    msg.putUserProperties("a", "1");
    
    // 消费端
    consumer.subscribe("SqlFilterTest", 
      MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" + 
      "and (a is not null and a between 0 and 3)"));
    

六、消息重试与死信队列

6.1 消息重试

  • 仅集群模式支持
  • 配置参数:
    consumer.setMaxReconsumeTimes(10);
    consumer.setSuspendCurrentQueueTimeMillis(5000); // 顺序消费有效
    
  • 实现差异:
    • 顺序消费:客户端本地重试
    • 并发消费:重新投递回服务端
  • 重试 Topic:%RETRY%ConsumerGroupName

6.2 死信队列

  • 重试最大次数后进入
  • Topic 名:%DLQ%ConsumerGroupName
  • 特点:
    • 分区数唯一
    • 消息不再消费

七、最佳实践

  1. Topic 设计

    • 一个应用尽量用一个 Topic
    • 用 Tags 区分子类型
  2. 消息追踪

    • 业务层面设置唯一 Keys
    • 日志记录 SendResult 和 Key
  3. 容错处理

    • 发送失败重试不超过 2 次
    • 异步发送不重试
    • 重要消息可存储到 DB 定时重试
  4. 消费优化

    • 保证消费幂等性
    • 提高并行度:
      • 增加 Consumer 实例
      • 调整 consumeThreadMin/Max
    • 批量消费:设置 consumeMessageBatchMaxSize
    • 非重要消息可跳过
  5. 性能优化

    • 日志类消息使用单向传输
    • 合理设置批量消息大小

八、核心实现原理

8.1 消息存储

  • 存储文件:
    • commitLog:消息主体
    • config:运行配置
    • consumerqueue:消费队列
    • index:消息索引
    • abort:异常文件
    • checkpoint:检查点
  • 写入流程:
    • 追加到 MappedFile 内存映射
    • 后台线程近实时分发到 ConsumeQueue 和 IndexFile
  • 刷盘机制:
    • 同步刷盘
    • 异步刷盘(默认)

8.2 高可用实现

  • DLedger 模式
    • 基于 Raft 协议
    • 自动故障恢复
    • 选举新 Leader
  • 主从同步
    • Master-Slave 架构
    • 数据同步复制

8.3 顺序消息实现

  • 生产端:
    • 单一生产者
    • 串行发送
    • 固定队列分配
  • 消费端:
    • 使用 MessageListenerOrderly
    • 避免并发消费

九、关键配置参数

配置项 说明 建议值
autoCreateTopicEnable 自动创建 Topic 线上 false
orderMessageEnable 顺序消息开关 true(如需)
returnOrderTopicConfigToBroker 顺序消息配置 true(如需)
enablePropertyFilter SQL92 过滤开关 true(如需)
maxReconsumeTimes 最大重试次数 10
consumeThreadMin/Max 消费线程数 根据负载调整
consumeMessageBatchMaxSize 批量消费大小 32(默认)

十、故障排查指南

  1. 消息丢失

    • 检查 SendResult 和 Key
    • 验证 Broker 存储
    • 检查消费位点
  2. 消息堆积

    • 增加消费者实例
    • 提高消费线程
    • 跳过非重要消息
  3. 顺序错乱

    • 验证生产顺序性
    • 检查消费模式
    • 确认队列分配稳定
  4. 事务消息异常

    • 检查本地事务
    • 验证回查机制
    • 确认 ProducerGroup 正确

参考资料

  1. RocketMQ 官方文档
  2. 《RocketMQ技术内幕:RocketMQ架构设计与实现原理》
  3. DLedger 实现原理
RocketMQ 技术内幕详解 一、RocketMQ 核心架构 1.1 基本模型 RocketMQ 是一个简单的 pub/sub 模型的消息队列系统,采用分区设计支持高并发和水平扩展: 一个 Topic 可以有多个生产者 一个消息可以被多个消费者消费 消费者支持集群和广播两种负载均衡模式 1.2 核心组件 RocketMQ 架构分为四大部分: 1.2.1 Producer(生产者) 根据负载均衡策略选择 broker 集群队列投递消息 支持快速失败和重试机制 消息组成要素: Topic:消息主题(一级分类) Body:消息内容 Properties:消息属性 Tag:消息标签(二级分类) Keys:业务唯一标识码 TransactionId:事务消息ID 1.2.2 Consumer(消费者) 支持 push(推)和 pull(拉)两种消费模式 支持集群和广播两种消费模式 提供实时订阅机制 1.2.3 NameServer(命名服务) 轻量级 Topic 路由注册中心 主要功能: Broker 管理:接受注册和心跳检测 路由信息管理:保存完整路由信息供客户端查询 部署特点: 多实例部署,实例间无通信 每个实例保存完整路由信息 无状态节点,可集群部署 1.2.4 Broker(代理服务器) 核心功能: 消息存储、投递和查询 服务高可用保证 部署架构: Master-Slave 架构 一个 Master 可对应多个 Slave 一个 Slave 只能对应一个 Master BrokerId=0 表示 Master,非0表示 Slave 二、集群工作流程 启动 NameServer :作为路由控制中心,等待连接 启动 Broker : 与所有 NameServer 保持长连接 定时发送心跳包(包含 Broker 和 Topic 信息) 创建 Topic : 指定存储的 Broker 可配置自动创建(线上建议关闭) 生产者发送消息 : 连接 NameServer 获取路由 选择队列并与 Broker 建立连接 消费者接收消息 : 连接 NameServer 获取路由 直接与 Broker 建立连接消费 三、消息类型详解 3.1 普通消息 发送方式: 同步发送:可靠,有响应 异步发送:可靠,带回调通知 单向传输:不可靠,无响应(适合日志等场景) 3.2 顺序消息 实现原理: 按 shardingKey 分配到同一队列 保证生产顺序性和消费顺序性 生产顺序性要求: 单一生产者 串行发送 严格顺序配置: 创建 Topic 时指定 -o true NameServer 配置 orderMessageEnable=true NameServer 配置 returnOrderTopicConfigToBroker=true 3.3 延迟消息 支持 18 个延迟等级(1s-2h) 实现逻辑: 先定时存储等待触发 时间到达后投递给消费者 注意事项: 避免大量消息设置相同触发时间 使用 message.setDelayTimeLevel(3) 设置 3.4 批量消息 提升吞吐率,减少调用次数 限制: 单批不超过 1MiB 同一批 Topic 必须相同 使用 send(List<Message> messages) 3.5 事务消息 二阶段提交实现分布式事务 执行流程: 发送半事务消息到 Broker Broker 持久化后返回 Ack 执行本地事务 提交二次确认(Commit/Rollback) 回查机制: 未决状态会触发回查 默认尝试 15 次 注意事项: 使用 TransactionMQProducer 需实现 TransactionListener ProducerGroupName 不能随意设置 四、消费模式与负载均衡 4.1 消费模式 集群模式 : 消息只需被组内任一消费者处理 可通过扩缩消费者调整能力 广播模式 : 消息被组内所有消费者处理 扩缩消费者不影响能力 仅推荐小流量场景使用 4.2 负载均衡策略 平均分配 机房优先分配 一致性 hash 分配 4.3 消费位点 集群模式 :客户端提交给服务端保存 广播模式 :客户端自己保存 重平衡可能导致少量消息重复 五、消息过滤 5.1 Tag 过滤 简单场景使用 一个消息一个 Tag 订阅示例: 5.2 SQL92 过滤 一个消息可设置多个属性 Broker 需配置 enablePropertyFilter=true 使用示例: 六、消息重试与死信队列 6.1 消息重试 仅集群模式支持 配置参数: 实现差异: 顺序消费:客户端本地重试 并发消费:重新投递回服务端 重试 Topic: %RETRY%ConsumerGroupName 6.2 死信队列 重试最大次数后进入 Topic 名: %DLQ%ConsumerGroupName 特点: 分区数唯一 消息不再消费 七、最佳实践 Topic 设计 : 一个应用尽量用一个 Topic 用 Tags 区分子类型 消息追踪 : 业务层面设置唯一 Keys 日志记录 SendResult 和 Key 容错处理 : 发送失败重试不超过 2 次 异步发送不重试 重要消息可存储到 DB 定时重试 消费优化 : 保证消费幂等性 提高并行度: 增加 Consumer 实例 调整 consumeThreadMin/Max 批量消费:设置 consumeMessageBatchMaxSize 非重要消息可跳过 性能优化 : 日志类消息使用单向传输 合理设置批量消息大小 八、核心实现原理 8.1 消息存储 存储文件: commitLog :消息主体 config :运行配置 consumerqueue :消费队列 index :消息索引 abort :异常文件 checkpoint :检查点 写入流程: 追加到 MappedFile 内存映射 后台线程近实时分发到 ConsumeQueue 和 IndexFile 刷盘机制: 同步刷盘 异步刷盘(默认) 8.2 高可用实现 DLedger 模式 : 基于 Raft 协议 自动故障恢复 选举新 Leader 主从同步 : Master-Slave 架构 数据同步复制 8.3 顺序消息实现 生产端: 单一生产者 串行发送 固定队列分配 消费端: 使用 MessageListenerOrderly 避免并发消费 九、关键配置参数 | 配置项 | 说明 | 建议值 | |--------|------|--------| | autoCreateTopicEnable | 自动创建 Topic | 线上 false | | orderMessageEnable | 顺序消息开关 | true(如需) | | returnOrderTopicConfigToBroker | 顺序消息配置 | true(如需) | | enablePropertyFilter | SQL92 过滤开关 | true(如需) | | maxReconsumeTimes | 最大重试次数 | 10 | | consumeThreadMin/Max | 消费线程数 | 根据负载调整 | | consumeMessageBatchMaxSize | 批量消费大小 | 32(默认) | 十、故障排查指南 消息丢失 : 检查 SendResult 和 Key 验证 Broker 存储 检查消费位点 消息堆积 : 增加消费者实例 提高消费线程 跳过非重要消息 顺序错乱 : 验证生产顺序性 检查消费模式 确认队列分配稳定 事务消息异常 : 检查本地事务 验证回查机制 确认 ProducerGroup 正确 参考资料 RocketMQ 官方文档 《RocketMQ技术内幕:RocketMQ架构设计与实现原理》 DLedger 实现原理