Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
Initial commit
arcward authored Feb 26, 2024
1 parent 2a53399 commit 8c0dcf7
Showing 10 changed files with 3,593 additions and 0 deletions.
105 changes: 105 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# crong

**Cron**g: Lightweight, straightforward cron expression parser, ticker
and task scheduler for your Golang projects.

## Usage

Create a schedule from a cron expression, calculate future/past schedules:

```go
package main

import (
"fmt"
"log"
"time"
"github.com/arcward/crong"
)

func main() {
schedule, err := crong.New("0 0 * * *", time.UTC)
if err != nil {
log.Fatal(err)
}

// Get the next (or most recent) scheduled time relative to the given time
fmt.Println("Next scheduled time:", schedule.Next(time.Now()))
fmt.Println("Previous scheduled time:", schedule.Prev(time.Now()))

// Check if the given time satisfies the schedule
if schedule.Matches(time.Now()) {
fmt.Println("It's time!")
}
}
```

Create a ticker that sends a tick on a channel whenever the cron
schedule fires, similar to `time.Ticker`:

```go
package main

import (
"context"
"log"
"time"
"github.com/arcward/crong"
)

func main() {
schedule, err := crong.New("@hourly", time.UTC)
if err != nil {
log.Fatal(err)
}

ticker := crong.NewTicker(context.Background(), schedule, 1 * time.Minute)
defer ticker.Stop()

select {
case t := <-ticker.C:
log.Printf("%s: Tick!", t)
}
}
```

Schedule a function to run whenever the schedule fires:

