以太坊区块数据同步方式简易介绍(下)
字数 1079 2025-08-22 12:23:12
以太坊区块数据同步方式详解
文章前言
本文深入解析以太坊区块数据同步的实现机制,重点介绍区块头部(headers)同步、区块体(bodies)同步和收据(receipts)同步的具体实现方式。通过分析Go语言源码,揭示以太坊下载器(Downloader)模块的核心工作原理。
核心同步流程
以太坊区块数据同步主要分为以下几个关键步骤:
- 区块头部同步:获取远程节点的最新头部块和轴块(pivot block)
- 区块体同步:下载已调度的区块体数据
- 收据同步:下载交易收据数据
源码分析
1. 区块头部同步
fetchHead方法
fetchHead方法用于从远程节点获取链的头部头和之前的轴块(如果可用):
func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, pivot *types.Header, err error) {
p.log.Debug("Retrieving remote chain head")
mode := d.getMode()
// 获取远程节点的最新头部块
latest, _ := p.peer.Head()
fetch := 1
if mode == FastSync {
fetch = 2 // 头部+轴块头
}
go p.peer.RequestHeadersByHash(latest, fetch, fsMinFullBlocks-1, true)
ttl := d.requestTTL()
timeout := time.After(ttl)
for {
select {
case <-d.cancelCh:
return nil, nil, errCanceled
case packet := <-d.headerCh:
// 验证响应来源和内容
if packet.PeerId() != p.id {
log.Debug("Received headers from incorrect peer", "peer", packet.PeerId())
break
}
headers := packet.(*headerPack).headers
if len(headers) == 0 || len(headers) > fetch {
return nil, nil, fmt.Errorf("%w: returned headers %d != requested %d", errBadPeer, len(headers), fetch)
}
head := headers[0]
// 检查点验证
if (mode == FastSync || mode == LightSync) && head.Number.Uint64() < d.checkpoint {
return nil, nil, fmt.Errorf("%w: remote head %d below checkpoint %d", errUnsyncedPeer, head.Number, d.checkpoint)
}
if len(headers) == 1 {
if mode == FastSync && head.Number.Uint64() > uint64(fsMinFullBlocks) {
return nil, nil, fmt.Errorf("%w: no pivot included along head header", errBadPeer)
}
return head, nil, nil
}
// 验证轴块
pivot := headers[1]
if pivot.Number.Uint64() != head.Number.Uint64()-uint64(fsMinFullBlocks) {
return nil, nil, fmt.Errorf("%w: remote pivot %d != requested %d", errInvalidChain, pivot.Number, head.Number.Uint64()-uint64(fsMinFullBlocks))
}
return head, pivot, nil
case <-timeout:
p.log.Debug("Waiting for head header timed out", "elapsed", ttl)
return nil, nil, errTimeout
case <-d.bodyCh:
case <-d.receiptCh:
// 忽略无关数据
}
}
}
processHeaders方法
processHeaders方法处理从输入通道接收到的头部块批次:
func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
var (
rollback uint64
rollbackErr error
mode = d.getMode()
)
defer func() {
if rollback > 0 {
// 执行回滚操作
if err := d.lightchain.SetHead(rollback - 1); err != nil {
log.Error("Failed to roll back chain segment", "head", rollback-1, "err", err)
}
// 记录回滚信息
log.Warn("Rolled back chain segment", "reason", rollbackErr)
}
}()
gotHeaders := false
for {
select {
case <-d.cancelCh:
rollbackErr = errCanceled
return errCanceled
case headers := <-d.headerProcCh:
if len(headers) == 0 {
// 通知其他组件头部处理完成
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
select {
case ch <- false:
case <-d.cancelCh:
}
}
// 验证同步状态
if mode != LightSync {
head := d.blockchain.CurrentBlock()
if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.NumberU64())) > 0 {
return errStallingPeer
}
}
if mode == FastSync || mode == LightSync {
head := d.lightchain.CurrentHeader()
if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {
return errStallingPeer
}
}
rollback = 0
return nil
}
gotHeaders = true
for len(headers) > 0 {
// 分批处理头部块
limit := maxHeadersProcess
if limit > len(headers) {
limit = len(headers)
}
chunk := headers[:limit]
// 快速同步或轻量同步模式下验证头部块
if mode == FastSync || mode == LightSync {
var pivot uint64
d.pivotLock.RLock()
if d.pivotHeader != nil {
pivot = d.pivotHeader.Number.Uint64()
}
d.pivotLock.RUnlock()
frequency := fsHeaderCheckFrequency
if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
frequency = 1
}
// 插入头部链
if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil {
rollbackErr = err
if (mode == FastSync || frequency > 1) && n > 0 && rollback == 0 {
rollback = chunk[0].Number.Uint64()
}
return fmt.Errorf("%w: %v", errInvalidChain, err)
}
// 更新安全网
if mode == FastSync {
head := chunk[len(chunk)-1].Number.Uint64()
if head-rollback > uint64(fsHeaderSafetyNet) {
rollback = head - uint64(fsHeaderSafetyNet)
} else {
rollback = 1
}
}
}
// 全同步或快速同步模式下调度区块内容获取
if mode == FullSync || mode == FastSync {
// 检查待处理队列
for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
select {
case <-d.cancelCh:
rollbackErr = errCanceled
return errCanceled
case <-time.After(time.Second):
}
}
// 调度区块内容获取
inserts := d.queue.Schedule(chunk, origin)
if len(inserts) != len(chunk) {
rollbackErr = fmt.Errorf("stale headers: len inserts %v len(chunk) %v", len(inserts), len(chunk))
return fmt.Errorf("%w: stale headers", errBadPeer)
}
}
headers = headers[limit:]
origin += uint64(limit)
}
// 更新同步统计
d.syncStatsLock.Lock()
if d.syncStatsChainHeight < origin {
d.syncStatsChainHeight = origin - 1
}
d.syncStatsLock.Unlock()
// 通知内容下载器
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
select {
case ch <- true:
default:
}
}
}
}
}
2. 区块体同步
fetchBodies方法
fetchBodies方法用于迭代下载已调度的区块体:
func (d *Downloader) fetchBodies(from uint64) error {
log.Debug("Downloading block bodies", "origin", from)
var (
deliver = func(packet dataPack) (int, error) {
pack := packet.(*bodyPack)
return d.queue.DeliverBodies(pack.peerID, pack.transactions, pack.uncles)
}
expire = func() map[string]int {
return d.queue.ExpireBodies(d.requestTTL())
}
fetch = func(p *peerConnection, req *fetchRequest) error {
return p.FetchBodies(req)
}
capacity = func(p *peerConnection) int {
return p.BlockCapacity(d.requestRTT())
}
setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) {
p.SetBodiesIdle(accepted, deliveryTime)
}
)
err := d.fetchParts(d.bodyCh, deliver, d.bodyWakeCh, expire,
d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ReserveBodies,
d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity,
d.peers.BodyIdlePeers, setIdle, "bodies")
log.Debug("Block body download terminated", "err", err)
return err
}
DeliverBodies方法
func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) error {
return d.deliver(d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter)
}
3. 收据同步
fetchReceipts方法
fetchReceipts方法用于迭代下载已调度的收据:
func (d *Downloader) fetchReceipts(from uint64) error {
log.Debug("Downloading transaction receipts", "origin", from)
var (
deliver = func(packet dataPack) (int, error) {
pack := packet.(*receiptPack)
return d.queue.DeliverReceipts(pack.peerID, pack.receipts)
}
expire = func() map[string]int {
return d.queue.ExpireReceipts(d.requestTTL())
}
fetch = func(p *peerConnection, req *fetchRequest) error {
return p.FetchReceipts(req)
}
capacity = func(p *peerConnection) int {
return p.ReceiptCapacity(d.requestRTT())
}
setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) {
p.SetReceiptsIdle(accepted, deliveryTime)
}
)
err := d.fetchParts(d.receiptCh, deliver, d.receiptWakeCh, expire,
d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ReserveReceipts,
d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity,
d.peers.ReceiptIdlePeers, setIdle, "receipts")
log.Debug("Transaction receipt download terminated", "err", err)
return err
}
DeliverReceipts方法
func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) error {
return d.deliver(d.receiptCh, &receiptPack{id, receipts}, receiptInMeter, receiptDropMeter)
}
通用数据获取框架
fetchParts方法
fetchParts方法是下载器的核心框架,用于处理各种类型数据的获取:
func (d *Downloader) fetchParts(
deliveryCh chan dataPack, // 数据包传递通道
deliver func(dataPack) (int, error), // 数据交付函数
wakeCh chan bool, // 唤醒通道
expire func() map[string]int, // 过期检查函数
pending func() int, // 待处理项计数
inFlight func() bool, // 检查是否有在途请求
reserve func(*peerConnection, int) (*fetchRequest, bool, bool), // 资源预留函数
fetchHook func([]*types.Header), // 获取钩子函数
fetch func(*peerConnection, *fetchRequest) error, // 获取函数
cancel func(*fetchRequest), // 取消函数
capacity func(*peerConnection) int, // 容量检查函数
idle func() ([]*peerConnection, int), // 空闲节点检查
setIdle func(*peerConnection, int, time.Time), // 设置空闲状态
kind string) error { // 数据类型标识
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
update := make(chan struct{}, 1)
finished := false
for {
select {
case <-d.cancelCh:
return errCanceled
case packet := <-deliveryCh:
deliveryTime := time.Now()
if peer := d.peers.Peer(packet.PeerId()); peer != nil {
accepted, err := deliver(packet)
if errors.Is(err, errInvalidChain) {
return err
}
if !errors.Is(err, errStaleDelivery) {
setIdle(peer, accepted, deliveryTime)
}
// 记录交付状态
switch {
case err == nil && packet.Items() == 0:
peer.log.Trace("Requested data not delivered", "type", kind)
case err == nil:
peer.log.Trace("Delivered new batch of data", "type", kind, "count", packet.Stats())
default:
peer.log.Debug("Failed to deliver retrieved data", "type", kind, "err", err)
}
}
select {
case update <- struct{}{}:
default:
}
case cont := <-wakeCh:
if !cont {
finished = true
}
select {
case update <- struct{}{}:
default:
}
case <-ticker.C:
select {
case update <- struct{}{}:
default:
}
case <-update:
if d.peers.Len() == 0 {
return errNoPeers
}
// 检查超时请求
for pid, fails := range expire() {
if peer := d.peers.Peer(pid); peer != nil {
if fails > 2 {
peer.log.Trace("Data delivery timed out", "type", kind)
setIdle(peer, 0, time.Now())
} else {
peer.log.Debug("Stalling delivery, dropping", "type", kind)
if d.dropPeer != nil {
d.dropPeer(pid)
if pid == d.cancelPeer {
d.cancel()
return errTimeout
}
}
}
}
}
// 检查是否完成
if pending() == 0 {
if !inFlight() && finished {
log.Debug("Data fetching completed", "type", kind)
return nil
}
break
}
// 向空闲节点发送请求
progressed, throttled, running := false, false, inFlight()
idles, total := idle()
pendCount := pending()
for _, peer := range idles {
if throttled {
break
}
if pendCount = pending(); pendCount == 0 {
break
}
request, progress, throttle := reserve(peer, capacity(peer))
if progress {
progressed = true
}
if throttle {
throttled = true
}
if request == nil {
continue
}
if fetchHook != nil {
fetchHook(request.Headers)
}
if err := fetch(peer, request); err != nil {
panic(fmt.Sprintf("%v: %s fetch assignment failed", peer, kind))
}
running = true
}
if !progressed && !throttled && !running && len(idles) == total && pendCount > 0 {
return errPeersUnavailable
}
}
}
}
deliver方法
deliver是通用的数据交付方法:
func (d *Downloader) deliver(destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) error {
inMeter.Mark(int64(packet.Items()))
defer func() {
if err != nil {
dropMeter.Mark(int64(packet.Items()))
}
}()
d.cancelLock.RLock()
cancel := d.cancelCh
d.cancelLock.RUnlock()
if cancel == nil {
return errNoSyncActive
}
select {
case destCh <- packet:
return nil
case <-cancel:
return errNoSyncActive
}
}
同步模式
以太坊支持三种同步模式:
- FullSync:完全同步,下载所有区块数据
- FastSync:快速同步,只下载最近的区块体,但验证所有状态
- LightSync:轻量同步,只下载区块头
关键设计要点
- 分批次处理:所有数据都采用分批次获取和处理的方式,避免内存占用过大
- 超时处理:对每个请求都设置了超时机制,防止网络问题导致同步停滞
- 优先级管理:通过队列管理不同数据的获取优先级
- 错误处理:完善的错误处理机制,包括peer评分、数据验证等
- 并发控制:通过通道(channel)实现高效的并发控制
- 资源管理:合理管理网络和计算资源,避免过度占用
总结
以太坊的区块数据同步机制是一个高度优化的分布式系统,通过上述代码分析可以看出:
- 同步过程分为头部、区块体和收据三个主要部分
- 采用统一的数据获取框架
fetchParts处理不同类型数据 - 完善的错误处理和超时机制保证同步的可靠性
- 合理的资源管理和并发控制确保高效运行
- 支持多种同步模式适应不同场景需求
这种设计使得以太坊节点能够在复杂的网络环境中高效、可靠地同步区块链数据,为整个网络的稳定运行提供了坚实基础。