以太坊MPT构建(下)
字数 1175 2025-08-22 12:23:12

以太坊MPT构建详解

1. MPT概述

MPT (Merkle Patricia Tree) 是以太坊中用于存储账户状态和交易信息的核心数据结构。它结合了Merkle Tree和Patricia Tree的优点,具有以下特性:

  • 高效存储:通过路径压缩减少存储空间
  • 快速验证:通过Merkle证明可以快速验证数据完整性
  • 确定性:相同的数据集总是生成相同的树结构

2. MPT核心组件

2.1 节点类型

MPT包含四种节点类型:

  1. 空节点:表示空树
  2. 叶子节点(shortNode):存储键值对,键是16进制编码
  3. 分支节点(fullNode):有17个元素的数组,前16个对应16进制字符,第17个存储值
  4. 哈希节点(hashNode):指向其他节点的哈希引用

2.2 辅助结构

2.2.1 hasher结构

type hasher struct {
    sha      crypto.Keccak256
    tmp      []byte
    parallel bool // 是否并行处理
}

2.2.2 committer结构

type committer struct {
    onleaf LeafCallback
    leafCh chan *leaf
    // ...其他字段
}

3. MPT构建过程

3.1 树的提交(Commit)

树的提交通过Commit函数实现,主要流程:

  1. 检查Trie的DB和Root是否存在
  2. 计算Root的Hash值
  3. 检查是否需要提交变更
  4. 将节点折叠为哈希节点并插入数据库
func (t *Trie) Commit(onleaf LeafCallback) (root common.Hash, err error) {
    if t.db == nil {
        panic("commit called on trie with nil database")
    }
    if t.root == nil {
        return emptyRoot, nil
    }
    
    rootHash := t.Hash()
    h := newCommitter()
    defer returnCommitterToPool(h)
    
    if _, dirty := t.root.cache(); !dirty {
        return rootHash, nil
    }
    
    var wg sync.WaitGroup
    if onleaf != nil {
        h.onleaf = onleaf
        h.leafCh = make(chan *leaf, leafChanSize)
        wg.Add(1)
        go func() {
            defer wg.Done()
            h.commitLoop(t.db)
        }()
    }
    
    var newRoot hashNode
    newRoot, err = h.Commit(t.root, t.db)
    
    if onleaf != nil {
        close(h.leafCh)
        wg.Wait()
    }
    
    if err != nil {
        return common.Hash{}, err
    }
    t.root = newRoot
    return rootHash, nil
}

3.2 Hash计算

Hash方法计算树的根哈希:

func (t *Trie) Hash() common.Hash {
    hash, cached, _ := t.hashRoot()
    t.root = cached
    return common.BytesToHash(hash.(hashNode))
}

hashRoot方法实现:

func (t *Trie) hashRoot() (node, node, error) {
    if t.root == nil {
        return hashNode(emptyRoot.Bytes()), nil, nil
    }
    
    h := newHasher(t.unhashed >= 100)
    defer returnHasherToPool(h)
    
    hashed, cached := h.hash(t.root, true)
    t.unhashed = 0
    return hashed, cached, nil
}

3.3 节点哈希处理

hash方法将节点折叠为哈希节点:

func (h *hasher) hash(n node, force bool) (hashed node, cached node) {
    if hash, _ := n.cache(); hash != nil {
        return hash, n
    }
    
    switch n := n.(type) {
    case *shortNode:
        collapsed, cached := h.hashShortNodeChildren(n)
        hashed := h.shortnodeToHash(collapsed, force)
        if hn, ok := hashed.(hashNode); ok {
            cached.flags.hash = hn
        } else {
            cached.flags.hash = nil
        }
        return hashed, cached
        
    case *fullNode:
        collapsed, cached := h.hashFullNodeChildren(n)
        hashed = h.fullnodeToHash(collapsed, force)
        if hn, ok := hashed.(hashNode); ok {
            cached.flags.hash = hn
        } else {
            cached.flags.hash = nil
        }
        return hashed, cached
        
    default:
        return n, n
    }
}

