Skip to content

Commit

Permalink
feat(prospective-parachains) implement ReputationAggregator
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielDDHM committed Dec 10, 2024
1 parent 5c4869d commit ef57de4
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 38 deletions.
101 changes: 76 additions & 25 deletions dot/parachain/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,75 +36,126 @@ type Ancestors struct {
numberOfAncestors uint32
}

// NetworkBridgeTxMessage represents the message sent to the network subsystem.
type NetworkBridgeTxMessage struct {
ReportPeerMessageBatch map[peer.ID]int32
}

// UnifiedReputationChangeType represents the type of reputation change.
type UnifiedReputationChangeType string

const (
CostMinor UnifiedReputationChangeType = "CostMinor"
CostMajor UnifiedReputationChangeType = "CostMajor"
CostMinorRepeated UnifiedReputationChangeType = "CostMinorRepeated"
CostMajorRepeated UnifiedReputationChangeType = "CostMajorRepeated"
Malicious UnifiedReputationChangeType = "Malicious"
BenefitMinorFirst UnifiedReputationChangeType = "BenefitMinorFirst"
BenefitMinor UnifiedReputationChangeType = "BenefitMinor"
BenefitMajorFirst UnifiedReputationChangeType = "BenefitMajorFirst"
BenefitMajor UnifiedReputationChangeType = "BenefitMajor"
)

// UnifiedReputationChange represents a reputation change for a peer.
type UnifiedReputationChange struct {
Change int32
Type UnifiedReputationChangeType
Reason string
}

