以太坊P2P网络深入刨析(下)
字数 652 2025-08-22 12:23:24

以太坊P2P网络深入解析

文章前言

本文是"以太坊P2P网络深入解析"的续篇,重点分析以太坊P2P网络的源码实现和关键机制。

源码分析

服务停用

Stop()函数用于停止服务器和所有活动的对等连接,它会阻塞直到所有活动连接都关闭:

func (srv *Server) Stop() {
    srv.lock.Lock()
    if !srv.running {
        srv.lock.Unlock()
        return
    }
    srv.running = false
    if srv.listener != nil {
        srv.listener.Close()
    }
    close(srv.quit)
    srv.lock.Unlock()
    srv.loopWG.Wait()
}

节点启动

Start()函数用于启动一个P2P节点:

func (srv *Server) Start() (err error) {
    srv.lock.Lock()
    defer srv.lock.Unlock()
    if srv.running {
        return errors.New("server already running")
    }
    srv.running = true
    srv.log = srv.Config.Logger
    if srv.log == nil {
        srv.log = log.Root()
    }
    if srv.clock == nil {
        srv.clock = mclock.System{}
    }
    if srv.NoDial && srv.ListenAddr == "" {
        srv.log.Warn("P2P server will be useless, neither dialing nor listening")
    }
    
    // 静态字段检查
    if srv.PrivateKey == nil {
        return errors.New("Server.PrivateKey must be set to a non-nil key")
    }
    if srv.newTransport == nil {
        srv.newTransport = newRLPX
    }
    if srv.listenFunc == nil {
        srv.listenFunc = net.Listen
    }
    
    // 初始化通道
    srv.quit = make(chan struct{})
    srv.delpeer = make(chan peerDrop)
    srv.checkpointPostHandshake = make(chan *conn)
    srv.checkpointAddPeer = make(chan *conn)
    srv.addtrusted = make(chan *enode.Node)
    srv.removetrusted = make(chan *enode.Node)
    srv.peerOp = make(chan peerOpFunc)
    srv.peerOpDone = make(chan struct{})
    
    // 设置本地节点
    if err := srv.setupLocalNode(); err != nil {
        return err
    }
    
    // 设置监听
    if srv.ListenAddr != "" {
        if err := srv.setupListening(); err != nil {
            return err
        }
    }
    
    // 设置发现机制
    if err := srv.setupDiscovery(); err != nil {
        return err
    }
    
    // 启动拨号调度器
    srv.setupDialScheduler()
    
    // 启动主循环
    srv.loopWG.Add(1)
    go srv.run()
    
    return nil
}

本地节点设置

setupLocalNode()函数用于设置本地节点:

func (srv *Server) setupLocalNode() error {
    // 创建devp2p握手信息
    pubkey := crypto.FromECDSAPub(&srv.PrivateKey.PublicKey)
    srv.ourHandshake = &protoHandshake{
        Version: baseProtocolVersion,
        Name:    srv.Name,
        ID:      pubkey[1:],
    }
    
    // 添加协议能力
    for _, p := range srv.Protocols {
        srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap())
    }
    sort.Sort(capsByNameAndVersion(srv.ourHandshake.Caps))
    
    // 创建本地节点
    db, err := enode.OpenDB(srv.Config.NodeDatabase)
    if err != nil {
        return err
    }
    srv.nodedb = db
    srv.localnode = enode.NewLocalNode(db, srv.PrivateKey)
    srv.localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
    
    // 设置协议属性
    for _, p := range srv.Protocols {
        for _, e := range p.Attributes {
            srv.localnode.Set(e)
        }
    }
    
    // 处理NAT
    switch srv.NAT.(type) {
    case nil:
        // 无NAT接口,不执行任何操作
    case nat.ExtIP:
        // ExtIP不阻塞,立即设置IP
        ip, _ := srv.NAT.ExternalIP()
        srv.localnode.SetStaticIP(ip)
    default:
        // 向路由器询问IP
        srv.loopWG.Add(1)
        go func() {
            defer srv.loopWG.Done()
            if ip, err := srv.NAT.ExternalIP(); err == nil {
                srv.localnode.SetStaticIP(ip)
            }
        }()
    }
    
    return nil
}

