Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(parachain): availability distribution skeleton #4591

Open
wants to merge 3 commits into
base: feat/parachain
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dot/network/request_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type RequestMaker interface {
Do(to peer.ID, req Message, res ResponseMessage) error
}

type RequestHandler func(who peer.ID, payload []byte) (ResponseMessage, error)

type RequestResponseProtocol struct {
ctx context.Context
host *host
Expand Down
7 changes: 7 additions & 0 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,13 @@ func (s *Service) SendMessage(to peer.ID, msg NotificationsMessage) error {
return errors.New("message not supported by any notifications protocol")
}

func (s *Service) RegisterRequestHandler(subprotocol protocol.ID, handler RequestHandler) {
// This method needs to exist on Service because some parachain subsystems include it in their Network interface.
// We could implement it by merging https://github.com/ChainSafe/gossamer/pull/4588 but by the time this is actually
// used, the network layer will probably have undergone some significant changes.
// See also https://github.com/ChainSafe/gossamer/issues/4453#issuecomment-2704259268
}

func (s *Service) GetRequestResponseProtocol(subprotocol string, requestTimeout time.Duration,
maxResponseSize uint64) RequestMaker {

Expand Down
130 changes: 130 additions & 0 deletions dot/parachain/availability-distribution/availability_distribution.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright 2025 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package availabilitydistribution

import (
"context"
"errors"
"fmt"

"github.com/ChainSafe/gossamer/dot/network"
parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/internal/log"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/runtime"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
)

var logger = log.NewFromGlobal(log.AddContext("pkg", "parachain-availability-distribution"))

type AvailabilityDistribution struct {
subSystemToOverseer chan<- any
net Network
blockState BlockState
}

var _ parachaintypes.Subsystem = (*AvailabilityDistribution)(nil)

type Network interface {
RegisterRequestHandler(subprotocolID protocol.ID, handler network.RequestHandler)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be missing something, but why can the subsystem register a handler? I mean, in what specific case the subsystem will register a request handler for a specific network protocol id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The availability distribution subsystem is responsible for fetching erasure chunks and PoVs as well as responding to those requests from other nodes.

So on startup the subsystem will register handlers for req_chunk/1 (and/or req_chunk/2) and req_pov/1.

}

type BlockState interface {
GetHeader(hash common.Hash) (*types.Header, error)
GetRuntime(hash common.Hash) (instance runtime.Instance, err error)
}

// NewAvailabilityDistribution creates a new AvailabilityDistribution subsystem
func NewAvailabilityDistribution(
overseerChan chan<- any,
net Network,
blockState BlockState,
) *AvailabilityDistribution {
return &AvailabilityDistribution{
subSystemToOverseer: overseerChan,
net: net,
blockState: blockState,
}
}

// Run starts the AvailabilityDistribution subsystem
func (ad *AvailabilityDistribution) Run(ctx context.Context, overseerToSubSystem <-chan any) {
for {
select {
case msg := <-overseerToSubSystem:
err := ad.processMessage(msg)
if err != nil {
logger.Errorf("processing message: %s", err.Error())
}
case <-ctx.Done():
if err := ctx.Err(); err != nil && !errors.Is(err, context.Canceled) {
logger.Errorf("ctx error: %s\n", err)
}
return
}
}
}

func (ad *AvailabilityDistribution) Stop() {
logger.Tracef("Stopping %s subsystem", ad.Name())
}

// Name returns the name of the subsystem
func (ad *AvailabilityDistribution) Name() parachaintypes.SubSystemName {
return parachaintypes.AvailabilityDistribution
}

// processMessage processes messages sent to the AvailabilityDistribution subsystem
func (ad *AvailabilityDistribution) processMessage(msg any) error {
switch msg := msg.(type) {
case parachaintypes.ActiveLeavesUpdateSignal:
err := ad.ProcessActiveLeavesUpdateSignal(msg)
if err != nil {
return fmt.Errorf("processing active leaves update signal: %w", err)
}
case parachaintypes.BlockFinalizedSignal:
return ad.ProcessBlockFinalizedSignal(msg)
case parachaintypes.AvailabilityDistributionMessageFetchPoV:
return ad.processAvailabilityDistributionMessageFetchPoV(msg)
default:
return fmt.Errorf("%w: %T", parachaintypes.ErrUnknownOverseerMessage, msg)
}
return nil
}

// ProcessActiveLeavesUpdateSignal processes active leaves update signal
func (ad *AvailabilityDistribution) ProcessActiveLeavesUpdateSignal(
signal parachaintypes.ActiveLeavesUpdateSignal,
) error {
return nil // TODO: implement
}

// ProcessBlockFinalizedSignal processes block finalized signal
func (ad *AvailabilityDistribution) ProcessBlockFinalizedSignal(msg parachaintypes.BlockFinalizedSignal) error {
return nil // nothing to do
}

func (ad *AvailabilityDistribution) processAvailabilityDistributionMessageFetchPoV(
msg parachaintypes.AvailabilityDistributionMessageFetchPoV,
) error {
return nil // TODO: implement
}

//nolint:unused
func (ad *AvailabilityDistribution) handleChunkFetchingRequest(
who peer.ID,
payload []byte,
) (network.ResponseMessage, error) {
return nil, nil // TODO: implement
}

//nolint:unused
func (ad *AvailabilityDistribution) handlePoVFetchingRequest(
who peer.ID,
payload []byte,
) (network.ResponseMessage, error) {
return nil, nil // TODO: implement
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2025 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package availabilitydistribution

import (
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"
)

func TestNewAvailabilityDistribution(t *testing.T) {
ctrl := gomock.NewController(t)
netMock := NewMockNetwork(ctrl)
blockStateMock := NewMockBlockState(ctrl)

overseerCh := make(chan any)

ad := NewAvailabilityDistribution(overseerCh, netMock, blockStateMock)

assert.NotNil(t, ad)
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// Copyright 2025 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package availabilitydistribution

//go:generate mockgen -destination=mocks_network_test.go -package=$GOPACKAGE . Network
//go:generate mockgen -destination=mocks_blockstate_test.go -package=$GOPACKAGE . BlockState
54 changes: 54 additions & 0 deletions dot/parachain/availability-distribution/mocks_network_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions dot/parachain/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"fmt"
"time"

availabilitydistribution "github.com/ChainSafe/gossamer/dot/parachain/availability-distribution"

bitfieldsigning "github.com/ChainSafe/gossamer/dot/parachain/bitfield-signing"

"github.com/ChainSafe/gossamer/dot/network"
Expand Down Expand Up @@ -95,6 +97,14 @@ func NewService(net Network, forkID string, st *state.Service, ks keystore.Keyst
bitfieldSigningsSubsystem := bitfieldsigning.NewBitfieldSigning(overseer.SubsystemsToOverseer, ks, st.Block)
overseer.RegisterSubsystem(bitfieldSigningsSubsystem)

// register availability distribution subsystem
availabilityDistributionSubsystem := availabilitydistribution.NewAvailabilityDistribution(
overseer.GetSubsystemToOverseerChannel(),
net,
st.Block,
)
overseer.RegisterSubsystem(availabilityDistributionSubsystem)

parachainService := &Service{
Network: net,
overseer: overseer,
Expand Down Expand Up @@ -173,6 +183,7 @@ type Network interface {
batchHandler network.NotificationsMessageBatchHandler,
maxSize uint64,
) error
RegisterRequestHandler(subprotocolID protocol.ID, handler network.RequestHandler)
GetRequestResponseProtocol(subprotocol string, requestTimeout time.Duration,
maxResponseSize uint64) network.RequestMaker
ReportPeer(change peerset.ReputationChange, p peer.ID)
Expand Down
23 changes: 12 additions & 11 deletions dot/parachain/types/subsystems.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,18 @@ import (
type SubSystemName string

const (
CandidateBacking SubSystemName = "CandidateBacking"
CollationProtocol SubSystemName = "CollationProtocol"
AvailabilityStore SubSystemName = "AvailabilityStore"
NetworkBridgeSender SubSystemName = "NetworkBridgeSender"
NetworkBridgeReceiver SubSystemName = "NetworkBridgeReceiver"
ChainAPI SubSystemName = "ChainAPI"
CandidateValidation SubSystemName = "CandidateValidation"
Provisioner SubSystemName = "Provisioner"
StatementDistribution SubSystemName = "StatementDistribution"
ProspectiveParachains SubSystemName = "ProspectiveParachains"
BitfieldSigning SubSystemName = "BitfieldSigning"
CandidateBacking SubSystemName = "CandidateBacking"
CollationProtocol SubSystemName = "CollationProtocol"
AvailabilityStore SubSystemName = "AvailabilityStore"
AvailabilityDistribution SubSystemName = "AvailabilityDistribution"
NetworkBridgeSender SubSystemName = "NetworkBridgeSender"
NetworkBridgeReceiver SubSystemName = "NetworkBridgeReceiver"
ChainAPI SubSystemName = "ChainAPI"
CandidateValidation SubSystemName = "CandidateValidation"
Provisioner SubSystemName = "Provisioner"
StatementDistribution SubSystemName = "StatementDistribution"
ProspectiveParachains SubSystemName = "ProspectiveParachains"
BitfieldSigning SubSystemName = "BitfieldSigning"
)

var SubsystemRequestTimeout = 5 * time.Second
Expand Down
Loading
Loading