Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pkg): scheduler package #3319

Merged
merged 12 commits into from
Dec 20, 2024
117 changes: 117 additions & 0 deletions pkg/scheduler/chan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package scheduler

import (
"context"
"fmt"
"sync/atomic"

cometbft "github.com/cometbft/cometbft/types"
"github.com/rs/zerolog"
)

// blockTicker represents custom ticker implementation
// that ticks on new Zeta block events.
type blockTicker struct {
task Task

// block channel that will be used to receive new blocks
blockChan <-chan cometbft.EventDataNewBlock

// stopChan is used to stop the ticker
stopChan chan struct{}

// doneChan is used to signal that the ticker has stopped (i.e. "blocking stop")
doneChan chan struct{}

// atomic flag. `1` for RUNNING, `0` for STOPPED
status int32
swift1337 marked this conversation as resolved.
Show resolved Hide resolved

logger zerolog.Logger
}

type blockCtxKey struct{}

func newBlockTicker(task Task, blockChan <-chan cometbft.EventDataNewBlock, logger zerolog.Logger) *blockTicker {
return &blockTicker{
task: task,
blockChan: blockChan,
stopChan: make(chan struct{}),
doneChan: nil,
logger: logger,
}
}

func withBlockEvent(ctx context.Context, event cometbft.EventDataNewBlock) context.Context {
return context.WithValue(ctx, blockCtxKey{}, event)
}

// BlockFromContext returns cometbft.EventDataNewBlock from the context or false.
func BlockFromContext(ctx context.Context) (cometbft.EventDataNewBlock, bool) {
blockEvent, ok := ctx.Value(blockCtxKey{}).(cometbft.EventDataNewBlock)
return blockEvent, ok
}

func (t *blockTicker) Start(ctx context.Context) error {
if !t.setRunning(true) {
return fmt.Errorf("ticker already started")
}

t.doneChan = make(chan struct{})
defer func() {
close(t.doneChan)

// closes stopChan if it's not closed yet
if t.setRunning(false) {
close(t.stopChan)
}
}()

for {
select {
case block, ok := <-t.blockChan:
// channel closed
if !ok {
t.logger.Warn().Msg("Block channel closed")
return nil
}

ctx := withBlockEvent(ctx, block)

if err := t.task(ctx); err != nil {
t.logger.Warn().Err(err).Msg("Task error")
}
case <-ctx.Done():
t.logger.Warn().Err(ctx.Err()).Msg("Content error")
return nil
case <-t.stopChan:
// caller invoked t.stop()
return nil
}
}
}

func (t *blockTicker) Stop() {
// noop
if !t.getRunning() {
return
}

// notify async loop to stop
close(t.stopChan)

// wait for the loop to stop
<-t.doneChan
t.setRunning(false)
}

func (t *blockTicker) getRunning() bool {
return atomic.LoadInt32(&t.status) == 1
}

func (t *blockTicker) setRunning(running bool) (changed bool) {
if running {
return atomic.CompareAndSwapInt32(&t.status, 0, 1)
}

return atomic.CompareAndSwapInt32(&t.status, 1, 0)
}
46 changes: 46 additions & 0 deletions pkg/scheduler/opts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package scheduler

import (
"time"

cometbft "github.com/cometbft/cometbft/types"
)

// Opt Definition option
type Opt func(*Definition)

// Name sets task name.
func Name(name string) Opt {
return func(d *Definition) { d.name = name }
}

// GroupName sets task group. Otherwise, defaults to DefaultGroup.
func GroupName(group Group) Opt {
return func(d *Definition) { d.group = group }
}

// LogFields augments definition logger with some fields.
func LogFields(fields map[string]any) Opt {
return func(d *Definition) { d.logFields = fields }
}

// Interval sets initial task interval.
func Interval(interval time.Duration) Opt {
return func(d *Definition) { d.interval = interval }
}

// Skipper sets task skipper function
func Skipper(skipper func() bool) Opt {
return func(d *Definition) { d.skipper = skipper }
}

// IntervalUpdater sets interval updater function.
func IntervalUpdater(intervalUpdater func() time.Duration) Opt {
return func(d *Definition) { d.intervalUpdater = intervalUpdater }
}

// BlockTicker makes Definition to listen for new zeta blocks instead of using interval ticker.
// IntervalUpdater is ignored.
func BlockTicker(blocks <-chan cometbft.EventDataNewBlock) Opt {
return func(d *Definition) { d.blockChan = blocks }
}
swift1337 marked this conversation as resolved.
Show resolved Hide resolved
234 changes: 234 additions & 0 deletions pkg/scheduler/scheduler.go
swift1337 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
// Package scheduler provides a background task scheduler that allows for the registration,
// execution, and management of periodic tasks. Tasks can be grouped, named, and configured
// with various options such as custom intervals, log fields, and skip conditions.
//
// The scheduler supports dynamic interval updates and can gracefully stop tasks either
// individually or by group.
package scheduler

import (
"context"
"sync"
"time"

cometbft "github.com/cometbft/cometbft/types"
"github.com/google/uuid"
"github.com/rs/zerolog"

"github.com/zeta-chain/node/pkg/bg"
"github.com/zeta-chain/node/pkg/ticker"
)

// Scheduler represents background task scheduler.
type Scheduler struct {
definitions map[uuid.UUID]*Definition
mu sync.RWMutex
logger zerolog.Logger
}