监听设置

setupListening()函数用于设置监听:

func (srv *Server) setupListening() error {
    // 启动监听器
    listener, err := srv.listenFunc("tcp", srv.ListenAddr)
    if err != nil {
        return err
    }
    srv.listener = listener
    srv.ListenAddr = listener.Addr().String()
    
    // 更新本地节点记录并映射TCP监听端口
    if tcp, ok := listener.Addr().(*net.TCPAddr); ok {
        srv.localnode.Set(enr.TCP(tcp.Port))
        if !tcp.IP.IsLoopback() && srv.NAT != nil {
            srv.loopWG.Add(1)
            go func() {
                nat.Map(srv.NAT, srv.quit, "tcp", tcp.Port, tcp.Port, "ethereum p2p")
                srv.loopWG.Done()
            }()
        }
    }
    
    // 启动监听循环
    srv.loopWG.Add(1)
    go srv.listenLoop()
    
    return nil
}

listenLoop()函数是监听的主要循环:

func (srv *Server) listenLoop() {
    srv.log.Debug("TCP listener up", "addr", srv.listener.Addr())
    
    // 限制新连接的接受
    tokens := defaultMaxPendingPeers
    if srv.MaxPendingPeers > 0 {
        tokens = srv.MaxPendingPeers
    }
    slots := make(chan struct{}, tokens)
    for i := 0; i < tokens; i++ {
        slots <- struct{}{}
    }
    
    defer srv.loopWG.Done()
    defer func() {
        for i := 0; i < cap(slots); i++ {
            <-slots
        }
    }()
    
    for {
        // 在接受前等待空闲插槽
        <-slots
        
        var (
            fd      net.Conn
            err     error
            lastLog time.Time
        )
        
        for {
            fd, err = srv.listener.Accept()
            if netutil.IsTemporaryError(err) {
                if time.Since(lastLog) > 1*time.Second {
                    srv.log.Debug("Temporary read error", "err", err)
                    lastLog = time.Now()
                }
                time.Sleep(time.Millisecond * 200)
                continue
            } else if err != nil {
                srv.log.Debug("Read error", "err", err)
                slots <- struct{}{}
                return
            }
            break
        }
        
        // 检查入站连接
        remoteIP := netutil.AddrIP(fd.RemoteAddr())
        if err := srv.checkInboundConn(remoteIP); err != nil {
            srv.log.Debug("Rejected inbound connection", "addr", fd.RemoteAddr(), "err", err)
            fd.Close()
            slots <- struct{}{}
            continue
        }
        
        if remoteIP != nil {
            var addr *net.TCPAddr
            if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok {
                addr = tcp
            }
            fd = newMeteredConn(fd, true, addr)
            srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr())
        }
        
        go func() {
            srv.SetupConn(fd, inboundConn, nil)
            slots <- struct{}{}
        }()
    }
}

发现机制设置

setupDiscovery()函数用于设置发现机制:

