通过源码分析RocketMQ主从复制原理
字数 2002 2025-08-11 08:35:44

RocketMQ主从复制原理深入解析

一、主从复制概述

RocketMQ Broker的主从复制主要包括两部分内容:

  1. CommitLog的消息复制:发生在消息写入时,支持同步和异步两种方式
  2. Broker元数据的复制:从服务器每隔10s从主Broker获取并更新配置

重要特征:RocketMQ主从同步不具备主从切换功能,主节点宕机后从节点不会接管消息发送,但可以提供消息读取。

二、CommitLog消息复制

2.1 整体流程

异步复制流程

  1. Producer发送消息到Broker Master进行存储
  2. Broker Master启动并在指定端口监听
  3. Broker Slave启动并主动连接Master
  4. Slave每隔5s向Master拉取消息(首次拉取获取本地最大偏移量)
  5. Master解析请求并返回数据
  6. Slave将消息写入本地CommitLog并汇报拉取进度

核心入口

public void start() throws Exception {
    this.acceptSocketService.beginAccept(); // Master启动接收请求
    this.acceptSocketService.start();
    this.groupTransferService.start();     // 同步复制线程
    this.haClient.start();                 // Slave启动
}

2.2 Master端实现

初始化

public void beginAccept() throws Exception {
    this.serverSocketChannel = ServerSocketChannel.open();
    this.selector = RemotingUtil.openSelector();
    this.serverSocketChannel.socket().setReuseAddress(true);
    this.serverSocketChannel.socket().bind(this.socketAddressListen);
    this.serverSocketChannel.configureBlocking(false);
    this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}

处理连接

  • 使用Java原生NIO(非Netty)
  • 为每个连接创建HAConnection对象
  • HAConnection包含两个核心线程:
    • WriteSocketService:发送CommitLog数据到Slave
    • ReadSocketService:读取Slave上报的offset

2.3 Slave端实现

主循环逻辑

while (!this.isStopped()) {
    if (this.connectMaster()) {
        if (this.isTimeToReportOffset()) { // 每5s上报一次offset
            this.reportSlaveMaxOffset(this.currentReportedOffset);
        }
        this.selector.select(1000);
        this.processReadEvent(); // 处理Master返回的消息数据
        // 检查连接健康状态
    } else {
        this.waitForRunning(1000 * 5); // 等待5s重试
    }
}

关键方法

  • connectMaster():通过NIO连接Master
  • getMaxPhyOffset():获取本地CommitLog最大偏移量
  • reportSlaveMaxOffset():上报offset到Master

2.4 Master数据处理

ReadSocketService

  • 从ByteBuffer解析Slave请求的offset
  • 设置slaveRequestOffsetslaveAckOffset
  • 如果是同步复制,通知GroupTransferService

WriteSocketService

  1. 确定传输起始位置nextTransferFromWhere
  2. 获取CommitLog数据:
    SelectMappedBufferResult selectResult = getCommitLogData(this.nextTransferFromWhere);
    
  3. 构建Header(offset + size)
  4. 通过NIO发送数据

2.5 Slave数据处理

processReadEvent核心逻辑

  1. 从ByteBuffer读取CommitLog数据
  2. 写入本地CommitLog:
    HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
    
  3. 上报新的offset

2.6 同步复制实现

同步复制流程

  1. Producer发送消息到Broker Master
  2. Master写消息线程唤醒WriteSocketService
  3. 创建GroupCommitRequest并等待完成
  4. Slave接收数据并返回新offset
  5. Master更新push2SlaveMaxOffset并判断是否完成

关键代码

// 创建同步请求
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
service.getWaitNotifyObject().wakeupAll(); // 唤醒WriteSocketService

// 等待同步完成
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());

GroupTransferService

  • 检查主从offset差异:
    boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
    
  • 最多等待5s(Slave心跳间隔默认5s)
  • 完成后通过CountDownLatch唤醒

三、元数据复制

Broker从服务器每隔10s从主服务器同步以下元数据:

public void syncAll() {
    this.syncTopicConfig();          // 主题配置
    this.syncConsumerOffset();       // 消费进度
    this.syncDelayOffset();          // 延迟消息偏移量
    this.syncSubscriptionGroupConfig(); // 订阅组配置
}

3.1 Topic配置同步

  • 从Master获取TopicConfigSerializeWrapper
  • 比较DataVersion决定是否更新
  • 持久化到topics.json

3.2 消费进度同步

  • 获取ConsumerOffsetSerializeWrapper
  • 更新本地offsetTable
  • 持久化到consumerOffset.json