// Task represents scheduler's task
swift1337 marked this conversation as resolved.
Show resolved Hide resolved
type Task func(ctx context.Context) error

// Group represents Definition group.
// Definitions can be grouped for easier management.
type Group string

// DefaultGroup is the default group for definitions.
const DefaultGroup = Group("default")

// Definition represents a configuration of a Task
type Definition struct {
// ref to the Scheduler is required
scheduler *Scheduler

// naming stuff
id uuid.UUID
group Group
name string

// arbitrary function that will be invoked by the scheduler
task Task

// represents interval ticker and its options
ticker *ticker.Ticker
interval time.Duration
intervalUpdater func() time.Duration
skipper func() bool

// zeta block ticker (also supports skipper)
blockChan <-chan cometbft.EventDataNewBlock
blockChanTicker *blockTicker

// logging
logFields map[string]any
logger zerolog.Logger
}

// New Scheduler instance.
func New(logger zerolog.Logger) *Scheduler {
return &Scheduler{
definitions: make(map[uuid.UUID]*Definition),
logger: logger.With().Str("module", "scheduler").Logger(),
}
}

// Register registers and starts new task in the background
swift1337 marked this conversation as resolved.
Show resolved Hide resolved
func (s *Scheduler) Register(ctx context.Context, task Task, opts ...Opt) *Definition {
id := uuid.New()
def := &Definition{
scheduler: s,
id: id,
group: DefaultGroup,
name: id.String(),
task: task,
interval: time.Second,
}
for _, opt := range opts {
opt(def)
}

def.logger = newDefinitionLogger(def, s.logger)

def.startTicker(ctx)

s.mu.Lock()
s.definitions[id] = def
s.mu.Unlock()

return def
}

// Stop stops all tasks.
func (s *Scheduler) Stop() {
s.StopGroup("")
}

// StopGroup stops all tasks in the group.
func (s *Scheduler) StopGroup(group Group) {
var selectedDefs []*Definition

s.mu.RLock()

// Filter desired definitions
for _, def := range s.definitions {
// "" is for wildcard i.e. all groups
if group == "" || def.group == group {
selectedDefs = append(selectedDefs, def)
}
}

s.mu.RUnlock()

if len(selectedDefs) == 0 {
return
}

// Stop all selected tasks concurrently
var wg sync.WaitGroup
wg.Add(len(selectedDefs))

for _, def := range selectedDefs {
go func(def *Definition) {
defer wg.Done()
def.Stop()
}(def)
}

wg.Wait()
}

// Stop stops the task and offloads it from the scheduler.
func (d *Definition) Stop() {
start := time.Now()

// delete definition from scheduler
defer func() {
d.scheduler.mu.Lock()
delete(d.scheduler.definitions, d.id)
d.scheduler.mu.Unlock()

timeTakenMS := time.Since(start).Milliseconds()
d.logger.Info().Int64("time_taken_ms", timeTakenMS).Msg("Stopped scheduler task")
}()

d.logger.Info().Msg("Stopping scheduler task")

if d.isIntervalTicker() {
d.ticker.StopBlocking()
return
}

d.blockChanTicker.Stop()
}

func (d *Definition) isIntervalTicker() bool {
return d.blockChan == nil
}

func (d *Definition) startTicker(ctx context.Context) {
d.logger.Info().Msg("Starting scheduler task")

if d.isIntervalTicker() {
d.ticker = ticker.New(d.interval, d.invokeByInterval, ticker.WithLogger(d.logger, d.name))
bg.Work(ctx, d.ticker.Start, bg.WithLogger(d.logger))

return
}

d.blockChanTicker = newBlockTicker(d.invoke, d.blockChan, d.logger)

bg.Work(ctx, d.blockChanTicker.Start, bg.WithLogger(d.logger))
}

// invokeByInterval a ticker.Task wrapper of invoke.
func (d *Definition) invokeByInterval(ctx context.Context, t *ticker.Ticker) error {
if err := d.invoke(ctx); err != nil {
d.logger.Error().Err(err).Msg("task failed")
}

if d.intervalUpdater != nil {
// noop if interval is not changed
t.SetInterval(d.intervalUpdater())
}

return nil
}

// invoke executes a given Task with logging & telemetry.
func (d *Definition) invoke(ctx context.Context) error {
// skip tick
if d.skipper != nil && d.skipper() {
return nil
}

d.logger.Debug().Msg("Invoking task")

err := d.task(ctx)

// todo metrics (TBD)
swift1337 marked this conversation as resolved.
Show resolved Hide resolved
// - duration (time taken)
// - outcome (skip, err, ok)
// - bump invocation counter
// - "last invoked at" timestamp (?)
// - chain_id
// - metrics cardinality: "task_group (?)" "task_name", "status", "chain_id"

return err
}

func newDefinitionLogger(def *Definition, logger zerolog.Logger) zerolog.Logger {
logOpts := logger.With().
Str("task.name", def.name).
Str("task.group", string(def.group))

if len(def.logFields) > 0 {
logOpts = logOpts.Fields(def.logFields)
}

taskType := "interval_ticker"
if def.blockChanTicker != nil {
taskType = "block_ticker"
}

return logOpts.Str("task.type", taskType).Logger()
}
Loading
Loading