func (srv *Server) setupDiscovery() error {
    srv.discmix = enode.NewFairMix(discmixTimeout)
    
    // 添加协议特定的发现源
    added := make(map[string]bool)
    for _, proto := range srv.Protocols {
        if proto.DialCandidates != nil && !added[proto.Name] {
            srv.discmix.AddSource(proto.DialCandidates)
            added[proto.Name] = true
        }
    }
    
    // 如果DHT被禁用,则不监听UDP端点
    if srv.NoDiscovery && !srv.DiscoveryV5 {
        return nil
    }
    
    addr, err := net.ResolveUDPAddr("udp", srv.ListenAddr)
    if err != nil {
        return err
    }
    
    conn, err := net.ListenUDP("udp", addr)
    if err != nil {
        return err
    }
    
    realaddr := conn.LocalAddr().(*net.UDPAddr)
    srv.log.Debug("UDP listener up", "addr", realaddr)
    
    if srv.NAT != nil {
        if !realaddr.IP.IsLoopback() {
            srv.loopWG.Add(1)
            go func() {
                nat.Map(srv.NAT, srv.quit, "udp", realaddr.Port, realaddr.Port, "ethereum discovery")
                srv.loopWG.Done()
            }()
        }
    }
    
    srv.localnode.SetFallbackUDP(realaddr.Port)
    
    // Discovery V4
    var unhandled chan discover.ReadPacket
    var sconn *sharedUDPConn
    if !srv.NoDiscovery {
        if srv.DiscoveryV5 {
            unhandled = make(chan discover.ReadPacket, 100)
            sconn = &sharedUDPConn{conn, unhandled}
        }
        
        cfg := discover.Config{
            PrivateKey:  srv.PrivateKey,
            NetRestrict: srv.NetRestrict,
            Bootnodes:   srv.BootstrapNodes,
            Unhandled:   unhandled,
            Log:         srv.log,
        }
        
        ntab, err := discover.ListenV4(conn, srv.localnode, cfg)
        if err != nil {
            return err
        }
        srv.ntab = ntab
        srv.discmix.AddSource(ntab.RandomNodes())
    }
    
    // Discovery V5
    if srv.DiscoveryV5 {
        cfg := discover.Config{
            PrivateKey:  srv.PrivateKey,
            NetRestrict: srv.NetRestrict,
            Bootnodes:   srv.BootstrapNodesV5,
            Log:         srv.log,
        }
        
        var err error
        if sconn != nil {
            srv.DiscV5, err = discover.ListenV5(sconn, srv.localnode, cfg)
        } else {
            srv.DiscV5, err = discover.ListenV5(conn, srv.localnode, cfg)
        }
        if err != nil {
            return err
        }
    }
    
    return nil
}

拨号调度器设置

setupDialScheduler()函数用于设置拨号调度器:

func (srv *Server) setupDialScheduler() {
    config := dialConfig{
        self:           srv.localnode.ID(),
        maxDialPeers:   srv.maxDialedConns(),
        maxActiveDials: srv.MaxPendingPeers,
        log:           srv.Logger,
        netRestrict:   srv.NetRestrict,
        dialer:        srv.Dialer,
        clock:         srv.clock,
    }
    
    if srv.ntab != nil {
        config.resolver = srv.ntab
    }
    
    if config.dialer == nil {
        config.dialer = tcpDialer{&net.Dialer{Timeout: defaultDialTimeout}}
    }
    
    srv.dialsched = newDialScheduler(config, srv.discmix, srv.SetupConn)
    
    for _, n := range srv.StaticNodes {
        srv.dialsched.addStatic(n)
    }
}

newDialScheduler()函数创建新的拨号调度器:

func newDialScheduler(config dialConfig, it enode.Iterator, setupFunc dialSetupFunc) *dialScheduler {
    d := &dialScheduler{
        dialConfig:   config.withDefaults(),
        setupFunc:   setupFunc,
        dialing:     make(map[enode.ID]*dialTask),
        static:      make(map[enode.ID]*dialTask),
        peers:       make(map[enode.ID]connFlag),
        doneCh:      make(chan *dialTask),
        nodesIn:     make(chan *enode.Node),
        addStaticCh: make(chan *enode.Node),
        remStaticCh: make(chan *enode.Node),
        addPeerCh:   make(chan *conn),
        remPeerCh:   make(chan *conn),
    }
    
    d.lastStatsLog = d.clock.Now()
    d.ctx, d.cancel = context.WithCancel(context.Background())
    d.wg.Add(2)
    go d.readNodes(it)
    go d.loop(it)
    return d
}

loop()是拨号器的主要循环:

