通过源码分析RocketMQ主从复制原理
字数 2002 2025-08-11 08:35:44
RocketMQ主从复制原理深入解析
一、主从复制概述
RocketMQ Broker的主从复制主要包括两部分内容:
- CommitLog的消息复制:发生在消息写入时,支持同步和异步两种方式
- Broker元数据的复制:从服务器每隔10s从主Broker获取并更新配置
重要特征:RocketMQ主从同步不具备主从切换功能,主节点宕机后从节点不会接管消息发送,但可以提供消息读取。
二、CommitLog消息复制
2.1 整体流程
异步复制流程:
- Producer发送消息到Broker Master进行存储
- Broker Master启动并在指定端口监听
- Broker Slave启动并主动连接Master
- Slave每隔5s向Master拉取消息(首次拉取获取本地最大偏移量)
- Master解析请求并返回数据
- Slave将消息写入本地CommitLog并汇报拉取进度
核心入口:
public void start() throws Exception {
this.acceptSocketService.beginAccept(); // Master启动接收请求
this.acceptSocketService.start();
this.groupTransferService.start(); // 同步复制线程
this.haClient.start(); // Slave启动
}
2.2 Master端实现
初始化:
public void beginAccept() throws Exception {
this.serverSocketChannel = ServerSocketChannel.open();
this.selector = RemotingUtil.openSelector();
this.serverSocketChannel.socket().setReuseAddress(true);
this.serverSocketChannel.socket().bind(this.socketAddressListen);
this.serverSocketChannel.configureBlocking(false);
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}
处理连接:
- 使用Java原生NIO(非Netty)
- 为每个连接创建HAConnection对象
- HAConnection包含两个核心线程:
WriteSocketService:发送CommitLog数据到SlaveReadSocketService:读取Slave上报的offset
2.3 Slave端实现
主循环逻辑:
while (!this.isStopped()) {
if (this.connectMaster()) {
if (this.isTimeToReportOffset()) { // 每5s上报一次offset
this.reportSlaveMaxOffset(this.currentReportedOffset);
}
this.selector.select(1000);
this.processReadEvent(); // 处理Master返回的消息数据
// 检查连接健康状态
} else {
this.waitForRunning(1000 * 5); // 等待5s重试
}
}
关键方法:
connectMaster():通过NIO连接MastergetMaxPhyOffset():获取本地CommitLog最大偏移量reportSlaveMaxOffset():上报offset到Master
2.4 Master数据处理
ReadSocketService:
- 从ByteBuffer解析Slave请求的offset
- 设置
slaveRequestOffset和slaveAckOffset - 如果是同步复制,通知
GroupTransferService
WriteSocketService:
- 确定传输起始位置
nextTransferFromWhere - 获取CommitLog数据:
SelectMappedBufferResult selectResult = getCommitLogData(this.nextTransferFromWhere); - 构建Header(offset + size)
- 通过NIO发送数据
2.5 Slave数据处理
processReadEvent核心逻辑:
- 从ByteBuffer读取CommitLog数据
- 写入本地CommitLog:
HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData); - 上报新的offset
2.6 同步复制实现
同步复制流程:
- Producer发送消息到Broker Master
- Master写消息线程唤醒WriteSocketService
- 创建GroupCommitRequest并等待完成
- Slave接收数据并返回新offset
- Master更新
push2SlaveMaxOffset并判断是否完成
关键代码:
// 创建同步请求
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
service.getWaitNotifyObject().wakeupAll(); // 唤醒WriteSocketService
// 等待同步完成
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
GroupTransferService:
- 检查主从offset差异:
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); - 最多等待5s(Slave心跳间隔默认5s)
- 完成后通过CountDownLatch唤醒
三、元数据复制
Broker从服务器每隔10s从主服务器同步以下元数据:
public void syncAll() {
this.syncTopicConfig(); // 主题配置
this.syncConsumerOffset(); // 消费进度
this.syncDelayOffset(); // 延迟消息偏移量
this.syncSubscriptionGroupConfig(); // 订阅组配置
}
3.1 Topic配置同步
- 从Master获取
TopicConfigSerializeWrapper - 比较DataVersion决定是否更新
- 持久化到
topics.json
3.2 消费进度同步
- 获取
ConsumerOffsetSerializeWrapper - 更新本地offsetTable
- 持久化到
consumerOffset.json
3.3 延迟消息偏移量
- 获取delayOffset字符串
- 写入到指定文件路径
3.4 订阅组配置
- 获取
SubscriptionGroupWrapper - 比较DataVersion决定是否更新
- 持久化订阅组信息
四、设计思想总结
- 分离设计:元数据与程序数据分开管理
- 拉取模式:从节点主动拉取数据(带offset)
- 复制策略:支持同步/异步两种方式,可配置
- 线程隔离:复制线程与主业务线程分离
- 健康检查:通过offset差异判断Slave可用性
五、关键配置参数
| 参数名 | 默认值 | 说明 |
|---|---|---|
| haSendHeartbeatInterval | 5000ms | Slave上报offset间隔 |
| haHousekeepingInterval | 20000ms | 连接健康检查间隔 |
| haTransferBatchSize | 32KB | 每次传输数据量 |
| syncFlushTimeout | 5000ms | 同步复制超时时间 |
| mappedFileSizeCommitLog | 1GB | CommitLog文件大小 |