Skip to content

Commit

Permalink
Merge pull request #8 from schmorrison/from-context
Browse files Browse the repository at this point in the history
WithContext
  • Loading branch information
adhocore authored Oct 13, 2021
2 parents a813b55 + 4e71aa2 commit f6cc01e
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 11 deletions.
47 changes: 36 additions & 11 deletions pkg/tasker/tasker.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,18 @@ type Task struct {

// Tasker is the task manager.
type Tasker struct {
Log *log.Logger
loc *time.Location
gron *gronx.Gronx
wg sync.WaitGroup
until time.Time
exprs map[string][]string
tasks map[string]TaskFunc
abort bool
timeout bool
verbose bool
Log *log.Logger
loc *time.Location
gron *gronx.Gronx
wg sync.WaitGroup
until time.Time
exprs map[string][]string
tasks map[string]TaskFunc
abort bool
timeout bool
verbose bool
ctx context.Context
ctxCancel context.CancelFunc
}

type result struct {
Expand Down Expand Up @@ -89,6 +91,21 @@ func New(opt Option) *Tasker {
return &Tasker{Log: logger, loc: loc, gron: &gron, exprs: exprs, tasks: tasks, verbose: opt.Verbose}
}

// WithContext adds a parent context to the Tasker struct
// and begins the abort when Done is received
func (t *Tasker) WithContext(ctx context.Context) *Tasker {
t.ctx, t.ctxCancel = context.WithCancel(ctx)
return t
}

func (t *Tasker) ctxDone() {
<-t.ctx.Done()
if t.verbose {
t.Log.Printf("[tasker] received signal on context.Done, aborting")
}
t.abort = true
}

// Taskify creates TaskFunc out of plain command wrt given options.
func (t *Tasker) Taskify(cmd string, opt Option) TaskFunc {
sh := Shell(opt.Shell)
Expand Down Expand Up @@ -237,6 +254,10 @@ func (t *Tasker) doSetup() {
t.Log.Printf("[tasker] final tick on or before %s", t.until.Format(dateFormat))
}

if t.ctx != nil {
go t.ctxDone()
}

sig := make(chan os.Signal)
signal.Notify(sig, os.Interrupt, syscall.SIGTERM)

Expand Down Expand Up @@ -290,7 +311,11 @@ func (t *Tasker) runTasks(tasks map[string]TaskFunc) {
}
}

ctx := context.TODO()
ctx := context.Background()
if t.ctx != nil {
ctx = t.ctx
}

for ref, task := range tasks {
t.wg.Add(1)
rc := make(chan result)
Expand Down
45 changes: 45 additions & 0 deletions pkg/tasker/tasker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,48 @@ func TestTaskify(t *testing.T) {
}
})
}

func TestWithContext(t *testing.T) {
t.Run("WithContext", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
taskr := New(Option{Verbose: true, Out: "../../test/tasker-ctx.out"}).WithContext(ctx)

called := 0
taskr.Task("@always", func(ctx context.Context) (int, error) {
taskr.Log.Println("task [@always][#1] waiting 3s")
called++
ct := 0
M:
for {
time.Sleep(300 * time.Millisecond)
select {
case <-ctx.Done():
taskr.Log.Printf("task [@always][#1] received Done signal after %d ms\n", ct*300)
break M
default:
ct++
}
}
return 0, nil
})

startCh := make(chan bool)

go func() {
<-startCh
time.Sleep(4 * time.Second)
cancel()
}()

startCh <- true
taskr.Until(5 * time.Second).Run()

if called != 2 {
t.Errorf("task should run 2 times, ran %d times", called)
}

buf, _ := ioutil.ReadFile("../../test/tasker-ctx.out")
buffer := string(buf)
fmt.Println(buffer)
})
}

0 comments on commit f6cc01e

Please sign in to comment.