From 3f81dd22f091482a5ca00de296fd99d305c6cb7a Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 11 Dec 2024 11:28:06 -0300 Subject: [PATCH] feat(dot/parachains): implement ReputationAggregator (#4401) --- dot/parachain/util/util.go | 119 +++++++++++++++++++++++++ dot/parachain/util/util_test.go | 149 ++++++++++++++++++++++++++++++++ 2 files changed, 268 insertions(+) create mode 100644 dot/parachain/util/util_test.go diff --git a/dot/parachain/util/util.go b/dot/parachain/util/util.go index dfcc3d4a9d..82afe41a3f 100644 --- a/dot/parachain/util/util.go +++ b/dot/parachain/util/util.go @@ -6,6 +6,8 @@ package util import ( "errors" "fmt" + "math" + "sync" "time" "github.com/ChainSafe/gossamer/dot/parachain/chainapi" @@ -14,8 +16,10 @@ import ( "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/crypto/sr25519" "github.com/ChainSafe/gossamer/lib/keystore" + "github.com/ChainSafe/gossamer/lib/primitives" "github.com/ChainSafe/gossamer/lib/runtime" wazero_runtime "github.com/ChainSafe/gossamer/lib/runtime/wazero" + "github.com/libp2p/go-libp2p/core/peer" ) type HashHeader struct { @@ -34,6 +38,121 @@ type Ancestors struct { numberOfAncestors uint32 } +// NetworkBridgeTxMessage represents the message sent to the network subsystem. +type NetworkBridgeTxMessage struct { + ReportPeerMessageBatch map[peer.ID]int32 +} + +// UnifiedReputationChangeType represents the type of reputation change. +type UnifiedReputationChangeType string + +const ( + CostMinor UnifiedReputationChangeType = "CostMinor" + CostMajor UnifiedReputationChangeType = "CostMajor" + CostMinorRepeated UnifiedReputationChangeType = "CostMinorRepeated" + CostMajorRepeated UnifiedReputationChangeType = "CostMajorRepeated" + Malicious UnifiedReputationChangeType = "Malicious" + BenefitMinorFirst UnifiedReputationChangeType = "BenefitMinorFirst" + BenefitMinor UnifiedReputationChangeType = "BenefitMinor" + BenefitMajorFirst UnifiedReputationChangeType = "BenefitMajorFirst" + BenefitMajor UnifiedReputationChangeType = "BenefitMajor" +) + +// UnifiedReputationChange represents a reputation change for a peer. +type UnifiedReputationChange struct { + Type UnifiedReputationChangeType + Reason string +} + +// CostOrBenefit returns the cost or benefit of the reputation change. +func (u UnifiedReputationChange) CostOrBenefit() int32 { + switch u.Type { + case CostMinor: + return -100_000 + case CostMajor: + return -300_000 + case CostMinorRepeated: + return -200_000 + case CostMajorRepeated: + return -600_000 + case Malicious: + return math.MinInt32 + case BenefitMajorFirst: + return 300_000 + case BenefitMajor: + return 200_000 + case BenefitMinorFirst: + return 15_000 + case BenefitMinor: + return 10_000 + default: + return 0 + } +} + +// ReputationAggregator collects and sends reputation changes in batches. +type ReputationAggregator struct { + sendImmediatelyIf func(rep UnifiedReputationChange) bool + byPeer map[peer.ID]int32 + mu sync.Mutex +} + +// NewReputationAggregator creates a new ReputationAggregator. +func NewReputationAggregator(sendImmediatelyIf func(rep UnifiedReputationChange) bool) *ReputationAggregator { + return &ReputationAggregator{ + sendImmediatelyIf: sendImmediatelyIf, + byPeer: make(map[peer.ID]int32), + } +} + +// Send sends the accumulated reputation changes in a batch and clears the state. +func (r *ReputationAggregator) Send(overseerCh chan<- NetworkBridgeTxMessage) { + r.mu.Lock() + defer r.mu.Unlock() + + if len(r.byPeer) == 0 { + return + } + + message := NetworkBridgeTxMessage{ + ReportPeerMessageBatch: r.byPeer, + } + overseerCh <- message + + r.byPeer = make(map[peer.ID]int32) +} + +// Modify processes a reputation change, sending it immediately if necessary or accumulating it. +func (r *ReputationAggregator) Modify( + overseerCh chan<- NetworkBridgeTxMessage, + peerID peer.ID, + rep UnifiedReputationChange, +) { + r.mu.Lock() + defer r.mu.Unlock() + + if r.sendImmediatelyIf(rep) { + r.singleSend(overseerCh, peerID, rep) + return + } + + r.byPeer[peerID] = primitives.SaturatingAdd(r.byPeer[peerID], rep.CostOrBenefit()) +} + +// singleSend sends a single reputation change directly. +func (r *ReputationAggregator) singleSend( + overseerCh chan<- NetworkBridgeTxMessage, + peerID peer.ID, + rep UnifiedReputationChange, +) { + message := NetworkBridgeTxMessage{ + ReportPeerMessageBatch: map[peer.ID]int32{ + peerID: rep.CostOrBenefit(), + }, + } + overseerCh <- message +} + // SigningKeyAndIndex finds the first key we can sign with from the given set of validators, // if any, and returns it along with the validator index. func SigningKeyAndIndex( diff --git a/dot/parachain/util/util_test.go b/dot/parachain/util/util_test.go new file mode 100644 index 0000000000..29ef158a78 --- /dev/null +++ b/dot/parachain/util/util_test.go @@ -0,0 +1,149 @@ +package util + +import ( + "testing" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/assert" +) + +func TestReputationAggregator_SendImmediately(t *testing.T) { + + overseerCh := make(chan NetworkBridgeTxMessage, 1) + + // Create a new aggregator with immediate send logic for Malicious type + aggregator := NewReputationAggregator(func(rep UnifiedReputationChange) bool { + return rep.Type == Malicious + }) + + // Mock peer and reputation change + peerID := peer.ID("peer1") + repChange := UnifiedReputationChange{ + Type: Malicious, + Reason: "Detected malicious behaviour", + } + + // Modify the aggregator + aggregator.Modify(overseerCh, peerID, repChange) + + // Verify the message is sent immediately + select { + case msg := <-overseerCh: + assert.Len(t, msg.ReportPeerMessageBatch, 1) + assert.Equal(t, repChange.CostOrBenefit(), msg.ReportPeerMessageBatch[peerID]) + default: + t.Error("Expected immediate message, but none was sent") + } +} + +func TestReputationAggregator_BatchSend(t *testing.T) { + + overseerCh := make(chan NetworkBridgeTxMessage, 1) + + // Create a new aggregator with no immediate send logic + aggregator := NewReputationAggregator(func(rep UnifiedReputationChange) bool { + return false // Always accumulate + }) + + // Add multiple reputation changes + peerID1 := peer.ID("peer1") + peerID2 := peer.ID("peer2") + aggregator.Modify(overseerCh, peerID1, UnifiedReputationChange{Type: BenefitMinor, Reason: "Good behaviour"}) + aggregator.Modify(overseerCh, peerID2, UnifiedReputationChange{Type: BenefitMajor, Reason: "Excellent behaviour"}) + + // Verify no messages were sent yet + select { + case <-overseerCh: + t.Error("Expected no message to be sent, but one was sent") + default: + } + + // Call Send to flush changes + aggregator.Send(overseerCh) + + // Verify the batch message + select { + case msg := <-overseerCh: + assert.Len(t, msg.ReportPeerMessageBatch, 2) + assert.Equal(t, int32(10_000), msg.ReportPeerMessageBatch[peerID1]) // BenefitMinor + assert.Equal(t, int32(200_000), msg.ReportPeerMessageBatch[peerID2]) // BenefitMajor + default: + t.Error("Expected batch message, but none was sent") + } +} + +func TestReputationAggregator_ClearAfterSend(t *testing.T) { + + overseerCh := make(chan NetworkBridgeTxMessage, 1) + + // Create a new aggregator + aggregator := NewReputationAggregator(func(rep UnifiedReputationChange) bool { + return false // Always accumulate + }) + + // Add a reputation change + peerID := peer.ID("peer1") + aggregator.Modify(overseerCh, peerID, UnifiedReputationChange{Type: BenefitMinor, Reason: "Positive contribution"}) + + // Call Send to flush changes + aggregator.Send(overseerCh) + + // Verify the batch message + select { + case <-overseerCh: + // Expected message sent + default: + t.Error("Expected batch message, but none was sent") + } + + // Verify the internal state is cleared + assert.Empty(t, aggregator.byPeer) +} + +func TestReputationAggregator_ConflictResolution(t *testing.T) { + + overseerCh := make(chan NetworkBridgeTxMessage, 1) + + // Create a new aggregator + aggregator := NewReputationAggregator(func(rep UnifiedReputationChange) bool { + return false // Always accumulate + }) + + // Add multiple reputation changes for the same peer + peerID := peer.ID("peer1") + aggregator.Modify(overseerCh, peerID, UnifiedReputationChange{Type: BenefitMajor, Reason: "Helpful behaviour"}) + aggregator.Modify(overseerCh, peerID, UnifiedReputationChange{Type: CostMinor, Reason: "Minor issue"}) + + // Call Send to flush changes + aggregator.Send(overseerCh) + + // Verify the accumulated result + select { + case msg := <-overseerCh: + assert.Len(t, msg.ReportPeerMessageBatch, 1) + assert.Equal(t, int32(100_000), msg.ReportPeerMessageBatch[peerID]) // 200_000 + (-100_000) = 100_000 + default: + t.Error("Expected batch message, but none was sent") + } +} + +func TestReputationAggregator_NoActionWithoutChanges(t *testing.T) { + + overseerCh := make(chan NetworkBridgeTxMessage, 1) + + // Create a new aggregator + aggregator := NewReputationAggregator(func(rep UnifiedReputationChange) bool { + return false + }) + + // Call Send without any changes + aggregator.Send(overseerCh) + + // Verify no messages were sent + select { + case <-overseerCh: + t.Error("Expected no message, but one was sent") + default: + // Expected behaviour + } +}