Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] feat/parachain: disputes coordinator #3347

Closed
wants to merge 42 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
1f1367e
add queue
kanishkatn Jun 13, 2023
8847198
use rwmutex
kanishkatn Jul 5, 2023
3e99781
add tests
kanishkatn Aug 21, 2023
6ea2c83
add backend
kanishkatn Jun 19, 2023
94e6d14
resolve rebase issues
kanishkatn Aug 2, 2023
90b3cf0
cleanup
kanishkatn Aug 18, 2023
f1811e1
cleanup
kanishkatn Aug 23, 2023
115463a
add imports
kanishkatn Jun 21, 2023
466a526
resolve rebase issues
kanishkatn Aug 2, 2023
c4820ea
fix rebase issues
kanishkatn Sep 18, 2023
c253c9d
add queue
kanishkatn Jun 13, 2023
5bf6f08
add tests
kanishkatn Aug 21, 2023
f0e3d0a
add backend
kanishkatn Jun 19, 2023
ddcee28
resolve rebase issues
kanishkatn Aug 2, 2023
018af5f
cleanup
kanishkatn Aug 18, 2023
9b5ec4a
cleanup
kanishkatn Aug 23, 2023
b36b7e4
add lru cache
kanishkatn Jul 31, 2023
04b61b1
add candidates
kanishkatn Jul 31, 2023
d07ad8d
add scrapper
kanishkatn Aug 1, 2023
8e4b832
use lrucache from lib
kanishkatn Aug 4, 2023
dca5889
init scraper
kanishkatn Aug 10, 2023
0ccd628
replace google btree
kanishkatn Aug 11, 2023
64e8cb1
resolve rebase issues
kanishkatn Aug 23, 2023
42db003
handle todos
kanishkatn Aug 28, 2023
78a3a63
add tests
kanishkatn Aug 29, 2023
ec48cc9
cleanup
kanishkatn Sep 14, 2023
f1b2e72
add queue
kanishkatn Jun 13, 2023
f3f5794
add backend
kanishkatn Jun 19, 2023
203333b
resolve rebase issues
kanishkatn Aug 2, 2023
ce83314
add lru cache
kanishkatn Jul 31, 2023
d316c18
use lrucache from lib
kanishkatn Aug 4, 2023
b558f8d
add spam slots
kanishkatn Jun 21, 2023
968d248
make thread safe, add tests
kanishkatn Jun 27, 2023
8e7aeb5
add benchmark
kanishkatn Jun 28, 2023
af73bb3
review suggestions
kanishkatn Jul 3, 2023
ae41f35
lint
kanishkatn Jul 5, 2023
137ede1
lint, review suggestions
kanishkatn Jul 5, 2023
9e7b234
add disputes coordinator
kanishkatn Jun 20, 2023
b4c444f
resolve rebase issues
kanishkatn Sep 19, 2023
b97f834
process onchain votes
kanishkatn Sep 28, 2023
84845cc
handle incoming
kanishkatn Sep 28, 2023
c327608
finish processors
kanishkatn Oct 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
make thread safe, add tests
kanishkatn committed Sep 19, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 968d248ec41878c212939f02782a65a115fe683e
125 changes: 99 additions & 26 deletions dot/parachain/spam_slots.go
Original file line number Diff line number Diff line change
@@ -4,11 +4,9 @@ import (
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/parachain"
"github.com/emirpasic/gods/sets/treeset"
"sync"
)

// SpamCount is the number of spam votes for a particular validator and session
type SpamCount uint32

// MaxSpamVotes is the maximum number of spam votes a validator can have for a particular session
const MaxSpamVotes = 50

