From 05ee9051af9e92307d2c01932ced619608353fc2 Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Thu, 14 Dec 2023 12:43:56 +0200 Subject: [PATCH] Update to xk6-timers v0.2.2 - fixing ordering of timers --- go.mod | 3 +- go.sum | 5 +- .../grafana/xk6-timers/timers/timers.go | 331 +++++++++++++----- vendor/modules.txt | 6 +- 4 files changed, 262 insertions(+), 83 deletions(-) diff --git a/go.mod b/go.mod index e23f41411a6..043a80c3829 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/grafana/xk6-browser v1.2.1 github.com/grafana/xk6-output-prometheus-remote v0.3.1 github.com/grafana/xk6-redis v0.2.0 - github.com/grafana/xk6-timers v0.1.2 + github.com/grafana/xk6-timers v0.2.2 github.com/grafana/xk6-webcrypto v0.1.0 github.com/grafana/xk6-websockets v0.2.1 github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 @@ -93,4 +93,5 @@ require ( golang.org/x/text v0.14.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20231002182017-d307bd883b97 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231002182017-d307bd883b97 // indirect + gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect ) diff --git a/go.sum b/go.sum index 352c179cab6..a32e6bcab17 100644 --- a/go.sum +++ b/go.sum @@ -99,8 +99,8 @@ github.com/grafana/xk6-output-prometheus-remote v0.3.1 h1:X23rQzlJD8dXWB31DkxR4u github.com/grafana/xk6-output-prometheus-remote v0.3.1/go.mod h1:0JLAm4ONsNUlNoxJXAwOCfA6GtDwTPs557OplAvE+3o= github.com/grafana/xk6-redis v0.2.0 h1:iXmAKVlAxafZ/h8ptuXTFhGu63IFsyDI8QjUgWm66BU= github.com/grafana/xk6-redis v0.2.0/go.mod h1:B3PA9PAPJa2/WUfNJCdQwZrbb6D4e6UHIk8dssQbj7w= -github.com/grafana/xk6-timers v0.1.2 h1:YVM6hPDgvy4SkdZQpd+/r9M0kDi1g+QdbSxW5ClfwDk= -github.com/grafana/xk6-timers v0.1.2/go.mod h1:XHmDIXAKe30NJMXrxKIKMFXx98etsCl0jBYktjsSURc= +github.com/grafana/xk6-timers v0.2.2 h1:5Bhf/RYZDI27uTaUfmE56iyB8U7gAu1VEfrFq8ZU42k= +github.com/grafana/xk6-timers v0.2.2/go.mod h1:QmMBB7G5MWoQQPU+tA/KTlryHQXqr/OoGx1RK6KOkn4= github.com/grafana/xk6-webcrypto v0.1.0 h1:StrQZkUi4vo3bAMmBUHvIQ8P+zNKCH3AwN22TZdDwHs= github.com/grafana/xk6-webcrypto v0.1.0/go.mod h1:JKxlKj03+zI6Bf/PUuXxrx4lJraBZx9UOrX4mtqB5+E= github.com/grafana/xk6-websockets v0.2.1 h1:99tuI5g9UPTCpGbiEo/9E7VFKQIOvTLq231qoMVef5c= @@ -340,6 +340,7 @@ gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/guregu/null.v3 v3.3.0 h1:8j3ggqq+NgKt/O7mbFVUFKUMWN+l1AmT5jQmJ6nPh2c= gopkg.in/guregu/null.v3 v3.3.0/go.mod h1:E4tX2Qe3h7QdL+uZ3a0vqvYwKQsRSQKM5V4YltdgH9Y= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= diff --git a/vendor/github.com/grafana/xk6-timers/timers/timers.go b/vendor/github.com/grafana/xk6-timers/timers/timers.go index d6b594ccac3..971024a9537 100644 --- a/vendor/github.com/grafana/xk6-timers/timers/timers.go +++ b/vendor/github.com/grafana/xk6-timers/timers/timers.go @@ -1,13 +1,12 @@ -// Package timers is implementing setInterval setTimeout and co. Not to be used mostly for testing purposes +// Package timers is implementing setInterval setTimeout and co. package timers import ( - "sync" - "sync/atomic" "time" - "github.com/dop251/goja" "github.com/mstoykov/k6-taskqueue-lib/taskqueue" + + "github.com/dop251/goja" "go.k6.io/k6/js/modules" ) @@ -19,9 +18,18 @@ type RootModule struct{} type Timers struct { vu modules.VU - timerStopCounter uint32 - timerStopsLock sync.Mutex - timerStops map[uint32]chan struct{} + timerIDCounter uint64 + + timers map[uint64]time.Time + // Maybe in the future if this moves to core it will be expanded to have multiple queues + queue *timerQueue + + // this used predominantly to get around very unlikely race conditions as we are adding stuff to the event loop + // from outside of it on multitple timers. And it is easier to just use this then redo half the work it does + // to make that safe + taskQueue *taskqueue.TaskQueue + // used to synchronize around context closing + taskQueueCh chan struct{} } var ( @@ -29,6 +37,11 @@ var ( _ modules.Instance = &Timers{} ) +const ( + setTimeoutName = "setTimeout" + setIntervalName = "setInterval" +) + // New returns a pointer to a new RootModule instance. func New() *RootModule { return &RootModule{} @@ -38,8 +51,9 @@ func New() *RootModule { // a new instance for each VU. func (*RootModule) NewModuleInstance(vu modules.VU) modules.Instance { return &Timers{ - vu: vu, - timerStops: make(map[uint32]chan struct{}), + vu: vu, + timers: make(map[uint64]time.Time), + queue: new(timerQueue), } } @@ -55,100 +69,261 @@ func (e *Timers) Exports() modules.Exports { } } -func noop() error { return nil } +func (e *Timers) nextID() uint64 { + e.timerIDCounter++ + return e.timerIDCounter +} -func (e *Timers) getTimerStopCh() (uint32, chan struct{}) { - id := atomic.AddUint32(&e.timerStopCounter, 1) - ch := make(chan struct{}) - e.timerStopsLock.Lock() - e.timerStops[id] = ch - e.timerStopsLock.Unlock() - return id, ch +func (e *Timers) call(callback goja.Callable, args []goja.Value) error { + // TODO: investigate, not sure GlobalObject() is always the correct value for `this`? + _, err := callback(e.vu.Runtime().GlobalObject(), args...) + return err +} + +func (e *Timers) setTimeout(callback goja.Callable, delay float64, args ...goja.Value) uint64 { + id := e.nextID() + e.timerInitialization(callback, delay, args, false, id) + return id } -func (e *Timers) stopTimerCh(id uint32) bool { //nolint:unparam - e.timerStopsLock.Lock() - defer e.timerStopsLock.Unlock() - ch, ok := e.timerStops[id] - if !ok { - return false +func (e *Timers) clearTimeout(id uint64) { + _, exists := e.timers[id] + if !exists { + return } - delete(e.timerStops, id) - close(ch) - return true + delete(e.timers, id) + + e.queue.remove(id) + e.freeEventLoopIfPossible() } -func (e *Timers) call(callback goja.Callable, args []goja.Value) error { - // TODO: investigate, not sure GlobalObject() is always the correct value for `this`? - _, err := callback(e.vu.Runtime().GlobalObject(), args...) +func (e *Timers) freeEventLoopIfPossible() { + if e.queue.length() == 0 && e.taskQueue != nil { + e.closeTaskQueue() + } +} + +func (e *Timers) setInterval(callback goja.Callable, delay float64, args ...goja.Value) uint64 { + id := e.nextID() + e.timerInitialization(callback, delay, args, true, id) + return id +} + +func (e *Timers) clearInterval(id uint64) { + e.clearTimeout(id) +} + +// https://html.spec.whatwg.org/multipage/timers-and-user-prompts.html#timer-initialisation-steps +// NOTE: previousId from the specification is always send and it is basically id +func (e *Timers) timerInitialization( + callback goja.Callable, timeout float64, args []goja.Value, repeat bool, id uint64, +) { + // skip all the nesting stuff as we do not care about them + if timeout < 0 { + timeout = 0 + } + + task := func() error { + // Specification 8.1: If id does not exist in global's map of active timers, then abort these steps. + if _, exist := e.timers[id]; !exist { + return nil + } + + err := e.call(callback, args) + + if _, exist := e.timers[id]; !exist { // 8.4 + return err + } + + if repeat { + e.timerInitialization(callback, timeout, args, repeat, id) + } else { + delete(e.timers, id) + } + + return err + } + + name := setTimeoutName + if repeat { + name = setIntervalName + } + + e.runAfterTimeout(&timer{ + id: id, + task: task, + nextTrigger: time.Now().Add(time.Duration(timeout * float64(time.Millisecond))), + name: name, + }) +} + +// https://html.spec.whatwg.org/multipage/timers-and-user-prompts.html#run-steps-after-a-timeout +// Notes: this just takes timers as makes the implementation way easier and we do not currently need +// most of the functionality provided +func (e *Timers) runAfterTimeout(t *timer) { + e.timers[t.id] = t.nextTrigger + + // as we have only one orderingId we have one queue + index := e.queue.add(t) + + if index != 0 { + return // not a timer at the very beginning + } + + e.setupTaskTimeout() +} + +func (e *Timers) runFirstTask() error { + t := e.queue.pop() + if t == nil { + return nil // everything was cleared + } + + err := t.task() + + if e.queue.length() > 0 { + e.setupTaskTimeout() + } else { + e.freeEventLoopIfPossible() + } + return err } -func (e *Timers) setTimeout(callback goja.Callable, delay float64, args ...goja.Value) uint32 { - runOnLoop := e.vu.RegisterCallback() - id, stopCh := e.getTimerStopCh() +func (e *Timers) setupTaskTimeout() { + e.queue.stopTimer() + delay := -time.Since(e.timers[e.queue.first().id]) + if e.taskQueue == nil { + e.taskQueue = taskqueue.New(e.vu.RegisterCallback) + e.setupTaskQueueCloserOnIterationEnd() + } + q := e.taskQueue + e.queue.head = time.AfterFunc(delay, func() { + q.Queue(e.runFirstTask) + }) +} - if delay < 0 { - delay = 0 +func (e *Timers) closeTaskQueue() { + // this only runs on the event loop + if e.taskQueueCh == nil { + return } + ch := e.taskQueueCh + // so that we do not execute it twice + e.taskQueueCh = nil + + // wait for this to happen so we don't need to hit the event loop again + // instead this just closes the queue + ch <- struct{}{} + <-ch +} +func (e *Timers) setupTaskQueueCloserOnIterationEnd() { + ctx := e.vu.Context() + q := e.taskQueue + ch := make(chan struct{}) + e.taskQueueCh = ch go func() { - timer := time.NewTimer(time.Duration(delay * float64(time.Millisecond))) - defer func() { - timer.Stop() - e.stopTimerCh(id) - }() + select { // wait for one of the two + case <-ctx.Done(): + // lets report timers won't be executed and clean the fields for the next execution + // we need to do this on the event loop as we don't want to have a race + q.Queue(func() error { + logger := e.vu.State().Logger + for _, timer := range e.queue.queue { + logger.Warnf("%s %d was stopped because the VU iteration was interrupted", timer.name, timer.id) + } - select { - case <-timer.C: - runOnLoop(func() error { - return e.call(callback, args) + // TODO: use `clear` when we only support go 1.21 and above + e.timers = make(map[uint64]time.Time) + e.queue = new(timerQueue) + e.taskQueue = nil + return nil }) - case <-stopCh: - runOnLoop(noop) - case <-e.vu.Context().Done(): - e.vu.State().Logger.Warnf("setTimeout %d was stopped because the VU iteration was interrupted", id) - runOnLoop(noop) + case <-ch: + e.taskQueue = nil + close(ch) } + e.queue.stopTimer() + q.Close() }() +} - return id +// this is just a small struct to keep the internals of a timer +type timer struct { + id uint64 + nextTrigger time.Time + task func() error + name string +} + +// this is just a list of timers that should be ordered once after the other +// this mostly just has methods to work on the slice +type timerQueue struct { + queue []*timer + head *time.Timer } -func (e *Timers) clearTimeout(id uint32) { - e.stopTimerCh(id) +func (tq *timerQueue) add(t *timer) int { + var i int + // don't use range as we want to index to go over one if it needs to go to the end + for ; i < len(tq.queue); i++ { + if tq.queue[i].nextTrigger.After(t.nextTrigger) { + break + } + } + + tq.queue = append(tq.queue, nil) + copy(tq.queue[i+1:], tq.queue[i:]) + tq.queue[i] = t + return i } -func (e *Timers) setInterval(callback goja.Callable, delay float64, args ...goja.Value) uint32 { - tq := taskqueue.New(e.vu.RegisterCallback) - id, stopCh := e.getTimerStopCh() +func (tq *timerQueue) stopTimer() { + if tq.head != nil && tq.head.Stop() { // we have a timer and we stopped it before it was over. + select { + case <-tq.head.C: + default: + } + } +} - go func() { - ticker := time.NewTicker(time.Duration(delay * float64(time.Millisecond))) - defer func() { - e.stopTimerCh(id) - ticker.Stop() - }() - - for { - defer tq.Close() - select { - case <-ticker.C: - tq.Queue(func() error { - return e.call(callback, args) - }) - case <-stopCh: - return - case <-e.vu.Context().Done(): - e.vu.State().Logger.Warnf("setInterval %d was stopped because the VU iteration was interrupted", id) - return - } +func (tq *timerQueue) remove(id uint64) { + i := tq.findIndex(id) + if i == -1 { + return + } + + tq.queue = append(tq.queue[:i], tq.queue[i+1:]...) +} + +func (tq *timerQueue) findIndex(id uint64) int { + for i, timer := range tq.queue { + if id == timer.id { + return i } - }() + } + return -1 +} - return id +func (tq *timerQueue) pop() *timer { + length := len(tq.queue) + if length == 0 { + return nil + } + t := tq.queue[0] + copy(tq.queue, tq.queue[1:]) + tq.queue = tq.queue[:length-1] + return t } -func (e *Timers) clearInterval(id uint32) { - e.stopTimerCh(id) +func (tq *timerQueue) length() int { + return len(tq.queue) +} + +func (tq *timerQueue) first() *timer { + if tq.length() == 0 { + return nil + } + return tq.queue[0] } diff --git a/vendor/modules.txt b/vendor/modules.txt index 4efeeed3e6a..317e27d0fa7 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -169,8 +169,8 @@ github.com/grafana/xk6-output-prometheus-remote/pkg/stale # github.com/grafana/xk6-redis v0.2.0 ## explicit; go 1.19 github.com/grafana/xk6-redis/redis -# github.com/grafana/xk6-timers v0.1.2 -## explicit; go 1.17 +# github.com/grafana/xk6-timers v0.2.2 +## explicit; go 1.20 github.com/grafana/xk6-timers/timers # github.com/grafana/xk6-webcrypto v0.1.0 ## explicit; go 1.19 @@ -534,6 +534,8 @@ google.golang.org/protobuf/types/pluginpb # gopkg.in/guregu/null.v3 v3.3.0 ## explicit gopkg.in/guregu/null.v3 +# gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 +## explicit # gopkg.in/yaml.v3 v3.0.1 ## explicit gopkg.in/yaml.v3