3.3.1 扩展节点处理

hashShortNodeChildren处理扩展节点:

func (h *hasher) hashShortNodeChildren(n *shortNode) (collapsed, cached *shortNode) {
    collapsed, cached = n.copy(), n.copy()
    collapsed.Key = hexToCompact(n.Key)
    
    switch n.Val.(type) {
    case *fullNode, *shortNode:
        collapsed.Val, cached.Val = h.hash(n.Val, false)
    }
    return collapsed, cached
}

shortnodeToHash创建哈希节点:

func (h *hasher) shortnodeToHash(n *shortNode, force bool) node {
    h.tmp.Reset()
    if err := rlp.Encode(&h.tmp, n); err != nil {
        panic("encode error: " + err.Error())
    }
    
    if len(h.tmp) < 32 && !force {
        return n
    }
    return h.hashData(h.tmp)
}

3.3.2 分支节点处理

hashFullNodeChildren处理分支节点:

func (h *hasher) hashFullNodeChildren(n *fullNode) (collapsed *fullNode, cached *fullNode) {
    cached = n.copy()
    collapsed = n.copy()
    
    if h.parallel {
        var wg sync.WaitGroup
        wg.Add(16)
        for i := 0; i < 16; i++ {
            go func(i int) {
                hasher := newHasher(false)
                if child := n.Children[i]; child != nil {
                    collapsed.Children[i], cached.Children[i] = hasher.hash(child, false)
                } else {
                    collapsed.Children[i] = nilValueNode
                }
                returnHasherToPool(hasher)
                wg.Done()
            }(i)
        }
        wg.Wait()
    } else {
        for i := 0; i < 16; i++ {
            if child := n.Children[i]; child != nil {
                collapsed.Children[i], cached.Children[i] = h.hash(child, false)
            } else {
                collapsed.Children[i] = nilValueNode
            }
        }
    }
    return collapsed, cached
}

fullnodeToHash创建哈希节点:

func (h *hasher) fullnodeToHash(n *fullNode, force bool) node {
    h.tmp.Reset()
    if err := n.EncodeRLP(&h.tmp); err != nil {
        panic("encode error: " + err.Error())
    }
    
    if len(h.tmp) < 32 && !force {
        return n
    }
    return h.hashData(h.tmp)
}

4. 树的证明

4.1 Prove方法

Prove方法获取指定Key的proof证明:

func (t *Trie) Prove(key []byte, fromLevel uint, proofDb ethdb.KeyValueWriter) error {
    key = keybytesToHex(key)
    var nodes []node
    tn := t.root
    
    for len(key) > 0 && tn != nil {
        switch n := tn.(type) {
        case *shortNode:
            if len(key) < len(n.Key) || !bytes.Equal(n.Key, key[:len(n.Key)]) {
                tn = nil
            } else {
                tn = n.Val
                key = key[len(n.Key):]
            }
            nodes = append(nodes, n)
            
        case *fullNode:
            tn = n.Children[key[0]]
            key = key[1:]
            nodes = append(nodes, n)
            
        case hashNode:
            var err error
            tn, err = t.resolveHash(n, nil)
            if err != nil {
                log.Error(fmt.Sprintf("Unhandled trie error: %v", err))
                return err
            }
            
        default:
            panic(fmt.Sprintf("%T: invalid node: %v", tn, tn))
        }
    }
    
    hasher := newHasher(false)
    defer returnHasherToPool(hasher)
    
    for i, n := range nodes {
        if fromLevel > 0 {
            fromLevel--
            continue
        }
        
        var hn node
        n, hn = hasher.proofHash(n)
        if hash, ok := hn.(hashNode); ok || i == 0 {
            enc, _ := rlp.EncodeToBytes(n)
            if !ok {
                hash = hasher.hashData(enc)
            }
            proofDb.Put(hash, enc)
        }
    }
    return nil
}

