Skip to content

Commit

Permalink
feat(dot/network/messages): define StateRequest and StateResponse
Browse files Browse the repository at this point in the history
… proto messages (#4089)
  • Loading branch information
EclesioMeloJunior authored Aug 22, 2024
1 parent a6b42f5 commit ea5be08
Show file tree
Hide file tree
Showing 35 changed files with 1,308 additions and 882 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ jobs:
- name: Up to date Go proto generated
run: |
go install google.golang.org/protobuf/cmd/[email protected] && \
PROTOC_VERSION=21.10 && \
PROTOC_VERSION=24.4 && \
curl -sL --output protoc.zip "https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-linux-x86_64.zip" && \
sudo unzip protoc.zip -d /usr/local && \
rm protoc.zip && \
Expand Down
25 changes: 11 additions & 14 deletions dot/network/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,16 @@ import (
var (
ErrNoPeersConnected = errors.New("no peers connected")
ErrReceivedEmptyMessage = errors.New("received empty message")
ErrNilBlockInResponse = errors.New("nil block in response")

errCannotValidateHandshake = errors.New("failed to validate handshake")
errMessageTypeNotValid = errors.New("message type is not valid")
errInvalidHandshakeForPeer = errors.New("peer previously sent invalid handshake")
errHandshakeTimeout = errors.New("handshake timeout reached")
errBlockRequestFromNumberInvalid = errors.New("block request message From number is not valid")
errInvalidStartingBlockType = errors.New("invalid StartingBlock in messsage")
errInboundHanshakeExists = errors.New("an inbound handshake already exists for given peer")
errInvalidRole = errors.New("invalid role")
ErrFailedToReadEntireMessage = errors.New("failed to read entire message")
ErrNilStream = errors.New("nil stream")
ErrInvalidLEB128EncodedData = errors.New("invalid LEB128 encoded data")
ErrGreaterThanMaxSize = errors.New("greater than maximum size")
ErrStreamReset = errors.New("stream reset")
errCannotValidateHandshake = errors.New("failed to validate handshake")
errMessageTypeNotValid = errors.New("message type is not valid")
errInvalidHandshakeForPeer = errors.New("peer previously sent invalid handshake")
errHandshakeTimeout = errors.New("handshake timeout reached")
errInboundHanshakeExists = errors.New("an inbound handshake already exists for given peer")
errInvalidRole = errors.New("invalid role")
ErrFailedToReadEntireMessage = errors.New("failed to read entire message")
ErrNilStream = errors.New("nil stream")
ErrInvalidLEB128EncodedData = errors.New("invalid LEB128 encoded data")
ErrGreaterThanMaxSize = errors.New("greater than maximum size")
ErrStreamReset = errors.New("stream reset")
)
29 changes: 16 additions & 13 deletions dot/network/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"
"time"

"github.com/ChainSafe/gossamer/dot/network/messages"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/internal/log"
"github.com/ChainSafe/gossamer/lib/common"
Expand All @@ -32,14 +33,14 @@ const (
)

type testStreamHandler struct {
messages map[peer.ID][]Message
messages map[peer.ID][]messages.P2PMessage
decoder messageDecoder
exit bool
}

func newTestStreamHandler(decoder messageDecoder) *testStreamHandler {
return &testStreamHandler{
messages: make(map[peer.ID][]Message),
messages: make(map[peer.ID][]messages.P2PMessage),
decoder: decoder,
}
}
Expand All @@ -55,7 +56,7 @@ func (s *testStreamHandler) handleStream(stream libp2pnetwork.Stream) {
s.readStream(stream, peer, s.decoder, s.handleMessage)
}

func (s *testStreamHandler) handleMessage(stream libp2pnetwork.Stream, msg Message) error {
func (s *testStreamHandler) handleMessage(stream libp2pnetwork.Stream, msg messages.P2PMessage) error {
msgs := s.messages[stream.Conn().RemotePeer()]
s.messages[stream.Conn().RemotePeer()] = append(msgs, msg)

Expand All @@ -65,7 +66,7 @@ func (s *testStreamHandler) handleMessage(stream libp2pnetwork.Stream, msg Messa
return s.writeToStream(stream, announceHandshake)
}

func (s *testStreamHandler) writeToStream(stream libp2pnetwork.Stream, msg Message) error {
func (s *testStreamHandler) writeToStream(stream libp2pnetwork.Stream, msg messages.P2PMessage) error {
encMsg, err := msg.Encode()
if err != nil {
return err
Expand Down Expand Up @@ -124,24 +125,26 @@ var starting, _ = variadic.NewUint32OrHash(uint32(1))

var one = uint32(1)

func newTestBlockRequestMessage(t *testing.T) *BlockRequestMessage {
func newTestBlockRequestMessage(t *testing.T) *messages.BlockRequestMessage {
t.Helper()

return &BlockRequestMessage{
RequestedData: RequestedDataHeader + RequestedDataBody + RequestedDataJustification,
return &messages.BlockRequestMessage{
RequestedData: messages.RequestedDataHeader +
messages.RequestedDataBody +
messages.RequestedDataJustification,
StartingBlock: *starting,
Direction: 1,
Max: &one,
}
}

func testBlockRequestMessageDecoder(in []byte, _ peer.ID, _ bool) (Message, error) {
msg := new(BlockRequestMessage)
func testBlockRequestMessageDecoder(in []byte, _ peer.ID, _ bool) (messages.P2PMessage, error) {
msg := new(messages.BlockRequestMessage)
err := msg.Decode(in)
return msg, err
}

func testBlockAnnounceMessageDecoder(in []byte, _ peer.ID, _ bool) (Message, error) {
func testBlockAnnounceMessageDecoder(in []byte, _ peer.ID, _ bool) (messages.P2PMessage, error) {
msg := BlockAnnounceMessage{
Number: 0,
Digest: types.NewDigest(),
Expand All @@ -150,7 +153,7 @@ func testBlockAnnounceMessageDecoder(in []byte, _ peer.ID, _ bool) (Message, err
return &msg, err
}

func testBlockAnnounceHandshakeDecoder(in []byte, _ peer.ID, _ bool) (Message, error) {
func testBlockAnnounceHandshakeDecoder(in []byte, _ peer.ID, _ bool) (messages.P2PMessage, error) {
msg := new(BlockAnnounceHandshake)
err := msg.Decode(in)
return msg, err
Expand Down Expand Up @@ -280,11 +283,11 @@ func createTestService(t *testing.T, cfg *Config) (srvc *Service) {
return srvc
}

func newTestBlockResponseMessage(t *testing.T) *BlockResponseMessage {
func newTestBlockResponseMessage(t *testing.T) *messages.BlockResponseMessage {
t.Helper()

const blockRequestSize = 128
msg := &BlockResponseMessage{
msg := &messages.BlockResponseMessage{
BlockData: make([]*types.BlockData, blockRequestSize),
}

Expand Down
5 changes: 3 additions & 2 deletions dot/network/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"sync"
"time"

"github.com/ChainSafe/gossamer/dot/network/messages"
"github.com/ChainSafe/gossamer/dot/peerset"
"github.com/ChainSafe/gossamer/internal/pubip"
"github.com/dgraph-io/ristretto"
Expand Down Expand Up @@ -330,7 +331,7 @@ func (h *host) bootstrap() {

// send creates a new outbound stream with the given peer and writes the message. It also returns
// the newly created stream.
func (h *host) send(p peer.ID, pid protocol.ID, msg Message) (network.Stream, error) {
func (h *host) send(p peer.ID, pid protocol.ID, msg messages.P2PMessage) (network.Stream, error) {
// open outbound stream with host protocol id
stream, err := h.p2pHost.NewStream(h.ctx, p, pid)
if err != nil {
Expand All @@ -354,7 +355,7 @@ func (h *host) send(p peer.ID, pid protocol.ID, msg Message) (network.Stream, er
return stream, nil
}

func (h *host) writeToStream(s network.Stream, msg Message) error {
func (h *host) writeToStream(s network.Stream, msg messages.P2PMessage) error {
encMsg, err := msg.Encode()
if err != nil {
return err
Expand Down
5 changes: 3 additions & 2 deletions dot/network/light.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package network
import (
"fmt"

"github.com/ChainSafe/gossamer/dot/network/messages"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/pkg/scale"
Expand All @@ -19,7 +20,7 @@ func (s *Service) handleLightStream(stream libp2pnetwork.Stream) {
s.readStream(stream, s.decodeLightMessage, s.handleLightMsg, MaxBlockResponseSize)
}

func (s *Service) decodeLightMessage(in []byte, peer peer.ID, _ bool) (Message, error) {
func (s *Service) decodeLightMessage(in []byte, peer peer.ID, _ bool) (messages.P2PMessage, error) {
s.lightRequestMu.RLock()
defer s.lightRequestMu.RUnlock()

Expand All @@ -33,7 +34,7 @@ func (s *Service) decodeLightMessage(in []byte, peer peer.ID, _ bool) (Message,
return newLightRequestFromBytes(in)
}

func (s *Service) handleLightMsg(stream libp2pnetwork.Stream, msg Message) (err error) {
func (s *Service) handleLightMsg(stream libp2pnetwork.Stream, msg messages.P2PMessage) (err error) {
defer func() {
err := stream.Close()
if err != nil && err.Error() != ErrStreamReset.Error() {
Expand Down
Loading

0 comments on commit ea5be08

Please sign in to comment.