Skip to content

Commit

Permalink
Merge pull request #151 from bloxapp/slotQueue-multi-listeners
Browse files Browse the repository at this point in the history
slotQueue support multi validators listeners
  • Loading branch information
nivBlox authored Jul 1, 2021
2 parents c242073 + 1b5a6a8 commit 3b82aa5
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 31 deletions.
79 changes: 57 additions & 22 deletions slotqueue/slotqueue.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package slotqueue

import (
"encoding/hex"
"fmt"
"github.com/bloxapp/ssv/pubsub"
"time"

"github.com/bloxapp/eth2-key-manager/core"
Expand All @@ -13,54 +15,87 @@ import (
// Queue represents the behavior of the slot queue
type Queue interface {
// Next returns the next slot with its duties at its time
Next(pubKey []byte) (uint64, *ethpb.DutiesResponse_Duty, bool, error)
RegisterToNext(pubKey []byte) (pubsub.SubjectChannel, error)

// Schedule schedules execution of the given slot and puts it into the queue
Schedule(pubKey []byte, slot uint64, duty *ethpb.DutiesResponse_Duty) error

// listenToTicker and notify for all the observers when ticker triggers
listenToTicker()
}

// queue implements Queue
type queue struct {
data *cache.Cache
ticker *slotutil.SlotTicker
data *cache.Cache
ticker *slotutil.SlotTicker
tickerSubjects map[string]pubsub.Subject
}

// SlotEvent represents the notify event fire for each pubkey subject with the proper duty
type SlotEvent struct {
Slot uint64
Duty *ethpb.DutiesResponse_Duty
Ok bool
}

// New is the constructor of queue
func New(network core.Network) Queue {
genesisTime := time.Unix(int64(network.MinGenesisTime()), 0)
slotTicker := slotutil.GetSlotTicker(genesisTime, uint64(network.SlotDurationSec().Seconds()))
return &queue{
data: cache.New(time.Minute*30, time.Minute*31),
ticker: slotTicker,
queue := &queue{
data: cache.New(time.Minute*30, time.Minute*31),
ticker: slotTicker,
tickerSubjects: make(map[string]pubsub.Subject),
}
go queue.listenToTicker()
return queue
}

// Next returns the next slot with its duties at its time
func (q *queue) Next(pubKey []byte) (uint64, *ethpb.DutiesResponse_Duty, bool, error) {
func (q *queue) listenToTicker() {
for currentSlot := range q.ticker.C() {
key := q.getKey(pubKey, currentSlot)
dataRaw, ok := q.data.Get(key)
if !ok {
continue
}
for pubkey, pub := range q.tickerSubjects {
key := q.getKey(pubkey, currentSlot)
dataRaw, ok := q.data.Get(key)
if !ok {
continue
}

duty, ok := dataRaw.(*ethpb.DutiesResponse_Duty)
if !ok {
continue
}
duty, ok := dataRaw.(*ethpb.DutiesResponse_Duty)
if !ok {
continue
}

return currentSlot, duty, true, nil
pub.Notify(SlotEvent{
Slot: currentSlot,
Duty: duty,
Ok: true,
})
}
}
for _, pub := range q.tickerSubjects {
pub.Notify(SlotEvent{
Slot: 0,
Duty: nil,
Ok: false,
})
}
}

return 0, nil, false, nil
// RegisterToNext check if subject exist if not create new one. Register to the subject and return the subject channel
func (q *queue) RegisterToNext(pubKey []byte) (pubsub.SubjectChannel, error) {
if pub, ok := q.tickerSubjects[hex.EncodeToString(pubKey)]; ok {
return pub.Register(hex.EncodeToString(pubKey))
}
q.tickerSubjects[hex.EncodeToString(pubKey)] = pubsub.NewSubject()
return q.tickerSubjects[hex.EncodeToString(pubKey)].Register(hex.EncodeToString(pubKey))
}

// Schedule schedules execution of the given slot and puts it into the queue
func (q *queue) Schedule(pubKey []byte, slot uint64, duty *ethpb.DutiesResponse_Duty) error {
q.data.SetDefault(q.getKey(pubKey, slot), duty)
q.data.SetDefault(q.getKey(hex.EncodeToString(pubKey), slot), duty)
return nil
}

func (q *queue) getKey(pubKey []byte, slot uint64) string {
return fmt.Sprintf("%d_%#v", slot, pubKey)
func (q *queue) getKey(pubKey string, slot uint64) string {
return fmt.Sprintf("%d_%s", slot, pubKey)
}
1 change: 1 addition & 0 deletions validator/duty_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ func (v *Validator) ExecuteDuty(ctx context.Context, slot uint64, duty *ethpb.Du
zap.Uint64("committee_index", duty.GetCommitteeIndex()),
zap.Uint64("slot", slot))

logger.Debug("executing duty...")
roles, err := v.beacon.RolesAt(ctx, slot, duty, v.Share.PublicKey, v.Share.ShareKey)
if err != nil {
logger.Error("failed to get roles for duty", zap.Error(err))
Expand Down
23 changes: 14 additions & 9 deletions validator/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,23 @@ func (v *Validator) Start() error {
func (v *Validator) startSlotQueueListener() {
v.logger.Info("start listening slot queue")

for {
slot, duty, ok, err := v.slotQueue.Next(v.Share.PublicKey.Serialize())
if err != nil {
v.logger.Error("failed to get next slot data", zap.Error(err))
continue
}
ch, err := v.slotQueue.RegisterToNext(v.Share.PublicKey.Serialize())
if err != nil{
v.logger.Error("failed to register validator to slot queue", zap.Error(err))
return
}

if !ok {
v.logger.Debug("no duties for slot scheduled")
for e := range ch {
if event, ok := e.(slotqueue.SlotEvent); ok {
if !event.Ok {
v.logger.Debug("no duties for slot scheduled")
continue
}
go v.ExecuteDuty(v.ctx, event.Slot, event.Duty)
}else {
v.logger.Error("slot queue event is not ok")
continue
}
go v.ExecuteDuty(v.ctx, slot, duty)
}
}

Expand Down

0 comments on commit 3b82aa5

Please sign in to comment.