以太坊P2P网络深入刨析(下)
字数 652 2025-08-22 12:23:24
以太坊P2P网络深入解析
文章前言
本文是"以太坊P2P网络深入解析"的续篇,重点分析以太坊P2P网络的源码实现和关键机制。
源码分析
服务停用
Stop()函数用于停止服务器和所有活动的对等连接,它会阻塞直到所有活动连接都关闭:
func (srv *Server) Stop() {
srv.lock.Lock()
if !srv.running {
srv.lock.Unlock()
return
}
srv.running = false
if srv.listener != nil {
srv.listener.Close()
}
close(srv.quit)
srv.lock.Unlock()
srv.loopWG.Wait()
}
节点启动
Start()函数用于启动一个P2P节点:
func (srv *Server) Start() (err error) {
srv.lock.Lock()
defer srv.lock.Unlock()
if srv.running {
return errors.New("server already running")
}
srv.running = true
srv.log = srv.Config.Logger
if srv.log == nil {
srv.log = log.Root()
}
if srv.clock == nil {
srv.clock = mclock.System{}
}
if srv.NoDial && srv.ListenAddr == "" {
srv.log.Warn("P2P server will be useless, neither dialing nor listening")
}
// 静态字段检查
if srv.PrivateKey == nil {
return errors.New("Server.PrivateKey must be set to a non-nil key")
}
if srv.newTransport == nil {
srv.newTransport = newRLPX
}
if srv.listenFunc == nil {
srv.listenFunc = net.Listen
}
// 初始化通道
srv.quit = make(chan struct{})
srv.delpeer = make(chan peerDrop)
srv.checkpointPostHandshake = make(chan *conn)
srv.checkpointAddPeer = make(chan *conn)
srv.addtrusted = make(chan *enode.Node)
srv.removetrusted = make(chan *enode.Node)
srv.peerOp = make(chan peerOpFunc)
srv.peerOpDone = make(chan struct{})
// 设置本地节点
if err := srv.setupLocalNode(); err != nil {
return err
}
// 设置监听
if srv.ListenAddr != "" {
if err := srv.setupListening(); err != nil {
return err
}
}
// 设置发现机制
if err := srv.setupDiscovery(); err != nil {
return err
}
// 启动拨号调度器
srv.setupDialScheduler()
// 启动主循环
srv.loopWG.Add(1)
go srv.run()
return nil
}
本地节点设置
setupLocalNode()函数用于设置本地节点:
func (srv *Server) setupLocalNode() error {
// 创建devp2p握手信息
pubkey := crypto.FromECDSAPub(&srv.PrivateKey.PublicKey)
srv.ourHandshake = &protoHandshake{
Version: baseProtocolVersion,
Name: srv.Name,
ID: pubkey[1:],
}
// 添加协议能力
for _, p := range srv.Protocols {
srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap())
}
sort.Sort(capsByNameAndVersion(srv.ourHandshake.Caps))
// 创建本地节点
db, err := enode.OpenDB(srv.Config.NodeDatabase)
if err != nil {
return err
}
srv.nodedb = db
srv.localnode = enode.NewLocalNode(db, srv.PrivateKey)
srv.localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
// 设置协议属性
for _, p := range srv.Protocols {
for _, e := range p.Attributes {
srv.localnode.Set(e)
}
}
// 处理NAT
switch srv.NAT.(type) {
case nil:
// 无NAT接口,不执行任何操作
case nat.ExtIP:
// ExtIP不阻塞,立即设置IP
ip, _ := srv.NAT.ExternalIP()
srv.localnode.SetStaticIP(ip)
default:
// 向路由器询问IP
srv.loopWG.Add(1)
go func() {
defer srv.loopWG.Done()
if ip, err := srv.NAT.ExternalIP(); err == nil {
srv.localnode.SetStaticIP(ip)
}
}()
}
return nil
}
监听设置
setupListening()函数用于设置监听:
func (srv *Server) setupListening() error {
// 启动监听器
listener, err := srv.listenFunc("tcp", srv.ListenAddr)
if err != nil {
return err
}
srv.listener = listener
srv.ListenAddr = listener.Addr().String()
// 更新本地节点记录并映射TCP监听端口
if tcp, ok := listener.Addr().(*net.TCPAddr); ok {
srv.localnode.Set(enr.TCP(tcp.Port))
if !tcp.IP.IsLoopback() && srv.NAT != nil {
srv.loopWG.Add(1)
go func() {
nat.Map(srv.NAT, srv.quit, "tcp", tcp.Port, tcp.Port, "ethereum p2p")
srv.loopWG.Done()
}()
}
}
// 启动监听循环
srv.loopWG.Add(1)
go srv.listenLoop()
return nil
}
listenLoop()函数是监听的主要循环:
func (srv *Server) listenLoop() {
srv.log.Debug("TCP listener up", "addr", srv.listener.Addr())
// 限制新连接的接受
tokens := defaultMaxPendingPeers
if srv.MaxPendingPeers > 0 {
tokens = srv.MaxPendingPeers
}
slots := make(chan struct{}, tokens)
for i := 0; i < tokens; i++ {
slots <- struct{}{}
}
defer srv.loopWG.Done()
defer func() {
for i := 0; i < cap(slots); i++ {
<-slots
}
}()
for {
// 在接受前等待空闲插槽
<-slots
var (
fd net.Conn
err error
lastLog time.Time
)
for {
fd, err = srv.listener.Accept()
if netutil.IsTemporaryError(err) {
if time.Since(lastLog) > 1*time.Second {
srv.log.Debug("Temporary read error", "err", err)
lastLog = time.Now()
}
time.Sleep(time.Millisecond * 200)
continue
} else if err != nil {
srv.log.Debug("Read error", "err", err)
slots <- struct{}{}
return
}
break
}
// 检查入站连接
remoteIP := netutil.AddrIP(fd.RemoteAddr())
if err := srv.checkInboundConn(remoteIP); err != nil {
srv.log.Debug("Rejected inbound connection", "addr", fd.RemoteAddr(), "err", err)
fd.Close()
slots <- struct{}{}
continue
}
if remoteIP != nil {
var addr *net.TCPAddr
if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok {
addr = tcp
}
fd = newMeteredConn(fd, true, addr)
srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr())
}
go func() {
srv.SetupConn(fd, inboundConn, nil)
slots <- struct{}{}
}()
}
}
发现机制设置
setupDiscovery()函数用于设置发现机制:
func (srv *Server) setupDiscovery() error {
srv.discmix = enode.NewFairMix(discmixTimeout)
// 添加协议特定的发现源
added := make(map[string]bool)
for _, proto := range srv.Protocols {
if proto.DialCandidates != nil && !added[proto.Name] {
srv.discmix.AddSource(proto.DialCandidates)
added[proto.Name] = true
}
}
// 如果DHT被禁用,则不监听UDP端点
if srv.NoDiscovery && !srv.DiscoveryV5 {
return nil
}
addr, err := net.ResolveUDPAddr("udp", srv.ListenAddr)
if err != nil {
return err
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return err
}
realaddr := conn.LocalAddr().(*net.UDPAddr)
srv.log.Debug("UDP listener up", "addr", realaddr)
if srv.NAT != nil {
if !realaddr.IP.IsLoopback() {
srv.loopWG.Add(1)
go func() {
nat.Map(srv.NAT, srv.quit, "udp", realaddr.Port, realaddr.Port, "ethereum discovery")
srv.loopWG.Done()
}()
}
}
srv.localnode.SetFallbackUDP(realaddr.Port)
// Discovery V4
var unhandled chan discover.ReadPacket
var sconn *sharedUDPConn
if !srv.NoDiscovery {
if srv.DiscoveryV5 {
unhandled = make(chan discover.ReadPacket, 100)
sconn = &sharedUDPConn{conn, unhandled}
}
cfg := discover.Config{
PrivateKey: srv.PrivateKey,
NetRestrict: srv.NetRestrict,
Bootnodes: srv.BootstrapNodes,
Unhandled: unhandled,
Log: srv.log,
}
ntab, err := discover.ListenV4(conn, srv.localnode, cfg)
if err != nil {
return err
}
srv.ntab = ntab
srv.discmix.AddSource(ntab.RandomNodes())
}
// Discovery V5
if srv.DiscoveryV5 {
cfg := discover.Config{
PrivateKey: srv.PrivateKey,
NetRestrict: srv.NetRestrict,
Bootnodes: srv.BootstrapNodesV5,
Log: srv.log,
}
var err error
if sconn != nil {
srv.DiscV5, err = discover.ListenV5(sconn, srv.localnode, cfg)
} else {
srv.DiscV5, err = discover.ListenV5(conn, srv.localnode, cfg)
}
if err != nil {
return err
}
}
return nil
}
拨号调度器设置
setupDialScheduler()函数用于设置拨号调度器:
func (srv *Server) setupDialScheduler() {
config := dialConfig{
self: srv.localnode.ID(),
maxDialPeers: srv.maxDialedConns(),
maxActiveDials: srv.MaxPendingPeers,
log: srv.Logger,
netRestrict: srv.NetRestrict,
dialer: srv.Dialer,
clock: srv.clock,
}
if srv.ntab != nil {
config.resolver = srv.ntab
}
if config.dialer == nil {
config.dialer = tcpDialer{&net.Dialer{Timeout: defaultDialTimeout}}
}
srv.dialsched = newDialScheduler(config, srv.discmix, srv.SetupConn)
for _, n := range srv.StaticNodes {
srv.dialsched.addStatic(n)
}
}
newDialScheduler()函数创建新的拨号调度器:
func newDialScheduler(config dialConfig, it enode.Iterator, setupFunc dialSetupFunc) *dialScheduler {
d := &dialScheduler{
dialConfig: config.withDefaults(),
setupFunc: setupFunc,
dialing: make(map[enode.ID]*dialTask),
static: make(map[enode.ID]*dialTask),
peers: make(map[enode.ID]connFlag),
doneCh: make(chan *dialTask),
nodesIn: make(chan *enode.Node),
addStaticCh: make(chan *enode.Node),
remStaticCh: make(chan *enode.Node),
addPeerCh: make(chan *conn),
remPeerCh: make(chan *conn),
}
d.lastStatsLog = d.clock.Now()
d.ctx, d.cancel = context.WithCancel(context.Background())
d.wg.Add(2)
go d.readNodes(it)
go d.loop(it)
return d
}
loop()是拨号器的主要循环:
func (d *dialScheduler) loop(it enode.Iterator) {
var (
nodesCh chan *enode.Node
historyExp = make(chan struct{}, 1)
)
loop:
for {
// 如果插槽可用则启动拨号连接
slots := d.freeDialSlots()
slots -= d.startStaticDials(slots)
if slots > 0 {
nodesCh = d.nodesIn
} else {
nodesCh = nil
}
d.rearmHistoryTimer(historyExp)
d.logStats()
select {
case node := <-nodesCh:
if err := d.checkDial(node); err != nil {
d.log.Trace("Discarding dial candidate", "id", node.ID(), "ip", node.IP(), "reason", err)
} else {
d.startDial(newDialTask(node, dynDialedConn))
}
case task := <-d.doneCh:
id := task.dest.ID()
delete(d.dialing, id)
d.updateStaticPool(id)
d.doneSinceLastLog++
case c := <-d.addPeerCh:
if c.is(dynDialedConn) || c.is(staticDialedConn) {
d.dialPeers++
}
id := c.node.ID()
d.peers[id] = c.flags
// 从静态池中移除,因为节点现在已连接
task := d.static[id]
if task != nil && task.staticPoolIndex >= 0 {
d.removeFromStaticPool(task.staticPoolIndex)
}
case c := <-d.remPeerCh:
if c.is(dynDialedConn) || c.is(staticDialedConn) {
d.dialPeers--
}
delete(d.peers, c.node.ID())
d.updateStaticPool(c.node.ID())
case node := <-d.addStaticCh:
id := node.ID()
_, exists := d.static[id]
d.log.Trace("Adding static node", "id", id, "ip", node.IP(), "added", !exists)
if exists {
continue loop
}
task := newDialTask(node, staticDialedConn)
d.static[id] = task
if d.checkDial(node) == nil {
d.addToStaticPool(task)
}
case node := <-d.remStaticCh:
id := node.ID()
task := d.static[id]
d.log.Trace("Removing static node", "id", id, "ok", task != nil)
if task != nil {
delete(d.static, id)
if task.staticPoolIndex >= 0 {
d.removeFromStaticPool(task.staticPoolIndex)
}
}
case <-historyExp:
d.expireHistory()
case <-d.ctx.Done():
it.Close()
break loop
}
}
d.stopHistoryTimer(historyExp)
for range d.dialing {
<-d.doneCh
}
d.wg.Done()
}
startDial()函数在单独的goroutine中运行拨号任务:
func (d *dialScheduler) startDial(task *dialTask) {
d.log.Trace("Starting p2p dial", "id", task.dest.ID(), "ip", task.dest.IP(), "flag", task.flags)
hkey := string(task.dest.ID().Bytes())
d.history.add(hkey, d.clock.Now().Add(dialHistoryExpiration))
d.dialing[task.dest.ID()] = task
go func() {
task.run(d)
d.doneCh <- task
}()
}
run()函数执行拨号任务:
func (t *dialTask) run(d *dialScheduler) {
if t.needResolve() && !t.resolve(d) {
return
}
err := t.dial(d, t.dest)
if err != nil {
// 对于静态节点,如果拨号失败则再解析一次
if _, ok := err.(*dialError); ok && t.flags&staticDialedConn != 0 {
if t.resolve(d) {
t.dial(d, t.dest)
}
}
}
}
dial()方法执行实际的连接尝试:
func (t *dialTask) dial(d *dialScheduler, dest *enode.Node) error {
fd, err := d.dialer.Dial(d.ctx, t.dest)
if err != nil {
d.log.Trace("Dial error", "id", t.dest.ID(), "addr", nodeAddr(t.dest), "conn", t.flags, "err", cleanupDialErr(err))
return &dialError{err}
}
mfd := newMeteredConn(fd, false, &net.TCPAddr{IP: dest.IP(), Port: dest.TCP()})
return d.setupFunc(mfd, t.flags, dest)
}
服务监听
SetupConn()函数执行握手协议并尝试将连接创建为peer对象:
func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error {
c := &conn{
fd: fd,
flags: flags,
cont: make(chan error),
}
if dialDest == nil {
c.transport = srv.newTransport(fd, nil)
} else {
c.transport = srv.newTransport(fd, dialDest.Pubkey())
}
err := srv.setupConn(c, flags, dialDest)
if err != nil {
c.close(err)
}
return err
}
setupConn()函数执行握手协议:
func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) error {
// 防止遗留的待处理连接进入握手
srv.lock.Lock()
running := srv.running
srv.lock.Unlock()
if !running {
return errServerStopped
}
// 如果是拨号,找出远程公钥
var dialPubkey *ecdsa.PublicKey
if dialDest != nil {
dialPubkey = new(ecdsa.PublicKey)
if err := dialDest.Load((*enode.Secp256k1)(dialPubkey)); err != nil {
err = errors.New("dial destination doesn't have a secp256k1 public key")
srv.log.Trace("Setting up connection failed", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
return err
}
}
// 运行RLPx握手
remotePubkey, err := c.doEncHandshake(srv.PrivateKey)
if err != nil {
srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
return err
}
if dialDest != nil {
c.node = dialDest
} else {
c.node = nodeFromConn(remotePubkey, c.fd)
}
clog := srv.log.New("id", c.node.ID(), "addr", c.fd.RemoteAddr(), "conn", c.flags)
err = srv.checkpoint(c, srv.checkpointPostHandshake)
if err != nil {
clog.Trace("Rejected peer", "err", err)
return err
}
// 运行能力协商握手
phs, err := c.doProtoHandshake(srv.ourHandshake)
if err != nil {
clog.Trace("Failed p2p handshake", "err", err)
return err
}
if id := c.node.ID(); !bytes.Equal(crypto.Keccak256(phs.ID), id[:]) {
clog.Trace("Wrong devp2p handshake identity", "phsid", hex.EncodeToString(phs.ID))
return DiscUnexpectedIdentity
}
c.caps, c.name = phs.Caps, phs.Name
err = srv.checkpoint(c, srv.checkpointAddPeer)
if err != nil {
clog.Trace("Rejected peer", "err", err)
return err
}
return nil
}
密钥握手
doEncHandshake()函数实现密钥握手:
func (t *rlpxTransport) doEncHandshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error) {
t.conn.SetDeadline(time.Now().Add(handshakeTimeout))
return t.conn.Handshake(prv)
}
Handshake()函数根据是主动握手还是被动握手来执行对应的握手逻辑:
func (c *Conn) Handshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error) {
var (
sec Secrets
err error
)
if c.dialDest != nil {
// 主动握手
sec, err = initiatorEncHandshake(c.conn, prv, c.dialDest)
} else {
// 被动握手
sec, err = receiverEncHandshake(c.conn, prv)
}
if err != nil {
return nil, err
}
c.InitWithSecrets(sec)
return sec.remote, err
}
主动握手过程:
func initiatorEncHandshake(conn io.ReadWriter, prv *ecdsa.PrivateKey, remote *ecdsa.PublicKey) (s Secrets, err error) {
h := &encHandshake{
initiator: true,
remote: ecies.ImportECDSAPublic(remote),
}
authMsg, err := h.makeAuthMsg(prv)
if err != nil {
return s, err
}
authPacket, err := sealEIP8(authMsg, h)
if err != nil {
return s, err
}
if _, err = conn.Write(authPacket); err != nil {
return s, err
}
authRespMsg := new(authRespV4)
authRespPacket, err := readHandshakeMsg(authRespMsg, encAuthRespLen, prv, conn)
if err != nil {
return s, err
}
if err := h.handleAuthResp(authRespMsg); err != nil {
return s, err
}
return h.secrets(authPacket, authRespPacket)
}
被动握手过程:
func receiverEncHandshake(conn io.ReadWriter, prv *ecdsa.PrivateKey) (s Secrets, err error) {
authMsg := new(authMsgV4)
authPacket, err := readHandshakeMsg(authMsg, encAuthMsgLen, prv, conn)
if err != nil {
return s, err
}
h := new(encHandshake)
if err := h.handleAuthMsg(authMsg, prv); err != nil {
return s, err
}
authRespMsg, err := h.makeAuthResp()
if err != nil {
return s, err
}
var authRespPacket []byte
if authMsg.gotPlain {
authRespPacket, err = authRespMsg.sealPlain(h)
} else {
authRespPacket, err = sealEIP8(authRespMsg, h)
}
if err != nil {
return s, err
}
if _, err = conn.Write(authRespPacket); err != nil {
return s, err
}
return h.secrets(authPacket, authRespPacket)
}
协议握手
doProtoHandshake()完成协议握手操作:
func (t *rlpxTransport) doProtoHandshake(our *protoHandshake) (their *protoHandshake, err error) {
werr := make(chan error, 1)
go func() {
werr <- Send(t, handshakeMsg, our)
}()
if their, err = readProtocolHandshake(t); err != nil {
<-werr
return nil, err
}
if err := <-werr; err != nil {
return nil, fmt.Errorf("write error: %v", err)
}
t.conn.SetSnappy(their.Version >= snappyProtocolVersion)
return their, nil
}
循环监听
run()方法是P2P服务器的主要逻辑循环:
func (srv *Server) run() {
srv.log.Info("Started P2P networking", "self", srv.localnode.Node().URLv4())
defer srv.loopWG.Done()
defer srv.nodedb.Close()
defer srv.discmix.Close()
defer srv.dialsched.stop()
var (
peers = make(map[enode.ID]*Peer)
inboundCount = 0
trusted = make(map[enode.ID]bool, len(srv.TrustedNodes))
)
// 将可信节点放入映射以加速检查
for _, n := range srv.TrustedNodes {
trusted[n.ID()] = true
}
running:
for {
select {
case <-srv.quit: