RocketMQ技术内幕
字数 3273 2025-08-11 21:26:29
RocketMQ 技术内幕详解
一、RocketMQ 核心架构
1.1 基本模型
RocketMQ 是一个简单的 pub/sub 模型的消息队列系统,采用分区设计支持高并发和水平扩展:
- 一个 Topic 可以有多个生产者
- 一个消息可以被多个消费者消费
- 消费者支持集群和广播两种负载均衡模式
1.2 核心组件
RocketMQ 架构分为四大部分:
1.2.1 Producer(生产者)
- 根据负载均衡策略选择 broker 集群队列投递消息
- 支持快速失败和重试机制
- 消息组成要素:
- Topic:消息主题(一级分类)
- Body:消息内容
- Properties:消息属性
- Tag:消息标签(二级分类)
- Keys:业务唯一标识码
- TransactionId:事务消息ID
1.2.2 Consumer(消费者)
- 支持 push(推)和 pull(拉)两种消费模式
- 支持集群和广播两种消费模式
- 提供实时订阅机制
1.2.3 NameServer(命名服务)
- 轻量级 Topic 路由注册中心
- 主要功能:
- Broker 管理:接受注册和心跳检测
- 路由信息管理:保存完整路由信息供客户端查询
- 部署特点:
- 多实例部署,实例间无通信
- 每个实例保存完整路由信息
- 无状态节点,可集群部署
1.2.4 Broker(代理服务器)
- 核心功能:
- 消息存储、投递和查询
- 服务高可用保证
- 部署架构:
- Master-Slave 架构
- 一个 Master 可对应多个 Slave
- 一个 Slave 只能对应一个 Master
- BrokerId=0 表示 Master,非0表示 Slave
二、集群工作流程
- 启动 NameServer:作为路由控制中心,等待连接
- 启动 Broker:
- 与所有 NameServer 保持长连接
- 定时发送心跳包(包含 Broker 和 Topic 信息)
- 创建 Topic:
- 指定存储的 Broker
- 可配置自动创建(线上建议关闭)
- 生产者发送消息:
- 连接 NameServer 获取路由
- 选择队列并与 Broker 建立连接
- 消费者接收消息:
- 连接 NameServer 获取路由
- 直接与 Broker 建立连接消费
三、消息类型详解
3.1 普通消息
- 发送方式:
- 同步发送:可靠,有响应
- 异步发送:可靠,带回调通知
- 单向传输:不可靠,无响应(适合日志等场景)
3.2 顺序消息
- 实现原理:
- 按 shardingKey 分配到同一队列
- 保证生产顺序性和消费顺序性
- 生产顺序性要求:
- 单一生产者
- 串行发送
- 严格顺序配置:
- 创建 Topic 时指定
-o true - NameServer 配置
orderMessageEnable=true - NameServer 配置
returnOrderTopicConfigToBroker=true
- 创建 Topic 时指定
3.3 延迟消息
- 支持 18 个延迟等级(1s-2h)
- 实现逻辑:
- 先定时存储等待触发
- 时间到达后投递给消费者
- 注意事项:
- 避免大量消息设置相同触发时间
- 使用
message.setDelayTimeLevel(3)设置
3.4 批量消息
- 提升吞吐率,减少调用次数
- 限制:
- 单批不超过 1MiB
- 同一批 Topic 必须相同
- 使用
send(List<Message> messages)
3.5 事务消息
- 二阶段提交实现分布式事务
- 执行流程:
- 发送半事务消息到 Broker
- Broker 持久化后返回 Ack
- 执行本地事务
- 提交二次确认(Commit/Rollback)
- 回查机制:
- 未决状态会触发回查
- 默认尝试 15 次
- 注意事项:
- 使用
TransactionMQProducer - 需实现
TransactionListener - ProducerGroupName 不能随意设置
- 使用
四、消费模式与负载均衡
4.1 消费模式
- 集群模式:
- 消息只需被组内任一消费者处理
- 可通过扩缩消费者调整能力
- 广播模式:
- 消息被组内所有消费者处理
- 扩缩消费者不影响能力
- 仅推荐小流量场景使用
4.2 负载均衡策略
- 平均分配
- 机房优先分配
- 一致性 hash 分配
4.3 消费位点
- 集群模式:客户端提交给服务端保存
- 广播模式:客户端自己保存
- 重平衡可能导致少量消息重复
五、消息过滤
5.1 Tag 过滤
- 简单场景使用
- 一个消息一个 Tag
- 订阅示例:
consumer.subscribe("TagFilterTest", "TagA||TagB"); consumer.subscribe("TagFilterTest", "*");
5.2 SQL92 过滤
- 一个消息可设置多个属性
- Broker 需配置
enablePropertyFilter=true - 使用示例:
// 生产端 msg.putUserProperties("a", "1"); // 消费端 consumer.subscribe("SqlFilterTest", MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" + "and (a is not null and a between 0 and 3)"));
六、消息重试与死信队列
6.1 消息重试
- 仅集群模式支持
- 配置参数:
consumer.setMaxReconsumeTimes(10); consumer.setSuspendCurrentQueueTimeMillis(5000); // 顺序消费有效 - 实现差异:
- 顺序消费:客户端本地重试
- 并发消费:重新投递回服务端
- 重试 Topic:
%RETRY%ConsumerGroupName
6.2 死信队列
- 重试最大次数后进入
- Topic 名:
%DLQ%ConsumerGroupName - 特点:
- 分区数唯一
- 消息不再消费
七、最佳实践
-
Topic 设计:
- 一个应用尽量用一个 Topic
- 用 Tags 区分子类型
-
消息追踪:
- 业务层面设置唯一 Keys
- 日志记录 SendResult 和 Key
-
容错处理:
- 发送失败重试不超过 2 次
- 异步发送不重试
- 重要消息可存储到 DB 定时重试
-
消费优化:
- 保证消费幂等性
- 提高并行度:
- 增加 Consumer 实例
- 调整
consumeThreadMin/Max
- 批量消费:设置
consumeMessageBatchMaxSize - 非重要消息可跳过
-
性能优化:
- 日志类消息使用单向传输
- 合理设置批量消息大小
八、核心实现原理
8.1 消息存储
- 存储文件:
commitLog:消息主体config:运行配置consumerqueue:消费队列index:消息索引abort:异常文件checkpoint:检查点
- 写入流程:
- 追加到 MappedFile 内存映射
- 后台线程近实时分发到 ConsumeQueue 和 IndexFile
- 刷盘机制:
- 同步刷盘
- 异步刷盘(默认)
8.2 高可用实现
- DLedger 模式:
- 基于 Raft 协议
- 自动故障恢复
- 选举新 Leader
- 主从同步:
- Master-Slave 架构
- 数据同步复制
8.3 顺序消息实现
- 生产端:
- 单一生产者
- 串行发送
- 固定队列分配
- 消费端:
- 使用
MessageListenerOrderly - 避免并发消费
- 使用
九、关键配置参数
| 配置项 | 说明 | 建议值 |
|---|---|---|
| autoCreateTopicEnable | 自动创建 Topic | 线上 false |
| orderMessageEnable | 顺序消息开关 | true(如需) |
| returnOrderTopicConfigToBroker | 顺序消息配置 | true(如需) |
| enablePropertyFilter | SQL92 过滤开关 | true(如需) |
| maxReconsumeTimes | 最大重试次数 | 10 |
| consumeThreadMin/Max | 消费线程数 | 根据负载调整 |
| consumeMessageBatchMaxSize | 批量消费大小 | 32(默认) |
十、故障排查指南
-
消息丢失:
- 检查 SendResult 和 Key
- 验证 Broker 存储
- 检查消费位点
-
消息堆积:
- 增加消费者实例
- 提高消费线程
- 跳过非重要消息
-
顺序错乱:
- 验证生产顺序性
- 检查消费模式
- 确认队列分配稳定
-
事务消息异常:
- 检查本地事务
- 验证回查机制
- 确认 ProducerGroup 正确
参考资料
- RocketMQ 官方文档
- 《RocketMQ技术内幕:RocketMQ架构设计与实现原理》
- DLedger 实现原理