3.3 延迟消息偏移量

  • 获取delayOffset字符串
  • 写入到指定文件路径

3.4 订阅组配置

  • 获取SubscriptionGroupWrapper
  • 比较DataVersion决定是否更新
  • 持久化订阅组信息

四、设计思想总结

  1. 分离设计:元数据与程序数据分开管理
  2. 拉取模式:从节点主动拉取数据(带offset)
  3. 复制策略:支持同步/异步两种方式,可配置
  4. 线程隔离:复制线程与主业务线程分离
  5. 健康检查:通过offset差异判断Slave可用性

五、关键配置参数

参数名 默认值 说明
haSendHeartbeatInterval 5000ms Slave上报offset间隔
haHousekeepingInterval 20000ms 连接健康检查间隔
haTransferBatchSize 32KB 每次传输数据量
syncFlushTimeout 5000ms 同步复制超时时间
mappedFileSizeCommitLog 1GB CommitLog文件大小
RocketMQ主从复制原理深入解析 一、主从复制概述 RocketMQ Broker的主从复制主要包括两部分内容: CommitLog的消息复制 :发生在消息写入时,支持同步和异步两种方式 Broker元数据的复制 :从服务器每隔10s从主Broker获取并更新配置 重要特征:RocketMQ主从同步 不具备主从切换功能 ,主节点宕机后从节点不会接管消息发送,但可以提供消息读取。 二、CommitLog消息复制 2.1 整体流程 异步复制流程 : Producer发送消息到Broker Master进行存储 Broker Master启动并在指定端口监听 Broker Slave启动并主动连接Master Slave每隔5s向Master拉取消息(首次拉取获取本地最大偏移量) Master解析请求并返回数据 Slave将消息写入本地CommitLog并汇报拉取进度 核心入口 : 2.2 Master端实现 初始化 : 处理连接 : 使用Java原生NIO(非Netty) 为每个连接创建HAConnection对象 HAConnection包含两个核心线程: WriteSocketService :发送CommitLog数据到Slave ReadSocketService :读取Slave上报的offset 2.3 Slave端实现 主循环逻辑 : 关键方法 : connectMaster() :通过NIO连接Master getMaxPhyOffset() :获取本地CommitLog最大偏移量 reportSlaveMaxOffset() :上报offset到Master 2.4 Master数据处理 ReadSocketService : 从ByteBuffer解析Slave请求的offset 设置 slaveRequestOffset 和 slaveAckOffset 如果是同步复制,通知 GroupTransferService WriteSocketService : 确定传输起始位置 nextTransferFromWhere 获取CommitLog数据: 构建Header(offset + size) 通过NIO发送数据 2.5 Slave数据处理 processReadEvent核心逻辑 : 从ByteBuffer读取CommitLog数据 写入本地CommitLog: 上报新的offset 2.6 同步复制实现 同步复制流程 : Producer发送消息到Broker Master Master写消息线程唤醒WriteSocketService 创建GroupCommitRequest并等待完成 Slave接收数据并返回新offset Master更新 push2SlaveMaxOffset 并判断是否完成 关键代码 : GroupTransferService : 检查主从offset差异: 最多等待5s(Slave心跳间隔默认5s) 完成后通过CountDownLatch唤醒 三、元数据复制 Broker从服务器每隔10s从主服务器同步以下元数据: 3.1 Topic配置同步 从Master获取 TopicConfigSerializeWrapper 比较DataVersion决定是否更新 持久化到 topics.json 3.2 消费进度同步 获取 ConsumerOffsetSerializeWrapper 更新本地offsetTable 持久化到 consumerOffset.json 3.3 延迟消息偏移量 获取delayOffset字符串 写入到指定文件路径 3.4 订阅组配置 获取 SubscriptionGroupWrapper 比较DataVersion决定是否更新 持久化订阅组信息 四、设计思想总结 分离设计 :元数据与程序数据分开管理 拉取模式 :从节点主动拉取数据(带offset) 复制策略 :支持同步/异步两种方式,可配置 线程隔离 :复制线程与主业务线程分离 健康检查 :通过offset差异判断Slave可用性 五、关键配置参数 | 参数名 | 默认值 | 说明 | |--------|--------|------| | haSendHeartbeatInterval | 5000ms | Slave上报offset间隔 | | haHousekeepingInterval | 20000ms | 连接健康检查间隔 | | haTransferBatchSize | 32KB | 每次传输数据量 | | syncFlushTimeout | 5000ms | 同步复制超时时间 | | mappedFileSizeCommitLog | 1GB | CommitLog文件大小 |