func (d *dialScheduler) loop(it enode.Iterator) {
    var (
        nodesCh    chan *enode.Node
        historyExp = make(chan struct{}, 1)
    )
    
loop:
    for {
        // 如果插槽可用则启动拨号连接
        slots := d.freeDialSlots()
        slots -= d.startStaticDials(slots)
        
        if slots > 0 {
            nodesCh = d.nodesIn
        } else {
            nodesCh = nil
        }
        
        d.rearmHistoryTimer(historyExp)
        d.logStats()
        
        select {
        case node := <-nodesCh:
            if err := d.checkDial(node); err != nil {
                d.log.Trace("Discarding dial candidate", "id", node.ID(), "ip", node.IP(), "reason", err)
            } else {
                d.startDial(newDialTask(node, dynDialedConn))
            }
            
        case task := <-d.doneCh:
            id := task.dest.ID()
            delete(d.dialing, id)
            d.updateStaticPool(id)
            d.doneSinceLastLog++
            
        case c := <-d.addPeerCh:
            if c.is(dynDialedConn) || c.is(staticDialedConn) {
                d.dialPeers++
            }
            id := c.node.ID()
            d.peers[id] = c.flags
            
            // 从静态池中移除,因为节点现在已连接
            task := d.static[id]
            if task != nil && task.staticPoolIndex >= 0 {
                d.removeFromStaticPool(task.staticPoolIndex)
            }
            
        case c := <-d.remPeerCh:
            if c.is(dynDialedConn) || c.is(staticDialedConn) {
                d.dialPeers--
            }
            delete(d.peers, c.node.ID())
            d.updateStaticPool(c.node.ID())
            
        case node := <-d.addStaticCh:
            id := node.ID()
            _, exists := d.static[id]
            d.log.Trace("Adding static node", "id", id, "ip", node.IP(), "added", !exists)
            if exists {
                continue loop
            }
            task := newDialTask(node, staticDialedConn)
            d.static[id] = task
            if d.checkDial(node) == nil {
                d.addToStaticPool(task)
            }
            
        case node := <-d.remStaticCh:
            id := node.ID()
            task := d.static[id]
            d.log.Trace("Removing static node", "id", id, "ok", task != nil)
            if task != nil {
                delete(d.static, id)
                if task.staticPoolIndex >= 0 {
                    d.removeFromStaticPool(task.staticPoolIndex)
                }
            }
            
        case <-historyExp:
            d.expireHistory()
            
        case <-d.ctx.Done():
            it.Close()
            break loop
        }
    }
    
    d.stopHistoryTimer(historyExp)
    for range d.dialing {
        <-d.doneCh
    }
    d.wg.Done()
}

startDial()函数在单独的goroutine中运行拨号任务:

func (d *dialScheduler) startDial(task *dialTask) {
    d.log.Trace("Starting p2p dial", "id", task.dest.ID(), "ip", task.dest.IP(), "flag", task.flags)
    hkey := string(task.dest.ID().Bytes())
    d.history.add(hkey, d.clock.Now().Add(dialHistoryExpiration))
    d.dialing[task.dest.ID()] = task
    
    go func() {
        task.run(d)
        d.doneCh <- task
    }()
}

run()函数执行拨号任务:

func (t *dialTask) run(d *dialScheduler) {
    if t.needResolve() && !t.resolve(d) {
        return
    }
    err := t.dial(d, t.dest)
    if err != nil {
        // 对于静态节点,如果拨号失败则再解析一次
        if _, ok := err.(*dialError); ok && t.flags&staticDialedConn != 0 {
            if t.resolve(d) {
                t.dial(d, t.dest)
            }
        }
    }
}

dial()方法执行实际的连接尝试:

func (t *dialTask) dial(d *dialScheduler, dest *enode.Node) error {
    fd, err := d.dialer.Dial(d.ctx, t.dest)
    if err != nil {
        d.log.Trace("Dial error", "id", t.dest.ID(), "addr", nodeAddr(t.dest), "conn", t.flags, "err", cleanupDialErr(err))
        return &dialError{err}
    }
    
    mfd := newMeteredConn(fd, false, &net.TCPAddr{IP: dest.IP(), Port: dest.TCP()})
    return d.setupFunc(mfd, t.flags, dest)
}

