以太坊区块数据同步方式详解
文章前言
随着区块链技术的发展和应用的普及,以太坊成为最受欢迎和广泛应用的智能合约平台之一。在以太坊网络中,区块数据的同步是保证网络安全性和一致性的关键步骤。了解以太坊区块数据的同步方式对于理解以太坊网络的工作原理以及参与其中的开发和应用具有重要意义。
节点角色
在以太坊网络中有几种不同的节点角色,每个角色都扮演着不同的功能和责任:
全节点(Full Node)
全节点是以太坊网络中最完整的节点,它存储并维护了整个区块链的完整副本。全节点具有以下主要功能:
- 参与共识过程,包括挖矿和验证区块
- 验证交易的有效性和执行智能合约操作
- 通过与其他全节点进行通信,传播交易和区块数据
- 存储所有的区块链数据,包括区块头、交易和智能合约代码
全节点在以太坊网络中起着重要的作用,它们提供了高度的安全性和可靠性,但需要占用大量的存储空间和网络带宽。
轻节点(Light Node)
轻节点是以太坊网络中的一种轻量级节点,它只存储了一小部分区块链数据,主要是为了实现轻量级的客户端应用。轻节点具有以下主要功能:
- 存储少量的区块链数据,如区块头和少数的交易数据
- 通过与全节点进行通信,获取缺失的区块数据和状态数据
- 验证交易的有效性,但不执行智能合约操作
- 支持客户端应用程序的开发和使用
轻节点占用较少的存储空间和网络带宽,同步速度相对较快,适合移动设备和资源受限的环境。
验证者节点(Validator Node)
验证者节点是以太坊2.0网络中的一种节点角色,它们参与以太坊区块链的共识和验证过程。验证者节点具有以下主要功能:
- 存储并验证区块链的一部分数据,包括区块头和少量的交易数据
- 参与共识过程,通过质押资金和投票来决定下一个区块的验证者
- 验证交易的有效性,但不执行智能合约操作
- 通过与其他验证者节点进行通信,传播区块和共识相关的信息
验证者节点在以太坊2.0中引入了Proof of Stake(PoS)共识算法,取代了以太坊1.0中的Proof of Work(PoW)算法,提供了更高的可扩展性和能源效率。
同步方式
以太坊区块数据的同步方式主要包括全节点同步、轻节点同步和验证者节点同步:
全节点同步
全节点是以太坊网络中最完整的节点,它存储并维护了完整的区块链数据。全节点通过从其他全节点请求区块数据来进行同步。全节点同步的过程称为全节点同步算法,也被称为"以太坊客户端"。
全节点同步的步骤如下:
- 初始化:全节点通过从创世区块开始,逐个请求区块数据来初始化自己的区块链数据库。
- 区块请求:全节点向其他全节点发送区块请求,获取缺失的区块数据。
- 区块验证:全节点验证收到的区块数据的有效性,包括校验区块的哈希值、交易的签名和执行合约的结果。
- 区块应用:全节点将验证通过的区块数据应用到自己的区块链数据库中,更新状态和交易历史。
全节点同步方式的优点是可以获得最完整和可信的区块链数据,但它需要大量的存储空间和网络带宽,并且同步过程相对较慢。
轻节点同步
轻节点是以太坊网络中的一种轻量级节点,它只存储一小部分区块链数据。轻节点通过与特定的全节点进行通信来进行同步,而无需存储整个区块链数据。
轻节点同步的步骤如下:
- 初始化:轻节点通过连接到一个或多个全节点,获取初始区块数据
- 区块请求:轻节点向全节点发送区块请求,获取缺失的区块数据
- 区块验证:轻节点验证收到的区块数据的有效性,包括校验区块的哈希值、交易的签名和执行合约的结果
- 状态获取:轻节点通过与全节点的交互,获取执行合约所需的状态数据
- 交易验证:轻节点验证交易的有效性,但不执行合约操作
轻节点同步方式的优点是占用更少的存储空间和网络带宽,同步速度相对较快,但它无法直接验证交易结果,需要依赖全节点来提供状态数据。
验证者节点同步
验证者节点是以太坊2.0网络中的一种节点,主要参与共识和验证区块数据的过程。验证者节点同步与全节点同步略有不同,因为以太坊2.0使用了Proof of Stake(PoS)共识算法。
验证者节点同步的步骤如下:
- 初始化:验证者节点通过连接到网络上的其他验证者节点,获取初始区块数据和验证者集合
- 区块请求:验证者节点向其他验证者节点请求缺失的区块数据
- 区块验证:验证者节点验证收到的区块数据的有效性,包括校验区块的哈希值、交易的签名和执行合约的结果
- 状态获取:验证者节点通过与其他验证者节点的交互,获取执行合约所需的状态数据
- 交易验证:验证者节点验证交易的有效性,但不执行合约操作
- 共识参与:验证者节点参与共识过程,例如通过质押和投票来决定下一个区块的验证者
源码分析
数据结构
downloader数据结构如下所示:
type Downloader struct {
// WARNING: The `rttEstimate` and `rttConfidence` fields are accessed atomically.
// On 32 bit platforms, only 64-bit aligned fields can be atomic. The struct is
// guaranteed to be so aligned, so take advantage of that. For more information,
// see https://golang.org/pkg/sync/atomic/#pkg-note-BUG.
rttEstimate uint64 // Round trip time to target for download requests
rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
mode uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode
mux *event.TypeMux // Event multiplexer to announce sync operation events
checkpoint uint64 // Checkpoint block number to enforce head against (e.g. fast sync)
genesis uint64 // Genesis block number to limit sync to (e.g. light client CHT)
queue *queue // Scheduler for selecting the hashes to download
peers *peerSet // Set of active peers from which download can proceed
stateDB ethdb.Database // Database to state sync into (and deduplicate via)
stateBloom *trie.SyncBloom // Bloom filter for fast trie node and contract code existence checks
// Statistics 统计信息,
syncStatsChainOrigin uint64 // Origin block number where syncing started at
syncStatsChainHeight uint64 // Highest block number known when syncing started
syncStatsState stateSyncStats
syncStatsLock sync.RWMutex // Lock protecting the sync stats fields
lightchain LightChain
blockchain BlockChain
// Callbacks
dropPeer peerDropFn // Drops a peer for misbehaving
// Status
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
synchronising int32
notified int32
committed int32
ancientLimit uint64 // The maximum block number which can be regarded as ancient data.
// Channels
headerCh chan dataPack // Channel receiving inbound block headers header的输入通道,从网络下载的header会被送到这个通道
bodyCh chan dataPack // Channel receiving inbound block bodies bodies的输入通道,从网络下载的bodies会被送到这个通道
receiptCh chan dataPack // Channel receiving inbound receipts receipts的输入通道,从网络下载的receipts会被送到这个通道
bodyWakeCh chan bool // Channel to signal the block body fetcher of new tasks 用来传输body fetcher新任务的通道
receiptWakeCh chan bool // Channel to signal the receipt fetcher of new tasks 用来传输receipt fetcher 新任务的通道
headerProcCh chan []*types.Header // Channel to feed the header processor new tasks 通道为header处理者提供新的任务
// State sync
pivotHeader *types.Header // Pivot block header to dynamically push the syncing state root
pivotLock sync.RWMutex // Lock protecting pivot header reads from updates
snapSync bool // Whether to run state sync over the snap protocol
SnapSyncer *snap.Syncer // TODO(karalabe): make private! hack for now
stateSyncStart chan *stateSync //启动新的state fetcher
trackStateReq chan *stateReq
stateCh chan dataPack // Channel receiving inbound node state data State的输入通道,从网络下载的State会被送到这个通道
// Cancellation and termination
cancelPeer string // Identifier of the peer currently being used as the master (cancel on drop)
cancelCh chan struct{} // Channel to cancel mid-flight syncs
cancelLock sync.RWMutex // Lock to protect the cancel channel and peer in delivers
cancelWg sync.WaitGroup // Make sure all fetcher goroutines have exited.
quitCh chan struct{} // Quit channel to signal termination
quitLock sync.Mutex // Lock to prevent double closes
// Testing hooks
syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch
chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
}
构造方法
下面的New函数主要用于创建一个新的Downloader实例:
func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
if lightchain == nil {
lightchain = chain
}
dl := &Downloader{
stateDB: stateDb,
stateBloom: stateBloom,
mux: mux,
checkpoint: checkpoint,
queue: newQueue(blockCacheMaxItems, blockCacheInitialItems),
peers: newPeerSet(),
rttEstimate: uint64(rttMaxEstimate),
rttConfidence: uint64(1000000),
blockchain: chain,
lightchain: lightchain,
dropPeer: dropPeer,
headerCh: make(chan dataPack, 1),
bodyCh: make(chan dataPack, 1),
receiptCh: make(chan dataPack, 1),
bodyWakeCh: make(chan bool, 1),
receiptWakeCh: make(chan bool, 1),
headerProcCh: make(chan []*types.Header, 1),
quitCh: make(chan struct{}),
stateCh: make(chan dataPack),
SnapSyncer: snap.NewSyncer(stateDb),
stateSyncStart: make(chan *stateSync),
syncStatsState: stateSyncStats{
processed: rawdb.ReadFastTrieProgress(stateDb),
},
trackStateReq: make(chan *stateReq),
}
go dl.qosTuner() //计算rttEstimate和rttConfidence
go dl.stateFetcher() //启动stateFetcher的任务监听
return dl
}
同步下载
区块同步始于Synchronise函数:
func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error {
err := d.synchronise(id, head, td, mode)
switch err {
case nil, errBusy, errCanceled:
return err
}
if errors.Is(err, errInvalidChain) || errors.Is(err, errBadPeer) || errors.Is(err, errTimeout) ||
errors.Is(err, errStallingPeer) || errors.Is(err, errUnsyncedPeer) ||
errors.Is(err, errEmptyHeaderSet) || errors.Is(err, errPeersUnavailable) ||
errors.Is(err, errTooOld) || errors.Is(err, errInvalidAncestor) {
log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err)
if d.dropPeer == nil {
log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", id)
} else {
d.dropPeer(id)
}
return err
}
log.Warn("Synchronisation failed, retrying", "err", err)
return err
}
synchronise函数实现代码如下:
func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode) error {
// Mock out the synchronisation if testing
if d.synchroniseMock != nil {
return d.synchroniseMock(id, hash)
}
// Make sure only one goroutine is ever allowed past this point at once
if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
return errBusy
}
defer atomic.StoreInt32(&d.synchronising, 0)
// Post a user notification of the sync (only once per session)
if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
log.Info("Block synchronisation started")
}
// If we are already full syncing, but have a fast-sync bloom filter laying
// around, make sure it doesn't use memory any more.
if mode == FullSync && d.stateBloom != nil {
d.stateBloom.Close()
}
// If snap sync was requested, create the snap scheduler and switch to fast
// sync mode.
if mode == SnapSync {
if !d.snapSync {
log.Warn("Enabling snapshot sync prototype")
d.snapSync = true
}
mode = FastSync
}
// Reset the queue, peer set and wake channels to clean any internal leftover state
d.queue.Reset(blockCacheMaxItems, blockCacheInitialItems)
d.peers.Reset()
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
select {
case <-ch:
default:
}
}
for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh} {
for empty := false; !empty; {
select {
case <-ch:
default:
empty = true
}
}
}
for empty := false; !empty; {
select {
case <-d.headerProcCh:
default:
empty = true
}
}
// Create cancel channel for aborting mid-flight and mark the master peer
d.cancelLock.Lock()
d.cancelCh = make(chan struct{})
d.cancelPeer = id
d.cancelLock.Unlock()
defer d.Cancel() // No matter what, we can't leave the cancel channel open
// Atomically set the requested sync mode
atomic.StoreUint32(&d.mode, uint32(mode))
// Retrieve the origin peer and initiate the downloading process
p := d.peers.Peer(id)
if p == nil {
return errUnknownPeer
}
return d.syncWithPeer(p, hash, td) // 基于哈希链从指定的peer和head hash开始块同步
}
syncWithPeer函数用于基于给定节点和头部哈希开始块同步:
func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) {
d.mux.Post(StartEvent{})
defer func() {
if err != nil {
d.mux.Post(FailedEvent{err})
} else {
latest := d.lightchain.CurrentHeader()
d.mux.Post(DoneEvent{latest})
}
}()
if p.version < 64 {
return fmt.Errorf("%w: advertized %d < required %d", errTooOld, p.version, 64)
}
mode := d.getMode()
log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", mode)
defer func(start time.Time) {
log.Debug("Synchronisation terminated", "elapsed", common.PrettyDuration(time.Since(start)))
}(time.Now())
// Look up the sync boundaries: the common ancestor and the target block
latest, pivot, err := d.fetchHead(p)
if err != nil {
return err
}
if mode == FastSync && pivot == nil {
pivot = d.blockchain.CurrentBlock().Header()
}
height := latest.Number.Uint64()
origin, err := d.findAncestor(p, latest)
if err != nil {
return err
}
d.syncStatsLock.Lock()
if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
d.syncStatsChainOrigin = origin
}
d.syncStatsChainHeight = height
d.syncStatsLock.Unlock()
// Ensure our origin point is below any fast sync pivot point
if mode == FastSync {
if height <= uint64(fsMinFullBlocks) {
origin = 0
} else {
pivotNumber := pivot.Number.Uint64()
if pivotNumber <= origin {
origin = pivotNumber - 1
}
rawdb.WriteLastPivotNumber(d.stateDB, pivotNumber)
}
}
d.committed = 1
if mode == FastSync && pivot.Number.Uint64() != 0 {
d.committed = 0
}
if mode == FastSync {
if d.checkpoint != 0 && d.checkpoint > fullMaxForkAncestry+1 {
d.ancientLimit = d.checkpoint
} else if height > fullMaxForkAncestry+1 {
d.ancientLimit = height - fullMaxForkAncestry - 1
} else {
d.ancientLimit = 0
}
frozen, _ := d.stateDB.Ancients()
if origin >= frozen && frozen != 0 {
d.ancientLimit = 0
log.Info("Disabling direct-ancient mode", "origin", origin, "ancient", frozen-1)
} else if d.ancientLimit > 0 {
log.Debug("Enabling direct-ancient mode", "ancient", d.ancientLimit)
}
if origin+1 < frozen {
if err := d.lightchain.SetHead(origin + 1); err != nil {
return err
}
}
}
// Initiate the sync using a concurrent header and content retrieval algorithm
d.queue.Prepare(origin+1, mode)
if d.syncInitHook != nil {
d.syncInitHook(origin, height)
}
fetchers := []func() error{
func() error { return d.fetchHeaders(p, origin+1) },
func() error { return d.fetchBodies(origin+1) },
func() error { return d.fetchReceipts(origin+1) },
func() error { return d.processHeaders(origin+1, td) },
}
if mode == FastSync {
d.pivotLock.Lock()
d.pivotHeader = pivot
d.pivotLock.Unlock()
fetchers = append(fetchers, func() error { return d.processFastSyncContent() })
} else if mode == FullSync {
fetchers = append(fetchers, d.processFullSyncContent)
}
return d.spawnSync(fetchers)
}
spawnSync会给每个fetcher启动一个goroutine,然后阻塞的等待fetcher出错:
func (d *Downloader) spawnSync(fetchers []func() error) error {
errc := make(chan error, len(fetchers))
d.cancelWg.Add(len(fetchers))
for _, fn := range fetchers {
fn := fn
go func() {
defer d.cancelWg.Done()
errc <- fn()
}()
}
var err error
for i := 0; i < len(fetchers); i++ {
if i == len(fetchers)-1 {
d.queue.Close()
}
if err = <-errc; err != nil && err != errCanceled {
break
}
}
d.queue.Close()
d.Cancel()
return err
}
同步State
state即世界状态,其保存着所有账户的余额等信息:
func (d *Downloader) stateFetcher() {
for {
select {
case s := <-d.stateSyncStart:
for next := s; next != nil; {
next = d.runStateSync(next)
}
case <-d.stateCh:
// Ignore state responses while no sync is running.
case <-d.quitCh:
return
}
}
}
runStateSync函数执行状态同步:
func (d *Downloader) runStateSync(s *stateSync) *stateSync {
var (
active = make(map[string]*stateReq) // Currently in-flight requests
finished []*stateReq // Completed or failed requests
timeout = make(chan *stateReq) // Timed out active requests
)
log.Trace("State sync starting", "root", s.root)
defer func() {
for _, req := range active {
req.timer.Stop()
req.peer.SetNodeDataIdle(int(req.nItems), time.Now())
}
}()
go s.run()
defer s.Cancel()
peerDrop := make(chan *peerConnection, 1024)
peerSub := s.d.peers.SubscribePeerDrops(peerDrop)
defer peerSub.Unsubscribe()
for {
var (
deliverReq *stateReq
deliverReqCh chan *stateReq
)
if len(finished) > 0 {
deliverReq = finished[0]
deliverReqCh = s.deliver
}
select {
case next := <-d.stateSyncStart:
d.spindownStateSync(active, finished, timeout, peerDrop)
return next
case <-s.done:
d.spindownStateSync(active, finished, timeout, peerDrop)
return nil
case deliverReqCh <- deliverReq:
copy(finished, finished[1:])
finished[len(finished)-1] = nil
finished = finished[:len(finished)-1]
case pack := <-d.stateCh:
req := active[pack.PeerId()]
if req == nil {
log.Debug("Unrequested node data", "peer", pack.PeerId(), "len", pack.Items())
continue
}
req.timer.Stop()
req.response = pack.(*statePack).states
req.delivered = time.Now()
finished = append(finished, req)
delete(active, pack.PeerId())
case p := <-peerDrop:
req := active[p.id]
if req == nil {
continue
}
req.timer.Stop()
req.dropped = true
req.delivered = time.Now()
finished = append(finished, req)
delete(active, p.id)
case req := <-timeout:
if active[req.peer.id] != req {
continue
}
req.delivered = time.Now()
finished = append(finished, req)
delete(active, req.peer.id)
case req := <-d.trackStateReq:
if old := active[req.peer.id]; old != nil {
log.Warn("Busy peer assigned new state fetch", "peer", old.peer.id)
old.timer.Stop()
old.dropped = true
old.delivered = time.Now()
finished = append(finished, old)
}
req.timer = time.AfterFunc(req.timeout, func() { timeout <- req })
active[req.peer.id] = req
}
}
}