4.2 VerifyProof方法

验证Merkle证明:

func VerifyProof(rootHash common.Hash, key []byte, proofDb ethdb.KeyValueReader) (value []byte, err error) {
    key = keybytesToHex(key)
    wantHash := rootHash
    
    for i := 0; ; i++ {
        buf, _ := proofDb.Get(wantHash[:])
        if buf == nil {
            return nil, fmt.Errorf("proof node %d (hash %064x) missing", i, wantHash)
        }
        
        n, err := decodeNode(wantHash[:], buf)
        if err != nil {
            return nil, fmt.Errorf("bad proof node %d: %v", i, err)
        }
        
        keyrest, cld := get(n, key, true)
        switch cld := cld.(type) {
        case nil:
            return nil, nil
        case hashNode:
            key = keyrest
            copy(wantHash[:], cld)
        case valueNode:
            return cld, nil
        }
    }
}

4.3 VerifyRangeProof方法

验证范围证明:

func VerifyRangeProof(rootHash common.Hash, firstKey []byte, lastKey []byte, 
    keys [][]byte, values [][]byte, proof ethdb.KeyValueReader) (ethdb.KeyValueStore, *Trie, *KeyValueNotary, bool, error) {
    
    // 验证输入数据一致性
    if len(keys) != len(values) {
        return nil, nil, nil, false, fmt.Errorf("inconsistent proof data, keys: %d, values: %d", len(keys), len(values))
    }
    
    // 确保键是单调递增的
    for i := 0; i < len(keys)-1; i++ {
        if bytes.Compare(keys[i], keys[i+1]) >= 0 {
            return nil, nil, nil, false, errors.New("range is not monotonically increasing")
        }
    }
    
    // 创建证明跟踪器
    notary := NewKeyValueNotary(proof)
    
    // 处理特殊情况
    if proof == nil {
        // 处理无证明的特殊情况
        // ...
    }
    
    if len(keys) == 0 {
        // 处理零元素证明
        // ...
    }
    
    if len(keys) == 1 && bytes.Equal(firstKey, lastKey) {
        // 处理单元素证明
        // ...
    }
    
    // 验证边缘键有效性
    if bytes.Compare(firstKey, lastKey) >= 0 {
        return nil, nil, nil, false, errors.New("invalid edge keys")
    }
    
    if len(firstKey) != len(lastKey) {
        return nil, nil, nil, false, errors.New("inconsistent edge keys")
    }
    
    // 将边缘证明转换为路径
    root, _, err := proofToPath(rootHash, nil, firstKey, notary, true)
    if err != nil {
        return nil, nil, nil, false, err
    }
    
    root, _, err = proofToPath(rootHash, root, lastKey, notary, true)
    if err != nil {
        return nil, nil, nil, false, err
    }
    
    // 移除内部引用
    empty, err := unsetInternal(root, firstKey, lastKey)
    if err != nil {
        return nil, nil, nil, false, err
    }
    
    // 重建Trie
    diskdb := memorydb.New()
    triedb := NewDatabase(diskdb)
    tr := &Trie{root: root, db: triedb}
    
    if empty {
        tr.root = nil
    }
    
    for index, key := range keys {
        tr.TryUpdate(key, values[index])
    }
    
    if tr.Hash() != rootHash {
        return nil, nil, nil, false, fmt.Errorf("invalid proof, want hash %x, got %x", rootHash, tr.Hash())
    }
    
    // 提交更改
    if _, err := tr.Commit(nil); err != nil {
        return nil, nil, nil, false, err
    }
    
    if err := triedb.Commit(rootHash, false, nil); err != nil {
        return nil, nil, nil, false, err
    }
    
    return diskdb, tr, notary, hasRightElement(root, keys[len(keys)-1]), nil
}

5. 安全考虑

5.1 哈希碰撞(Hash Collision)