服务监听

SetupConn()函数执行握手协议并尝试将连接创建为peer对象:

func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error {
    c := &conn{
        fd:     fd,
        flags:  flags,
        cont:   make(chan error),
    }
    
    if dialDest == nil {
        c.transport = srv.newTransport(fd, nil)
    } else {
        c.transport = srv.newTransport(fd, dialDest.Pubkey())
    }
    
    err := srv.setupConn(c, flags, dialDest)
    if err != nil {
        c.close(err)
    }
    return err
}

setupConn()函数执行握手协议:

func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) error {
    // 防止遗留的待处理连接进入握手
    srv.lock.Lock()
    running := srv.running
    srv.lock.Unlock()
    if !running {
        return errServerStopped
    }
    
    // 如果是拨号,找出远程公钥
    var dialPubkey *ecdsa.PublicKey
    if dialDest != nil {
        dialPubkey = new(ecdsa.PublicKey)
        if err := dialDest.Load((*enode.Secp256k1)(dialPubkey)); err != nil {
            err = errors.New("dial destination doesn't have a secp256k1 public key")
            srv.log.Trace("Setting up connection failed", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
            return err
        }
    }
    
    // 运行RLPx握手
    remotePubkey, err := c.doEncHandshake(srv.PrivateKey)
    if err != nil {
        srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
        return err
    }
    
    if dialDest != nil {
        c.node = dialDest
    } else {
        c.node = nodeFromConn(remotePubkey, c.fd)
    }
    
    clog := srv.log.New("id", c.node.ID(), "addr", c.fd.RemoteAddr(), "conn", c.flags)
    err = srv.checkpoint(c, srv.checkpointPostHandshake)
    if err != nil {
        clog.Trace("Rejected peer", "err", err)
        return err
    }
    
    // 运行能力协商握手
    phs, err := c.doProtoHandshake(srv.ourHandshake)
    if err != nil {
        clog.Trace("Failed p2p handshake", "err", err)
        return err
    }
    
    if id := c.node.ID(); !bytes.Equal(crypto.Keccak256(phs.ID), id[:]) {
        clog.Trace("Wrong devp2p handshake identity", "phsid", hex.EncodeToString(phs.ID))
        return DiscUnexpectedIdentity
    }
    
    c.caps, c.name = phs.Caps, phs.Name
    err = srv.checkpoint(c, srv.checkpointAddPeer)
    if err != nil {
        clog.Trace("Rejected peer", "err", err)
        return err
    }
    
    return nil
}

密钥握手

doEncHandshake()函数实现密钥握手:

func (t *rlpxTransport) doEncHandshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error) {
    t.conn.SetDeadline(time.Now().Add(handshakeTimeout))
    return t.conn.Handshake(prv)
}

Handshake()函数根据是主动握手还是被动握手来执行对应的握手逻辑:

func (c *Conn) Handshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error) {
    var (
        sec Secrets
        err error
    )
    
    if c.dialDest != nil {
        // 主动握手
        sec, err = initiatorEncHandshake(c.conn, prv, c.dialDest)
    } else {
        // 被动握手
        sec, err = receiverEncHandshake(c.conn, prv)
    }
    
    if err != nil {
        return nil, err
    }
    
    c.InitWithSecrets(sec)
    return sec.remote, err
}

主动握手过程:

func initiatorEncHandshake(conn io.ReadWriter, prv *ecdsa.PrivateKey, remote *ecdsa.PublicKey) (s Secrets, err error) {
    h := &encHandshake{
        initiator: true,
        remote:    ecies.ImportECDSAPublic(remote),
    }
    
    authMsg, err := h.makeAuthMsg(prv)
    if err != nil {
        return s, err
    }
    
    authPacket, err := sealEIP8(authMsg, h)
    if err != nil {
        return s, err
    }
    
    if _, err = conn.Write(authPacket); err != nil {
        return s, err
    }
    
    authRespMsg := new(authRespV4)
    authRespPacket, err := readHandshakeMsg(authRespMsg, encAuthRespLen, prv, conn)
    if err != nil {
        return s, err
    }
    
    if err := h.handleAuthResp(authRespMsg); err != nil {
        return s, err
    }
    
    return h.secrets(authPacket, authRespPacket)
}

