Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add: GetJob, DeleteJob, ListJobsByPrefix #79

Closed
wants to merge 6 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix repeated entries with newMutex. Add ListJobs, DeleteJob, GetJob
Signed-off-by: Cassandra Coyle <[email protected]>
cicoyle committed Jan 11, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit b6f840c7a75d8aba14a2a0d4a08f4aef432173fa
49 changes: 48 additions & 1 deletion cron.go
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ package etcdcron
import (
"context"
"fmt"
"google.golang.org/protobuf/types/known/anypb"
"log"
"regexp"
"runtime/debug"
@@ -42,6 +43,12 @@ type Job struct {
Rhythm string
// Routine method
Func func(context.Context) error

Repeats int32
DueTime string
TTL string
Data *anypb.Any
Metadata map[string]string
}

func (j Job) Run(ctx context.Context) error {
@@ -164,6 +171,16 @@ func New(opts ...CronOpt) (*Cron, error) {
return cron, nil
}

// GetJob retrieves a job by name.
func (c *Cron) GetJob(jobName string) *Job {
for _, entry := range c.entries {
if entry.Job.Name == jobName {
return &entry.Job
}
}
return nil
}

// AddFunc adds a Job to the Cron to be run on the given schedule.
func (c *Cron) AddJob(job Job) error {
schedule, err := Parse(job.Rhythm)
@@ -174,6 +191,25 @@ func (c *Cron) AddJob(job Job) error {
return nil
}

// DeleteJob deletes a job by name.
func (c *Cron) DeleteJob(jobName string) error {
var updatedEntries []*Entry
found := false
for _, entry := range c.entries {
if entry.Job.Name == jobName {
found = true
continue
}
// Keep the entries that don't match the specified jobName
updatedEntries = append(updatedEntries, entry)
}
if !found {
return fmt.Errorf("job not found: %s", jobName)
}
c.entries = updatedEntries
return nil
}

// Schedule adds a Job to the Cron to be run on the given schedule.
func (c *Cron) Schedule(schedule Schedule, job Job) {
entry := &Entry{
@@ -188,6 +224,17 @@ func (c *Cron) Schedule(schedule Schedule, job Job) {
c.add <- entry
}

func (c *Cron) ListJobsByAppID(appID string) []*Job {
var appJobs []*Job
for _, entry := range c.entries {
if strings.HasPrefix(entry.Job.Name, fmt.Sprintf("%s_", appID)) {
// Job belongs to the specified app_id
appJobs = append(appJobs, &entry.Job)
}
}
return appJobs
}

// Entries returns a snapshot of the cron entries.
func (c *Cron) Entries() []*Entry {
if c.running {
@@ -253,7 +300,7 @@ func (c *Cron) run(ctx context.Context) {
ctx = c.funcCtx(ctx, e.Job)
}

m, err := c.etcdclient.NewMutex(fmt.Sprintf("etcd_cron/%s/%d", e.Job.canonicalName(), effective.Unix()))
m, err := c.etcdclient.NewMutex(fmt.Sprintf("etcd_cron/%s", e.Job.canonicalName()))
if err != nil {
go c.etcdErrorsHandler(ctx, e.Job, errors.Wrapf(err, "fail to create etcd mutex for job '%v'", e.Job.Name))
return