```go
package main

import (
"context"
"log"
"time"
"github.com/arcward/crong"
)

func main() {
schedule, err := crong.New("* * * * *", time.UTC)
if err != nil {
log.Fatal(err)
}

// MaxConcurrent=0 only allows the job to run sequentially, while
// increasing TickerReceiveTimeout can accommodate potentially long-running
// jobs, where you may not want the next tick to be dropped immediately.
// MaxConsecutiveFailures=10 will stop executing the given function if it
// returns a non-nil error ten times in a row.
opts := &crong.ScheduledJobOptions{
MaxConcurrent: 0,
TickerReceiveTimeout: 30 * time.Second,
MaxConsecutiveFailures: 10,
}
scheduledJob := crong.ScheduleFunc(
context.Background(),
schedule,
opts,
func(t time.Time) error {
log.Printf("Scheduled run for %s started at %s", t, time.Now())
return nil
},
)
defer scheduledJob.Stop(context.Background())
}
```
34 changes: 34 additions & 0 deletions doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
Package crong is a library for parsing cron expressions, calculating
the next run times of the expressions, and validating the expressions.
# Syntax
Supports standard cron syntax (see https://en.wikipedia.org/wiki/Cron),
as well as less standard expressions. For example, `5/10 4,5 * *` means
"every 10 minutes starting at the 5th minute of the hour, for hours 4 and 5."
Days of the week are indexed 0-6, with 0 being Sunday, and can be
referenced by name (SUN, MON, TUE, WED, THU, FRI, SAT) or by number.
Months are indexed 1-12, and can be referenced by
name (JAN, FEB, MAR, APR, MAY, JUN, JUL, AUG, SEP, OCT, NOV, DEC) or by number.
Cron macros supported:
@yearly (or @annually) - Run once a year, midnight, Jan. 1
@monthly - Run once a month, midnight, first of month
@weekly - Run once a week, midnight between Saturday and Sunday
@daily (or @midnight) - Run once a day, midnight
@hourly - Run once an hour, beginning of hour
Other characters supported:
- - any value
, - value list separator
- - range of values
/ - step values
? - no specific value (month, day of month, day of week only)
L - last day of month (when used, must be used alone)
*/
package crong
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
module github.com/arcward/crong


go 1.22.0
44 changes: 44 additions & 0 deletions helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package crong

import (
"strings"
"testing"
)

// assertEqual is a helper function to compare two values
func assertEqual[V comparable](t testing.TB, val V, expected V) {
t.Helper()
if val != expected {
t.Errorf("expected %v, got %v", expected, val)
}
}

func slicesEqual(t testing.TB, val []int, expect []int) bool {
t.Helper()
if len(val) != len(expect) {
return false
}
for _, v := range expect {
if !hasValue(t, val, v) {
return false
}
}
return true
}

func hasValue(t testing.TB, a []int, expect int) bool {
t.Helper()
for _, v := range a {
if v == expect {
return true
}
}
return false
}

func requireErr(t testing.TB, err error, msg ...string) {
t.Helper()
if err == nil {
t.Fatalf("expected error (%s)", strings.Join(msg, "- \n"))
}
}
310 changes: 310 additions & 0 deletions job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,310 @@
package crong

import (
"context"
"errors"
"log/slog"
"sync"
"sync/atomic"
"time"
)

type ScheduleState int64

const (
ScheduleStarted ScheduleState = iota + 1
ScheduleSuspended
ScheduleStopped
)

type ScheduledJobOptions struct {
// MaxConcurrent is the maximum number of concurrent job executions.
// If 0, there is no limit
MaxConcurrent int

// TickerReceiveTimeout is the maximum time the job's ticker will
// wait for the job to receive a tick on the Ticker.C channel
TickerReceiveTimeout time.Duration

// MaxFailures is the maximum number of times the job can fail
// before it is stopped. 0=no limit
MaxFailures int

// MaxConsecutiveFailures is the maximum number of consecutive
// times the job can fail before it is stopped. 0=no limit
MaxConsecutiveFailures int
}

// ScheduledJob is a function that runs on Ticker ticks
// for a Schedule
type ScheduledJob struct {
schedule *Schedule
ticker *Ticker
f func(t time.Time) error
runtimes []*JobRuntime
mu sync.RWMutex
stopCh chan struct{}

// Failures is the number of times the job has failed
Failures atomic.Int64

// ConsecutiveFailures is the number of times the job has failed in a row
ConsecutiveFailures atomic.Int64

// Runs is the number of times the job has run
Runs atomic.Int64

// Running is the number of times the job is currently running
Running atomic.Int64

state atomic.Int64
previouslyStarted atomic.Bool
startMu sync.Mutex
options ScheduledJobOptions
}

func NewScheduledJob(
schedule *Schedule,
opts *ScheduledJobOptions,
f func(t time.Time) error,
) *ScheduledJob {
return &ScheduledJob{
schedule: schedule,
ticker: NewTicker(
context.Background(),
schedule,
opts.TickerReceiveTimeout,
),
f: f,
runtimes: make([]*JobRuntime, 0),
stopCh: make(chan struct{}, 1),
options: *opts,
}
}

func ScheduleFunc(
ctx context.Context,
schedule *Schedule,
opts *ScheduledJobOptions,
f func(t time.Time) error,
) *ScheduledJob {

s := &ScheduledJob{
schedule: schedule,
ticker: NewTicker(ctx, schedule, opts.TickerReceiveTimeout),
f: f,
runtimes: make([]*JobRuntime, 0),
stopCh: make(chan struct{}, 1),
state: atomic.Int64{},
previouslyStarted: atomic.Bool{},
options: *opts,
}
s.state.Store(int64(ScheduleStarted))
s.previouslyStarted.Store(true)

go func() {
_ = s.start(ctx)
}()
return s
}

func (s *ScheduledJob) Start(ctx context.Context) error {
if ScheduleState(s.state.Load()) == ScheduleStopped {
return errors.New("cannot start a job that has been stopped")
}

if s.previouslyStarted.Load() {
return errors.New("job has already been started")
}

return s.start(ctx)
}

// Stop stops job execution. After Stop is called, the job cannot be
// restarted.
func (s *ScheduledJob) Stop(ctx context.Context) bool {
select {
case <-ctx.Done():
// return ctx.Err()
case s.stopCh <- struct{}{}:
//
}
old := s.state.Swap(int64(ScheduleStopped))
if old == int64(ScheduleStopped) {
return false
}
return true
}

// Suspend pauses job execution until Resume is called
func (s *ScheduledJob) Suspend() bool {
return s.state.CompareAndSwap(
int64(ScheduleStarted),
int64(ScheduleSuspended),
)
}

// Resume resumes job execution after a call to Suspend
func (s *ScheduledJob) Resume() bool {
return s.state.CompareAndSwap(
int64(ScheduleSuspended),
int64(ScheduleStarted),
)
}

// Runtimes returns a slice of the job's runtimes
func (s *ScheduledJob) Runtimes() []*JobRuntime {
s.mu.RLock()
defer s.mu.RUnlock()
return s.runtimes[:]
}

func (s *ScheduledJob) State() ScheduleState {
return ScheduleState(s.state.Load())
}

// Start starts the job. If the job has already been started,
// it returns an error. If the job has been stopped, it returns an error.
func (s *ScheduledJob) start(ctx context.Context) error {
s.mu.Lock()
ctx, cancel := context.WithCancel(ctx)
defer cancel()

s.state.Store(int64(ScheduleStarted))

defer s.ticker.Stop()
s.previouslyStarted.Store(true)
s.mu.Unlock()
wg := sync.WaitGroup{}

// Waits for a stop signal, then cancels the context
wg.Add(1)
go func() {
defer s.state.Store(int64(ScheduleStopped))
defer wg.Done()
select {
case <-ctx.Done():
return
case <-s.stopCh:
cancel()
return
}
}()

var jobCh chan time.Time

if s.options.MaxConcurrent > 0 {
jobCh = make(chan time.Time)
defer close(jobCh)
for i := 0; i < s.options.MaxConcurrent; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case rt := <-jobCh:
s.execute(rt)
}
}
}()
}
}