被动握手过程:

func receiverEncHandshake(conn io.ReadWriter, prv *ecdsa.PrivateKey) (s Secrets, err error) {
    authMsg := new(authMsgV4)
    authPacket, err := readHandshakeMsg(authMsg, encAuthMsgLen, prv, conn)
    if err != nil {
        return s, err
    }
    
    h := new(encHandshake)
    if err := h.handleAuthMsg(authMsg, prv); err != nil {
        return s, err
    }
    
    authRespMsg, err := h.makeAuthResp()
    if err != nil {
        return s, err
    }
    
    var authRespPacket []byte
    if authMsg.gotPlain {
        authRespPacket, err = authRespMsg.sealPlain(h)
    } else {
        authRespPacket, err = sealEIP8(authRespMsg, h)
    }
    if err != nil {
        return s, err
    }
    
    if _, err = conn.Write(authRespPacket); err != nil {
        return s, err
    }
    
    return h.secrets(authPacket, authRespPacket)
}

协议握手

doProtoHandshake()完成协议握手操作:

func (t *rlpxTransport) doProtoHandshake(our *protoHandshake) (their *protoHandshake, err error) {
    werr := make(chan error, 1)
    go func() {
        werr <- Send(t, handshakeMsg, our)
    }()
    
    if their, err = readProtocolHandshake(t); err != nil {
        <-werr
        return nil, err
    }
    
    if err := <-werr; err != nil {
        return nil, fmt.Errorf("write error: %v", err)
    }
    
    t.conn.SetSnappy(their.Version >= snappyProtocolVersion)
    return their, nil
}

循环监听

run()方法是P2P服务器的主要逻辑循环:

func (srv *Server) run() {
    srv.log.Info("Started P2P networking", "self", srv.localnode.Node().URLv4())
    defer srv.loopWG.Done()
    defer srv.nodedb.Close()
    defer srv.discmix.Close()
    defer srv.dialsched.stop()
    
    var (
        peers        = make(map[enode.ID]*Peer)
        inboundCount = 0
        trusted      = make(map[enode.ID]bool, len(srv.TrustedNodes))
    )
    
    // 将可信节点放入映射以加速检查
    for _, n := range srv.TrustedNodes {
        trusted[n.ID()] = true
    }
    
running:
    for {
        select {
        case <-srv.quit:
           
以太坊P2P网络深入解析 文章前言 本文是"以太坊P2P网络深入解析"的续篇,重点分析以太坊P2P网络的源码实现和关键机制。 源码分析 服务停用 Stop() 函数用于停止服务器和所有活动的对等连接,它会阻塞直到所有活动连接都关闭: 节点启动 Start() 函数用于启动一个P2P节点: 本地节点设置 setupLocalNode() 函数用于设置本地节点: 监听设置 setupListening() 函数用于设置监听: listenLoop() 函数是监听的主要循环: 发现机制设置 setupDiscovery() 函数用于设置发现机制: 拨号调度器设置 setupDialScheduler() 函数用于设置拨号调度器: newDialScheduler() 函数创建新的拨号调度器: loop() 是拨号器的主要循环: startDial() 函数在单独的goroutine中运行拨号任务: run() 函数执行拨号任务: dial() 方法执行实际的连接尝试: 服务监听 SetupConn() 函数执行握手协议并尝试将连接创建为peer对象: setupConn() 函数执行握手协议: 密钥握手 doEncHandshake() 函数实现密钥握手: Handshake() 函数根据是主动握手还是被动握手来执行对应的握手逻辑: 主动握手过程: 被动握手过程: 协议握手 doProtoHandshake() 完成协议握手操作: 循环监听 run() 方法是P2P服务器的主要逻辑循环: