Skip to content

Commit

Permalink
Fix - running out of nonces under load (#224)
Browse files Browse the repository at this point in the history
* Account for continued nonce consumption in target. Sort by expiration

* remove redundant lock

* lock for entire GetNonces operation

* Swap map to slice for nonce cache in CosignerNonceCache

* Add tests, fix prune all

* Fix test

* Add Delete test

* lint

* siplify flaky test

* add comments
  • Loading branch information
agouin authored Nov 22, 2023
1 parent ad63753 commit 976815f
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 55 deletions.
118 changes: 65 additions & 53 deletions signer/cosigner_nonce_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
const (
defaultGetNoncesInterval = 3 * time.Second
defaultGetNoncesTimeout = 4 * time.Second
cachePreSize = 10000
nonceCacheExpiration = 10 * time.Second // half of the local cosigner cache expiration
defaultNonceExpiration = 10 * time.Second // half of the local cosigner cache expiration
targetTrim = 10
)

type CosignerNonceCache struct {
Expand All @@ -29,10 +29,13 @@ type CosignerNonceCache struct {

getNoncesInterval time.Duration
getNoncesTimeout time.Duration
nonceExpiration time.Duration

threshold uint8

cache NonceCache

pruner NonceCachePruner
}

type lastCount struct {
Expand All @@ -58,15 +61,13 @@ func (lc *lastCount) Get() int {
return lc.count
}

type NonceCache struct {
cache map[uuid.UUID]*CachedNonce
mu sync.RWMutex
type NonceCachePruner interface {
PruneNonces() int
}

func NewNonceCache() NonceCache {
return NonceCache{
cache: make(map[uuid.UUID]*CachedNonce, cachePreSize),
}
type NonceCache struct {
cache []*CachedNonce
mu sync.RWMutex
}

func (nc *NonceCache) Size() int {
Expand All @@ -75,17 +76,14 @@ func (nc *NonceCache) Size() int {
return len(nc.cache)
}

func (nc *NonceCache) Get(uuid uuid.UUID) (*CachedNonce, bool) {
nc.mu.RLock()
defer nc.mu.RUnlock()
cn, ok := nc.cache[uuid]
return cn, ok
}

func (nc *NonceCache) Set(uuid uuid.UUID, cn *CachedNonce) {
func (nc *NonceCache) Add(cn *CachedNonce) {
nc.mu.Lock()
defer nc.mu.Unlock()
nc.cache[uuid] = cn
nc.cache = append(nc.cache, cn)
}

func (nc *NonceCache) Delete(index int) {
nc.cache = append(nc.cache[:index], nc.cache[index+1:]...)
}

type CosignerNoncesRel struct {
Expand Down Expand Up @@ -115,17 +113,26 @@ func NewCosignerNonceCache(
leader Leader,
getNoncesInterval time.Duration,
getNoncesTimeout time.Duration,
nonceExpiration time.Duration,
threshold uint8,
pruner NonceCachePruner,
) *CosignerNonceCache {
return &CosignerNonceCache{
cnc := &CosignerNonceCache{
logger: logger,
cache: NewNonceCache(),
cosigners: cosigners,
leader: leader,
getNoncesInterval: getNoncesInterval,
getNoncesTimeout: getNoncesTimeout,
nonceExpiration: nonceExpiration,
threshold: threshold,
pruner: pruner,
}
// the only time pruner is expected to be non-nil is during tests, otherwise we use the cache logic.
if pruner == nil {
cnc.pruner = cnc
}

return cnc
}

func (cnc *CosignerNonceCache) getUuids(n int) []uuid.UUID {
Expand All @@ -136,18 +143,23 @@ func (cnc *CosignerNonceCache) getUuids(n int) []uuid.UUID {
return uuids
}

func (cnc *CosignerNonceCache) target() int {
return int((cnc.noncesPerMinute/60)*cnc.getNoncesInterval.Seconds()*1.2) + int(cnc.noncesPerMinute/30) + targetTrim
}

func (cnc *CosignerNonceCache) reconcile(ctx context.Context) {
// prune expired nonces
cnc.pruneNonces()
pruned := cnc.pruner.PruneNonces()

if !cnc.leader.IsLeader() {
return
}
remainingNonces := cnc.cache.Size()
timeSinceLastReconcile := time.Since(cnc.lastReconcileTime)

lastReconcileNonces := cnc.lastReconcileNonces.Get()
// calculate nonces per minute
noncesPerMin := float64(cnc.lastReconcileNonces.Get()-remainingNonces) / timeSinceLastReconcile.Minutes()
noncesPerMin := float64(lastReconcileNonces-remainingNonces-pruned) / timeSinceLastReconcile.Minutes()
if noncesPerMin < 0 {
noncesPerMin = 0
}
Expand All @@ -167,15 +179,16 @@ func (cnc *CosignerNonceCache) reconcile(ctx context.Context) {

// calculate how many nonces we need to load to keep up with demand
// load 120% the number of nonces we need to keep up with demand,
// plus a couple seconds worth of nonces to account for nonce consumption during LoadN
// plus 10 for padding

target := int((cnc.noncesPerMinute/60)*cnc.getNoncesInterval.Seconds()*1.2) + 10
additional := target - remainingNonces
t := cnc.target()
additional := t - remainingNonces
if additional <= 0 {
// we're ahead of demand, don't load any more
cnc.logger.Debug(
"Cosigner nonce cache ahead of demand",
"target", target,
"target", t,
"remaining", remainingNonces,
"noncesPerMin", cnc.noncesPerMinute,
)
Expand All @@ -184,7 +197,7 @@ func (cnc *CosignerNonceCache) reconcile(ctx context.Context) {

cnc.logger.Debug(
"Loading additional nonces to meet demand",
"target", target,
"target", t,
"remaining", remainingNonces,
"additional", additional,
"noncesPerMin", cnc.noncesPerMinute,
Expand All @@ -202,7 +215,7 @@ func (cnc *CosignerNonceCache) LoadN(ctx context.Context, n int) {
var wg sync.WaitGroup
wg.Add(len(cnc.cosigners))

expiration := time.Now().Add(nonceCacheExpiration)
expiration := time.Now().Add(cnc.nonceExpiration)

for i, p := range cnc.cosigners {
i := i
Expand Down Expand Up @@ -251,7 +264,7 @@ func (cnc *CosignerNonceCache) LoadN(ctx context.Context, n int) {
})
}
if num >= cnc.threshold {
cnc.cache.Set(u, &nonce)
cnc.cache.Add(&nonce)
added++
}
}
Expand All @@ -274,10 +287,10 @@ func (cnc *CosignerNonceCache) Start(ctx context.Context) {
}

func (cnc *CosignerNonceCache) GetNonces(fastestPeers []Cosigner) (*CosignerUUIDNonces, error) {
cnc.cache.mu.RLock()
defer cnc.cache.mu.RUnlock()
cnc.cache.mu.Lock()
defer cnc.cache.mu.Unlock()
CheckNoncesLoop:
for u, cn := range cnc.cache.cache {
for i, cn := range cnc.cache.cache {
var nonces CosignerNonces
for _, p := range fastestPeers {
found := false
Expand All @@ -294,13 +307,12 @@ CheckNoncesLoop:
}
}

cnc.cache.mu.RUnlock()
cnc.clearNonce(u)
cnc.cache.mu.RLock()
// remove this set of nonces from the cache
cnc.cache.Delete(i)

// all peers found
return &CosignerUUIDNonces{
UUID: u,
UUID: cn.UUID,
Nonces: nonces,
}, nil
}
Expand All @@ -316,26 +328,32 @@ CheckNoncesLoop:
return nil, fmt.Errorf("no nonces found involving cosigners %+v", cosignerInts)
}

func (cnc *CosignerNonceCache) pruneNonces() {
func (cnc *CosignerNonceCache) PruneNonces() int {
cnc.cache.mu.Lock()
defer cnc.cache.mu.Unlock()
for u, cn := range cnc.cache.cache {
if time.Now().After(cn.Expiration) {
delete(cnc.cache.cache, u)
nonExpiredIndex := len(cnc.cache.cache) - 1
for i := len(cnc.cache.cache) - 1; i >= 0; i-- {
if time.Now().Before(cnc.cache.cache[i].Expiration) {
nonExpiredIndex = i
break
}
if i == 0 {
deleteCount := len(cnc.cache.cache)
cnc.cache.cache = nil
return deleteCount
}
}
}

func (cnc *CosignerNonceCache) clearNonce(uuid uuid.UUID) {
cnc.cache.mu.Lock()
defer cnc.cache.mu.Unlock()
delete(cnc.cache.cache, uuid)
deleteCount := len(cnc.cache.cache) - nonExpiredIndex - 1
if nonExpiredIndex != len(cnc.cache.cache)-1 {
cnc.cache.cache = cnc.cache.cache[:nonExpiredIndex+1]
}
return deleteCount
}

func (cnc *CosignerNonceCache) ClearNonces(cosigner Cosigner) {
cnc.cache.mu.Lock()
defer cnc.cache.mu.Unlock()
for u, cn := range cnc.cache.cache {
for i, cn := range cnc.cache.cache {
deleteID := -1
for i, n := range cn.Nonces {
if n.Cosigner.GetID() == cosigner.GetID() {
Expand All @@ -347,16 +365,10 @@ func (cnc *CosignerNonceCache) ClearNonces(cosigner Cosigner) {
if deleteID >= 0 {
if len(cn.Nonces)-1 < int(cnc.threshold) {
// If cosigners on this nonce drops below threshold, delete it as it's no longer usable
delete(cnc.cache.cache, u)
cnc.cache.Delete(i)
} else {
cn.Nonces = append(cn.Nonces[:deleteID], cn.Nonces[deleteID+1:]...)
}
}
}
}

func (cnc *CosignerNonceCache) ClearAllNonces() {
cnc.cache.mu.Lock()
defer cnc.cache.mu.Unlock()
cnc.cache.cache = make(map[uuid.UUID]*CachedNonce, cachePreSize)
}
93 changes: 91 additions & 2 deletions signer/cosigner_nonce_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,69 @@ package signer
import (
"context"
"os"
"sync"
"testing"
"time"

cometlog "github.com/cometbft/cometbft/libs/log"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
)

func TestNonceCache(_ *testing.T) {
nc := NonceCache{}
for i := 0; i < 10; i++ {
nc.Add(&CachedNonce{UUID: uuid.New(), Expiration: time.Now().Add(1 * time.Second)})
}

nc.Delete(nc.Size() - 1)
nc.Delete(0)
}

type mockPruner struct {
cnc *CosignerNonceCache
count int
pruned int
mu sync.Mutex
}

func (mp *mockPruner) PruneNonces() int {
pruned := mp.cnc.PruneNonces()
mp.mu.Lock()
defer mp.mu.Unlock()
mp.count++
mp.pruned += pruned
return pruned
}

func (mp *mockPruner) Result() (int, int) {
mp.mu.Lock()
defer mp.mu.Unlock()
return mp.count, mp.pruned
}

func TestNonceCacheDemand(t *testing.T) {
lcs, _ := getTestLocalCosigners(t, 2, 3)
cosigners := make([]Cosigner, len(lcs))
for i, lc := range lcs {
cosigners[i] = lc
}

mp := &mockPruner{}

nonceCache := NewCosignerNonceCache(
cometlog.NewTMLogger(cometlog.NewSyncWriter(os.Stdout)),
cosigners,
&MockLeader{id: 1, leader: &ThresholdValidator{myCosigner: lcs[0]}},
500*time.Millisecond,
100*time.Millisecond,
defaultNonceExpiration,
2,
mp,
)

mp.cnc = nonceCache

ctx, cancel := context.WithCancel(context.Background())

nonceCache.LoadN(ctx, 500)
Expand All @@ -45,6 +85,55 @@ func TestNonceCacheDemand(t *testing.T) {

cancel()

target := int(nonceCache.noncesPerMinute*.01) + 10
require.LessOrEqual(t, size, target)
require.LessOrEqual(t, size, nonceCache.target())

require.Greater(t, mp.count, 0)
require.Equal(t, 0, mp.pruned)
}

func TestNonceCacheExpiration(t *testing.T) {
lcs, _ := getTestLocalCosigners(t, 2, 3)
cosigners := make([]Cosigner, len(lcs))
for i, lc := range lcs {
cosigners[i] = lc
}

mp := &mockPruner{}

nonceCache := NewCosignerNonceCache(
cometlog.NewTMLogger(cometlog.NewSyncWriter(os.Stdout)),
cosigners,
&MockLeader{id: 1, leader: &ThresholdValidator{myCosigner: lcs[0]}},
250*time.Millisecond,
10*time.Millisecond,
500*time.Millisecond,
2,
mp,
)

mp.cnc = nonceCache

ctx, cancel := context.WithCancel(context.Background())

const loadN = 500

nonceCache.LoadN(ctx, loadN)

go nonceCache.Start(ctx)

time.Sleep(1 * time.Second)

count, pruned := mp.Result()

// we should have pruned at least three times after
// waiting for a second with a reconcile interval of 250ms
require.GreaterOrEqual(t, count, 3)

// we should have pruned at least the number of nonces we loaded and knew would expire
require.GreaterOrEqual(t, pruned, loadN)

cancel()

// the cache should have at most the trim padding since no nonces are being consumed.
require.LessOrEqual(t, nonceCache.cache.Size(), targetTrim)
}
Loading

0 comments on commit 976815f

Please sign in to comment.