@@ -28,61 +26,136 @@ type SpamSlots interface {

// spamSlots is an implementation of SpamSlots
type spamSlots struct {
Slots map[[2]interface{}]SpamCount
Unconfirmed map[[2]interface{}]*treeset.Set
slots map[[2]interface{}]uint32
slotsLock sync.RWMutex

unconfirmed map[[2]interface{}]*treeset.Set
unconfirmedLock sync.RWMutex

maxSpamVotes uint32
}

// byValidatorIndex is a comparator for ValidatorIndex
func byValidatorIndex(a, b interface{}) int {
return int(a.(parachain.ValidatorIndex) - b.(parachain.ValidatorIndex))
}

func (s *spamSlots) getSpamCount(session parachain.SessionIndex, validator parachain.ValidatorIndex) (uint32, bool) {
s.slotsLock.RLock()
defer s.slotsLock.RUnlock()
spamCount, ok := s.slots[[2]interface{}{session, validator}]
return spamCount, ok
}

func (s *spamSlots) getValidators(session parachain.SessionIndex, candidate common.Hash) (*treeset.Set, bool) {
s.unconfirmedLock.RLock()
defer s.unconfirmedLock.RUnlock()
validators, ok := s.unconfirmed[[2]interface{}{session, candidate}]
return validators, ok
}

func (s *spamSlots) clearValidators(session parachain.SessionIndex, candidate common.Hash) {
s.unconfirmedLock.Lock()
defer s.unconfirmedLock.Unlock()
delete(s.unconfirmed, [2]interface{}{session, candidate})
}

func (s spamSlots) AddUnconfirmed(session parachain.SessionIndex, candidate common.Hash, validator parachain.ValidatorIndex) bool {
if s.Slots[[2]interface{}{session, validator}] >= MaxSpamVotes {
func (s *spamSlots) clearSlots(session parachain.SessionIndex, validator parachain.ValidatorIndex) {
s.slotsLock.Lock()
defer s.slotsLock.Unlock()
delete(s.slots, [2]interface{}{session, validator})
}

func (s *spamSlots) incrementSpamCount(session parachain.SessionIndex, validator parachain.ValidatorIndex) {
s.slotsLock.Lock()
defer s.slotsLock.Unlock()
s.slots[[2]interface{}{session, validator}]++
}

func (s *spamSlots) AddUnconfirmed(session parachain.SessionIndex, candidate common.Hash, validator parachain.ValidatorIndex) bool {
if spamCount, _ := s.getSpamCount(session, validator); spamCount >= s.maxSpamVotes {
return false
}

validators := s.Unconfirmed[[2]interface{}{session, candidate}]
s.unconfirmedLock.Lock()
defer s.unconfirmedLock.Unlock()

validators, ok := s.unconfirmed[[2]interface{}{session, candidate}]
if !ok || validators == nil {
validators = treeset.NewWith(byValidatorIndex)
}

if !validators.Contains(validator) {
validators.Add(validator)
s.Slots[[2]interface{}{session, validator}]++
s.incrementSpamCount(session, validator)
}

s.unconfirmed[[2]interface{}{session, candidate}] = validators

return true
}

func (s spamSlots) Clear(session parachain.SessionIndex, candidate common.Hash) {
if validators, ok := s.Unconfirmed[[2]interface{}{session, candidate}]; ok {
for validator := range validators.Values() {
s.Slots[[2]interface{}{session, validator}]--
if s.Slots[[2]interface{}{session, validator}] <= 0 {
delete(s.Slots, [2]interface{}{session, validator})
func (s *spamSlots) Clear(session parachain.SessionIndex, candidate common.Hash) {
if validators, ok := s.getValidators(session, candidate); ok {
validatorSet := validators.Values()
s.clearValidators(session, candidate)

for _, validator := range validatorSet {
spamCount, ok := s.getSpamCount(session, validator.(parachain.ValidatorIndex))
if ok {
if spamCount == 1 {
s.clearSlots(session, validator.(parachain.ValidatorIndex))
continue
}

s.slotsLock.Lock()
s.slots[[2]interface{}{session, validator.(parachain.ValidatorIndex)}] = spamCount - 1
s.slotsLock.Unlock()
}
}
}
}

func (s spamSlots) PruneOld(oldestIndex parachain.SessionIndex) {
for k := range s.Unconfirmed {
func (s *spamSlots) PruneOld(oldestIndex parachain.SessionIndex) {
s.unconfirmedLock.Lock()
unconfirmedToDelete := make([][2]interface{}, 0)
for k := range s.unconfirmed {
if k[0].(parachain.SessionIndex) < oldestIndex {
delete(s.Unconfirmed, k)
unconfirmedToDelete = append(unconfirmedToDelete, k)
}
}

for k := range s.Slots {
for _, k := range unconfirmedToDelete {
delete(s.unconfirmed, k)
}
s.unconfirmedLock.Unlock()

s.slotsLock.Lock()
slotsToDelete := make([][2]interface{}, 0)
for k := range s.slots {
if k[0].(parachain.SessionIndex) < oldestIndex {
delete(s.Slots, k)
slotsToDelete = append(slotsToDelete, k)
}
}

for _, k := range slotsToDelete {
delete(s.slots, k)
}
s.slotsLock.Unlock()
}

// NewSpamSlots returns a new SpamSlots instance
func NewSpamSlots() SpamSlots {
func NewSpamSlots(maxSpamVotes uint32) SpamSlots {
return &spamSlots{
Slots: make(map[[2]interface{}]SpamCount),
Unconfirmed: make(map[[2]interface{}]*treeset.Set),
slots: make(map[[2]interface{}]uint32),
unconfirmed: make(map[[2]interface{}]*treeset.Set),
maxSpamVotes: maxSpamVotes,
}
}

// NewSpamSlotsFromState returns a new SpamSlots instance from the given state
func NewSpamSlotsFromState(unconfirmedDisputes map[[2]interface{}]*treeset.Set) SpamSlots {
slots := make(map[[2]interface{}]SpamCount)
slots := make(map[[2]interface{}]uint32)

for k, v := range unconfirmedDisputes {
for validator := range v.Values() {
@@ -95,7 +168,7 @@ func NewSpamSlotsFromState(unconfirmedDisputes map[[2]interface{}]*treeset.Set)
}

return &spamSlots{
Slots: slots,
Unconfirmed: unconfirmedDisputes,
slots: slots,
unconfirmed: unconfirmedDisputes,
}
}
122 changes: 122 additions & 0 deletions dot/parachain/spam_slots_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package parachain

import (
"github.com/ChainSafe/gossamer/lib/common"
"github.com/stretchr/testify/require"
"sync"
"testing"
)

func TestSpamSlots_AddUnconfirmed(t *testing.T) {
t.Parallel()
// with 3 slots, we can add 3 votes for the same validator
ss := NewSpamSlots(3)

// add 3 votes for validator 1
require.True(t, ss.AddUnconfirmed(1, common.Hash{1}, 1))
require.True(t, ss.AddUnconfirmed(1, common.Hash{2}, 1))
require.True(t, ss.AddUnconfirmed(1, common.Hash{3}, 1))

// for the 4th vote, the slot must be full
require.False(t, ss.AddUnconfirmed(1, common.Hash{}, 1))
}

func TestSpamSlots_Clear(t *testing.T) {
t.Parallel()

ss := NewSpamSlots(3)

// add 3 votes for validator 1
require.True(t, ss.AddUnconfirmed(1, common.Hash{1}, 1))
require.True(t, ss.AddUnconfirmed(1, common.Hash{2}, 1))
require.True(t, ss.AddUnconfirmed(1, common.Hash{3}, 1))

// the slot must be full
require.False(t, ss.AddUnconfirmed(1, common.Hash{0}, 1))

// clear the votes for session 1 and candidate 1
ss.Clear(1, common.Hash{1})

// now we can add another vote
require.True(t, ss.AddUnconfirmed(1, common.Hash{1}, 1))
}

func TestSpamSlots_PruneOld(t *testing.T) {
t.Parallel()

ss := NewSpamSlots(3)

// add 3 votes for validator 1 session 1
require.True(t, ss.AddUnconfirmed(1, common.Hash{1}, 1))
require.True(t, ss.AddUnconfirmed(1, common.Hash{2}, 1))
require.True(t, ss.AddUnconfirmed(1, common.Hash{3}, 1))

// add 3 votes for validator 1 session 2
require.True(t, ss.AddUnconfirmed(2, common.Hash{1}, 1))
require.True(t, ss.AddUnconfirmed(2, common.Hash{2}, 1))
require.True(t, ss.AddUnconfirmed(2, common.Hash{3}, 1))

// add 3 votes for validator 1 session 3
require.True(t, ss.AddUnconfirmed(3, common.Hash{1}, 1))
require.True(t, ss.AddUnconfirmed(3, common.Hash{2}, 1))
require.True(t, ss.AddUnconfirmed(3, common.Hash{3}, 1))

// the validator shouldn't be able to vote for session 1, 2 and 3 anymore
require.False(t, ss.AddUnconfirmed(1, common.Hash{0}, 1))
require.False(t, ss.AddUnconfirmed(2, common.Hash{0}, 1))
require.False(t, ss.AddUnconfirmed(3, common.Hash{0}, 1))

// prune old sessions
ss.PruneOld(3)

// the validator should be able to vote for session 1 and 2 but not 3
require.True(t, ss.AddUnconfirmed(1, common.Hash{0}, 1))
require.True(t, ss.AddUnconfirmed(2, common.Hash{0}, 1))
require.False(t, ss.AddUnconfirmed(3, common.Hash{0}, 1))
}

func TestSpamSlots_Concurrency(t *testing.T) {
t.Parallel()

const maxSpamVotes = 5000
const numAdd = 100000
const numClear = 100000
const numPrune = 100000

spam := NewSpamSlots(maxSpamVotes)
var wg sync.WaitGroup

// Concurrent add operations
for i := 0; i < numAdd; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_ = spam.AddUnconfirmed(1, common.Hash{1}, 1)
}()
}

// Concurrent clear operations
for i := 0; i < numClear; i++ {
wg.Add(1)
go func() {
defer wg.Done()
spam.Clear(1, common.Hash{1})
}()
}

// Concurrent prune operations
for i := 0; i < numPrune; i++ {
wg.Add(1)
go func() {
defer wg.Done()
spam.PruneOld(1)
}()
}

// Wait for all goroutines to complete
wg.Wait()
}

//TODO: add test for NewSpamSlotsFromState

//TODO: add benchmarks