// NetworkBridgeTxMessage represents the message sent to the network subsystem.
type NetworkBridgeTxMessage struct {
ReportPeerMessageBatch map[peer.ID]int32
// CostOrBenefit returns the cost or benefit of the reputation change.
func (u UnifiedReputationChange) CostOrBenefit() int32 {
switch u.Type {
case CostMinor:
return -100_000
case CostMajor:
return -300_000
case CostMinorRepeated:
return -200_000
case CostMajorRepeated:
return -600_000
case Malicious:
return -1 << 31 // Equivalent to i32::MIN
case BenefitMajorFirst:
return 300_000
case BenefitMajor:
return 200_000
case BenefitMinorFirst:
return 15_000
case BenefitMinor:
return 10_000
default:
return 0
}
}

// ReputationAggregator aggregates reputation changes for peers.
// ReputationAggregator collects and sends reputation changes in batches.
type ReputationAggregator struct {
sendImmediatelyIf func(rep UnifiedReputationChange) bool
byPeer map[peer.ID]int32
mu sync.Mutex // Mutex for thread safety
mu sync.Mutex
}

// NewReputationAggregator creates a new ReputationAggregator.
func NewReputationAggregator(sendImmediatelyIf func(rep UnifiedReputationChange) bool) *ReputationAggregator {
return &ReputationAggregator{
byPeer: make(map[peer.ID]int32),
sendImmediatelyIf: sendImmediatelyIf,
byPeer: make(map[peer.ID]int32),
}
}

// Send sends the aggregated reputation changes in a batch and clears the state.
// Send sends the accumulated reputation changes in a batch and clears the state.
func (r *ReputationAggregator) Send(overseerCh chan<- NetworkBridgeTxMessage) {
r.mu.Lock()
defer r.mu.Unlock()

// If there's nothing to send, exit
if len(r.byPeer) == 0 {
return
}

// Create the batch message
message := NetworkBridgeTxMessage{
ReportPeerMessageBatch: r.byPeer,
}

// Send the message
overseerCh <- message

// Clear the map after sending
r.byPeer = make(map[peer.ID]int32)
}

// Modify adds a reputation change to the internal state or sends it immediately if needed.
// Modify processes a reputation change, sending it immediately if necessary or accumulating it.
func (r *ReputationAggregator) Modify(overseerCh chan<- NetworkBridgeTxMessage, peerID peer.ID, rep UnifiedReputationChange) {
r.mu.Lock()
defer r.mu.Unlock()

// Check if the change should be sent immediately
if r.sendImmediatelyIf(rep) {
// Send immediately without adding to the aggregator
overseerCh <- NetworkBridgeTxMessage{
ReportPeerMessageBatch: map[peer.ID]int32{
peerID: rep.Change,
},
}
r.singleSend(overseerCh, peerID, rep)
return
}

// Otherwise, accumulate the reputation change
if r.byPeer == nil {
r.byPeer = make(map[peer.ID]int32)
r.add(peerID, rep)
fmt.Printf("Accumulated reputation change for peer %s: %+v\n", peerID, rep)
}

// singleSend sends a single reputation change directly.
func (r *ReputationAggregator) singleSend(overseerCh chan<- NetworkBridgeTxMessage, peerID peer.ID, rep UnifiedReputationChange) {
message := NetworkBridgeTxMessage{
ReportPeerMessageBatch: map[peer.ID]int32{
peerID: rep.CostOrBenefit(),
},
}
overseerCh <- message
fmt.Printf("Sent immediate reputation change for peer %s: %+v\n", peerID, rep)
}

// add accumulates a reputation change for a peer.
func (r *ReputationAggregator) add(peerID peer.ID, rep UnifiedReputationChange) {
if _, exists := r.byPeer[peerID]; !exists {
r.byPeer[peerID] = 0
}
r.byPeer[peerID] += rep.Change
r.byPeer[peerID] += rep.CostOrBenefit()
}

// AddReputation updates a batch with a reputation change for a peer.
func AddReputation(batch map[peer.ID]int32, peerID peer.ID, rep UnifiedReputationChange) {
batch[peerID] += rep.CostOrBenefit()
}

// SigningKeyAndIndex finds the first key we can sign with from the given set of validators,
Expand Down
26 changes: 13 additions & 13 deletions dot/parachain/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ func TestReputationAggregator_SendImmediately(t *testing.T) {
// Mock channel
overseerCh := make(chan NetworkBridgeTxMessage, 1)

// Create a new aggregator with immediate send logic for changes < 0
// Create a new aggregator with immediate send logic for Malicious type
aggregator := NewReputationAggregator(func(rep UnifiedReputationChange) bool {
return rep.Change < 0
return rep.Type == Malicious
})

// Mock peer and reputation change
peerID := peer.ID("peer1")
repChange := UnifiedReputationChange{
Change: -10,
Reason: "Malicious behavior",
Type: Malicious,
Reason: "Detected malicious behavior",
}

// Modify the aggregator
Expand All @@ -30,7 +30,7 @@ func TestReputationAggregator_SendImmediately(t *testing.T) {
select {
case msg := <-overseerCh:
assert.Len(t, msg.ReportPeerMessageBatch, 1)
assert.Equal(t, int32(-10), msg.ReportPeerMessageBatch[peerID])
assert.Equal(t, repChange.CostOrBenefit(), msg.ReportPeerMessageBatch[peerID])
default:
t.Error("Expected immediate message, but none was sent")
}
Expand All @@ -48,8 +48,8 @@ func TestReputationAggregator_BatchSend(t *testing.T) {
// Add multiple reputation changes
peerID1 := peer.ID("peer1")
peerID2 := peer.ID("peer2")
aggregator.Modify(overseerCh, peerID1, UnifiedReputationChange{Change: 5, Reason: "Good behavior"})
aggregator.Modify(overseerCh, peerID2, UnifiedReputationChange{Change: 10, Reason: "Great behavior"})
aggregator.Modify(overseerCh, peerID1, UnifiedReputationChange{Type: BenefitMinor, Reason: "Good behavior"})
aggregator.Modify(overseerCh, peerID2, UnifiedReputationChange{Type: BenefitMajor, Reason: "Excellent behavior"})

// Verify no messages were sent yet
select {
Expand All @@ -65,8 +65,8 @@ func TestReputationAggregator_BatchSend(t *testing.T) {
select {
case msg := <-overseerCh:
assert.Len(t, msg.ReportPeerMessageBatch, 2)
assert.Equal(t, int32(5), msg.ReportPeerMessageBatch[peerID1])
assert.Equal(t, int32(10), msg.ReportPeerMessageBatch[peerID2])
assert.Equal(t, int32(10_000), msg.ReportPeerMessageBatch[peerID1]) // BenefitMinor
assert.Equal(t, int32(200_000), msg.ReportPeerMessageBatch[peerID2]) // BenefitMajor
default:
t.Error("Expected batch message, but none was sent")
}
Expand All @@ -83,7 +83,7 @@ func TestReputationAggregator_ClearAfterSend(t *testing.T) {

// Add a reputation change
peerID := peer.ID("peer1")
aggregator.Modify(overseerCh, peerID, UnifiedReputationChange{Change: 10, Reason: "Good behavior"})
aggregator.Modify(overseerCh, peerID, UnifiedReputationChange{Type: BenefitMinor, Reason: "Positive contribution"})

// Call Send to flush changes
aggregator.Send(overseerCh)
Expand Down Expand Up @@ -111,8 +111,8 @@ func TestReputationAggregator_ConflictResolution(t *testing.T) {

// Add multiple reputation changes for the same peer
peerID := peer.ID("peer1")
aggregator.Modify(overseerCh, peerID, UnifiedReputationChange{Change: 10, Reason: "Good behavior"})
aggregator.Modify(overseerCh, peerID, UnifiedReputationChange{Change: -5, Reason: "Minor issue"})
aggregator.Modify(overseerCh, peerID, UnifiedReputationChange{Type: BenefitMajor, Reason: "Helpful behavior"})
aggregator.Modify(overseerCh, peerID, UnifiedReputationChange{Type: CostMinor, Reason: "Minor issue"})

// Call Send to flush changes
aggregator.Send(overseerCh)
Expand All @@ -121,7 +121,7 @@ func TestReputationAggregator_ConflictResolution(t *testing.T) {
select {
case msg := <-overseerCh:
assert.Len(t, msg.ReportPeerMessageBatch, 1)
assert.Equal(t, int32(5), msg.ReportPeerMessageBatch[peerID]) // 10 + (-5) = 5
assert.Equal(t, int32(100_000), msg.ReportPeerMessageBatch[peerID]) // 200_000 + (-100_000) = 100_000
default:
t.Error("Expected batch message, but none was sent")
}
Expand Down

0 comments on commit ef57de4

Please sign in to comment.