Skip to content

Commit

Permalink
feat: add DRY_RUN config option (#97)
Browse files Browse the repository at this point in the history
* feat: add DRY_RUN config option

Add a `DRY_RUN` configuration option that controls whether the service
executes operations or simply logs the received events.

This is a temporary feature needed to help test the recently added logic
to monitor contract events using http connections without causing any
side-effects.

* review: add "scheduler" and "nopScheduler" types

Instead of using the `dryRun` flag to control whether or not operations
are added to the (standard) scheduler, we now select the type of the
scheduler pass to the timelock worker service:

* if dryRun is false, use the standard scheduler
* if dryRun is true, use the new "nop" scheduler, which only logs the
  calls but does not do anything

In practice the "standard scheduler" is a new type + interface as well,
since the existing implementation defined a the schedule as a simple
data type which was associated with the timelock worker via implicit
composition (though all the schedule related methods were defined on the
timelock worker type).
  • Loading branch information
gustavogama-cll authored Nov 26, 2024
1 parent 1b171e4 commit 4fe06c5
Show file tree
Hide file tree
Showing 14 changed files with 472 additions and 220 deletions.
9 changes: 8 additions & 1 deletion cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func startCommand() *cobra.Command {

nodeURL, privateKey, timelockAddress, callProxyAddress string
fromBlock, pollPeriod, eventListenerPollPeriod int64
dryRun bool
)

// Initialize timelock-worker configuration.
Expand All @@ -41,6 +42,7 @@ func startCommand() *cobra.Command {
startCmd.Flags().Int64Var(&fromBlock, "from-block", timelockConf.FromBlock, "Start watching from this block")
startCmd.Flags().Int64Var(&pollPeriod, "poll-period", timelockConf.PollPeriod, "Poll period in seconds")
startCmd.Flags().Int64Var(&eventListenerPollPeriod, "event-listener-poll-period", timelockConf.EventListenerPollPeriod, "Event Listener poll period in seconds")
startCmd.Flags().BoolVar(&dryRun, "dry-run", timelockConf.DryRun, "Enable \"dry run\" mode -- monitor events but don't trigger any calls")

return &startCmd
}
Expand Down Expand Up @@ -87,8 +89,13 @@ func startTimelock(cmd *cobra.Command) {
logs.Fatal().Msgf("value of poll-period not set: %s", err.Error())
}

dryRun, err := cmd.Flags().GetBool("dry-run")
if err != nil {
logs.Fatal().Msgf("value of dry-run not set: %s", err.Error())
}

tWorker, err := timelock.NewTimelockWorker(nodeURL, timelockAddress, callProxyAddress, privateKey,
big.NewInt(fromBlock), pollPeriod, eventListenerPollPeriod, logs)
big.NewInt(fromBlock), pollPeriod, eventListenerPollPeriod, dryRun, logs)
if err != nil {
logs.Fatal().Msgf("error creating the timelock-worker: %s", err.Error())
}
Expand Down
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ go 1.22
require (
github.com/docker/go-connections v0.5.0
github.com/ethereum/go-ethereum v1.13.15
github.com/google/go-cmp v0.6.0
github.com/prometheus/client_golang v1.19.1
github.com/rs/zerolog v1.33.0
github.com/samber/lo v1.47.0
github.com/smartcontractkit/ccip-owner-contracts v0.0.0-20240917103524-56f1a8d2cd4b
github.com/smartcontractkit/chain-selectors v1.0.17
github.com/spf13/cobra v1.8.1
Expand Down Expand Up @@ -95,9 +97,9 @@ require (
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/text v0.15.0 // indirect
golang.org/x/text v0.16.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,8 @@ github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6ke
github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4=
github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE=
github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ=
github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc=
github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU=
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/gopsutil/v3 v3.23.12 h1:z90NtUkp3bMtmICZKpC4+WaknU1eXtp5vtbQ11DgpE4=
Expand Down Expand Up @@ -389,8 +391,8 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down Expand Up @@ -430,8 +432,8 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand Down
8 changes: 8 additions & 0 deletions pkg/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package cli
import (
"fmt"
"os"
"slices"
"strconv"
"strings"

"github.com/spf13/viper"
)
Expand All @@ -17,6 +19,7 @@ type Config struct {
FromBlock int64 `mapstructure:"FROM_BLOCK"`
PollPeriod int64 `mapstructure:"POLL_PERIOD"`
EventListenerPollPeriod int64 `mapstructure:"EVENT_LISTENER_POLL_PERIOD"`
DryRun bool `mapstructure:"DRY_RUN"`
}

// NewTimelockCLI return a new Timelock instance configured.
Expand Down Expand Up @@ -80,5 +83,10 @@ func NewTimelockCLI() (*Config, error) {
c.EventListenerPollPeriod = int64(pp)
}

if os.Getenv("DRY_RUN") != "" {
trueValues := []string{"true", "yes", "on", "enabled", "1"}
c.DryRun = slices.Contains(trueValues, strings.ToLower(os.Getenv("DRY_RUN")))
}

return &c, nil
}
12 changes: 9 additions & 3 deletions pkg/cli/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func Test_NewTimelockCLI(t *testing.T) {
name: "load from file",
setup: func(t *testing.T) {
unsetenvs(t, "NODE_URL", "TIMELOCK_ADDRESS", "CALL_PROXY_ADDRESS", "PRIVATE_KEY", "FROM_BLOCK",
"POLL_PERIOD", "EVENT_LISTENER_POLL_PERIOD")
"POLL_PERIOD", "EVENT_LISTENER_POLL_PERIOD", "DRY_RUN")

err := os.WriteFile(configFileName, []byte(string(
"NODE_URL=wss://goerli/test\n"+
Expand All @@ -30,7 +30,8 @@ func Test_NewTimelockCLI(t *testing.T) {
"PRIVATE_KEY=9876543210\n"+
"FROM_BLOCK=1\n"+
"POLL_PERIOD=2\n"+
"EVENT_LISTENER_POLL_PERIOD=3\n",
"EVENT_LISTENER_POLL_PERIOD=3\n"+
"DRY_RUN=true\n",
)), os.FileMode(0644))
require.NoError(t, err)

Expand All @@ -44,6 +45,7 @@ func Test_NewTimelockCLI(t *testing.T) {
FromBlock: 1,
PollPeriod: 2,
EventListenerPollPeriod: 3,
DryRun: true,
},
},
{
Expand All @@ -56,7 +58,8 @@ func Test_NewTimelockCLI(t *testing.T) {
"PRIVATE_KEY=9876543210\n"+
"FROM_BLOCK=1\n"+
"POLL_PERIOD=2\n"+
"EVENT_LISTENER_POLL_PERIOD=3\n",
"EVENT_LISTENER_POLL_PERIOD=3\n"+
"DRY_RUN=true\n",
)), os.FileMode(0644))
require.NoError(t, err)

Expand All @@ -67,6 +70,7 @@ func Test_NewTimelockCLI(t *testing.T) {
t.Setenv("FROM_BLOCK", "4")
t.Setenv("POLL_PERIOD", "5")
t.Setenv("EVENT_LISTENER_POLL_PERIOD", "6")
t.Setenv("DRY_RUN", "false")

t.Cleanup(func() { os.Remove(configFileName) })
},
Expand Down Expand Up @@ -95,6 +99,7 @@ func Test_NewTimelockCLI(t *testing.T) {
t.Setenv("FROM_BLOCK", "4")
t.Setenv("POLL_PERIOD", "5")
t.Setenv("EVENT_LISTENER_POLL_PERIOD", "6")
t.Setenv("DRY_RUN", "yes")

t.Cleanup(func() { os.Remove(configFileName) })
},
Expand All @@ -106,6 +111,7 @@ func Test_NewTimelockCLI(t *testing.T) {
FromBlock: 4,
PollPeriod: 5,
EventListenerPollPeriod: 6,
DryRun: true,
},
},
{
Expand Down
1 change: 1 addition & 0 deletions pkg/timelock/const_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ var (
testFromBlock = big.NewInt(0)
testPollPeriod = 5
testEventListenerPollPeriod = 0
testDryRun = false
testLogger = logger.Logger("info", "human")
)
8 changes: 4 additions & 4 deletions pkg/timelock/operations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

func Test_isOperation(t *testing.T) {
testWorker := newTestTimelockWorker(t, testNodeURL, testTimelockAddress, testCallProxyAddress, testPrivateKey,
testFromBlock, int64(testPollPeriod), int64(testEventListenerPollPeriod), testLogger)
testFromBlock, int64(testPollPeriod), int64(testEventListenerPollPeriod), testDryRun, testLogger)

var ctx context.Context

Expand Down Expand Up @@ -56,7 +56,7 @@ func Test_isOperation(t *testing.T) {

func Test_isReady(t *testing.T) {
testWorker := newTestTimelockWorker(t, testNodeURL, testTimelockAddress, testCallProxyAddress, testPrivateKey,
testFromBlock, int64(testPollPeriod), int64(testEventListenerPollPeriod), testLogger)
testFromBlock, int64(testPollPeriod), int64(testEventListenerPollPeriod), testDryRun, testLogger)

var ctx context.Context

Expand Down Expand Up @@ -100,7 +100,7 @@ func Test_isReady(t *testing.T) {

func Test_isDone(t *testing.T) {
testWorker := newTestTimelockWorker(t, testNodeURL, testTimelockAddress, testCallProxyAddress, testPrivateKey,
testFromBlock, int64(testPollPeriod), int64(testEventListenerPollPeriod), testLogger)
testFromBlock, int64(testPollPeriod), int64(testEventListenerPollPeriod), testDryRun, testLogger)

var ctx context.Context

Expand Down Expand Up @@ -144,7 +144,7 @@ func Test_isDone(t *testing.T) {

func Test_isPending(t *testing.T) {
testWorker := newTestTimelockWorker(t, testNodeURL, testTimelockAddress, testCallProxyAddress, testPrivateKey,
testFromBlock, int64(testPollPeriod), int64(testEventListenerPollPeriod), testLogger)
testFromBlock, int64(testPollPeriod), int64(testEventListenerPollPeriod), testDryRun, testLogger)

var ctx context.Context

Expand Down
91 changes: 68 additions & 23 deletions pkg/timelock/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,39 @@ import (

type operationKey [32]byte

type Scheduler interface {
runScheduler(ctx context.Context) <-chan struct{}
addToScheduler(op *contract.TimelockCallScheduled)
delFromScheduler(op operationKey)
dumpOperationStore(now func() time.Time)
}

type executeFn func(context.Context, []*contract.TimelockCallScheduled)

// Scheduler represents a scheduler with an in memory store.
// Whenever accesing the map the mutex should be Locked, to prevent
// any race condition.
type scheduler struct {
mu sync.Mutex
ticker *time.Ticker
add chan *contract.TimelockCallScheduled
del chan operationKey
store map[operationKey][]*contract.TimelockCallScheduled
busy bool
mu sync.Mutex
ticker *time.Ticker
add chan *contract.TimelockCallScheduled
del chan operationKey
store map[operationKey][]*contract.TimelockCallScheduled
busy bool
logger *zerolog.Logger
executeFn executeFn
}

// newScheduler returns a new initialized scheduler.
func newScheduler(tick time.Duration) *scheduler {
func newScheduler(tick time.Duration, logger *zerolog.Logger, executeFn executeFn) *scheduler {
s := &scheduler{
ticker: time.NewTicker(tick),
add: make(chan *contract.TimelockCallScheduled),
del: make(chan operationKey),
store: make(map[operationKey][]*contract.TimelockCallScheduled),
busy: false,
ticker: time.NewTicker(tick),
add: make(chan *contract.TimelockCallScheduled),
del: make(chan operationKey),
store: make(map[operationKey][]*contract.TimelockCallScheduled),
busy: false,
logger: logger,
executeFn: executeFn,
}

return s
Expand All @@ -50,7 +63,7 @@ func newScheduler(tick time.Duration) *scheduler {
// call them this way so no process is allowd to add/delete from
// the store, which could cause race conditions like adding/deleting
// while the operation is being executed.
func (tw *Worker) runScheduler(ctx context.Context) <-chan struct{} {
func (tw *scheduler) runScheduler(ctx context.Context) <-chan struct{} {
done := make(chan struct{})

go func() {
Expand All @@ -67,7 +80,7 @@ func (tw *Worker) runScheduler(ctx context.Context) <-chan struct{} {
tw.logger.Debug().Msgf("new scheduler tick: operations in store")
tw.setSchedulerBusy()
for _, op := range tw.store {
tw.execute(ctx, op)
tw.executeFn(ctx, op)
}
tw.setSchedulerFree()
} else {
Expand Down Expand Up @@ -102,7 +115,7 @@ func (tw *Worker) runScheduler(ctx context.Context) <-chan struct{} {
}

// updateSchedulerDelay updates the internal ticker delay, so it can be reconfigured while running.
func (tw *Worker) updateSchedulerDelay(t time.Duration) {
func (tw *scheduler) updateSchedulerDelay(t time.Duration) {
if t <= 0 {
tw.logger.Debug().Msgf("internal min delay not changed, invalid duration: %v", t.String())
return
Expand All @@ -113,40 +126,38 @@ func (tw *Worker) updateSchedulerDelay(t time.Duration) {
}

// addToScheduler adds a new CallSchedule operation safely to the store.
func (tw *Worker) addToScheduler(op *contract.TimelockCallScheduled) {
func (tw *scheduler) addToScheduler(op *contract.TimelockCallScheduled) {
tw.logger.Debug().Msgf("scheduling operation: %x", op.Id)
tw.add <- op
tw.logger.Debug().Msgf("operations in scheduler: %v", len(tw.store))
}

// delFromScheduler deletes an operation safely from the store.
func (tw *Worker) delFromScheduler(op operationKey) {
func (tw *scheduler) delFromScheduler(op operationKey) {
tw.logger.Debug().Msgf("de-scheduling operation: %v", op)
tw.del <- op
tw.logger.Debug().Msgf("operations in scheduler: %v", len(tw.store))
}

func (tw *Worker) setSchedulerBusy() {
func (tw *scheduler) setSchedulerBusy() {
tw.logger.Debug().Msgf("setting scheduler busy")
tw.mu.Lock()
tw.busy = true
tw.mu.Unlock()
}

func (tw *Worker) setSchedulerFree() {
func (tw *scheduler) setSchedulerFree() {
tw.logger.Debug().Msgf("setting scheduler free")
tw.mu.Lock()
tw.busy = false
tw.mu.Unlock()
}

func (tw *Worker) isSchedulerBusy() bool {
func (tw *scheduler) isSchedulerBusy() bool {
return tw.busy
}

// dumpOperationStore dumps to the logger and to the log file the current scheduled unexecuted operations.
// maps in go don't guarantee order, so that's why we have to find the earliest block.
func (tw *Worker) dumpOperationStore(now func() time.Time) {
func (tw *scheduler) dumpOperationStore(now func() time.Time) {
if len(tw.store) <= 0 {
tw.logger.Info().Msgf("no operations to dump")
return
Expand Down Expand Up @@ -249,3 +260,37 @@ func toEarliestRecord(op *contract.TimelockCallScheduled) string {
func toSubsequentRecord(op *contract.TimelockCallScheduled) string {
return fmt.Sprintf("CallSchedule pending ID: %x\tBlock Number: %v\n", op.Id, op.Raw.BlockNumber)
}

// ----- nop scheduler -----
// nopScheduler implements the Scheduler interface but doesn't not effectively trigger any operations.
type nopScheduler struct {
logger *zerolog.Logger
}

func newNopScheduler(logger *zerolog.Logger) *nopScheduler {
return &nopScheduler{logger: logger}
}

func (s *nopScheduler) runScheduler(ctx context.Context) <-chan struct{} {
s.logger.Info().Msg("nop.runScheduler")
ch := make(chan struct{})

go func() {
<-ctx.Done()
close(ch)
}()

return ch
}

func (s *nopScheduler) addToScheduler(op *contract.TimelockCallScheduled) {
s.logger.Info().Any("op", op).Msg("nop.addToScheduler")
}

func (s *nopScheduler) delFromScheduler(key operationKey) {
s.logger.Info().Any("key", key).Msg("nop.delFromScheduler")
}

func (s *nopScheduler) dumpOperationStore(now func() time.Time) {
s.logger.Info().Msg("nop.dumpOperationStore")
}
Loading

0 comments on commit 4fe06c5

Please sign in to comment.