Skip to content

Commit

Permalink
Merge branch 'feat/parachain' into eclesio/fragment-chain-impl
Browse files Browse the repository at this point in the history
  • Loading branch information
EclesioMeloJunior authored Dec 12, 2024
2 parents b65841c + 3f81dd2 commit e484624
Show file tree
Hide file tree
Showing 2 changed files with 268 additions and 0 deletions.
119 changes: 119 additions & 0 deletions dot/parachain/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package util
import (
"errors"
"fmt"
"math"
"sync"
"time"

"github.com/ChainSafe/gossamer/dot/parachain/chainapi"
Expand All @@ -14,8 +16,10 @@ import (
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/crypto/sr25519"
"github.com/ChainSafe/gossamer/lib/keystore"
"github.com/ChainSafe/gossamer/lib/primitives"
"github.com/ChainSafe/gossamer/lib/runtime"
wazero_runtime "github.com/ChainSafe/gossamer/lib/runtime/wazero"
"github.com/libp2p/go-libp2p/core/peer"
)

type HashHeader struct {
Expand All @@ -34,6 +38,121 @@ type Ancestors struct {
numberOfAncestors uint32
}

// NetworkBridgeTxMessage represents the message sent to the network subsystem.
type NetworkBridgeTxMessage struct {
ReportPeerMessageBatch map[peer.ID]int32
}

// UnifiedReputationChangeType represents the type of reputation change.
type UnifiedReputationChangeType string

const (
CostMinor UnifiedReputationChangeType = "CostMinor"
CostMajor UnifiedReputationChangeType = "CostMajor"
CostMinorRepeated UnifiedReputationChangeType = "CostMinorRepeated"
CostMajorRepeated UnifiedReputationChangeType = "CostMajorRepeated"
Malicious UnifiedReputationChangeType = "Malicious"
BenefitMinorFirst UnifiedReputationChangeType = "BenefitMinorFirst"
BenefitMinor UnifiedReputationChangeType = "BenefitMinor"
BenefitMajorFirst UnifiedReputationChangeType = "BenefitMajorFirst"
BenefitMajor UnifiedReputationChangeType = "BenefitMajor"
)

// UnifiedReputationChange represents a reputation change for a peer.
type UnifiedReputationChange struct {
Type UnifiedReputationChangeType
Reason string
}

// CostOrBenefit returns the cost or benefit of the reputation change.
func (u UnifiedReputationChange) CostOrBenefit() int32 {
switch u.Type {
case CostMinor:
return -100_000
case CostMajor:
return -300_000
case CostMinorRepeated:
return -200_000
case CostMajorRepeated:
return -600_000
case Malicious:
return math.MinInt32
case BenefitMajorFirst:
return 300_000
case BenefitMajor:
return 200_000
case BenefitMinorFirst:
return 15_000
case BenefitMinor:
return 10_000
default:
return 0
}
}

// ReputationAggregator collects and sends reputation changes in batches.
type ReputationAggregator struct {
sendImmediatelyIf func(rep UnifiedReputationChange) bool
byPeer map[peer.ID]int32
mu sync.Mutex
}

// NewReputationAggregator creates a new ReputationAggregator.
func NewReputationAggregator(sendImmediatelyIf func(rep UnifiedReputationChange) bool) *ReputationAggregator {
return &ReputationAggregator{
sendImmediatelyIf: sendImmediatelyIf,
byPeer: make(map[peer.ID]int32),
}
}

// Send sends the accumulated reputation changes in a batch and clears the state.
func (r *ReputationAggregator) Send(overseerCh chan<- NetworkBridgeTxMessage) {
r.mu.Lock()
defer r.mu.Unlock()

if len(r.byPeer) == 0 {
return
}

message := NetworkBridgeTxMessage{
ReportPeerMessageBatch: r.byPeer,
}
overseerCh <- message

r.byPeer = make(map[peer.ID]int32)
}

// Modify processes a reputation change, sending it immediately if necessary or accumulating it.
func (r *ReputationAggregator) Modify(
overseerCh chan<- NetworkBridgeTxMessage,
peerID peer.ID,
rep UnifiedReputationChange,
) {
r.mu.Lock()
defer r.mu.Unlock()

if r.sendImmediatelyIf(rep) {
r.singleSend(overseerCh, peerID, rep)
return
}

r.byPeer[peerID] = primitives.SaturatingAdd(r.byPeer[peerID], rep.CostOrBenefit())
}

// singleSend sends a single reputation change directly.
func (r *ReputationAggregator) singleSend(
overseerCh chan<- NetworkBridgeTxMessage,
peerID peer.ID,
rep UnifiedReputationChange,
) {
message := NetworkBridgeTxMessage{
ReportPeerMessageBatch: map[peer.ID]int32{
peerID: rep.CostOrBenefit(),
},
}
overseerCh <- message
}

// SigningKeyAndIndex finds the first key we can sign with from the given set of validators,
// if any, and returns it along with the validator index.
func SigningKeyAndIndex(
Expand Down
149 changes: 149 additions & 0 deletions dot/parachain/util/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package util

import (
"testing"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/assert"
)

func TestReputationAggregator_SendImmediately(t *testing.T) {

overseerCh := make(chan NetworkBridgeTxMessage, 1)

// Create a new aggregator with immediate send logic for Malicious type
aggregator := NewReputationAggregator(func(rep UnifiedReputationChange) bool {
return rep.Type == Malicious
})

// Mock peer and reputation change
peerID := peer.ID("peer1")
repChange := UnifiedReputationChange{
Type: Malicious,
Reason: "Detected malicious behaviour",
}

// Modify the aggregator
aggregator.Modify(overseerCh, peerID, repChange)

// Verify the message is sent immediately
select {
case msg := <-overseerCh:
assert.Len(t, msg.ReportPeerMessageBatch, 1)
assert.Equal(t, repChange.CostOrBenefit(), msg.ReportPeerMessageBatch[peerID])
default:
t.Error("Expected immediate message, but none was sent")
}
}

func TestReputationAggregator_BatchSend(t *testing.T) {

overseerCh := make(chan NetworkBridgeTxMessage, 1)

// Create a new aggregator with no immediate send logic
aggregator := NewReputationAggregator(func(rep UnifiedReputationChange) bool {
return false // Always accumulate
})

// Add multiple reputation changes
peerID1 := peer.ID("peer1")
peerID2 := peer.ID("peer2")
aggregator.Modify(overseerCh, peerID1, UnifiedReputationChange{Type: BenefitMinor, Reason: "Good behaviour"})
aggregator.Modify(overseerCh, peerID2, UnifiedReputationChange{Type: BenefitMajor, Reason: "Excellent behaviour"})

// Verify no messages were sent yet
select {
case <-overseerCh:
t.Error("Expected no message to be sent, but one was sent")
default:
}

// Call Send to flush changes
aggregator.Send(overseerCh)

// Verify the batch message
select {
case msg := <-overseerCh:
assert.Len(t, msg.ReportPeerMessageBatch, 2)
assert.Equal(t, int32(10_000), msg.ReportPeerMessageBatch[peerID1]) // BenefitMinor
assert.Equal(t, int32(200_000), msg.ReportPeerMessageBatch[peerID2]) // BenefitMajor
default:
t.Error("Expected batch message, but none was sent")
}
}

func TestReputationAggregator_ClearAfterSend(t *testing.T) {

overseerCh := make(chan NetworkBridgeTxMessage, 1)

// Create a new aggregator
aggregator := NewReputationAggregator(func(rep UnifiedReputationChange) bool {
return false // Always accumulate
})

// Add a reputation change
peerID := peer.ID("peer1")
aggregator.Modify(overseerCh, peerID, UnifiedReputationChange{Type: BenefitMinor, Reason: "Positive contribution"})

// Call Send to flush changes
aggregator.Send(overseerCh)

// Verify the batch message
select {
case <-overseerCh:
// Expected message sent
default:
t.Error("Expected batch message, but none was sent")
}

// Verify the internal state is cleared
assert.Empty(t, aggregator.byPeer)
}

func TestReputationAggregator_ConflictResolution(t *testing.T) {

overseerCh := make(chan NetworkBridgeTxMessage, 1)

// Create a new aggregator
aggregator := NewReputationAggregator(func(rep UnifiedReputationChange) bool {
return false // Always accumulate
})

// Add multiple reputation changes for the same peer
peerID := peer.ID("peer1")
aggregator.Modify(overseerCh, peerID, UnifiedReputationChange{Type: BenefitMajor, Reason: "Helpful behaviour"})
aggregator.Modify(overseerCh, peerID, UnifiedReputationChange{Type: CostMinor, Reason: "Minor issue"})

// Call Send to flush changes
aggregator.Send(overseerCh)

// Verify the accumulated result
select {
case msg := <-overseerCh:
assert.Len(t, msg.ReportPeerMessageBatch, 1)
assert.Equal(t, int32(100_000), msg.ReportPeerMessageBatch[peerID]) // 200_000 + (-100_000) = 100_000
default:
t.Error("Expected batch message, but none was sent")
}
}

func TestReputationAggregator_NoActionWithoutChanges(t *testing.T) {

overseerCh := make(chan NetworkBridgeTxMessage, 1)

// Create a new aggregator
aggregator := NewReputationAggregator(func(rep UnifiedReputationChange) bool {
return false
})

// Call Send without any changes
aggregator.Send(overseerCh)

// Verify no messages were sent
select {
case <-overseerCh:
t.Error("Expected no message, but one was sent")
default:
// Expected behaviour
}
}

0 comments on commit e484624

Please sign in to comment.