攻击者可能通过构造不同的数据集产生相同的哈希值,从而绕过验证。在MPT中需要注意:

  1. 哈希计算顺序:交换非叶子节点的子树顺序可能产生相同的Merkle根
  2. 防御措施
    • 使用足够强的哈希函数(如Keccak-256)
    • 在验证时检查完整路径而不仅仅是哈希值

5.2 其他安全考虑

  1. 空节点处理:确保空节点有明确的表示方式
  2. 序列化验证:验证RLP编码的正确性
  3. 并行处理安全:确保并发哈希计算时的线程安全

6. 关键常量

var (
    // 空树的已知根哈希
    emptyRoot = common.HexToHash("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421")
    
    // 空状态条目的已知哈希
    emptyState = crypto.Keccak256Hash(nil)
)

7. 总结

MPT是以太坊存储层的核心数据结构,理解其构建和验证过程对于:

  • 开发以太坊客户端
  • 实现轻客户端验证
  • 设计状态通道和侧链
  • 进行安全审计

都非常重要。本文详细介绍了MPT的构建过程、证明生成和验证机制,以及相关的安全考虑,为开发者深入理解以太坊底层提供了全面的参考。

以太坊MPT构建详解 1. MPT概述 MPT (Merkle Patricia Tree) 是以太坊中用于存储账户状态和交易信息的核心数据结构。它结合了Merkle Tree和Patricia Tree的优点,具有以下特性: 高效存储:通过路径压缩减少存储空间 快速验证:通过Merkle证明可以快速验证数据完整性 确定性:相同的数据集总是生成相同的树结构 2. MPT核心组件 2.1 节点类型 MPT包含四种节点类型: 空节点 :表示空树 叶子节点(shortNode) :存储键值对,键是16进制编码 分支节点(fullNode) :有17个元素的数组,前16个对应16进制字符,第17个存储值 哈希节点(hashNode) :指向其他节点的哈希引用 2.2 辅助结构 2.2.1 hasher结构 2.2.2 committer结构 3. MPT构建过程 3.1 树的提交(Commit) 树的提交通过 Commit 函数实现,主要流程: 检查Trie的DB和Root是否存在 计算Root的Hash值 检查是否需要提交变更 将节点折叠为哈希节点并插入数据库 3.2 Hash计算 Hash 方法计算树的根哈希: hashRoot 方法实现: 3.3 节点哈希处理 hash 方法将节点折叠为哈希节点: 3.3.1 扩展节点处理 hashShortNodeChildren 处理扩展节点: shortnodeToHash 创建哈希节点: 3.3.2 分支节点处理 hashFullNodeChildren 处理分支节点: fullnodeToHash 创建哈希节点: 4. 树的证明 4.1 Prove方法 Prove 方法获取指定Key的proof证明: 4.2 VerifyProof方法 验证Merkle证明: 4.3 VerifyRangeProof方法 验证范围证明: 5. 安全考虑 5.1 哈希碰撞(Hash Collision) 攻击者可能通过构造不同的数据集产生相同的哈希值,从而绕过验证。在MPT中需要注意: 哈希计算顺序 :交换非叶子节点的子树顺序可能产生相同的Merkle根 防御措施 : 使用足够强的哈希函数(如Keccak-256) 在验证时检查完整路径而不仅仅是哈希值 5.2 其他安全考虑 空节点处理 :确保空节点有明确的表示方式 序列化验证 :验证RLP编码的正确性 并行处理安全 :确保并发哈希计算时的线程安全 6. 关键常量 7. 总结 MPT是以太坊存储层的核心数据结构,理解其构建和验证过程对于: 开发以太坊客户端 实现轻客户端验证 设计状态通道和侧链 进行安全审计 都非常重要。本文详细介绍了MPT的构建过程、证明生成和验证机制,以及相关的安全考虑,为开发者深入理解以太坊底层提供了全面的参考。