// Waits for ticks on the Ticker.C channel, then
// executes the job
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case rt := <-s.ticker.C:
switch {
case ScheduleState(s.state.Load()) == ScheduleSuspended:
slog.Info("execution suspended, skipping tick", "tick", rt)
case jobCh == nil:
wg.Add(1)
go func() {
defer wg.Done()
s.execute(rt)
}()
default:
jobCh <- rt
}
}

}
}()
wg.Wait()
// s.state.Store(int64(ScheduleStopped))
return nil
}

func (s *ScheduledJob) execute(rt time.Time) {
s.Runs.Add(1)

s.Running.Add(1)
defer s.Running.Add(-1)

s.mu.Lock()
defer s.mu.Unlock()

runtime := &JobRuntime{Start: rt}
slog.Info("running")

runtime.Error = s.f(rt)
if runtime.Error == nil {
s.ConsecutiveFailures.Store(0)
} else {
failures := s.Failures.Add(1)
consecutiveFailures := s.ConsecutiveFailures.Add(1)

if s.options.MaxFailures > 0 && failures >= int64(s.options.MaxFailures) {
slog.Warn(
"max failures reached, stopping job",
"max_failures", s.options.MaxFailures,
"failures", failures,
)
select {
case s.stopCh <- struct{}{}:
default:
}
} else if s.options.MaxConsecutiveFailures > 0 &&
consecutiveFailures >= int64(s.options.MaxConsecutiveFailures) {
slog.Warn(
"max consecutive failures reached, stopping job",
"max_consecutive_failures", s.options.MaxConsecutiveFailures,
"consecutive_failures", consecutiveFailures,
)
select {
case s.stopCh <- struct{}{}:
default:
}
}
}

runtime.End = time.Now()
slog.Info(
"job finished",
"Start",
runtime.Start,
"end",
runtime.End,
)
s.runtimes = append(s.runtimes, runtime)
}

// JobRuntime is a record of a job's runtime and any error
type JobRuntime struct {
// Start is the time the job started
Start time.Time

// End is the time the job ended
End time.Time

// Error is any error that occurred during the job
Error error
}
423 changes: 423 additions & 0 deletions job_test.go

Large diffs are not rendered by default.

1,183 changes: 1,183 additions & 0 deletions schedule.go

Large diffs are not rendered by default.

