Skip to content

Commit

Permalink
Improved logging
Browse files Browse the repository at this point in the history
Added a global `*slog.Logger` to use for logging, instead of directly
calling `slog.Info()` to make it easier to manage logging. The
default logger now logs to `io.Discard` by default.
  • Loading branch information
arcward committed Aug 23, 2024
1 parent 66b4a19 commit 5aff025
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 52 deletions.
73 changes: 53 additions & 20 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ type ScheduledJobOptions struct {
MaxConsecutiveFailures int
}

func (s ScheduledJobOptions) LogValue() slog.Value {
return slog.GroupValue(
slog.Int("max_concurrent", s.MaxConcurrent),
slog.Int("max_failures", s.MaxFailures),
slog.Int("max_consecutive_failures", s.MaxConsecutiveFailures),
slog.Duration("ticker_receive_timeout", s.TickerReceiveTimeout),
)
}

// ScheduledJob is a function that runs on Ticker ticks
// for a Schedule
type ScheduledJob struct {
Expand Down Expand Up @@ -65,10 +74,10 @@ type ScheduledJob struct {

func NewScheduledJob(
schedule *Schedule,
opts *ScheduledJobOptions,
opts ScheduledJobOptions,
f func(t time.Time) error,
) *ScheduledJob {
return &ScheduledJob{
job := &ScheduledJob{
schedule: schedule,
ticker: NewTicker(
context.Background(),
Expand All @@ -78,14 +87,38 @@ func NewScheduledJob(
f: f,
runtimes: make([]*JobRuntime, 0),
stopCh: make(chan struct{}, 1),
options: *opts,
options: opts,
}

return job
}

func (s ScheduledJob) LogValue() slog.Value {
return slog.GroupValue(
slog.String("schedule", s.schedule.String()),
slog.Group(
"options", slog.Int("max_concurrent", s.options.MaxConcurrent),
slog.Int("max_failures", s.options.MaxFailures),
slog.Int(
"max_consecutive_failures",
s.options.MaxConsecutiveFailures,
),
slog.Duration(
"ticker_receive_timeout",
s.options.TickerReceiveTimeout,
),
),
slog.Int64("failures", s.Failures.Load()),
slog.Int64("consecutive_failures", s.ConsecutiveFailures.Load()),
slog.Int64("runs", s.Runs.Load()),
slog.Int64("running", s.Running.Load()),
)
}

func ScheduleFunc(
ctx context.Context,
schedule *Schedule,
opts *ScheduledJobOptions,
opts ScheduledJobOptions,
f func(t time.Time) error,
) *ScheduledJob {

Expand All @@ -97,7 +130,7 @@ func ScheduleFunc(
stopCh: make(chan struct{}, 1),
state: atomic.Int64{},
previouslyStarted: atomic.Bool{},
options: *opts,
options: opts,
}
s.state.Store(int64(ScheduleStarted))
s.previouslyStarted.Store(true)
Expand Down Expand Up @@ -125,7 +158,6 @@ func (s *ScheduledJob) Start(ctx context.Context) error {
func (s *ScheduledJob) Stop(ctx context.Context) bool {
select {
case <-ctx.Done():
// return ctx.Err()
case s.stopCh <- struct{}{}:
//
}
Expand Down Expand Up @@ -224,7 +256,11 @@ func (s *ScheduledJob) start(ctx context.Context) error {
case rt := <-s.ticker.C:
switch {
case ScheduleState(s.state.Load()) == ScheduleSuspended:
slog.Info("execution suspended, skipping tick", "tick", rt)
Logger.Debug(
"execution suspended, skipping tick",
"scheduled_job", s,
"tick", rt,
)
case jobCh == nil:
wg.Add(1)
go func() {
Expand All @@ -239,7 +275,6 @@ func (s *ScheduledJob) start(ctx context.Context) error {
}
}()
wg.Wait()
// s.state.Store(int64(ScheduleStopped))
return nil
}

Expand All @@ -253,7 +288,8 @@ func (s *ScheduledJob) execute(rt time.Time) {
defer s.mu.Unlock()

runtime := &JobRuntime{Start: rt}
slog.Info("running")

Logger.Info("running scheduled job", "scheduled_job", s)

runtime.Error = s.f(rt)
if runtime.Error == nil {
Expand All @@ -263,21 +299,19 @@ func (s *ScheduledJob) execute(rt time.Time) {
consecutiveFailures := s.ConsecutiveFailures.Add(1)

if s.options.MaxFailures > 0 && failures >= int64(s.options.MaxFailures) {
slog.Warn(
Logger.Warn(
"max failures reached, stopping job",
"max_failures", s.options.MaxFailures,
"failures", failures,
"scheduled_job", s,
)
select {
case s.stopCh <- struct{}{}:
default:
}
} else if s.options.MaxConsecutiveFailures > 0 &&
consecutiveFailures >= int64(s.options.MaxConsecutiveFailures) {
slog.Warn(
Logger.Warn(
"max consecutive failures reached, stopping job",
"max_consecutive_failures", s.options.MaxConsecutiveFailures,
"consecutive_failures", consecutiveFailures,
"scheduled_job", s,
)
select {
case s.stopCh <- struct{}{}:
Expand All @@ -287,12 +321,11 @@ func (s *ScheduledJob) execute(rt time.Time) {
}

runtime.End = time.Now()
slog.Info(
Logger.Info(
"job finished",
"Start",
runtime.Start,
"end",
runtime.End,
"start", runtime.Start,
"end", runtime.End,
"scheduled_job", s,
)
s.runtimes = append(s.runtimes, runtime)
}
Expand Down
14 changes: 7 additions & 7 deletions job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestScheduledJob(t *testing.T) {
sf := ScheduleFunc(
ctx,
s,
&ScheduledJobOptions{
ScheduledJobOptions{
MaxConcurrent: 10,
TickerReceiveTimeout: 5 * time.Second,
},
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestScheduledContext(t *testing.T) {
ranCh := make(chan struct{}, 1)
sj := NewScheduledJob(
s,
&ScheduledJobOptions{
ScheduledJobOptions{
MaxConcurrent: 1,
TickerReceiveTimeout: 5 * time.Second,
},
Expand Down Expand Up @@ -169,7 +169,7 @@ func TestJobFailure(t *testing.T) {
sj := ScheduleFunc(
ctx,
s,
&ScheduledJobOptions{
ScheduledJobOptions{
MaxConcurrent: 0,
TickerReceiveTimeout: 5 * time.Second,
},
Expand Down Expand Up @@ -214,7 +214,7 @@ func TestPreviouslyStarted(t *testing.T) {
sj := ScheduleFunc(
ctx,
s,
&ScheduledJobOptions{
ScheduledJobOptions{
MaxConcurrent: 0,
TickerReceiveTimeout: 5 * time.Second,
}, func(dt time.Time) error {
Expand Down Expand Up @@ -245,7 +245,7 @@ func TestAlreadyStopped(t *testing.T) {
sj := ScheduleFunc(
ctx,
s,
&ScheduledJobOptions{
ScheduledJobOptions{
MaxConcurrent: 0,
TickerReceiveTimeout: 5 * time.Second,
},
Expand Down Expand Up @@ -279,7 +279,7 @@ func TestJobMaxFailures(t *testing.T) {
}
sj := NewScheduledJob(
s,
&ScheduledJobOptions{
ScheduledJobOptions{
MaxConcurrent: 3,
TickerReceiveTimeout: 5 * time.Second,
MaxFailures: 3,
Expand Down Expand Up @@ -323,7 +323,7 @@ func TestJobConsecutiveFailures(t *testing.T) {
doneCh := make(chan struct{}, 10)
sj := NewScheduledJob(
s,
&ScheduledJobOptions{
ScheduledJobOptions{
MaxConcurrent: 3,
TickerReceiveTimeout: 5 * time.Second,
MaxConsecutiveFailures: 3,
Expand Down
67 changes: 42 additions & 25 deletions ticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package crong

import (
"context"
"io"
"log/slog"
"sync"
"sync/atomic"
"time"
)

var Logger = slog.New(slog.NewTextHandler(io.Discard, nil))

// Ticker is a cron ticker that sends the current time
// on the Ticker.C channel when the schedule is triggered
type Ticker struct {
Expand All @@ -26,8 +29,6 @@ type Ticker struct {
ticksSent atomic.Int64
ticksDropped atomic.Int64
mu sync.Mutex

// cronTicker *time.Ticker
}

// NewTicker creates a new Ticker from a cron expression,
Expand Down Expand Up @@ -60,7 +61,7 @@ func NewTicker(
for {
select {
case <-t.stop:
slog.Info("ticker stopped, canceling")
Logger.Debug("ticker stopped, canceling", "ticker", t)
cancel()
return
case <-ctx.Done():
Expand All @@ -75,9 +76,9 @@ func NewTicker(
t.tickOnSchedule(ctx)
}()

slog.Info("waiting for initial tick")
Logger.Debug("waiting for initial tick", "ticker", t)
init := <-t.tickCh
slog.Info("initial tick", "time", init)
Logger.Debug("initial tick", "time", init, "ticker", t)
wg.Add(1)
go func() {
wg.Done()
Expand Down Expand Up @@ -105,11 +106,20 @@ func (t *Ticker) tickOnSchedule(ctx context.Context) {
t.tickCh <- time.Now().In(t.schedule.loc)
nextTime := t.schedule.nextNoTruncate(time.Now().In(loc).Truncate(time.Minute))
sleepDone := make(chan struct{}, 1)
slog.Info("starting tick on schedule", "next_time", nextTime)
Logger.Debug(
"starting tick on schedule",
"next_time", nextTime,
"ticker", t,
)
for ctx.Err() == nil {
now := time.Now().In(t.schedule.loc)
if timesEqualToMinute(now, nextTime) {
slog.Info("saw tick", "next_time", nextTime, "now", now)
Logger.Debug(
"saw tick",
"next_time", nextTime,
"now", now,
"ticker", t,
)
t.tick(ctx)
nextTime = t.schedule.nextNoTruncate(
time.Now().In(loc).Truncate(time.Minute),
Expand All @@ -120,16 +130,13 @@ func (t *Ticker) tickOnSchedule(ctx context.Context) {
untilNextMinute := nextMinute.Sub(time.Now())
sleepDuration := untilNextMinute + (1 * time.Second)

slog.Info(
Logger.Info(
"sleeping",
"duration",
sleepDuration,
"next_time",
nextTime,
"now",
now,
"until_next_minute",
untilNextMinute,
"duration", sleepDuration,
"next_time", nextTime,
"now", now,
"until_next_minute", untilNextMinute,
"ticker", t,
)
go func() {
time.Sleep(sleepDuration)
Expand All @@ -151,23 +158,21 @@ func (t *Ticker) run(ctx context.Context) {
for {
select {
case <-ctx.Done():
slog.Info("ticker stopped, breaking")
Logger.Debug("ticker stopped, breaking", "ticker", t)
return
case currentTick := <-t.tickCh:
slog.Info(
Logger.Debug(
"schedule triggered",
"current_tick",
currentTick,
"schedule",
t.schedule,
"current_tick", currentTick,
"ticker", t,
)
tctx, tcancel := context.WithTimeout(ctx, t.sendTimeout)
select {
case t.C <- currentTick:
t.ticksSent.Add(1)
slog.Info("sent tick")
Logger.Debug("sent tick", "ticker", t)
case <-tctx.Done():
slog.Warn("dropped tick")
Logger.Debug("dropped tick", "ticker", t)
t.ticksDropped.Add(1)
}
tcancel()
Expand All @@ -182,7 +187,7 @@ func (t *Ticker) tick(ctx context.Context) bool {
case <-ctx.Done():
return false
case t.tickCh <- nt:
slog.Info("sent tick", "tick", nt)
Logger.Info("sent tick", "tick", nt, "ticker", t)
t.ticksSeen.Add(1)

t.mu.Lock()
Expand All @@ -195,6 +200,18 @@ func (t *Ticker) tick(ctx context.Context) bool {
}
}

func (t Ticker) LogValue() slog.Value {
return slog.GroupValue(
slog.String("schedule", t.schedule.String()),
slog.Group(
"ticks",
"seen", t.ticksSeen.Load(),
"sent", t.ticksSent.Load(),
"dropped", t.ticksDropped.Load(),
),
)
}

func timesEqualToMinute(t1, t2 time.Time) bool {
return t1.Truncate(time.Minute).Equal(t2.Truncate(time.Minute))
}

0 comments on commit 5aff025

Please sign in to comment.