diff --git a/internal/client/network-gossip/bridge.go b/internal/client/network-gossip/bridge.go new file mode 100644 index 0000000000..a80a09ca29 --- /dev/null +++ b/internal/client/network-gossip/bridge.go @@ -0,0 +1,311 @@ +// Copyright 2025 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package gossip + +import ( + "fmt" + "sync" + "time" + + "github.com/ChainSafe/gossamer/internal/client/network" + "github.com/ChainSafe/gossamer/internal/client/network/service" + netSync "github.com/ChainSafe/gossamer/internal/client/network/sync" + peerid "github.com/ChainSafe/gossamer/internal/client/network/types/peer-id" + "github.com/ChainSafe/gossamer/internal/log" + "github.com/ChainSafe/gossamer/internal/primitives/runtime" +) + +var logger = log.NewFromGlobal(log.AddContext("pkg", "client/network-gossip")) + +// GossipEngine receives messages from the network via the Network and forwards them to upper layers via messageSinks. +// In the scenario where messages have been received from the network but a subscribed message sink is not ready to +// receiver, we delay 10 ms and will remove the channel from the sinks if the message is not consumed by the end of +// the delay. To model this process a gossip engine can be in two forwarding states: idle, and busy. +// +// GossipEngine utilises and implementation of [Network] and provides gossiping capabilities on +// top of it. +type GossipEngine[H runtime.Hash, N runtime.Number, Hasher runtime.Hasher[H]] struct { + stateMachine consensusGossip[H, Hasher] + network Network + sync Syncing[H, N] + periodicMaintenanceInterval <-chan time.Time + protocol network.ProtocolName + + // Incoming events from the syncing service. + syncEventStream chan netSync.SyncEvent + // Handle for polling notification-related events. + notificationService service.NotificationService + // Outgoing events to the consumer. + messageSinks map[H][]chan TopicNotification + // Buffered messages (see [`ForwardingState`]). + forwardingState forwardingState + + isTerminated bool + stopChan chan any + errChan chan error +} + +type forwardingState interface { + isForwardingState() +} + +// The gossip engine is currently not forwarding any messages and will poll the network for more messages to forward. +type forwardingStateIdle struct{} + +func (forwardingStateIdle) isForwardingState() {} + +// The gossip engine is in the progress of forwarding messages and thus will not poll the network for more messages +// until it has send all current messages into the subscribed message sinks. +type forwardingStateBusy[H runtime.Hash] []hashTopicNotification[H] + +func (forwardingStateBusy[H]) isForwardingState() {} + +func NewGossipEngine[H runtime.Hash, N runtime.Number, Hasher runtime.Hasher[H]]( + network Network, + sync Syncing[H, N], + notificationService service.NotificationService, + protocol network.ProtocolName, + validator Validator[H], +) GossipEngine[H, N, Hasher] { + ge := newGossipEngine[H, N, Hasher](network, sync, notificationService, protocol, validator) + go func() { + defer close(ge.errChan) + ge.errChan <- ge.poll() + }() + return ge +} + +func newGossipEngine[H runtime.Hash, N runtime.Number, Hasher runtime.Hasher[H]]( + network Network, + sync Syncing[H, N], + notificationService service.NotificationService, + protocol network.ProtocolName, + validator Validator[H], +) GossipEngine[H, N, Hasher] { + syncEventStream := sync.EventStream("network-gossip") + _ = syncEventStream + + ge := GossipEngine[H, N, Hasher]{ + stateMachine: newConsensusGossip[H, Hasher](validator, protocol), + network: network, + sync: sync, + notificationService: notificationService, + periodicMaintenanceInterval: time.NewTimer(periodicMaintenanceInterval).C, + protocol: protocol, + syncEventStream: syncEventStream, + messageSinks: make(map[H][]chan TopicNotification), + forwardingState: forwardingStateIdle{}, + isTerminated: false, + stopChan: make(chan any), + errChan: make(chan error, 1), + } + return ge +} + +func (ge *GossipEngine[H, N, Hasher]) Report(who peerid.PeerID, reputation network.ReputationChange) { + ge.network.ReportPeer(who, reputation) +} + +// RegisterGossipMessage registers a message without propagating it to any peers. The message becomes available to new +// peers or when the service is asked to gossip the message's topic. No validation is performed on the message, if the +// message is already expired it should be dropped on the next garbage collection. +func (ge *GossipEngine[H, N, Hasher]) RegisterGossipMessage(topic H, message []byte) { + ge.stateMachine.RegisterMessage(topic, message) +} + +// BroadcastTopic will broadcast all messages with given topic. +func (ge *GossipEngine[H, N, Hasher]) BroadcastTopic(topic H, force bool) { + ge.stateMachine.BroadcastTopic(ge.notificationService, topic, force) +} + +// MessagesFor retrieves data of valid, incoming messages for a topic (but might have expired). +func (ge *GossipEngine[H, N, Hasher]) MessagesFor(topic H) chan TopicNotification { + pastMessages := ge.stateMachine.MessagesFor(topic) + // The channel length is not critical for correctness. + size := 10 + if len(pastMessages) > size { + size = len(pastMessages) + } + ch := make(chan TopicNotification, size) + + for _, notification := range pastMessages { + select { + case ch <- notification: + default: + panic("receiver known to be live, and buffer size known to suffice") + } + } + + _, ok := ge.messageSinks[topic] + if !ok { + ge.messageSinks[topic] = make([]chan TopicNotification, 0) + } + ge.messageSinks[topic] = append(ge.messageSinks[topic], ch) + return ch +} + +// SendTopic will send all messages with given topic to a peer. +func (ge *GossipEngine[H, N, Hasher]) SendTopic(who peerid.PeerID, topic H, force bool) { + ge.stateMachine.SendTopic(ge.notificationService, who, topic, force) +} + +// GossipMessage will multicast a message to all peers. +func (ge *GossipEngine[H, N, Hasher]) GossipMessage(topic H, message []byte, force bool) { + ge.stateMachine.Multicast(ge.notificationService, topic, message, force) +} + +// SendMessage will send addressed message to the given peers. The message is not kept or multicast +// later on. +func (ge *GossipEngine[H, N, Hasher]) SendMessage(who []peerid.PeerID, data []byte) { + for _, who := range who { + ge.stateMachine.SendMessage(ge.notificationService, who, data) + } +} + +// Announce will notify everyone we're connected to that we have the given block. +// +// Note: this method isn't strictly related to gossiping and should eventually be moved +// somewhere else. +func (ge *GossipEngine[H, N, Hasher]) Announce(block H, associatedData []byte) { + ge.sync.AnnounceBlock(block, associatedData) +} + +func (ge *GossipEngine[H, N, Hasher]) poll() error { //nolint: gocyclo + var nextNotificationEvent <-chan service.NotificationEvent + // outer: + for { + switch forwardingState := ge.forwardingState.(type) { + case forwardingStateIdle: + if nextNotificationEvent == nil { + nextNotificationEvent = ge.notificationService.NextEvent() + } + syncEventStream := ge.syncEventStream + + select { + case event := <-nextNotificationEvent: + // if ok { + switch event := event.(type) { + case service.NotificationEventValidateInboundSubstream: + // only accept peers whose role can be determined + var result service.ValidationResult = service.ValidationResultReject + role := ge.network.PeerRole(event.Peer, event.Handshake) + if role != nil { + result = service.ValidationResultAccept + } + event.ResultChan <- result + close(event.ResultChan) + case service.NotificationEventNotificationStreamOpened: + role := ge.network.PeerRole(event.Peer, event.Handshake) + if role != nil { + ge.stateMachine.NewPeer(ge.notificationService, event.Peer, *role) + } else { + logger.Debugf("role for %s couldn't be determined", event.Peer) + } + case service.NotificationEventNotificationStreamClosed: + ge.stateMachine.PeerDisconnected(ge.notificationService, event.Peer) + case service.NotificationEventNotificationReceived: + toForward := ge.stateMachine.OnIncoming( + ge.network, ge.notificationService, event.Peer, [][]byte{event.Notification}) + ge.forwardingState = forwardingStateBusy[H](toForward) + default: + panic("unreachable") + } + nextNotificationEvent = nil + case syncEvent, ok := <-syncEventStream: + if !ok { + // The sync event stream closed + ge.isTerminated = true + return fmt.Errorf("syncEventStream was terminated unexpectedly") + } + if ok { + switch remote := syncEvent.(type) { + case netSync.SyncEventPeerConnected: + ge.network.AddSetReserved(peerid.PeerID(remote), ge.protocol) + case netSync.SyncEventPeerDisconnected: + ge.network.RemoveSetReserved(peerid.PeerID(remote), ge.protocol) + } + } + case <-ge.periodicMaintenanceInterval: + ge.periodicMaintenanceInterval = time.NewTimer(periodicMaintenanceInterval).C + ge.stateMachine.Tick(ge.notificationService) + + for topic, sinks := range ge.messageSinks { + retained := make([]chan TopicNotification, 0) + for _, sink := range sinks { + if sink != nil { + retained = append(retained, sink) + } + } + if len(retained) > 0 { + ge.messageSinks[topic] = retained + } else { + delete(ge.messageSinks, topic) + } + } + case <-ge.stopChan: + return nil + } + + case forwardingStateBusy[H]: + var ( + topic H + notification TopicNotification + ) + if len(forwardingState) > 0 { + htn := forwardingState[0] + topic = htn.Hash + notification = htn.TopicNotification + ge.forwardingState = forwardingState[1:] + } else { + ge.forwardingState = forwardingStateIdle{} + continue + } + + sinks, ok := ge.messageSinks[topic] + if !ok { + continue + } + + // Filter out all closed sinks. + retained := make([]chan TopicNotification, 0) + for _, sink := range sinks { + if sink != nil { + retained = append(retained, sink) + } + } + ge.messageSinks[topic] = retained + sinks = ge.messageSinks[topic] + + if len(sinks) == 0 { + delete(ge.messageSinks, topic) + continue + } + + logger.Tracef("Pushing consensus message to sinks for %s", topic) + + // Send the notification on each sink. + var wg sync.WaitGroup + for i, sink := range sinks { + wg.Add(1) + go func(sink chan TopicNotification) { + defer wg.Done() + timeout := time.NewTimer(10 * time.Millisecond) + defer timeout.Stop() + select { + case sink <- notification: + // TODO: retry logic? + case <-timeout.C: + // Receiver not responding. Will be removed in next iteration + close(sink) + ge.messageSinks[topic][i] = nil + } + }(sink) + } + wg.Wait() + + default: + panic("unreachable") + } + } +} diff --git a/internal/client/network-gossip/bridge_test.go b/internal/client/network-gossip/bridge_test.go new file mode 100644 index 0000000000..fd3a57156a --- /dev/null +++ b/internal/client/network-gossip/bridge_test.go @@ -0,0 +1,417 @@ +// Copyright 2025 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package gossip + +import ( + "math/rand" + "reflect" + "sync" + "testing" + "testing/quick" + "time" + + "github.com/ChainSafe/gossamer/internal/client/network" + "github.com/ChainSafe/gossamer/internal/client/network/config" + "github.com/ChainSafe/gossamer/internal/client/network/event" + "github.com/ChainSafe/gossamer/internal/client/network/role" + "github.com/ChainSafe/gossamer/internal/client/network/service" + netSync "github.com/ChainSafe/gossamer/internal/client/network/sync" + "github.com/ChainSafe/gossamer/internal/client/network/types/multiaddr" + peerid "github.com/ChainSafe/gossamer/internal/client/network/types/peer-id" + "github.com/ChainSafe/gossamer/internal/primitives/core/hash" + "github.com/ChainSafe/gossamer/internal/primitives/runtime" + "github.com/ChainSafe/gossamer/pkg/scale" + "github.com/stretchr/testify/require" +) + +type TestNetwork struct{} + +func (TestNetwork) SetAuthorizedPeers(peers map[peerid.PeerID]any) { panic("unimpl") } +func (TestNetwork) SetAuthorizedOnly(reservedOnly bool) { panic("unimpl") } +func (TestNetwork) AddKnownAddress(peerID peerid.PeerID, addr multiaddr.Multiaddr) { panic("unimpl") } +func (TestNetwork) ReportPeer(peerID peerid.PeerID, costBenefit network.ReputationChange) {} +func (TestNetwork) DisconnectPeer(who peerid.PeerID, protocol network.ProtocolName) { panic("unimpl") } +func (TestNetwork) AcceptUnreservedPeers() { panic("unimpl") } +func (TestNetwork) DenyUnreservedPeers() { panic("unimpl") } +func (TestNetwork) AddReservedPeer(peer config.MultiaddrPeerId) error { panic("unimpl") } +func (TestNetwork) RemoveReservedPeer(peerID peerid.PeerID) { panic("unimpl") } +func (TestNetwork) SetReservedPeers(protocol network.ProtocolName, peers map[multiaddr.Multiaddr]any) error { + panic("unimpl") +} +func (TestNetwork) AddPeersToReservedSet(protocol network.ProtocolName, peers map[multiaddr.Multiaddr]any) error { + panic("unimpl") +} +func (TestNetwork) RemovePeersFromReservedSet(protocol network.ProtocolName, peers []peerid.PeerID) { + panic("unimpl") +} +func (TestNetwork) AddToPeersSet(protocol network.ProtocolName, peers map[multiaddr.Multiaddr]any) error { + panic("unimpl") +} +func (TestNetwork) RemoveFromPeersSet(protocol network.ProtocolName, peers []peerid.PeerID) { + panic("unimpl") +} +func (TestNetwork) SyncNumConnected() uint { panic("unimpl") } +func (TestNetwork) PeerRole(peerID peerid.PeerID, handshake []byte) *role.ObservedRole { + var roles role.Roles + err := scale.Unmarshal(handshake, &roles) + if err != nil { + return nil + } + role := roles.ObservedRole() + return &role +} +func (TestNetwork) ReservedPeers() <-chan struct { + Peers []peerid.PeerID + Error error +} { + panic("unimpl") +} +func (TestNetwork) EventStream(name string) chan event.Event { panic("unimpl") } +func (TestNetwork) AnnounceBlock(hash hash.H256, data []byte) { panic("unimpl") } +func (TestNetwork) NewBestBlockImported(hash hash.H256, number uint64) { panic("unimpl") } +func (TestNetwork) AddSetReserved(who peerid.PeerID, protocol network.ProtocolName) { panic("unimpl") } +func (TestNetwork) RemoveSetReserved(who peerid.PeerID, protocol network.ProtocolName) { + panic("unimpl") +} + +var _ Network = TestNetwork{} + +type TestSync struct { + eventSenders []chan netSync.SyncEvent + sync.Mutex +} + +func (ts *TestSync) EventStream(name string) chan netSync.SyncEvent { + ts.Lock() + defer ts.Unlock() + ch := make(chan netSync.SyncEvent) + ts.eventSenders = append(ts.eventSenders, ch) + return ch +} + +func (*TestSync) AnnounceBlock(hash hash.H256, data []byte) { panic("unimpl") } +func (*TestSync) NewBestBlockImported(hash hash.H256, number uint64) { panic("unimpl") } + +type TestNotificationService struct { + ch chan service.NotificationEvent +} + +func (TestNotificationService) OpenSubstream(peer peerid.PeerID) <-chan error { panic("unimpl") } +func (TestNotificationService) CloseSubstream(peer peerid.PeerID) <-chan error { panic("unimpl") } +func (TestNotificationService) SendSyncNotification(peer peerid.PeerID, notification []byte) { + panic("unimpl") +} +func (TestNotificationService) SendAsyncNotification(peer peerid.PeerID, notification []byte) <-chan error { + panic("unimpl") +} +func (TestNotificationService) SetHandshake(handshake []byte) <-chan error { panic("unimpl") } +func (TestNotificationService) TrySetHandshake(handshake []byte) error { panic("unimpl") } +func (tns TestNotificationService) NextEvent() <-chan service.NotificationEvent { return tns.ch } +func (TestNotificationService) Protocol() network.ProtocolName { panic("unimpl") } +func (TestNotificationService) MessageSink(peer peerid.PeerID) service.MessageSink { panic("unimpl") } + +var _ service.NotificationService = TestNotificationService{} + +type ChannelLengthTopic struct { + Length uint + Topic hash.H256 +} + +func (ChannelLengthTopic) Generate(rand *rand.Rand, size int) reflect.Value { + possibleLength := rand.Intn(100) + possibleTopics := uint64(rand.Intn(10)) + topicHash := hash.NewH256FromLowUint64BigEndian(possibleTopics).Bytes() + _ = topicHash + clt := ChannelLengthTopic{ + Length: uint(possibleLength), + Topic: hash.NewH256FromLowUint64BigEndian(possibleTopics), + } + return reflect.ValueOf(clt) +} + +type Message struct { + Topic hash.H256 +} + +func (Message) Generate(rand *rand.Rand, size int) reflect.Value { + possibleTopics := uint64(rand.Intn(10)) + return reflect.ValueOf(Message{ + Topic: hash.NewH256FromLowUint64BigEndian(possibleTopics), + }) +} + +type TestValidator struct{} + +func (ao TestValidator) NewPeer(context ValidatorContext[hash.H256], who peerid.PeerID, role role.ObservedRole) { +} +func (ao TestValidator) PeerDisconnected(context ValidatorContext[hash.H256], who peerid.PeerID) { +} +func (ao TestValidator) Validate( + context ValidatorContext[hash.H256], sender peerid.PeerID, data []byte, +) ValidationResult { + return ValidationResultProcessAndKeep[hash.H256]{ + Hash: hash.H256(data[0:32]), + } +} +func (ao TestValidator) MessageExpired() func(topic hash.H256, message []byte) bool { + return func(topic hash.H256, message []byte) bool { + return false + } +} +func (ao TestValidator) MessageAllowed() func( + who peerid.PeerID, intent MessageIntent, topic hash.H256, data []byte, +) bool { + return func(who peerid.PeerID, intent MessageIntent, topic hash.H256, data []byte) bool { + return true + } +} + +func TestGossipEngine(t *testing.T) { + t.Run("keeps_multiple_subscribers_per_topic_updated_with_both_old_and_new_messages", func(t *testing.T) { + topic := hash.NewH256() + protocol := network.ProtocolName("/my_protocol") + remotePeer := peerid.NewRandomPeerID() + network := TestNetwork{} + sync := TestSync{} + ch := make(chan service.NotificationEvent, 3) + notificationService := TestNotificationService{ch: ch} + + gossipEngine := newGossipEngine[hash.H256, uint64, runtime.BlakeTwo256]( + network, + &sync, + notificationService, + protocol, + AllowAll{}, + ) + + // Register the remote peer. + ch <- service.NotificationEventNotificationStreamOpened{ + Peer: remotePeer, + Direction: service.DirctionInbound, + NegotiatedFallBack: nil, + Handshake: scale.MustMarshal(role.RolesFull), + } + + messages := [][]byte{{1}, {2}} + + // Send first event before subscribing. + ch <- service.NotificationEventNotificationReceived{ + Peer: remotePeer, + Notification: messages[0], + } + + subscribers := make([]chan TopicNotification, 0) + for i := 0; i < 2; i++ { + subscribers = append(subscribers, gossipEngine.MessagesFor(topic)) + } + + // Send second event after subscribing. + ch <- service.NotificationEventNotificationReceived{ + Peer: remotePeer, + Notification: messages[1], + } + + done := make(chan any) + go func() { + defer close(done) + gossipEngine.poll() + }() + + for _, message := range messages { + for _, subscriber := range subscribers { + topicNotification := <-subscriber + expected := TopicNotification{ + Message: message, + Sender: &remotePeer, + } + require.Equal(t, expected, topicNotification) + } + } + + close(gossipEngine.stopChan) + <-done + }) + + t.Run("forwarding_to_different_size_and_topic_channels", func(t *testing.T) { + var prop = func(t *testing.T, channels []ChannelLengthTopic, notifications [][]Message) { + protocol := network.ProtocolName("/my_protocol") + remotePeer := peerid.NewRandomPeerID() + network := TestNetwork{} + sync := TestSync{} + + // for NotificationStreamOpened + chanLength := 1 + // then all the messages + for _, notification := range notifications { + chanLength += len(notification) + } + ch := make(chan service.NotificationEvent, chanLength) + notificationService := TestNotificationService{ch: ch} + + numChannelsPerTopic := make(map[hash.H256]uint) + for _, channel := range channels { + _, ok := numChannelsPerTopic[channel.Topic] + if !ok { + numChannelsPerTopic[channel.Topic] = 0 + } + numChannelsPerTopic[channel.Topic]++ + } + + expectedTotalMsgsAllChan := uint(0) + expectedMsgsPerTopicAllChan := make(map[hash.H256]uint) + acc := make(map[hash.H256]uint) + for _, messages := range notifications { + for _, message := range messages { + _, ok := acc[message.Topic] + if !ok { + acc[message.Topic] = 0 + } + acc[message.Topic]++ + } + } + for topic, num := range acc { + numChannels := numChannelsPerTopic[topic] + expectedMsgsPerTopicAllChan[topic] = numChannels * num + expectedTotalMsgsAllChan += numChannels * num + } + + gossipEngine := newGossipEngine[hash.H256, uint64, runtime.BlakeTwo256]( + network, + &sync, + notificationService, + protocol, + TestValidator{}, + ) + + type topicChan struct { + Topic hash.H256 + Chan chan TopicNotification + } + // Create channels. + topicChans := make([]topicChan, 0) + for _, channel := range channels { + topicChans = append(topicChans, topicChan{ + Topic: channel.Topic, + Chan: make(chan TopicNotification, channel.Length), + }) + } + + // Insert channels into gossipEngine. + for _, topicChan := range topicChans { + _, ok := gossipEngine.messageSinks[topicChan.Topic] + if !ok { + gossipEngine.messageSinks[topicChan.Topic] = make([]chan TopicNotification, 0) + } + gossipEngine.messageSinks[topicChan.Topic] = append( + gossipEngine.messageSinks[topicChan.Topic], topicChan.Chan) + } + + // Register the remote peer. + ch <- service.NotificationEventNotificationStreamOpened{ + Peer: remotePeer, + Direction: service.DirctionInbound, + NegotiatedFallBack: nil, + Handshake: scale.MustMarshal(role.RolesFull), + } + + // Send messages into the network event stream. + for iNotification, messages := range notifications { + var msgs [][]byte + for iMessage, message := range messages { + // Embed the topic in the first 256 bytes of the message to be extracted by + // the TestValidator later on. + msg := message.Topic.Bytes() + + // Make sure the message is unique via iNotification and iMessage to + // ensure consensusGossip does not deduplicate it. + msg = append(msg, byte(iNotification)) + msg = append(msg, byte(iMessage)) + + msgs = append(msgs, msg) + } + + for _, msg := range msgs { + // Send first event before subscribing. + ch <- service.NotificationEventNotificationReceived{ + Peer: remotePeer, + Notification: msg, + } + } + } + + receivedMsgsPerTopicAllChan := make(map[hash.H256]uint) + + // Poll both gossip engine and each receiver and track the amount of received messages. + done := make(chan any) + go func() { + defer close(done) + gossipEngine.poll() + }() + + timer := time.NewTimer(1 * time.Second) + defer timer.Stop() + delay := timer.C + + msgCount := uint(0) + var expected <-chan time.Time + outer: + for { + for _, topicChan := range topicChans { + select { + case <-topicChan.Chan: + _, ok := receivedMsgsPerTopicAllChan[topicChan.Topic] + if !ok { + receivedMsgsPerTopicAllChan[topicChan.Topic] = 0 + } + receivedMsgsPerTopicAllChan[topicChan.Topic]++ + msgCount++ + default: + } + } + + if msgCount == expectedTotalMsgsAllChan { + // Set a 1ms timeout, just to ensure we're not receiving more msgs. + if expected == nil { + timeout := time.NewTimer(10 * time.Millisecond) + defer timeout.Stop() + expected = timeout.C + } + } + + select { + case <-delay: + break outer + case <-expected: + break outer + default: + } + } + close(gossipEngine.stopChan) + <-done + + // Compare amount of expected messages with amount of received messages. + for expectedTopic, expectedNum := range expectedMsgsPerTopicAllChan { + require.Equal(t, expectedNum, receivedMsgsPerTopicAllChan[expectedTopic]) + } + + for _, topicChan := range topicChans { + close(topicChan.Chan) + } + } + + prop(t, nil, [][]Message{{Message{Topic: hash.NewH256()}}}) + prop(t, + []ChannelLengthTopic{{Topic: hash.NewH256(), Length: 71}}, + [][]Message{{{Topic: hash.NewH256()}}}, + ) + + f := func(channels []ChannelLengthTopic, notifications [][]Message) bool { + prop(t, channels, notifications) + return true + } + if err := quick.Check(f, nil); err != nil { + t.Error(err) + } + }) +} diff --git a/internal/client/network-gossip/network_gossip.go b/internal/client/network-gossip/network_gossip.go new file mode 100644 index 0000000000..9a105100ba --- /dev/null +++ b/internal/client/network-gossip/network_gossip.go @@ -0,0 +1,25 @@ +// Copyright 2025 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package gossip + +import ( + "github.com/ChainSafe/gossamer/internal/client/network" + "github.com/ChainSafe/gossamer/internal/client/network/service" + "github.com/ChainSafe/gossamer/internal/client/network/sync" + peerid "github.com/ChainSafe/gossamer/internal/client/network/types/peer-id" +) + +// Network is the abstraction over a network. +type Network interface { + service.NetworkPeers + service.NetworkEventStream + AddSetReserved(who peerid.PeerID, protocol network.ProtocolName) + RemoveSetReserved(who peerid.PeerID, protocol network.ProtocolName) +} + +// Syncing is the abstraction over the syncing subsystem. +type Syncing[H, N any] interface { + sync.SyncEventStream + service.NetworkBlock[H, N] +} diff --git a/internal/client/network-gossip/state_machine.go b/internal/client/network-gossip/state_machine.go new file mode 100644 index 0000000000..08d2e935db --- /dev/null +++ b/internal/client/network-gossip/state_machine.go @@ -0,0 +1,417 @@ +// Copyright 2025 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package gossip + +import ( + "time" + + "github.com/ChainSafe/gossamer/internal/client/network" + "github.com/ChainSafe/gossamer/internal/client/network/role" + "github.com/ChainSafe/gossamer/internal/client/network/service" + peerid "github.com/ChainSafe/gossamer/internal/client/network/types/peer-id" + "github.com/ChainSafe/gossamer/internal/primitives/runtime" + "github.com/dolthub/maphash" + "github.com/elastic/go-freelru" +) + +// NOTE: The current value is adjusted based on largest production network deployment (Kusama) and the current main +// gossip user (GRANDPA). Currently there are ~800 validators on Kusama, as such,each GRANDPA round should generate +// ~1600 messages, and we currently keep track of the last 2completed rounds and the current live one. That makes it so +// that at any point we will be holding ~4800 live messages. +// +// Assuming that each known message is tracked with a 32 byte hash, then this cache should take about 256 KB of memory. +const knownMessageCacheSize uint32 = 8192 + +const rebroadcastInterval time.Duration = 750 * time.Millisecond + +const periodicMaintenanceInterval time.Duration = 1100 * time.Millisecond + +var ( + // Reputation change when a peer sends us a gossip message that we didn't know about. + gossipSuccess = network.NewReputationChange(1<<4, "Successful gossip") + // Reputation change when a peer sends us a gossip message that we already knew about. + duplicateGossip = network.NewReputationChange(-(1 << 2), "Duplicate gossip") +) + +type peerConsensus[H comparable] struct { + knownMessages map[H]any +} + +// Topic stream message with sender. +type TopicNotification struct { + // Message data. + Message []byte + // Sender if available. + Sender *peerid.PeerID +} + +type messageEntry[H runtime.Hash] struct { + messageHash H + topic H + message []byte + sender *peerid.PeerID +} + +// Local implementation of [ValidatorContext]. +type newtorkContext[H runtime.Hash, Hasher runtime.Hasher[H]] struct { + gossip *consensusGossip[H, Hasher] + notificationService service.NotificationService +} + +// Broadcast all messages with given topic to peers that do not have it yet. +func (nc newtorkContext[H, Hasher]) BroadcastTopic(topic H, force bool) { + nc.gossip.BroadcastTopic(nc.notificationService, topic, force) +} + +// Broadcast a message to all peers that have not received it previously. +func (nc newtorkContext[H, Hasher]) BroadcastMessage(topic H, message []byte, force bool) { + nc.gossip.Multicast(nc.notificationService, topic, message, force) +} + +// Send addressed message to a peer. +func (nc newtorkContext[H, Hasher]) SendMessage(who peerid.PeerID, message []byte) { + nc.notificationService.SendSyncNotification(who, message) +} + +// Send all messages with given topic to a peer. +func (nc newtorkContext[H, Hasher]) SendTopic(who peerid.PeerID, topic H, force bool) { + nc.gossip.SendTopic(nc.notificationService, who, topic, force) +} + +func propagate[H runtime.Hash]( + notificationService service.NotificationService, + _ network.ProtocolName, + messages []messageEntry[H], + intent MessageIntent, + peers map[peerid.PeerID]peerConsensus[H], + validator Validator[H], +) { + messageAllowed := validator.MessageAllowed() + + for id, peer := range peers { + for _, message := range messages { + switch intent { + case MessageIntentBroadcast: + if _, ok := peer.knownMessages[message.messageHash]; ok { + continue + } + intent = MessageIntentBroadcast + case MessageIntentPeriodicReboradcast: + if _, ok := peer.knownMessages[message.messageHash]; ok { + intent = MessageIntentPeriodicReboradcast + } else { + // peer doesn't know message, so the logic should treat it as an + // initial broadcast. + intent = MessageIntentBroadcast + } + default: + } + + if !messageAllowed(id, intent, message.topic, message.message) { + continue + } + + peer.knownMessages[message.messageHash] = nil + peers[id] = peer + + notificationService.SendSyncNotification(id, message.message) + } + } + +} + +// Consensus network protocol handler. Manages statements and candidate requests. +type consensusGossip[H runtime.Hash, Hasher runtime.Hasher[H]] struct { + peers map[peerid.PeerID]peerConsensus[H] + messages []messageEntry[H] + knownMessages freelru.LRU[H, any] + protocol network.ProtocolName + validator Validator[H] + nextBroadcast time.Time +} + +type hasher[K comparable] struct { + maphash.Hasher[K] +} + +func (h hasher[K]) Hash(key K) uint32 { + return uint32(h.Hasher.Hash(key)) +} + +// Create a new instance using the given validator. +func newConsensusGossip[H runtime.Hash, Hasher runtime.Hasher[H]]( + validator Validator[H], + protocol network.ProtocolName, +) consensusGossip[H, Hasher] { + h := hasher[H]{maphash.NewHasher[H]()} + knownMessages, err := freelru.New[H, any](knownMessageCacheSize, h.Hash) + if err != nil { + panic(err) + } + return consensusGossip[H, Hasher]{ + peers: make(map[peerid.PeerID]peerConsensus[H]), + messages: make([]messageEntry[H], 0), + knownMessages: *knownMessages, + protocol: protocol, + validator: validator, + nextBroadcast: time.Now().Add(rebroadcastInterval), + } +} + +// Handle new connected peer. +func (cg *consensusGossip[H, Hasher]) NewPeer( + notificationService service.NotificationService, + who peerid.PeerID, + role role.ObservedRole, +) { + cg.peers[who] = peerConsensus[H]{knownMessages: make(map[H]any)} + + validator := cg.validator + context := newtorkContext[H, Hasher]{gossip: cg, notificationService: notificationService} + validator.NewPeer(context, who, role) +} + +func (cg *consensusGossip[H, Hasher]) registerMessageHashed( + messageHash H, topic H, message []byte, sender *peerid.PeerID, +) { + cg.knownMessages.Add(messageHash, nil) + cg.messages = append(cg.messages, messageEntry[H]{ + messageHash: messageHash, + topic: topic, + message: message, + sender: sender, + }) + //TODO: registered meessages metrics +} + +// Registers a message without propagating it to any peers. The message becomes available to new peers or when the +// service is asked to gossip the message's topic. No validation is performed on the message, if the message is already +// expired it should be dropped on the next garbage collection. +func (cg *consensusGossip[H, Hasher]) RegisterMessage(topic H, message []byte) { + messageHash := (*new(Hasher)).Hash(message) + cg.registerMessageHashed(messageHash, topic, message, nil) +} + +// Call when a peer has been disconnected to stop tracking gossip status. +func (cg *consensusGossip[H, Hasher]) PeerDisconnected( + notificationService service.NotificationService, who peerid.PeerID, +) { + validator := cg.validator + context := newtorkContext[H, Hasher]{gossip: cg, notificationService: notificationService} + validator.PeerDisconnected(context, who) + delete(cg.peers, who) +} + +// Perform periodic maintenance +func (cg *consensusGossip[H, Hasher]) Tick(notificationService service.NotificationService) { + cg.CollectGarbage() + now := time.Now() + if now.After(cg.nextBroadcast) || now.Equal(cg.nextBroadcast) { + cg.rebroadcast(notificationService) + cg.nextBroadcast = time.Now().Add(rebroadcastInterval) + } +} + +// Rebroadcast all messages to all peers. +func (cg *consensusGossip[H, Hasher]) rebroadcast(notificationService service.NotificationService) { + propagate(notificationService, cg.protocol, cg.messages, MessageIntentPeriodicReboradcast, cg.peers, cg.validator) +} + +// Broadcast all messages with given topic. +func (cg *consensusGossip[H, Hasher]) BroadcastTopic( + notificationService service.NotificationService, topic H, force bool, +) { + var messages []messageEntry[H] + for _, entry := range cg.messages { + if entry.topic == topic { + messages = append(messages, entry) + } + } + + var intent MessageIntent = MessageIntentBroadcast + if force { + intent = MessageIntentForcedBroadcast + } + propagate(notificationService, cg.protocol, messages, intent, cg.peers, cg.validator) +} + +// Prune old or no longer relevant consensus messages. Provide a predicate for pruning, which returns false when the +// items with a given topic should be pruned. +func (cg *consensusGossip[H, Hasher]) CollectGarbage() { + knownMessages := cg.knownMessages + before := len(cg.messages) + + messageExpired := cg.validator.MessageExpired() + tempMessages := make([]messageEntry[H], 0) + for _, msg := range cg.messages { + if !messageExpired(msg.topic, msg.message) { + tempMessages = append(tempMessages, msg) + } + } + cg.messages = tempMessages + + expiredMessages := before - len(cg.messages) + _ = expiredMessages + // TODO: expired messages metric + + for id, peer := range cg.peers { + for h := range peer.knownMessages { + if _, ok := knownMessages.Get(h); !ok { + delete(peer.knownMessages, h) + } + } + cg.peers[id] = peer + } +} + +// Get valid messages received in the past for a topic (might have expired meanwhile). +func (cg *consensusGossip[H, Hasher]) MessagesFor(topic H) []TopicNotification { + notifications := make([]TopicNotification, 0) + for _, entry := range cg.messages { + if entry.topic == topic { + notifications = append(notifications, TopicNotification{Message: entry.message, Sender: entry.sender}) + } + } + return notifications +} + +type hashTopicNotification[H runtime.Hash] struct { + Hash H + TopicNotification +} + +// Register incoming messages and return the ones that are new and valid (according to a [Validator]) and should thus +// be forwarded to the upper layers. +func (cg *consensusGossip[H, Hasher]) OnIncoming( + network Network, + notificationService service.NotificationService, + who peerid.PeerID, + messages [][]byte, +) []hashTopicNotification[H] { + toForward := make([]hashTopicNotification[H], 0) + + for _, message := range messages { + messageHash := (*new(Hasher)).Hash(message) + + if _, ok := cg.knownMessages.Get(messageHash); ok { + // If the peer already send us the message once, let's report them. + peer, ok := cg.peers[who] + if ok { + _, ok := peer.knownMessages[messageHash] + if ok { + network.ReportPeer(who, duplicateGossip) + } + peer.knownMessages[messageHash] = nil + cg.peers[who] = peer + } + continue + } + + // validate the message + validator := cg.validator + context := newtorkContext[H, Hasher]{gossip: cg, notificationService: notificationService} + validation := validator.Validate(&context, who, message) + + var ( + topic H + keep bool + ) + switch validation := validation.(type) { + case ValidationResultProcessAndKeep[H]: + topic = validation.Hash + keep = true + case ValidationResultProcessAndDiscard[H]: + topic = validation.Hash + keep = false + case ValidationResultDiscard: + continue + default: + panic("unreachable") + } + + peer, ok := cg.peers[who] + if !ok { + continue + } + + network.ReportPeer(who, gossipSuccess) + peer.knownMessages[messageHash] = nil + toForward = append(toForward, hashTopicNotification[H]{ + Hash: topic, + TopicNotification: TopicNotification{Message: message, Sender: &who}, + }) + + if keep { + cg.registerMessageHashed(messageHash, topic, message, &who) + } + } + + return toForward +} + +// Send all messages with given topic to a peer. +func (cg *consensusGossip[H, Hasher]) SendTopic( + notificationService service.NotificationService, who peerid.PeerID, topic H, force bool, +) { + messageAllowed := cg.validator.MessageAllowed() + + if peer, ok := cg.peers[who]; ok { + for _, entry := range cg.messages { + var intent MessageIntent = MessageIntentBroadcast + if force { + intent = MessageIntentForcedBroadcast + } + + _, ok := peer.knownMessages[entry.messageHash] + if !force && ok { + continue + } + + if !messageAllowed(who, intent, entry.topic, entry.message) { + continue + } + + peer.knownMessages[entry.messageHash] = nil + cg.peers[who] = peer + + notificationService.SendSyncNotification(who, entry.message) + } + } +} + +// Multicast a message to all peers. +func (cg *consensusGossip[H, Hasher]) Multicast( + notificationService service.NotificationService, topic H, message []byte, force bool, +) { + messageHash := (*new(Hasher)).Hash(message) + cg.registerMessageHashed(messageHash, topic, message, nil) + var intent MessageIntent = MessageIntentBroadcast + if force { + intent = MessageIntentForcedBroadcast + } + + propagate( + notificationService, + cg.protocol, + []messageEntry[H]{{messageHash: messageHash, topic: topic, message: message}}, + intent, + cg.peers, + cg.validator, + ) +} + +// Send addressed message to a peer. The message is not kept or multicast later on. +func (cg *consensusGossip[H, Hasher]) SendMessage( + notificationService service.NotificationService, who peerid.PeerID, message []byte, +) { + peer, ok := cg.peers[who] + if !ok { + return + } + + messageHash := (*new(Hasher)).Hash(message) + peer.knownMessages[messageHash] = nil + cg.peers[who] = peer + notificationService.SendSyncNotification(who, message) +} diff --git a/internal/client/network-gossip/state_machine_test.go b/internal/client/network-gossip/state_machine_test.go new file mode 100644 index 0000000000..6dd27152d1 --- /dev/null +++ b/internal/client/network-gossip/state_machine_test.go @@ -0,0 +1,322 @@ +// Copyright 2025 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package gossip + +import ( + "sync" + "testing" + + "github.com/ChainSafe/gossamer/internal/client/network" + "github.com/ChainSafe/gossamer/internal/client/network/config" + "github.com/ChainSafe/gossamer/internal/client/network/event" + "github.com/ChainSafe/gossamer/internal/client/network/role" + "github.com/ChainSafe/gossamer/internal/client/network/service" + "github.com/ChainSafe/gossamer/internal/client/network/types/multiaddr" + peerid "github.com/ChainSafe/gossamer/internal/client/network/types/peer-id" + "github.com/ChainSafe/gossamer/internal/primitives/core/hash" + "github.com/ChainSafe/gossamer/internal/primitives/runtime" + "github.com/stretchr/testify/require" +) + +type AllowAll struct{} + +func (ao AllowAll) NewPeer(context ValidatorContext[hash.H256], who peerid.PeerID, role role.ObservedRole) { +} +func (ao AllowAll) PeerDisconnected(context ValidatorContext[hash.H256], who peerid.PeerID) { +} +func (ao AllowAll) Validate(context ValidatorContext[hash.H256], sender peerid.PeerID, data []byte) ValidationResult { + return ValidationResultProcessAndKeep[hash.H256]{ + Hash: hash.NewH256(), + } +} +func (ao AllowAll) MessageExpired() func(topic hash.H256, message []byte) bool { + return func(topic hash.H256, message []byte) bool { + return false + } +} +func (ao AllowAll) MessageAllowed() func(who peerid.PeerID, intent MessageIntent, topic hash.H256, data []byte) bool { + return func(who peerid.PeerID, intent MessageIntent, topic hash.H256, data []byte) bool { + return true + } +} + +type AllowOne struct{} + +func (ao AllowOne) NewPeer(context ValidatorContext[hash.H256], who peerid.PeerID, role role.ObservedRole) { +} +func (ao AllowOne) PeerDisconnected(context ValidatorContext[hash.H256], who peerid.PeerID) { +} +func (ao AllowOne) Validate(context ValidatorContext[hash.H256], sender peerid.PeerID, data []byte) ValidationResult { + if data[0] == 1 { + return ValidationResultProcessAndKeep[hash.H256]{ + Hash: hash.NewH256(), + } + } + return ValidationResultDiscard{} +} +func (ao AllowOne) MessageExpired() func(topic hash.H256, message []byte) bool { + return func(topic hash.H256, message []byte) bool { + return message[0] != 1 + } +} +func (ao AllowOne) MessageAllowed() func(who peerid.PeerID, intent MessageIntent, topic hash.H256, data []byte) bool { + return func(who peerid.PeerID, intent MessageIntent, topic hash.H256, data []byte) bool { + return true + } +} + +var ( + _ Validator[hash.H256] = AllowAll{} + _ Validator[hash.H256] = AllowOne{} +) + +type PeerIDReputationChange struct { + peerid.PeerID + network.ReputationChange +} + +type NoOpNetwork struct { + peerReports []PeerIDReputationChange + peerReportsMtx sync.Mutex +} + +func (*NoOpNetwork) SetAuthorizedPeers(peers map[peerid.PeerID]any) { + panic("unimpl") +} +func (*NoOpNetwork) SetAuthorizedOnly(reservedOnly bool) { + panic("unimpl") +} +func (*NoOpNetwork) AddKnownAddress(peerID peerid.PeerID, addr multiaddr.Multiaddr) { + panic("unimpl") +} +func (non *NoOpNetwork) ReportPeer(peerID peerid.PeerID, costBenefit network.ReputationChange) { + non.peerReportsMtx.Lock() + defer non.peerReportsMtx.Unlock() + non.peerReports = append(non.peerReports, PeerIDReputationChange{ + PeerID: peerID, + ReputationChange: costBenefit, + }) +} +func (*NoOpNetwork) DisconnectPeer(who peerid.PeerID, protocol network.ProtocolName) { + panic("unimpl") +} +func (*NoOpNetwork) AcceptUnreservedPeers() { + panic("unimpl") +} +func (*NoOpNetwork) DenyUnreservedPeers() { + panic("unimpl") +} +func (*NoOpNetwork) AddReservedPeer(peer config.MultiaddrPeerId) error { + panic("unimpl") +} +func (*NoOpNetwork) RemoveReservedPeer(peerID peerid.PeerID) { + panic("unimpl") +} +func (*NoOpNetwork) SetReservedPeers(protocol network.ProtocolName, peers map[multiaddr.Multiaddr]any) error { + panic("unimpl") +} +func (*NoOpNetwork) AddPeersToReservedSet(protocol network.ProtocolName, peers map[multiaddr.Multiaddr]any) error { + panic("unimpl") +} +func (*NoOpNetwork) RemovePeersFromReservedSet(protocol network.ProtocolName, peers []peerid.PeerID) { + panic("unimpl") +} +func (*NoOpNetwork) AddToPeersSet(protocol network.ProtocolName, peers map[multiaddr.Multiaddr]any) error { + panic("unimpl") +} +func (*NoOpNetwork) RemoveFromPeersSet(protocol network.ProtocolName, peers []peerid.PeerID) { + panic("unimpl") +} +func (*NoOpNetwork) SyncNumConnected() uint { + panic("unimpl") +} +func (*NoOpNetwork) EventStream(name string) chan event.Event { + panic("unimpl") +} +func (*NoOpNetwork) AnnounceBlock(hash hash.H256, data []byte) { + panic("unimpl") +} +func (*NoOpNetwork) NewBestBlockImported(hash hash.H256, number uint64) { + panic("unimpl") +} +func (*NoOpNetwork) AddSetReserved(who peerid.PeerID, protocol network.ProtocolName) { + panic("unimpl") +} +func (*NoOpNetwork) RemoveSetReserved(who peerid.PeerID, protocol network.ProtocolName) { + panic("unimpl") +} +func (*NoOpNetwork) PeerRole(peerID peerid.PeerID, handshake []byte) *role.ObservedRole { + panic("unimpl") +} +func (*NoOpNetwork) ReservedPeers() <-chan struct { + Peers []peerid.PeerID + Error error +} { + panic("unimpl") +} + +var ( + _ service.NetworkPeers = &NoOpNetwork{} + _ service.NetworkEventStream = &NoOpNetwork{} + _ service.NetworkBlock[hash.H256, uint64] = &NoOpNetwork{} +) + +type NoOpNotificationService struct{} + +func (NoOpNotificationService) OpenSubstream(peer peerid.PeerID) <-chan error { + panic("unimpl") +} +func (NoOpNotificationService) CloseSubstream(peer peerid.PeerID) <-chan error { + panic("unimpl") +} +func (NoOpNotificationService) SendSyncNotification(peer peerid.PeerID, notification []byte) { + panic("unimpl") +} +func (NoOpNotificationService) SendAsyncNotification(peer peerid.PeerID, notification []byte) <-chan error { + panic("unimpl") +} +func (NoOpNotificationService) SetHandshake(handshake []byte) <-chan error { + panic("unimpl") +} +func (NoOpNotificationService) TrySetHandshake(handshake []byte) error { + panic("unimpl") +} +func (NoOpNotificationService) NextEvent() <-chan service.NotificationEvent { + panic("unimpl") +} +func (NoOpNotificationService) Protocol() network.ProtocolName { + panic("unimpl") +} +func (NoOpNotificationService) MessageSink(peer peerid.PeerID) service.MessageSink { + panic("unimpl") +} + +var _ service.NotificationService = NoOpNotificationService{} + +func pushMessage( + consensus *consensusGossip[hash.H256, runtime.BlakeTwo256], topic hash.H256, h hash.H256, message []byte, +) { + consensus.knownMessages.Add(h, nil) + consensus.messages = append(consensus.messages, messageEntry[hash.H256]{ + messageHash: h, + topic: topic, + message: message, + sender: nil, + }) +} + +func TestConsensusGossip(t *testing.T) { + t.Run("collects_garbage", func(t *testing.T) { + prevHash := hash.NewRandomH256() + bestHash := hash.NewRandomH256() + consensus := newConsensusGossip[hash.H256, runtime.BlakeTwo256](AllowAll{}, "/foo") + m1Hash := hash.NewRandomH256() + m2Hash := hash.NewRandomH256() + m1 := []byte{1, 2, 3} + m2 := []byte{4, 5, 6} + + pushMessage(&consensus, prevHash, m1Hash, m1) + pushMessage(&consensus, bestHash, m2Hash, m2) + consensus.knownMessages.Add(m1Hash, nil) + consensus.knownMessages.Add(m2Hash, nil) + + consensus.CollectGarbage() + require.Equal(t, 2, len(consensus.messages)) + require.Equal(t, 2, consensus.knownMessages.Len()) + + consensus.validator = AllowOne{} + + // m2 is expired + consensus.CollectGarbage() + require.Equal(t, 1, len(consensus.messages)) + // known messages are only pruned based on size. + require.Equal(t, 2, consensus.knownMessages.Len()) + _, ok := consensus.knownMessages.Get(m2Hash) + require.True(t, ok) + }) + + t.Run("message_stream_include_those_sent_before_asking", func(t *testing.T) { + consensus := newConsensusGossip[hash.H256, runtime.BlakeTwo256](AllowAll{}, "/foo") + + // Register message. + message := []byte{4, 5, 6} + topic := runtime.BlakeTwo256{}.Hash([]byte{1, 2, 3}) + consensus.RegisterMessage(topic, message) + + require.Equal(t, TopicNotification{ + Message: message, + Sender: nil, + }, consensus.MessagesFor(topic)[0]) + }) + + t.Run("can_keep_multiple_messages_per_topic", func(t *testing.T) { + consensus := newConsensusGossip[hash.H256, runtime.BlakeTwo256](AllowAll{}, "/foo") + + topic := hash.NewRandomH256() + msgA := []byte{1, 2, 3} + msgB := []byte{4, 5, 6} + + consensus.RegisterMessage(topic, msgA) + consensus.RegisterMessage(topic, msgB) + + require.Equal(t, 2, len(consensus.messages)) + }) + + t.Run("peer_is_removed_on_disconnect", func(t *testing.T) { + consensus := newConsensusGossip[hash.H256, runtime.BlakeTwo256](AllowAll{}, "/foo") + + notifcationService := NoOpNotificationService{} + + peerID := peerid.NewRandomPeerID() + consensus.NewPeer(notifcationService, peerID, role.ObservedRoleFull) + _, ok := consensus.peers[peerID] + require.True(t, ok) + + consensus.PeerDisconnected(notifcationService, peerID) + _, ok = consensus.peers[peerID] + require.False(t, ok) + }) + + t.Run("on_incoming_ignores_discarded_messages", func(t *testing.T) { + notifcationService := NoOpNotificationService{} + consensus := newConsensusGossip[hash.H256, runtime.BlakeTwo256](AllowAll{}, "/foo") + toForward := consensus.OnIncoming(nil, notifcationService, peerid.NewRandomPeerID(), [][]byte{{1, 2, 3}}) + + require.Empty(t, toForward) + }) + + t.Run("on_incoming_ignores_unregistered_peer", func(t *testing.T) { + network := NoOpNetwork{} + notifcationService := NoOpNotificationService{} + remote := peerid.NewRandomPeerID() + + consensus := newConsensusGossip[hash.H256, runtime.BlakeTwo256](AllowAll{}, "/foo") + toForward := consensus.OnIncoming(&network, notifcationService, remote, [][]byte{{1, 2, 3}}) + + require.Empty(t, toForward) + }) + + // Two peers can send us the same gossip message. We should not report the second peer + // sending the gossip message as long as its the first time the peer send us this message. + t.Run("do_not_report_peer_for_first_time_duplicate_gossip_message", func(t *testing.T) { + consensus := newConsensusGossip[hash.H256, runtime.BlakeTwo256](AllowAll{}, "/foo") + + network := NoOpNetwork{} + notifcationService := NoOpNotificationService{} + + peerID := peerid.NewRandomPeerID() + consensus.NewPeer(notifcationService, peerID, role.ObservedRoleFull) + require.Contains(t, consensus.peers, peerID) + + peerID2 := peerid.NewRandomPeerID() + consensus.NewPeer(notifcationService, peerID2, role.ObservedRoleFull) + require.Contains(t, consensus.peers, peerID2) + + message := [][]byte{{1, 2, 3}} + consensus.OnIncoming(&network, notifcationService, peerID, message) + consensus.OnIncoming(&network, notifcationService, peerID2, message) + + require.Equal(t, []PeerIDReputationChange{{peerID, gossipSuccess}}, network.peerReports) + }) +} diff --git a/internal/client/network-gossip/validator.go b/internal/client/network-gossip/validator.go new file mode 100644 index 0000000000..757e2c8e85 --- /dev/null +++ b/internal/client/network-gossip/validator.go @@ -0,0 +1,72 @@ +// Copyright 2025 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package gossip + +import ( + "github.com/ChainSafe/gossamer/internal/client/network/role" + peerid "github.com/ChainSafe/gossamer/internal/client/network/types/peer-id" + "golang.org/x/exp/constraints" +) + +// Validator is the interface that validates consensus messages. +type Validator[H constraints.Ordered] interface { + // New peer is connected. + NewPeer(context ValidatorContext[H], who peerid.PeerID, role role.ObservedRole) + // New connection is dropped. + PeerDisconnected(context ValidatorContext[H], who peerid.PeerID) + // Validate consensus message. + Validate(context ValidatorContext[H], sender peerid.PeerID, data []byte) ValidationResult + // Produce a closure for validating messages on a given topic. + MessageExpired() func(topic H, message []byte) bool + // Produce a closure for filtering egress messages. + MessageAllowed() func(who peerid.PeerID, intent MessageIntent, topic H, data []byte) bool +} + +// ValidatorContext allows a [Validator] to respond to incoming messages by sending out further messages. +type ValidatorContext[H constraints.Ordered] interface { + // Broadcast all messages with given topic to peers that do not have it yet. + BroadcastTopic(topic H, force bool) + // Broadcast a message to all peers that have not received it previously. + BroadcastMessage(topic H, message []byte, force bool) + // Send addressed message to a peer. + SendMessage(who peerid.PeerID, message []byte) + // Send all messages with given topic to a peer. + SendTopic(who peerid.PeerID, topic H, force bool) +} + +// MessageIntent is the reason for sending out the message. +type MessageIntent uint + +const ( + // Requested broadcast. + MessageIntentBroadcast = iota + 1 + // Requested broadcast to all peers. + MessageIntentForcedBroadcast + // Periodic rebroadcast of all messages to all peers. + MessageIntentPeriodicReboradcast +) + +// ValidationResultProcessAndKeep means the message should be stored and propagated under given topic. +type ValidationResultProcessAndKeep[H constraints.Ordered] struct { + Hash H +} + +func (ValidationResultProcessAndKeep[H]) isValidationResult() {} + +// ValidationResultProcessAndDiscard means the message should be processed, but not propagated. +type ValidationResultProcessAndDiscard[H constraints.Ordered] struct { + Hash H +} + +func (ValidationResultProcessAndDiscard[H]) isValidationResult() {} + +// ValidationResultDiscard means the message should be ignored. +type ValidationResultDiscard struct{} + +func (ValidationResultDiscard) isValidationResult() {} + +// ValidationResultDiscard represents a message validation result. +type ValidationResult interface { + isValidationResult() +} diff --git a/internal/client/network/config/config.go b/internal/client/network/config/config.go new file mode 100644 index 0000000000..137c3ef373 --- /dev/null +++ b/internal/client/network/config/config.go @@ -0,0 +1,16 @@ +// Copyright 2025 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package config + +import ( + "github.com/ChainSafe/gossamer/internal/client/network/types/multiaddr" + peerid "github.com/ChainSafe/gossamer/internal/client/network/types/peer-id" +) + +// MultiaddrPeerId is the address of a node, including its identity. +// This struct represents a decoded version of a multiaddress that ends with "/p2p/". +type MultiaddrPeerId struct { + multiaddr.Multiaddr + peerid.PeerID +} diff --git a/internal/client/network/event/event.go b/internal/client/network/event/event.go new file mode 100644 index 0000000000..0e0dd0dea8 --- /dev/null +++ b/internal/client/network/event/event.go @@ -0,0 +1,109 @@ +// Copyright 2025 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package event + +import ( + "time" + + "github.com/ChainSafe/gossamer/internal/client/network" + "github.com/ChainSafe/gossamer/internal/client/network/role" + "github.com/ChainSafe/gossamer/internal/client/network/types/kad" + peerid "github.com/ChainSafe/gossamer/internal/client/network/types/peer-id" +) + +// DHTEvent is an event generated by DHT as a response to get/put value requests. +type DHTEvent interface { + isDHTEvent() +} + +// DHTEventValueFound mean the value was found. +type DHTEventValueFound kad.PeerRecord + +// DHTEventValueNotFound means the requested record has not been found in the DHT. +type DHTEventValueNotFound kad.Key + +// DHTEventValuePut means the record has been successfully inserted into the DHT. +type DHTEventValuePut kad.Key + +// DHTEventValuePutFailed means an error has occurred while putting a record into the DHT. +type DHTEventValuePutFailed kad.Key + +// DHTEventStartProvidingFailed means an error occurred while registering as a content provider on the DHT. +type DHTEventStartProvidingFailed kad.Key + +// DHTEventPutRecordRequest means the DHT received a put record request. +type DHTEventPutRecordRequest struct { + Key kad.Key + Value []byte + Publisher *peerid.PeerID + Expires time.Time +} + +// DHTEventProvidersFound contains the providers for [kad.Key] that were found. +type DHTEventProvidersFound struct { + Key kad.Key + Providers []peerid.PeerID +} + +// DHTEventProvidersNotFound means the providers for [kad.Key] were not found. +type DHTEventProvidersNotFound kad.Key + +func (DHTEventValueFound) isDHTEvent() {} +func (DHTEventValueNotFound) isDHTEvent() {} +func (DHTEventValuePut) isDHTEvent() {} +func (DHTEventValuePutFailed) isDHTEvent() {} +func (DHTEventStartProvidingFailed) isDHTEvent() {} +func (DHTEventPutRecordRequest) isDHTEvent() {} +func (DHTEventProvidersFound) isDHTEvent() {} + +// Event is an event generated by the networking layer. +type Event interface { + isEvent() +} + +// EventDHT is an event generated by a DHT. +type EventDHT struct { + DHTEvent +} + +// EventNotificationStreamOpened represents an opened a substream with the given node with the given notifications +// protocol. The protocol is always one of the notification protocols that have been registered. +type EventNotificationStreamOpened struct { + // Node we opened the substream with. + Remote peerid.PeerID + // The concerned protocol. Each protocol uses a different substream. + Protocol network.ProtocolName + // If the negotiation didn't use the main name of the protocol, then this field contains which name has actually + // been used. + NegotiatedFallback *network.ProtocolName + // Role of the remote. + Role role.ObservedRole + // Received handshake. + ReceivedHandshake []byte +} + +// Closed a substream with the given node. Always matches a corresponding previous +// [EventNotificationStreamOpened] message. +type EventNotificationStreamClosed struct { + // Node we closed the substream with. + Remote peerid.PeerID + // The concerned protocol. Each protocol uses a different substream. + Protocol network.ProtocolName +} + +// Received one or more messages from the given node using the given protocol. +type EventNotificationsReceived struct { + // Node we received the message from. + Remote peerid.PeerID + // Concerned protocol and associated message. + Messages []struct { + Protocol network.ProtocolName + Message []byte + } +} + +func (EventDHT) isEvent() {} +func (EventNotificationStreamOpened) isEvent() {} +func (EventNotificationStreamClosed) isEvent() {} +func (EventNotificationsReceived) isEvent() {} diff --git a/internal/client/network/network.go b/internal/client/network/network.go new file mode 100644 index 0000000000..0e77e2bee4 --- /dev/null +++ b/internal/client/network/network.go @@ -0,0 +1,20 @@ +// Copyright 2025 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package network + +// ProtocolName is the protocol name transmitted on the wire. +type ProtocolName string + +// ReputationChange is a description of a reputation adjustment for a node. +type ReputationChange struct { + // Reputation delta. + Value int32 + // Reason for reputation change. + Reason string +} + +// NewReputationChange constructs a [ReputationChange] with given delta and reason. +func NewReputationChange(value int32, reason string) ReputationChange { + return ReputationChange{Value: value, Reason: reason} +} diff --git a/internal/client/network/role/role.go b/internal/client/network/role/role.go new file mode 100644 index 0000000000..f4431482f4 --- /dev/null +++ b/internal/client/network/role/role.go @@ -0,0 +1,68 @@ +// Copyright 2025 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package role + +// ObservedRole is the role that the peer sent to us during the handshake, with the addition of what our local node +// knows about that peer. +// +// This type is different from [Role]. The [Role] type indicates what a node says about itself, while ObservedRole +// is a [Role] merged with the information known locally about that node. +type ObservedRole uint + +const ( + // Full node. + ObservedRoleFull ObservedRole = iota + // Light node. + ObservedRoleLight + // Third-party authority. + ObservedRoleAuthority +) + +// Role of the local node. +type Role uint + +const ( + // Regular full node. + RoleFull Role = iota + // Actual authority. + RoleAuthority +) + +// Roles are a bitmask of the roles that a node fulfils. +type Roles uint8 + +const ( + // No network. + RolesNone Roles = 0b00000000 + // Full node, does not participate in consensus. + RolesFull Roles = 0b00000001 + // Light client node. + RolesLight Roles = 0b00000010 + // Act as an authority + RolesAuthority Roles = 0b00000100 +) + +func (r Roles) intersects(other Roles) bool { + return !((r & other) == 0) +} + +// Does this role represents a client that holds full chain data locally? +func (r Roles) IsFull() bool { + return r.intersects(RolesFull | RolesAuthority) +} + +// Does this role represents a client that does not participates in the consensus? +func (r Roles) IsAuthority() bool { + return r == RolesAuthority +} + +func (r Roles) ObservedRole() ObservedRole { + if r.IsAuthority() { + return ObservedRoleAuthority + } else if r.IsFull() { + return ObservedRoleFull + } else { + return ObservedRoleLight + } +} diff --git a/internal/client/network/service/service.go b/internal/client/network/service/service.go new file mode 100644 index 0000000000..b495f57e19 --- /dev/null +++ b/internal/client/network/service/service.go @@ -0,0 +1,286 @@ +// Copyright 2025 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package service + +import ( + "github.com/ChainSafe/gossamer/internal/client/network" + "github.com/ChainSafe/gossamer/internal/client/network/config" + "github.com/ChainSafe/gossamer/internal/client/network/event" + "github.com/ChainSafe/gossamer/internal/client/network/role" + "github.com/ChainSafe/gossamer/internal/client/network/types/multiaddr" + peerid "github.com/ChainSafe/gossamer/internal/client/network/types/peer-id" +) + +// NetworkSyncForkRequest provides an ability to set a fork sync request for a particular block. +type NetworkSyncForkRequest[BlockHash, BlockNumber any] interface { + // Notifies the sync service to try and sync the given block from the given peers. + // + // If the given slice of peers is empty then the underlying implementation + // should make a best effort to fetch the block from any peers it is + // connected to. + SetSyncForkRequest(peers []peerid.PeerID, hash BlockHash, number BlockNumber) +} + +// NetworkPeers provides low-level API for manipulating network peers. +type NetworkPeers interface { + // Set authorized peers. + // + // Need a better solution to manage authorized peers, but now just use reserved peers for + // prototyping. + SetAuthorizedPeers(peers map[peerid.PeerID]any) + // Set authorized_only flag. + // + // Need a better solution to decide authorized_only, but now just use reservedOnly flag for prototyping. + SetAuthorizedOnly(reservedOnly bool) + // Adds an address known to a node. + AddKnownAddress(peerID peerid.PeerID, addr multiaddr.Multiaddr) + // Report a given peer as either beneficial (+) or costly (-) according to the given scalar. + ReportPeer(peerID peerid.PeerID, costBenefit network.ReputationChange) + // Disconnect from a node as soon as possible. + // + // This triggers the same effects as if the connection had closed itself spontaneously. + // + // See also ["NetworkPeers::remove_from_peers_set"], which has the same effect but also prevents the local node + // from re-establishing an outgoing substream to this peer until it is added again. + DisconnectPeer(who peerid.PeerID, protocol network.ProtocolName) + // Connect to unreserved peers and allow unreserved peers to connect for syncing purposes. + AcceptUnreservedPeers() + // Disconnect from unreserved peers and deny new unreserved peers to connect for syncing purposes. + DenyUnreservedPeers() + // Adds a PeerID and its MultiAddr as reserved for a sync protocol (default peer set). + // + // Returns an error if the given string is not a valid multiaddress or contains an invalid peer ID (which includes + // the local peer ID). + AddReservedPeer(peer config.MultiaddrPeerId) error + // Removes a [peerid.PeerID] from the list of reserved peers for a sync protocol (default peer set). + RemoveReservedPeer(peerID peerid.PeerID) + // Sets the reserved set of a protocol to the given set of peers. + // + // Each Multiaddr must end with a "/p2p/" component containing the peer id. It can also + // consist of only "/p2p/". + // + // The node will start establishing/accepting connections and substreams to/from peers in this set, if it doesn't + // have any substream open with them yet. + // + // Note however, if a call to this function results in less peers on the reserved set, they will not necessarily + // get disconnected (depending on available free slots in the peer set). If you want to also disconnect those + // removed peers, you will have to call RemoveFromPeersSet on those in addition to updating the reserved set. You + // can omit this step if the peer set is in reserved only mode. + // + // Returns an error if one of the given addresses is invalid or contains an invalid peer ID (which includes the + // local peer id). + SetReservedPeers(protocol network.ProtocolName, peers map[multiaddr.Multiaddr]any) error + // Add peers to a peer set. + // + // Each Multiaddr must end with a "/p2p/" component containing the peer id. It can also consist of only + // "/p2p/". + // + // Returns an error if one of the given addresses is invalid or contains an invalid peer id (which includes the + // local peer id). + AddPeersToReservedSet(protocol network.ProtocolName, peers map[multiaddr.Multiaddr]any) error + // Remove peers from a peer set. + RemovePeersFromReservedSet(protocol network.ProtocolName, peers []peerid.PeerID) + // Add a peer to a set of peers. + // + // If the set has slots available, it will try to open a substream with this peer. + // + // Each Multiaddr must end with a "/p2p/" component containing the peer id. It can also consist of only + // "/p2p/". + // + // Returns an error if one of the given addresses is invalid or contains an invalid peer id (which includes the + // local peer id). + AddToPeersSet(protocol network.ProtocolName, peers map[multiaddr.Multiaddr]any) error + // Remove peers from a peer set. + // + // If we currently have an open substream with this peer, it will soon be closed. + RemoveFromPeersSet(protocol network.ProtocolName, peers []peerid.PeerID) + // Returns the number of peers in the sync peer set we're connected to. + SyncNumConnected() uint + + // Attempt to get peer role. + // + // Right now the peer role is decoded from the received handshake for all protocols ("/block-announces/1" has other + // information as well). If the handshake cannot be decoded into a role, the role queried from peer storeand if the + // role is not stored there either, nil is returned and the peer should be discarded. + PeerRole(peerID peerid.PeerID, handshake []byte) *role.ObservedRole + + // Get the list of reserved peers. + // + // Returns an error if the network worker is no longer running. + ReservedPeers() <-chan struct { + Peers []peerid.PeerID + Error error + } +} + +// NetworkEventStream provides access to network-level event stream. +type NetworkEventStream interface { + // Returns a stream containing the events that happen on the network. If this method is called multiple times, the + // events are duplicated. The stream never ends (unless the network worker gets shut down). The name passed is + // used to identify the channel in the Prometheus metrics. + EventStream(name string) chan event.Event +} + +// NetworkBlock provides ability to announce blocks to the network. +type NetworkBlock[BlockHash any, BlockNumber any] interface { + // Make sure an important block is propagated to peers. + // + // In chain based consensus, we often need to make sure non-best forks are at least temporarily synced. This + // function forces such an announcement. + AnnounceBlock(hash BlockHash, data []byte) + + // Inform the network service about new best imported block. + NewBestBlockImported(hash BlockHash, number BlockNumber) +} + +// ValidationResult is the substream acceptance result. +type ValidationResult uint + +const ( + /// Accept inbound substream. + ValidationResultAccept = iota + 1 + /// Reject inbound substream. + ValidationResultReject +) + +// Direction is the substream direction. +type Direction uint + +const ( + // Substream opened by the remote node. + DirctionInbound = iota + 1 + // Substream opened by the local node. + DirectionOutbound +) + +// NotificationEvent are events received by the protocol from [NotificationService]. +type NotificationEvent interface { + isNotificationEvent() +} + +// NotificationEventValidateInboundSubstream is the validate inbound substream event. +type NotificationEventValidateInboundSubstream struct { + // Peer ID. + Peer peerid.PeerID + // Received handshake. + Handshake []byte + // Channel for sending validation result back to [NotificationService] + ResultChan chan ValidationResult +} + +func (NotificationEventValidateInboundSubstream) isNotificationEvent() {} + +// NotificationEventNotificationStreamOpened means a remote identified by peer id opened a substream and sent +// handshake. Validate handshake and report status (accept/reject) to [NotificationService]. +type NotificationEventNotificationStreamOpened struct { + // Peer ID. + Peer peerid.PeerID + // Is the substream inbound or outbound. + Direction Direction + // Received handshake. + Handshake []byte + // Negotiated fallback. + NegotiatedFallBack *network.ProtocolName +} + +func (NotificationEventNotificationStreamOpened) isNotificationEvent() {} + +// NotificationEventNotificationStreamClosed means a substream was closed. +type NotificationEventNotificationStreamClosed struct { + // Peer ID. + Peer peerid.PeerID +} + +func (NotificationEventNotificationStreamClosed) isNotificationEvent() {} + +// NotificationEventNotificationReceived means a notification was received from the substream. +type NotificationEventNotificationReceived struct { + // Peer ID. + Peer peerid.PeerID + // Received notification. + Notification []byte +} + +func (NotificationEventNotificationReceived) isNotificationEvent() {} + +// NotificationService is the notification service. It defines behaviours that both the protocol implementations and +// NotificationService can expect from each other. +// +// NotificationService can send two different kinds of information to protocol: +// - substream-related information +// - notification-related information +// +// When an unvalidated, inbound substream is received by NotificationService, it sends the inbound stream information +// (peer ID, handshake) to protocol for validation. Protocol must then verify that the handshake is valid (and in the +// future that it has a slot it can allocate for the peer) and then report back the [ValidationResult] which is either +// [ValidationResultAccept] or [ValidationResultReject]. +// +// After the validation result has been received by NotificationService, it prepares the substream for communication by +// initialising the necessary sinks and emits [NotificationEventNotificationStreamOpened] which informs the protocol +// that the remote peer is ready to receive notifications. +// +// Both local and remote peer can close the substream at any time. Local peer can do so by calling CloseSubstream which +// instructs NotificationService to close the substream. Remote closing the substream is indicated to the local peer by +// receiving [NotificationEventNotificationStreamClosed] event. +// +// In case the protocol must update its handshake while it's operating (such as updating the best block information), +// it can do so by calling SetHandshake which instructs NotificationService to update the handshake it stored during +// protocol initialization. +// +// All peer events are multiplexed on the same incoming event stream from NotificationService and thus each event +// carries a peer id so the protocol knows whose information to update when receiving an event. +type NotificationService interface { + // Instruct NotificationService to open a new substream for peer. + OpenSubstream(peer peerid.PeerID) <-chan error + + // Instruct NotificationService to close substream for "peer". + CloseSubstream(peer peerid.PeerID) <-chan error + + // Send synchronous notification to peer. + SendSyncNotification(peer peerid.PeerID, notification []byte) + + // Send asynchronous notification to peer, allowing sender to exercise backpressure. + // Returns an error if the peer doesn't exist. + SendAsyncNotification(peer peerid.PeerID, notification []byte) <-chan error + + // Set handshake for the notification protocol replacing the old handshake. + SetHandshake(handshake []byte) <-chan error + + // Non-blocking variant of SetHandshake that attempts to update the handshake and returns an error if the channel + // is blocked. + // + // Technically the function can return an error if the channel to NotificationService is closed but that doesn't + // happen under normal operation. + TrySetHandshake(handshake []byte) error + + // Get next event from the NotificationService event stream. + NextEvent() <-chan NotificationEvent + + // Get protocol name of the NotificationService. + Protocol() network.ProtocolName + + // Get message sink of the peer. + MessageSink(peer peerid.PeerID) MessageSink +} + +// MessageSink for peers. +// +// If protocol cannot use [NotificationService] to send notifications to peers and requires, e.g., notifications to be +// sent in another task, the protocol may acquire a MessageSink object for each peer by calling +// [NotificationService.MessageSink]. Calling this function returns an object which allows the protocol to send +// notifications to the remote peer. +// +// Use of this API is discouraged as it's not as performant as sending notifications through [NotificationService] due +// to synchronisation required to keep the underlying notification sink up to date with possible sink replacement +// events. +type MessageSink interface { + // Send synchronous notification to the peer associated with this MessageSink. + SendSyncNotification(notification []byte) + + // Send an asynchronous notification to to the peer associated with this MessageSink, + // allowing sender to exercise backpressure. + // + // Returns an error if the peer does not exist. + SendAsyncNotification(notification []byte) <-chan error +} diff --git a/internal/client/network/sync/sync.go b/internal/client/network/sync/sync.go new file mode 100644 index 0000000000..6a72fa6e3b --- /dev/null +++ b/internal/client/network/sync/sync.go @@ -0,0 +1,27 @@ +// Copyright 2025 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package sync + +import ( + peerid "github.com/ChainSafe/gossamer/internal/client/network/types/peer-id" +) + +// SyncEvent is a syncing related event that other protocols can subscribe to. +type SyncEvent interface { + isSyncEvent() +} + +// SyncEventPeerConnected is the peer that the syncing implementation is tracking connected. +type SyncEventPeerConnected peerid.PeerID + +// SyncEventPeerDisconnected is the peer that the syncing implementation was tracking disconnected. +type SyncEventPeerDisconnected peerid.PeerID + +func (SyncEventPeerConnected) isSyncEvent() {} +func (SyncEventPeerDisconnected) isSyncEvent() {} + +type SyncEventStream interface { + // Subscribe to syncing related events. + EventStream(name string) chan SyncEvent +} diff --git a/internal/client/network/types/kad/kad.go b/internal/client/network/types/kad/kad.go new file mode 100644 index 0000000000..32472002aa --- /dev/null +++ b/internal/client/network/types/kad/kad.go @@ -0,0 +1,32 @@ +// Copyright 2025 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package kad + +import ( + "time" + + peerid "github.com/ChainSafe/gossamer/internal/client/network/types/peer-id" +) + +// Key is the (opaque) key of a record. +type Key []byte + +// Record is a record stored in the DHT. +type Record struct { + // Key of the record. + Key Key + // Value of the record. + Value []byte + // The (original) publisher of the record. + Publisher *peerid.PeerID + // The expiration time as measured by a local, monotonic clock. + Expires *time.Time +} + +// PeerRecord is a record either received by the given peer or retrieved from the local record store. +type PeerRecord struct { + // The peer from whom the record was received. nil if the record was retrieved from local storage. + Peer *peerid.PeerID + Record Record +} diff --git a/internal/client/network/types/multiaddr/multiaddr.go b/internal/client/network/types/multiaddr/multiaddr.go new file mode 100644 index 0000000000..aa8b800114 --- /dev/null +++ b/internal/client/network/types/multiaddr/multiaddr.go @@ -0,0 +1,13 @@ +// Copyright 2025 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package multiaddr + +import ( + libp2p "github.com/libp2p/go-libp2p/core" +) + +// Multiaddr type used in Gossamer +type Multiaddr struct { + libp2p.Multiaddr +} diff --git a/internal/client/network/types/multihash/multihash.go b/internal/client/network/types/multihash/multihash.go new file mode 100644 index 0000000000..b1328dfa80 --- /dev/null +++ b/internal/client/network/types/multihash/multihash.go @@ -0,0 +1,55 @@ +// Copyright 2025 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package multihash + +import "github.com/multiformats/go-multihash" + +// Code is a multihash code +type Code uint64 + +const ( + Identity Code = multihash.IDENTITY + ShaTwo256 Code = multihash.SHA2_256 +) + +// Multihash is the default multihash implementation. Only hashes used by substrate are defined. +type Multihash struct { + multihash.Multihash +} + +func (m Multihash) Code() uint64 { + decoded, err := multihash.Decode([]byte(m.Multihash)) + if err != nil { + panic("unable to decode") + } + return decoded.Code +} + +func (m Multihash) Digest() []byte { + decoded, err := multihash.Decode([]byte(m.Multihash)) + if err != nil { + panic("unable to decode") + } + return decoded.Digest +} + +func Wrap(code uint64, inputDigest []byte) (Multihash, error) { + mh, err := multihash.Encode(inputDigest, code) + if err != nil { + return Multihash{}, err + } + return Multihash{ + Multihash: multihash.Multihash(mh), + }, nil +} + +func NewMultihashFromBytes(b []byte) (Multihash, error) { + _, err := multihash.Decode(b) + if err != nil { + return Multihash{}, err + } + return Multihash{ + Multihash: multihash.Multihash(b), + }, nil +} diff --git a/internal/client/network/types/peer-id/peer_id.go b/internal/client/network/types/peer-id/peer_id.go new file mode 100644 index 0000000000..90be08f4bc --- /dev/null +++ b/internal/client/network/types/peer-id/peer_id.go @@ -0,0 +1,94 @@ +// Copyright 2025 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package peerid + +import ( + "crypto/rand" + + "github.com/ChainSafe/gossamer/internal/client/network/types/multihash" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/peer" +) + +// PeerID is the identifier of a peer of the network. +// +// The data is a CIDv0 compatible multihash of the protobuf encoded public key of the peer +// as specified in [specs/peer-ids](https://github.com/libp2p/specs/blob/master/peer-ids/peer-ids.md). +type PeerID struct { + peer.ID +} + +// Ed25519 will convert PeerID into ed25519 public key bytes. +func (pid PeerID) Ed25519() *[32]byte { + pubKey, err := pid.ID.ExtractPublicKey() + if err != nil { + return nil + } + switch pubKey.(type) { + case *crypto.Ed25519PublicKey: + default: + panic("should always be of type *crypto.Ed25519PublicKey") + } + raw, err := pubKey.Raw() + if err != nil { + return nil + } + + if len(raw) != 32 { + return nil + } + + var ret [32]byte + copy(ret[:], raw) + + return &ret +} + +// NewPeerIDFromEd25519 will create new [PeerID] from ed25519 public key bytes. +func NewPeerIDFromEd25519(b [32]byte) *PeerID { + public, err := crypto.UnmarshalEd25519PublicKey(b[:]) + if err != nil { + return nil + } + id, err := peer.IDFromPublicKey(public) + if err != nil { + return nil + } + return &PeerID{id} +} + +// NewRandomPeerID will create new random [PeerID]. +func NewRandomPeerID() PeerID { + b := make([]byte, 32) + _, err := rand.Read(b) + if err != nil { + panic(err) + } + + mh, err := multihash.Wrap(0x00, b) + if err != nil { + panic("the digest size is never too large") + } + + id, err := peer.IDFromBytes(mh.Multihash) + if err != nil { + panic(err) + } + + return PeerID{ + ID: id, + } +} + +// NewPeerID creates a [PeerID] parsed from bytes. +func NewPeerID(data []byte) (PeerID, error) { + peerID, err := peer.IDFromBytes(data) + if err != nil { + return PeerID{}, err + } + + return PeerID{ + peerID, + }, nil +} diff --git a/internal/client/network/types/peer-id/peer_id_test.go b/internal/client/network/types/peer-id/peer_id_test.go new file mode 100644 index 0000000000..e2d6fa7d5f --- /dev/null +++ b/internal/client/network/types/peer-id/peer_id_test.go @@ -0,0 +1,34 @@ +// Copyright 2025 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package peerid + +import ( + "testing" + + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/require" +) + +func TestNewRandomPeerID(t *testing.T) { + require.NotEmpty(t, NewRandomPeerID().ID) +} + +func TestFromED25519(t *testing.T) { + _, public, err := crypto.GenerateEd25519Key(nil) + require.NoError(t, err) + original, err := peer.IDFromPublicKey(public) + require.NoError(t, err) + t.Logf("%v", original) + + peerID, err := NewPeerID([]byte(original)) + require.NoError(t, err) + require.Equal(t, original, peerID.ID) + + key := peerID.Ed25519() + require.NotNil(t, key) + from := NewPeerIDFromEd25519(*key) + require.NotNil(t, from) + require.Equal(t, original, from.ID) +}