1,156 changes: 1,156 additions & 0 deletions schedule_test.go

Large diffs are not rendered by default.

200 changes: 200 additions & 0 deletions ticker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
package crong

import (
"context"
"log/slog"
"sync"
"sync/atomic"
"time"
)

// Ticker is a cron ticker that sends the current time
// on the Ticker.C channel when the schedule is triggered
type Ticker struct {
schedule *Schedule
C chan time.Time
tickCh chan time.Time
stop chan struct{}
// sendTimeout is the maximum time to wait for a receiver
// to send a tick on the Ticker.C channel
sendTimeout time.Duration

firstTick time.Time
lastTick time.Time

ticksSeen atomic.Int64
ticksSent atomic.Int64
ticksDropped atomic.Int64
mu sync.Mutex

// cronTicker *time.Ticker
}

// NewTicker creates a new Ticker from a cron expression,
// sending the current time on Ticker.C when the schedule
// is triggered.
// It works similarly to [time.Ticker](https://golang.org/pkg/time/#Ticker),
// but is granular only to the minute. sendTimeout is the maximum time to wait
// for a receiver to send a tick on the Ticker.C channel (this differs from
// [time.Ticker], allowing some wiggle room for slow receivers).
// If the provided context is canceled, the ticker will stop automatically.
func NewTicker(
ctx context.Context,
schedule *Schedule,
sendTimeout time.Duration,
) *Ticker {
t := &Ticker{
schedule: schedule,
C: make(chan time.Time),
stop: make(chan struct{}, 1),
tickCh: make(chan time.Time),
mu: sync.Mutex{},
sendTimeout: sendTimeout,
}

ctx, cancel := context.WithCancel(ctx)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
wg.Done()
for {
select {
case <-t.stop:
slog.Info("ticker stopped, canceling")
cancel()
return
case <-ctx.Done():
t.Stop()
}
}
}()

wg.Add(1)
go func() {
wg.Done()
t.tickOnSchedule(ctx)
}()

slog.Info("waiting for initial tick")
init := <-t.tickCh
slog.Info("initial tick", "time", init)
wg.Add(1)
go func() {
wg.Done()
t.run(ctx)
}()
wg.Wait()

return t
}

func (t *Ticker) Stop() {
select {
case t.stop <- struct{}{}:
//
default:
//
}
}

// tickOnSchedule sends a tick when the current time matches
// the next scheduled time. The time is checked every minute.
// This is used instead of a [time.Ticker] to avoid drift.
func (t *Ticker) tickOnSchedule(ctx context.Context) {
loc := t.schedule.loc
t.tickCh <- time.Now().In(t.schedule.loc)
nextTime := t.schedule.nextNoTruncate(time.Now().In(loc).Truncate(time.Minute))
sleepDone := make(chan struct{}, 1)
slog.Info("starting tick on schedule", "next_time", nextTime)
for ctx.Err() == nil {
now := time.Now().In(t.schedule.loc)
if timesEqualToMinute(now, nextTime) {
slog.Info("saw tick", "next_time", nextTime, "now", now)
t.tick(ctx)
nextTime = t.schedule.nextNoTruncate(
time.Now().In(loc).Truncate(time.Minute),
)
}

nextMinute := time.Now().Add(time.Minute).Truncate(time.Minute)
untilNextMinute := nextMinute.Sub(time.Now())
sleepDuration := untilNextMinute + (1 * time.Second)

slog.Info(
"sleeping",
"duration",
sleepDuration,
"next_time",
nextTime,
"now",
now,
"until_next_minute",
untilNextMinute,
)
go func() {
time.Sleep(sleepDuration)
sleepDone <- struct{}{}
}()
select {
case <-ctx.Done():
return
case <-sleepDone:
//
}
}
}

