Skip to content

Commit

Permalink
Add filter to cancel and clear function of entry
Browse files Browse the repository at this point in the history
  • Loading branch information
dongxuny committed Dec 23, 2022
1 parent 229a199 commit 44cd30f
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 10 deletions.
4 changes: 2 additions & 2 deletions database.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ type Database interface {

GetJob(id string) (Job, error)

CancelJobsOverdue(days int) error
CancelJobsOverdue(days int, filter *JobFilter) error

CleanJobs(days int) error
CleanJobs(days int, filter *JobFilter) error
}

type UnmarshalerFunc func([]byte, *JobMeta) (Job, error)
Expand Down
72 changes: 68 additions & 4 deletions database/mysql/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,43 @@ func (e *Database) GetJob(id string) (rkasync.Job, error) {
return e.UnmarshalJob([]byte(wrap.JobRaw), wrap.JobMeta)
}

func (e *Database) CancelJobsOverdue(days int) error {
func (e *Database) CancelJobsOverdue(days int, filter *rkasync.JobFilter) error {
clauses := make([]clause.Expression, 0)

if filter != nil {
for i := range filter.TypeList {
clauses = append(clauses, clause.Eq{
Column: "type",
Value: filter.TypeList[i],
})
}

for i := range filter.UserList {
clauses = append(clauses, clause.Eq{
Column: "user",
Value: filter.UserList[i],
})
}

for i := range filter.ClassList {
clauses = append(clauses, clause.Eq{
Column: "class",
Value: filter.ClassList[i],
})
}

for i := range filter.CategoryList {
clauses = append(clauses, clause.Eq{
Column: "category",
Value: filter.CategoryList[i],
})
}
}

err := e.db.Transaction(func(tx *gorm.DB) error {
due := time.Now().AddDate(0, 0, -days)

resDB := tx.Model(&Wrapper{}).Where("state = ? AND updated_at < ?",
resDB := tx.Model(&Wrapper{}).Clauses(clauses...).Where("state = ? AND updated_at < ?",
rkasync.JobStateRunning, due).Update("state", rkasync.JobStateCanceled)
if resDB.Error != nil {
return resDB.Error
Expand All @@ -257,15 +289,47 @@ func (e *Database) CancelJobsOverdue(days int) error {
return err
}

func (e *Database) CleanJobs(days int) error {
func (e *Database) CleanJobs(days int, filter *rkasync.JobFilter) error {
clauses := make([]clause.Expression, 0)

if filter != nil {
for i := range filter.TypeList {
clauses = append(clauses, clause.Eq{
Column: "type",
Value: filter.TypeList[i],
})
}

for i := range filter.UserList {
clauses = append(clauses, clause.Eq{
Column: "user",
Value: filter.UserList[i],
})
}

for i := range filter.ClassList {
clauses = append(clauses, clause.Eq{
Column: "class",
Value: filter.ClassList[i],
})
}

for i := range filter.CategoryList {
clauses = append(clauses, clause.Eq{
Column: "category",
Value: filter.CategoryList[i],
})
}
}

err := e.db.Transaction(func(tx *gorm.DB) error {
due := time.Now().AddDate(0, 0, -days)

states := []string{
rkasync.JobStateFailed, rkasync.JobStateSuccess, rkasync.JobStateCanceled,
}

resDB := tx.Where("state IN ? AND updated_at < ?", states, due).Delete(&Wrapper{})
resDB := tx.Clauses(clauses...).Where("state IN ? AND updated_at < ?", states, due).Delete(&Wrapper{})
if resDB.Error != nil {
return resDB.Error
}
Expand Down
8 changes: 4 additions & 4 deletions entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,12 @@ func (e *Entry) GetJob(id string) (Job, error) {
return e.db.GetJob(id)
}

func (e *Entry) CancelJobsOverdue(days int) error {
return e.db.CancelJobsOverdue(days)
func (e *Entry) CancelJobsOverdue(days int, filter *JobFilter) error {
return e.db.CancelJobsOverdue(days, filter)
}

func (e *Entry) CleanJobs(days int) error {
return e.db.CleanJobs(days)
func (e *Entry) CleanJobs(days int, filter *JobFilter) error {
return e.db.CleanJobs(days, filter)
}

func (e *Entry) RegisterJob(job Job) {
Expand Down

0 comments on commit 44cd30f

Please sign in to comment.