以太坊MPT构建(下)
字数 1175 2025-08-22 12:23:12
以太坊MPT构建详解
1. MPT概述
MPT (Merkle Patricia Tree) 是以太坊中用于存储账户状态和交易信息的核心数据结构。它结合了Merkle Tree和Patricia Tree的优点,具有以下特性:
- 高效存储:通过路径压缩减少存储空间
- 快速验证:通过Merkle证明可以快速验证数据完整性
- 确定性:相同的数据集总是生成相同的树结构
2. MPT核心组件
2.1 节点类型
MPT包含四种节点类型:
- 空节点:表示空树
- 叶子节点(shortNode):存储键值对,键是16进制编码
- 分支节点(fullNode):有17个元素的数组,前16个对应16进制字符,第17个存储值
- 哈希节点(hashNode):指向其他节点的哈希引用
2.2 辅助结构
2.2.1 hasher结构
type hasher struct {
sha crypto.Keccak256
tmp []byte
parallel bool // 是否并行处理
}
2.2.2 committer结构
type committer struct {
onleaf LeafCallback
leafCh chan *leaf
// ...其他字段
}
3. MPT构建过程
3.1 树的提交(Commit)
树的提交通过Commit函数实现,主要流程:
- 检查Trie的DB和Root是否存在
- 计算Root的Hash值
- 检查是否需要提交变更
- 将节点折叠为哈希节点并插入数据库
func (t *Trie) Commit(onleaf LeafCallback) (root common.Hash, err error) {
if t.db == nil {
panic("commit called on trie with nil database")
}
if t.root == nil {
return emptyRoot, nil
}
rootHash := t.Hash()
h := newCommitter()
defer returnCommitterToPool(h)
if _, dirty := t.root.cache(); !dirty {
return rootHash, nil
}
var wg sync.WaitGroup
if onleaf != nil {
h.onleaf = onleaf
h.leafCh = make(chan *leaf, leafChanSize)
wg.Add(1)
go func() {
defer wg.Done()
h.commitLoop(t.db)
}()
}
var newRoot hashNode
newRoot, err = h.Commit(t.root, t.db)
if onleaf != nil {
close(h.leafCh)
wg.Wait()
}
if err != nil {
return common.Hash{}, err
}
t.root = newRoot
return rootHash, nil
}
3.2 Hash计算
Hash方法计算树的根哈希:
func (t *Trie) Hash() common.Hash {
hash, cached, _ := t.hashRoot()
t.root = cached
return common.BytesToHash(hash.(hashNode))
}
hashRoot方法实现:
func (t *Trie) hashRoot() (node, node, error) {
if t.root == nil {
return hashNode(emptyRoot.Bytes()), nil, nil
}
h := newHasher(t.unhashed >= 100)
defer returnHasherToPool(h)
hashed, cached := h.hash(t.root, true)
t.unhashed = 0
return hashed, cached, nil
}
3.3 节点哈希处理
hash方法将节点折叠为哈希节点:
func (h *hasher) hash(n node, force bool) (hashed node, cached node) {
if hash, _ := n.cache(); hash != nil {
return hash, n
}
switch n := n.(type) {
case *shortNode:
collapsed, cached := h.hashShortNodeChildren(n)
hashed := h.shortnodeToHash(collapsed, force)
if hn, ok := hashed.(hashNode); ok {
cached.flags.hash = hn
} else {
cached.flags.hash = nil
}
return hashed, cached
case *fullNode:
collapsed, cached := h.hashFullNodeChildren(n)
hashed = h.fullnodeToHash(collapsed, force)
if hn, ok := hashed.(hashNode); ok {
cached.flags.hash = hn
} else {
cached.flags.hash = nil
}
return hashed, cached
default:
return n, n
}
}
3.3.1 扩展节点处理
hashShortNodeChildren处理扩展节点:
func (h *hasher) hashShortNodeChildren(n *shortNode) (collapsed, cached *shortNode) {
collapsed, cached = n.copy(), n.copy()
collapsed.Key = hexToCompact(n.Key)
switch n.Val.(type) {
case *fullNode, *shortNode:
collapsed.Val, cached.Val = h.hash(n.Val, false)
}
return collapsed, cached
}
shortnodeToHash创建哈希节点:
func (h *hasher) shortnodeToHash(n *shortNode, force bool) node {
h.tmp.Reset()
if err := rlp.Encode(&h.tmp, n); err != nil {
panic("encode error: " + err.Error())
}
if len(h.tmp) < 32 && !force {
return n
}
return h.hashData(h.tmp)
}
3.3.2 分支节点处理
hashFullNodeChildren处理分支节点:
func (h *hasher) hashFullNodeChildren(n *fullNode) (collapsed *fullNode, cached *fullNode) {
cached = n.copy()
collapsed = n.copy()
if h.parallel {
var wg sync.WaitGroup
wg.Add(16)
for i := 0; i < 16; i++ {
go func(i int) {
hasher := newHasher(false)
if child := n.Children[i]; child != nil {
collapsed.Children[i], cached.Children[i] = hasher.hash(child, false)
} else {
collapsed.Children[i] = nilValueNode
}
returnHasherToPool(hasher)
wg.Done()
}(i)
}
wg.Wait()
} else {
for i := 0; i < 16; i++ {
if child := n.Children[i]; child != nil {
collapsed.Children[i], cached.Children[i] = h.hash(child, false)
} else {
collapsed.Children[i] = nilValueNode
}
}
}
return collapsed, cached
}
fullnodeToHash创建哈希节点:
func (h *hasher) fullnodeToHash(n *fullNode, force bool) node {
h.tmp.Reset()
if err := n.EncodeRLP(&h.tmp); err != nil {
panic("encode error: " + err.Error())
}
if len(h.tmp) < 32 && !force {
return n
}
return h.hashData(h.tmp)
}
4. 树的证明
4.1 Prove方法
Prove方法获取指定Key的proof证明:
func (t *Trie) Prove(key []byte, fromLevel uint, proofDb ethdb.KeyValueWriter) error {
key = keybytesToHex(key)
var nodes []node
tn := t.root
for len(key) > 0 && tn != nil {
switch n := tn.(type) {
case *shortNode:
if len(key) < len(n.Key) || !bytes.Equal(n.Key, key[:len(n.Key)]) {
tn = nil
} else {
tn = n.Val
key = key[len(n.Key):]
}
nodes = append(nodes, n)
case *fullNode:
tn = n.Children[key[0]]
key = key[1:]
nodes = append(nodes, n)
case hashNode:
var err error
tn, err = t.resolveHash(n, nil)
if err != nil {
log.Error(fmt.Sprintf("Unhandled trie error: %v", err))
return err
}
default:
panic(fmt.Sprintf("%T: invalid node: %v", tn, tn))
}
}
hasher := newHasher(false)
defer returnHasherToPool(hasher)
for i, n := range nodes {
if fromLevel > 0 {
fromLevel--
continue
}
var hn node
n, hn = hasher.proofHash(n)
if hash, ok := hn.(hashNode); ok || i == 0 {
enc, _ := rlp.EncodeToBytes(n)
if !ok {
hash = hasher.hashData(enc)
}
proofDb.Put(hash, enc)
}
}
return nil
}
4.2 VerifyProof方法
验证Merkle证明:
func VerifyProof(rootHash common.Hash, key []byte, proofDb ethdb.KeyValueReader) (value []byte, err error) {
key = keybytesToHex(key)
wantHash := rootHash
for i := 0; ; i++ {
buf, _ := proofDb.Get(wantHash[:])
if buf == nil {
return nil, fmt.Errorf("proof node %d (hash %064x) missing", i, wantHash)
}
n, err := decodeNode(wantHash[:], buf)
if err != nil {
return nil, fmt.Errorf("bad proof node %d: %v", i, err)
}
keyrest, cld := get(n, key, true)
switch cld := cld.(type) {
case nil:
return nil, nil
case hashNode:
key = keyrest
copy(wantHash[:], cld)
case valueNode:
return cld, nil
}
}
}
4.3 VerifyRangeProof方法
验证范围证明:
func VerifyRangeProof(rootHash common.Hash, firstKey []byte, lastKey []byte,
keys [][]byte, values [][]byte, proof ethdb.KeyValueReader) (ethdb.KeyValueStore, *Trie, *KeyValueNotary, bool, error) {
// 验证输入数据一致性
if len(keys) != len(values) {
return nil, nil, nil, false, fmt.Errorf("inconsistent proof data, keys: %d, values: %d", len(keys), len(values))
}
// 确保键是单调递增的
for i := 0; i < len(keys)-1; i++ {
if bytes.Compare(keys[i], keys[i+1]) >= 0 {
return nil, nil, nil, false, errors.New("range is not monotonically increasing")
}
}
// 创建证明跟踪器
notary := NewKeyValueNotary(proof)
// 处理特殊情况
if proof == nil {
// 处理无证明的特殊情况
// ...
}
if len(keys) == 0 {
// 处理零元素证明
// ...
}
if len(keys) == 1 && bytes.Equal(firstKey, lastKey) {
// 处理单元素证明
// ...
}
// 验证边缘键有效性
if bytes.Compare(firstKey, lastKey) >= 0 {
return nil, nil, nil, false, errors.New("invalid edge keys")
}
if len(firstKey) != len(lastKey) {
return nil, nil, nil, false, errors.New("inconsistent edge keys")
}
// 将边缘证明转换为路径
root, _, err := proofToPath(rootHash, nil, firstKey, notary, true)
if err != nil {
return nil, nil, nil, false, err
}
root, _, err = proofToPath(rootHash, root, lastKey, notary, true)
if err != nil {
return nil, nil, nil, false, err
}
// 移除内部引用
empty, err := unsetInternal(root, firstKey, lastKey)
if err != nil {
return nil, nil, nil, false, err
}
// 重建Trie
diskdb := memorydb.New()
triedb := NewDatabase(diskdb)
tr := &Trie{root: root, db: triedb}
if empty {
tr.root = nil
}
for index, key := range keys {
tr.TryUpdate(key, values[index])
}
if tr.Hash() != rootHash {
return nil, nil, nil, false, fmt.Errorf("invalid proof, want hash %x, got %x", rootHash, tr.Hash())
}
// 提交更改
if _, err := tr.Commit(nil); err != nil {
return nil, nil, nil, false, err
}
if err := triedb.Commit(rootHash, false, nil); err != nil {
return nil, nil, nil, false, err
}
return diskdb, tr, notary, hasRightElement(root, keys[len(keys)-1]), nil
}
5. 安全考虑
5.1 哈希碰撞(Hash Collision)
攻击者可能通过构造不同的数据集产生相同的哈希值,从而绕过验证。在MPT中需要注意:
- 哈希计算顺序:交换非叶子节点的子树顺序可能产生相同的Merkle根
- 防御措施:
- 使用足够强的哈希函数(如Keccak-256)
- 在验证时检查完整路径而不仅仅是哈希值
5.2 其他安全考虑
- 空节点处理:确保空节点有明确的表示方式
- 序列化验证:验证RLP编码的正确性
- 并行处理安全:确保并发哈希计算时的线程安全
6. 关键常量
var (
// 空树的已知根哈希
emptyRoot = common.HexToHash("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421")
// 空状态条目的已知哈希
emptyState = crypto.Keccak256Hash(nil)
)
7. 总结
MPT是以太坊存储层的核心数据结构,理解其构建和验证过程对于:
- 开发以太坊客户端
- 实现轻客户端验证
- 设计状态通道和侧链
- 进行安全审计
都非常重要。本文详细介绍了MPT的构建过程、证明生成和验证机制,以及相关的安全考虑,为开发者深入理解以太坊底层提供了全面的参考。