// run waits for ticks on the tick channel and sends
// them on the Ticker.C channel, then schedules the
// next tick
func (t *Ticker) run(ctx context.Context) {
for {
select {
case <-ctx.Done():
slog.Info("ticker stopped, breaking")
return
case currentTick := <-t.tickCh:
slog.Info(
"schedule triggered",
"current_tick",
currentTick,
"schedule",
t.schedule,
)
tctx, tcancel := context.WithTimeout(ctx, t.sendTimeout)
select {
case t.C <- currentTick:
t.ticksSent.Add(1)
slog.Info("sent tick")
case <-tctx.Done():
slog.Warn("dropped tick")
t.ticksDropped.Add(1)
}
tcancel()
}
}
}

// tick sends a tick on the tick channel
func (t *Ticker) tick(ctx context.Context) bool {
nt := time.Now().In(t.schedule.loc)
select {
case <-ctx.Done():
return false
case t.tickCh <- nt:
slog.Info("sent tick", "tick", nt)
t.ticksSeen.Add(1)

t.mu.Lock()
defer t.mu.Unlock()
t.lastTick = nt
if t.firstTick.IsZero() {
t.firstTick = nt
}
return true
}
}

func timesEqualToMinute(t1, t2 time.Time) bool {
return t1.Truncate(time.Minute).Equal(t2.Truncate(time.Minute))
}
134 changes: 134 additions & 0 deletions ticker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package crong

import (
"context"
"testing"
"time"
)

func TestTicker(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()

s, err := New("* * * * *", nil) // every minute
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
ticker := NewTicker(ctx, s, 5*time.Second)
if ticker == nil {
t.Fatalf("expected ticker")
}
defer ticker.Stop()
nextTick := s.Next(time.Now())

select {
case <-ctx.Done():
t.Fatalf("expected tick")
case tick := <-ticker.C:
tickMin := tick.Truncate(time.Minute)
nextMin := nextTick.Truncate(time.Minute)

if !tickMin.Equal(nextMin) {
t.Fatalf("expected tick to be %s, got %s", tickMin, nextMin)
}
}
}

func TestEarlyTicker(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

s, err := New("* * * * *", nil) // every minute
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
ticker := NewTicker(ctx, s, 5*time.Second)
if ticker == nil {
t.Fatalf("expected ticker")
}
defer ticker.Stop()

nextTick := s.Next(time.Now())
go func() {
ticker.tick(ctx)
}()

select {
case <-ctx.Done():
t.Fatalf("expected tick")
case tick := <-ticker.C:
tickSecs := tick.Unix()
nextSecs := nextTick.Unix()
if nextSecs <= tickSecs {
t.Fatalf("expected tick to be %d, got %d", tickSecs, nextSecs)
}
}
}

func TestTickerCanceled(t *testing.T) {
// verify that we no longer receive ticks after canceling
// the tick context
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

s, err := New("* * * * *", nil) // every minute
if err != nil {
t.Fatalf("unexpected error: %s", err)
}

ticker := NewTicker(ctx, s, 5*time.Second)
if ticker == nil {
t.Fatalf("expected ticker")
}
defer ticker.Stop()

tctx, tcancel := context.WithCancel(context.Background())
defer tcancel()

sawTick := make(chan time.Time, 1)
go func() {
select {
case <-tctx.Done():
return
case <-ticker.C:
sawTick <- time.Now()
}
}()

// cancel the context, which should prevent the tick from being emitted
cancel()
go func() {
time.Sleep(500 * time.Millisecond)
if sent := ticker.tick(ctx); sent {
t.Errorf("expected no tick to be sent")
}
}()

cctx, ccancel := context.WithTimeout(context.Background(), 6*time.Second)
defer ccancel()
select {
case <-sawTick:
t.Fatalf("shouldn't have received tick")
case <-cctx.Done():
tcancel()
return
}
}

func TestTickerSendTimeout(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

s, err := New("* * * * *", nil) // every minute
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
ticker := NewTicker(ctx, s, 3*time.Second)
if ticker == nil {
t.Fatalf("expected ticker")
}
defer ticker.Stop()
ticker.tick(ctx)
time.Sleep(5 * time.Second)
assertEqual(t, ticker.ticksDropped.Load(), int64(1))
}

0 comments on commit 8c0dcf7

Please sign in to comment.