Skip to content

Commit

Permalink
connect it better
Browse files Browse the repository at this point in the history
  • Loading branch information
darioush committed Jan 29, 2025
1 parent 924f947 commit ba25762
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 28 deletions.
80 changes: 70 additions & 10 deletions plugin/evm/statesync/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ import (
"fmt"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/snow/validators"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/version"
"github.com/ava-labs/coreth/core"
"github.com/ava-labs/coreth/eth/protocols/snap"
"github.com/ava-labs/libevm/log"
ethp2p "github.com/ava-labs/libevm/p2p"
"github.com/ava-labs/libevm/p2p/enode"
)

var (
Expand All @@ -23,30 +26,31 @@ var (

type Connector struct {
sync *snap.Syncer
sender common.AppSender
sender *p2p.Client
}

func NewConnector(sync *snap.Syncer, sender common.AppSender) *Connector {
func NewConnector(sync *snap.Syncer, sender *p2p.Client) *Connector {
return &Connector{sync: sync, sender: sender}
}

func (c *Connector) Connected(ctx context.Context, nodeID ids.NodeID, version *version.Application) error {
return c.sync.Register(NewOutboundPeer(nodeID, c.sender))
return c.sync.Register(NewOutboundPeer(nodeID, c.sync, c.sender))
}

func (c *Connector) Disconnected(ctx context.Context, nodeID ids.NodeID) error {
return c.sync.Unregister(nodeID.String())
}

type outbound struct {
peerID ids.NodeID
sender common.AppSender
requestID uint32
peerID ids.NodeID
sync *snap.Syncer
sender *p2p.Client
}

func NewOutboundPeer(nodeID ids.NodeID, sender common.AppSender) *snap.Peer {
func NewOutboundPeer(nodeID ids.NodeID, sync *snap.Syncer, sender *p2p.Client) *snap.Peer {
return snap.NewFakePeer(protocolVersion, nodeID.String(), &outbound{
peerID: nodeID,
sync: sync,
sender: sender,
})
}
Expand All @@ -60,10 +64,66 @@ func (o *outbound) WriteMsg(msg ethp2p.Msg) error {
nodeIDs := set.NewSet[ids.NodeID](1)
nodeIDs.Add(o.peerID)

o.requestID++
return o.sender.SendAppRequest(context.Background(), nodeIDs, o.requestID, bytes)
return o.sender.AppRequest(context.Background(), nodeIDs, bytes, o.handleResponse)
}

// ReadMsg implements the ethp2p.MsgReadWriter interface.
// It is not expected to be called in the used code path.
func (o *outbound) ReadMsg() (ethp2p.Msg, error) { panic("not expected to be called") }

func (o *outbound) handleResponse(
ctx context.Context,
nodeID ids.NodeID,
responseBytes []byte,
err error,
) {
if err != nil {
log.Warn("got error response from peer", "peer", nodeID, "err", err)
return
}

p := snap.NewFakePeer(protocolVersion, nodeID.String(), &rw{responseBytes: responseBytes})
if err := snap.HandleMessage(o, p); err != nil {
log.Warn("failed to handle response", "peer", nodeID, "err", err)
}
}

func (o *outbound) Chain() *core.BlockChain { panic("not expected to be called") }
func (o *outbound) RunPeer(*snap.Peer, snap.Handler) error { panic("not expected to be called") }
func (o *outbound) PeerInfo(id enode.ID) interface{} { panic("not expected to be called") }

func (o *outbound) Handle(peer *snap.Peer, packet snap.Packet) error {
d := &Downloader{SnapSyncer: o.sync}
return d.DeliverSnapPacket(peer, packet)
}

// Downloader is copied from eth/downloader/downloader.go
type Downloader struct {
SnapSyncer *snap.Syncer
}

// DeliverSnapPacket is invoked from a peer's message handler when it transmits a
// data packet for the local node to consume.
func (d *Downloader) DeliverSnapPacket(peer *snap.Peer, packet snap.Packet) error {
switch packet := packet.(type) {
case *snap.AccountRangePacket:
hashes, accounts, err := packet.Unpack()
if err != nil {
return err
}
return d.SnapSyncer.OnAccounts(peer, packet.ID, hashes, accounts, packet.Proof)

case *snap.StorageRangesPacket:
hashset, slotset := packet.Unpack()
return d.SnapSyncer.OnStorage(peer, packet.ID, hashset, slotset, packet.Proof)

case *snap.ByteCodesPacket:
return d.SnapSyncer.OnByteCodes(peer, packet.ID, packet.Codes)

case *snap.TrieNodesPacket:
return d.SnapSyncer.OnTrieNodes(peer, packet.ID, packet.Nodes)

default:
return fmt.Errorf("unexpected snap packet type: %T", packet)
}
}
31 changes: 16 additions & 15 deletions plugin/evm/statesync/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ var (
)

const (
HandlerID = 128 // ID for the state sync handler, leaving earlier IDs reserved
ProtocolID = 128 // ID for the state sync handler, leaving earlier IDs reserved
ErrCodeSnapHandlerFailed = 1

protocolVersion = 0
Expand All @@ -53,7 +53,7 @@ func (h *Handler) AppRequest(
log.Debug("statesync AppRequest called", "nodeID", nodeID, "requestBytes", len(requestBytes))
rw := &rw{requestBytes: requestBytes}
p := snap.NewFakePeer(protocolVersion, nodeID.String(), rw)
err := snap.Handle(h, p)
err := snap.HandleMessage(h, p)
log.Debug("statesync AppRequest handled", "nodeID", nodeID, "err", err)
if err != nil {
return nil, &common.AppError{
Expand Down Expand Up @@ -93,19 +93,7 @@ type rw struct {
// ReadMsg implements ethp2p.MsgReadWriter.
// It is expected to be called exactly once, immediately after the request is received.
func (rw *rw) ReadMsg() (ethp2p.Msg, error) {
// parse first uint64 as code
if len(rw.requestBytes) < wrappers.LongLen {
return ethp2p.Msg{}, fmt.Errorf("request too short: %d", len(rw.requestBytes))
}
code := binary.BigEndian.Uint64(rw.requestBytes)
rw.requestBytes = rw.requestBytes[wrappers.LongLen:]

return ethp2p.Msg{
Code: code,
Size: uint32(len(rw.requestBytes)),
Payload: bytes.NewReader(rw.requestBytes),
ReceivedAt: time.Now(),
}, nil
return fromBytes(rw.requestBytes)
}

// WriteMsg implements ethp2p.MsgReadWriter.
Expand All @@ -116,6 +104,19 @@ func (rw *rw) WriteMsg(msg ethp2p.Msg) error {
return err
}

func fromBytes(msgBytes []byte) (ethp2p.Msg, error) {
if len(msgBytes) < wrappers.LongLen {
return ethp2p.Msg{}, fmt.Errorf("bytes too short: %d", len(msgBytes))
}
code := binary.BigEndian.Uint64(msgBytes)
return ethp2p.Msg{
Code: code,
Size: uint32(len(msgBytes) - wrappers.LongLen),
Payload: bytes.NewReader(msgBytes[wrappers.LongLen:]),
ReceivedAt: time.Now(),
}, nil
}

func toBytes(msg ethp2p.Msg) ([]byte, error) {
bytes := make([]byte, msg.Size+wrappers.LongLen)
binary.BigEndian.PutUint64(bytes, msg.Code)
Expand Down
5 changes: 3 additions & 2 deletions plugin/evm/syncervm_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,13 @@ func (client *stateSyncerClient) stateSync(ctx context.Context) error {
if client.useUpstream {
log.Warn("Using upstream state syncer (untested)")
syncer := snap.NewSyncer(client.chaindb, rawdb.HashScheme)
p2pClient := client.network.NewClient(ethstatesync.ProtocolID)
if len(client.stateSyncNodes) > 0 {
for _, nodeID := range client.stateSyncNodes {
syncer.Register(ethstatesync.NewOutboundPeer(nodeID, client.appSender))
syncer.Register(ethstatesync.NewOutboundPeer(nodeID, syncer, p2pClient))
}
} else {
client.network.AddConnector(ethstatesync.NewConnector(syncer, client.appSender))
client.network.AddConnector(ethstatesync.NewConnector(syncer, p2pClient))
}
if err := syncer.Sync(client.syncSummary.BlockRoot, convertReadOnlyToBidirectional(ctx.Done())); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion plugin/evm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ func (vm *VM) Initialize(
vm.Network.AddHandler(p2p.SignatureRequestHandlerID, warpHandler)

//////
vm.Network.AddHandler(statesync.HandlerID, statesync.NewHandler(vm.blockChain))
vm.Network.AddHandler(statesync.ProtocolID, statesync.NewHandler(vm.blockChain))

vm.setAppRequestHandlers()

Expand Down

0 comments on commit ba25762

Please sign in to comment.