From ba25762cfb99fbeaaf3cf89a652ab240ca912ef9 Mon Sep 17 00:00:00 2001 From: Darioush Jalali Date: Tue, 28 Jan 2025 19:19:04 -0800 Subject: [PATCH] connect it better --- plugin/evm/statesync/connector.go | 80 +++++++++++++++++++++++++++---- plugin/evm/statesync/handler.go | 31 ++++++------ plugin/evm/syncervm_client.go | 5 +- plugin/evm/vm.go | 2 +- 4 files changed, 90 insertions(+), 28 deletions(-) diff --git a/plugin/evm/statesync/connector.go b/plugin/evm/statesync/connector.go index 5e87c2a3ff..ce8f643fc2 100644 --- a/plugin/evm/statesync/connector.go +++ b/plugin/evm/statesync/connector.go @@ -8,12 +8,15 @@ import ( "fmt" "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/network/p2p" "github.com/ava-labs/avalanchego/snow/validators" "github.com/ava-labs/avalanchego/utils/set" "github.com/ava-labs/avalanchego/version" + "github.com/ava-labs/coreth/core" "github.com/ava-labs/coreth/eth/protocols/snap" + "github.com/ava-labs/libevm/log" ethp2p "github.com/ava-labs/libevm/p2p" + "github.com/ava-labs/libevm/p2p/enode" ) var ( @@ -23,15 +26,15 @@ var ( type Connector struct { sync *snap.Syncer - sender common.AppSender + sender *p2p.Client } -func NewConnector(sync *snap.Syncer, sender common.AppSender) *Connector { +func NewConnector(sync *snap.Syncer, sender *p2p.Client) *Connector { return &Connector{sync: sync, sender: sender} } func (c *Connector) Connected(ctx context.Context, nodeID ids.NodeID, version *version.Application) error { - return c.sync.Register(NewOutboundPeer(nodeID, c.sender)) + return c.sync.Register(NewOutboundPeer(nodeID, c.sync, c.sender)) } func (c *Connector) Disconnected(ctx context.Context, nodeID ids.NodeID) error { @@ -39,14 +42,15 @@ func (c *Connector) Disconnected(ctx context.Context, nodeID ids.NodeID) error { } type outbound struct { - peerID ids.NodeID - sender common.AppSender - requestID uint32 + peerID ids.NodeID + sync *snap.Syncer + sender *p2p.Client } -func NewOutboundPeer(nodeID ids.NodeID, sender common.AppSender) *snap.Peer { +func NewOutboundPeer(nodeID ids.NodeID, sync *snap.Syncer, sender *p2p.Client) *snap.Peer { return snap.NewFakePeer(protocolVersion, nodeID.String(), &outbound{ peerID: nodeID, + sync: sync, sender: sender, }) } @@ -60,10 +64,66 @@ func (o *outbound) WriteMsg(msg ethp2p.Msg) error { nodeIDs := set.NewSet[ids.NodeID](1) nodeIDs.Add(o.peerID) - o.requestID++ - return o.sender.SendAppRequest(context.Background(), nodeIDs, o.requestID, bytes) + return o.sender.AppRequest(context.Background(), nodeIDs, bytes, o.handleResponse) } // ReadMsg implements the ethp2p.MsgReadWriter interface. // It is not expected to be called in the used code path. func (o *outbound) ReadMsg() (ethp2p.Msg, error) { panic("not expected to be called") } + +func (o *outbound) handleResponse( + ctx context.Context, + nodeID ids.NodeID, + responseBytes []byte, + err error, +) { + if err != nil { + log.Warn("got error response from peer", "peer", nodeID, "err", err) + return + } + + p := snap.NewFakePeer(protocolVersion, nodeID.String(), &rw{responseBytes: responseBytes}) + if err := snap.HandleMessage(o, p); err != nil { + log.Warn("failed to handle response", "peer", nodeID, "err", err) + } +} + +func (o *outbound) Chain() *core.BlockChain { panic("not expected to be called") } +func (o *outbound) RunPeer(*snap.Peer, snap.Handler) error { panic("not expected to be called") } +func (o *outbound) PeerInfo(id enode.ID) interface{} { panic("not expected to be called") } + +func (o *outbound) Handle(peer *snap.Peer, packet snap.Packet) error { + d := &Downloader{SnapSyncer: o.sync} + return d.DeliverSnapPacket(peer, packet) +} + +// Downloader is copied from eth/downloader/downloader.go +type Downloader struct { + SnapSyncer *snap.Syncer +} + +// DeliverSnapPacket is invoked from a peer's message handler when it transmits a +// data packet for the local node to consume. +func (d *Downloader) DeliverSnapPacket(peer *snap.Peer, packet snap.Packet) error { + switch packet := packet.(type) { + case *snap.AccountRangePacket: + hashes, accounts, err := packet.Unpack() + if err != nil { + return err + } + return d.SnapSyncer.OnAccounts(peer, packet.ID, hashes, accounts, packet.Proof) + + case *snap.StorageRangesPacket: + hashset, slotset := packet.Unpack() + return d.SnapSyncer.OnStorage(peer, packet.ID, hashset, slotset, packet.Proof) + + case *snap.ByteCodesPacket: + return d.SnapSyncer.OnByteCodes(peer, packet.ID, packet.Codes) + + case *snap.TrieNodesPacket: + return d.SnapSyncer.OnTrieNodes(peer, packet.ID, packet.Nodes) + + default: + return fmt.Errorf("unexpected snap packet type: %T", packet) + } +} diff --git a/plugin/evm/statesync/handler.go b/plugin/evm/statesync/handler.go index 9b7b51f469..60f02f54fb 100644 --- a/plugin/evm/statesync/handler.go +++ b/plugin/evm/statesync/handler.go @@ -30,7 +30,7 @@ var ( ) const ( - HandlerID = 128 // ID for the state sync handler, leaving earlier IDs reserved + ProtocolID = 128 // ID for the state sync handler, leaving earlier IDs reserved ErrCodeSnapHandlerFailed = 1 protocolVersion = 0 @@ -53,7 +53,7 @@ func (h *Handler) AppRequest( log.Debug("statesync AppRequest called", "nodeID", nodeID, "requestBytes", len(requestBytes)) rw := &rw{requestBytes: requestBytes} p := snap.NewFakePeer(protocolVersion, nodeID.String(), rw) - err := snap.Handle(h, p) + err := snap.HandleMessage(h, p) log.Debug("statesync AppRequest handled", "nodeID", nodeID, "err", err) if err != nil { return nil, &common.AppError{ @@ -93,19 +93,7 @@ type rw struct { // ReadMsg implements ethp2p.MsgReadWriter. // It is expected to be called exactly once, immediately after the request is received. func (rw *rw) ReadMsg() (ethp2p.Msg, error) { - // parse first uint64 as code - if len(rw.requestBytes) < wrappers.LongLen { - return ethp2p.Msg{}, fmt.Errorf("request too short: %d", len(rw.requestBytes)) - } - code := binary.BigEndian.Uint64(rw.requestBytes) - rw.requestBytes = rw.requestBytes[wrappers.LongLen:] - - return ethp2p.Msg{ - Code: code, - Size: uint32(len(rw.requestBytes)), - Payload: bytes.NewReader(rw.requestBytes), - ReceivedAt: time.Now(), - }, nil + return fromBytes(rw.requestBytes) } // WriteMsg implements ethp2p.MsgReadWriter. @@ -116,6 +104,19 @@ func (rw *rw) WriteMsg(msg ethp2p.Msg) error { return err } +func fromBytes(msgBytes []byte) (ethp2p.Msg, error) { + if len(msgBytes) < wrappers.LongLen { + return ethp2p.Msg{}, fmt.Errorf("bytes too short: %d", len(msgBytes)) + } + code := binary.BigEndian.Uint64(msgBytes) + return ethp2p.Msg{ + Code: code, + Size: uint32(len(msgBytes) - wrappers.LongLen), + Payload: bytes.NewReader(msgBytes[wrappers.LongLen:]), + ReceivedAt: time.Now(), + }, nil +} + func toBytes(msg ethp2p.Msg) ([]byte, error) { bytes := make([]byte, msg.Size+wrappers.LongLen) binary.BigEndian.PutUint64(bytes, msg.Code) diff --git a/plugin/evm/syncervm_client.go b/plugin/evm/syncervm_client.go index cda8828263..8a3438108e 100644 --- a/plugin/evm/syncervm_client.go +++ b/plugin/evm/syncervm_client.go @@ -163,12 +163,13 @@ func (client *stateSyncerClient) stateSync(ctx context.Context) error { if client.useUpstream { log.Warn("Using upstream state syncer (untested)") syncer := snap.NewSyncer(client.chaindb, rawdb.HashScheme) + p2pClient := client.network.NewClient(ethstatesync.ProtocolID) if len(client.stateSyncNodes) > 0 { for _, nodeID := range client.stateSyncNodes { - syncer.Register(ethstatesync.NewOutboundPeer(nodeID, client.appSender)) + syncer.Register(ethstatesync.NewOutboundPeer(nodeID, syncer, p2pClient)) } } else { - client.network.AddConnector(ethstatesync.NewConnector(syncer, client.appSender)) + client.network.AddConnector(ethstatesync.NewConnector(syncer, p2pClient)) } if err := syncer.Sync(client.syncSummary.BlockRoot, convertReadOnlyToBidirectional(ctx.Done())); err != nil { return err diff --git a/plugin/evm/vm.go b/plugin/evm/vm.go index 08a7ce91e0..cadafd80a1 100644 --- a/plugin/evm/vm.go +++ b/plugin/evm/vm.go @@ -623,7 +623,7 @@ func (vm *VM) Initialize( vm.Network.AddHandler(p2p.SignatureRequestHandlerID, warpHandler) ////// - vm.Network.AddHandler(statesync.HandlerID, statesync.NewHandler(vm.blockChain)) + vm.Network.AddHandler(statesync.ProtocolID, statesync.NewHandler(vm.blockChain)) vm.setAppRequestHandlers()