Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pkg): scheduler package #3319

Merged
merged 12 commits into from
Dec 20, 2024
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

* [3170](https://github.com/zeta-chain/node/pull/3170) - revamp TSS package in zetaclient
* [3291](https://github.com/zeta-chain/node/pull/3291) - revamp zetaclient initialization (+ graceful shutdown)
* [3319](https://github.com/zeta-chain/node/pull/3319) - implement scheduler for zetaclient

### Fixes

Expand Down
117 changes: 117 additions & 0 deletions pkg/scheduler/chan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package scheduler

import (
"context"
"fmt"
"sync/atomic"

cometbft "github.com/cometbft/cometbft/types"
"github.com/rs/zerolog"
)

// blockTicker represents custom ticker implementation
// that ticks on new Zeta block events.
type blockTicker struct {
task Task

// block channel that will be used to receive new blocks
blockChan <-chan cometbft.EventDataNewBlock

// stopChan is used to stop the ticker
stopChan chan struct{}

// doneChan is used to signal that the ticker has stopped (i.e. "blocking stop")
doneChan chan struct{}

// atomic flag. `1` for RUNNING, `0` for STOPPED
status int32
swift1337 marked this conversation as resolved.
Show resolved Hide resolved

logger zerolog.Logger
}

type blockCtxKey struct{}

func newBlockTicker(task Task, blockChan <-chan cometbft.EventDataNewBlock, logger zerolog.Logger) *blockTicker {
return &blockTicker{
task: task,
blockChan: blockChan,
stopChan: make(chan struct{}),
doneChan: nil,
logger: logger,
}
}

func withBlockEvent(ctx context.Context, event cometbft.EventDataNewBlock) context.Context {
return context.WithValue(ctx, blockCtxKey{}, event)
}

// BlockFromContext returns cometbft.EventDataNewBlock from the context or false.
func BlockFromContext(ctx context.Context) (cometbft.EventDataNewBlock, bool) {
blockEvent, ok := ctx.Value(blockCtxKey{}).(cometbft.EventDataNewBlock)
return blockEvent, ok
}

func (t *blockTicker) Start(ctx context.Context) error {
if !t.setRunning(true) {
return fmt.Errorf("ticker already started")
}

Check warning on line 57 in pkg/scheduler/chan.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/chan.go#L56-L57

Added lines #L56 - L57 were not covered by tests

t.doneChan = make(chan struct{})
defer func() {
close(t.doneChan)

// closes stopChan if it's not closed yet
if t.setRunning(false) {
close(t.stopChan)
}
}()

for {
select {
case block, ok := <-t.blockChan:
// channel closed
if !ok {
t.logger.Warn().Msg("Block channel closed")
return nil
}

ctx := withBlockEvent(ctx, block)

if err := t.task(ctx); err != nil {
t.logger.Warn().Err(err).Msg("Task error")
}
case <-ctx.Done():
t.logger.Warn().Err(ctx.Err()).Msg("Content error")
return nil

Check warning on line 85 in pkg/scheduler/chan.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/chan.go#L81-L85

Added lines #L81 - L85 were not covered by tests
case <-t.stopChan:
// caller invoked t.stop()
return nil
}
}
}

func (t *blockTicker) Stop() {
// noop
if !t.getRunning() {
return
}

// notify async loop to stop
close(t.stopChan)

// wait for the loop to stop
<-t.doneChan
t.setRunning(false)
}

func (t *blockTicker) getRunning() bool {
return atomic.LoadInt32(&t.status) == 1
}

func (t *blockTicker) setRunning(running bool) (changed bool) {
if running {
return atomic.CompareAndSwapInt32(&t.status, 0, 1)
}

return atomic.CompareAndSwapInt32(&t.status, 1, 0)
}
46 changes: 46 additions & 0 deletions pkg/scheduler/opts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package scheduler

import (
"time"

cometbft "github.com/cometbft/cometbft/types"
)

// Opt Definition option
type Opt func(*Definition)

// Name sets task name.
func Name(name string) Opt {
return func(d *Definition) { d.name = name }
}

// GroupName sets task group. Otherwise, defaults to DefaultGroup.
func GroupName(group Group) Opt {
return func(d *Definition) { d.group = group }
}

// LogFields augments definition logger with some fields.
func LogFields(fields map[string]any) Opt {
return func(d *Definition) { d.logFields = fields }
}

// Interval sets initial task interval.
func Interval(interval time.Duration) Opt {
return func(d *Definition) { d.interval = interval }
}

// Skipper sets task skipper function
func Skipper(skipper func() bool) Opt {
return func(d *Definition) { d.skipper = skipper }
}

// IntervalUpdater sets interval updater function.
func IntervalUpdater(intervalUpdater func() time.Duration) Opt {
return func(d *Definition) { d.intervalUpdater = intervalUpdater }
}

// BlockTicker makes Definition to listen for new zeta blocks instead of using interval ticker.
// IntervalUpdater is ignored.
func BlockTicker(blocks <-chan cometbft.EventDataNewBlock) Opt {
return func(d *Definition) { d.blockChan = blocks }
}
swift1337 marked this conversation as resolved.
Show resolved Hide resolved
Loading
Loading