diff --git a/database.go b/database.go index faf4f03..117b46e 100644 --- a/database.go +++ b/database.go @@ -1,56 +1,31 @@ package rkasync +import "gorm.io/gorm/clause" + type Database interface { Type() string - RegisterJob(job Job) + AddJob(job *Job) error + + RegisterProcessor(jobType string, processor Processor) - AddJob(job Job) error + GetProcessor(jobType string) Processor - PickJobToWork() (Job, error) + PickJobToWork() (*Job, error) - UpdateJobState(job Job, state string) error + UpdateJobState(job *Job, state string) error - ListJobs(filter *JobFilter) ([]Job, error) + ListJobs(filter *JobFilter) ([]*Job, error) - GetJob(id string) (Job, error) + GetJob(id string) (*Job, error) CancelJobsOverdue(days int, filter *JobFilter) error CleanJobs(days int, filter *JobFilter) error } -type UnmarshalerFunc func([]byte, *JobMeta) (Job, error) - -func NewJobFilter() *JobFilter { - return &JobFilter{ - TypeList: []string{}, - UserList: []string{}, - ClassList: []string{}, - CategoryList: []string{}, - } -} - type JobFilter struct { - TypeList []string - UserList []string - ClassList []string - CategoryList []string - Limit int -} - -func (f *JobFilter) AddType(in string) { - f.TypeList = append(f.TypeList, in) -} - -func (f *JobFilter) AddUser(in string) { - f.UserList = append(f.UserList, in) -} - -func (f *JobFilter) AddClass(in string) { - f.ClassList = append(f.ClassList, in) -} - -func (f *JobFilter) AddCategory(in string) { - f.CategoryList = append(f.CategoryList, in) + ClauseList []clause.Expression + Limit int + Order string } diff --git a/database/mysql/db.go b/database/mysql/db.go index f9c1e2f..f9bf3d3 100644 --- a/database/mysql/db.go +++ b/database/mysql/db.go @@ -27,75 +27,54 @@ func RegisterDatabase(m map[string]string) rkasync.Database { } if !db.DryRun { - db.AutoMigrate(&Wrapper{}) + db.AutoMigrate(&rkasync.Job{}) } return &Database{ - db: db, - unmarshalerM: map[string]rkasync.UnmarshalerFunc{}, - lock: &sync.Mutex{}, + db: db, + processorM: map[string]rkasync.Processor{}, + lock: &sync.Mutex{}, } } type Database struct { - db *gorm.DB - lock sync.Locker - unmarshalerM map[string]rkasync.UnmarshalerFunc + db *gorm.DB + lock sync.Locker + processorM map[string]rkasync.Processor } func (e *Database) Type() string { return "MySQL" } -func (e *Database) RegisterJob(job rkasync.Job) { - e.lock.Lock() - defer e.lock.Unlock() - - e.unmarshalerM[job.Meta().Type] = job.Unmarshal +func (e *Database) RegisterProcessor(jobType string, processor rkasync.Processor) { + e.processorM[jobType] = processor } -func (e *Database) UnmarshalJob(b []byte, meta *rkasync.JobMeta) (rkasync.Job, error) { - e.lock.Lock() - defer e.lock.Unlock() - - unmar, ok := e.unmarshalerM[meta.Type] - if !ok { - return nil, fmt.Errorf("unsupported job type %s, please register job first", meta.Type) - } - - return unmar(b, meta) +func (e *Database) GetProcessor(jobType string) rkasync.Processor { + return e.processorM[jobType] } -func (e *Database) AddJob(job rkasync.Job) error { - if job.Meta() == nil { - return fmt.Errorf("nil job meta") - } - - job.Meta().Id = xid.New().String() - job.Meta().State = rkasync.JobStateCreated - - wrapper := &Wrapper{ - JobMeta: job.Meta(), +func (e *Database) AddJob(job *rkasync.Job) error { + if job == nil { + return fmt.Errorf("nil job") } - b, err := job.Marshal() - if err != nil { - return err - } - wrapper.JobRaw = string(b) + job.Id = xid.New().String() + job.State = rkasync.JobStateCreated // sync to DB - resDB := e.db.Create(wrapper) + resDB := e.db.Create(job) return resDB.Error } -func (e *Database) PickJobToWork() (rkasync.Job, error) { - var res rkasync.Job +func (e *Database) PickJobToWork() (*rkasync.Job, error) { + var job *rkasync.Job err := e.db.Transaction(func(tx *gorm.DB) error { + res := &rkasync.Job{} // get job with state created - wrap := &Wrapper{} - resDB := tx.Where("state = ?", rkasync.JobStateCreated).Limit(1).Find(wrap) + resDB := tx.Where("state = ?", rkasync.JobStateCreated).Limit(1).Find(res) if resDB.Error != nil { return resDB.Error @@ -105,63 +84,39 @@ func (e *Database) PickJobToWork() (rkasync.Job, error) { return nil } - wrap.JobMeta.State = rkasync.JobStateRunning - wrap.JobMeta.UpdatedAt = time.Now() - - // update state for job structure - job, err := e.UnmarshalJob([]byte(wrap.JobRaw), wrap.JobMeta) - if err != nil { - return err - } - - // update in DB - b, err := job.Marshal() - if err != nil { - return err - } - wrap.JobRaw = string(b) + res.State = rkasync.JobStateRunning + res.UpdatedAt = time.Now() - resDB = tx.Updates(wrap) + resDB = tx.Updates(res) if resDB.Error != nil { return resDB.Error } if resDB.RowsAffected < 1 { - return fmt.Errorf("failed to update job state, id:%s, state:%s", wrap.Id, rkasync.JobStateRunning) + return fmt.Errorf("failed to update job state, id:%s, state:%s", job.Id, rkasync.JobStateRunning) } - res = job + job = res return nil }) - return res, err + return job, err } -func (e *Database) UpdateJobState(job rkasync.Job, state string) error { +func (e *Database) UpdateJobState(job *rkasync.Job, state string) error { err := e.db.Transaction(func(tx *gorm.DB) error { - if !rkasync.JobNewStateAllowed(job.Meta().State, state) { - return fmt.Errorf("job state mutation not allowed by policy, %s->%s", job.Meta().State, state) + if !rkasync.JobNewStateAllowed(job.State, state) { + return fmt.Errorf("job state mutation not allowed by policy, %s->%s", job.State, state) } - job.Meta().State = state + job.State = state - wrap := &Wrapper{ - JobMeta: job.Meta(), - } - - // update in DB - b, err := job.Marshal() - if err != nil { - return err - } - wrap.JobRaw = string(b) - - resDB := tx.Updates(wrap) + resDB := tx.Updates(job) if resDB.Error != nil { return resDB.Error } if resDB.RowsAffected < 1 { - return fmt.Errorf("failed to update job state, no rows updated, id:%s, state:%s", job.Meta().Id, state) + return fmt.Errorf("failed to update job state, no rows updated, id:%s, state:%s", job.Id, state) } return nil @@ -171,68 +126,40 @@ func (e *Database) UpdateJobState(job rkasync.Job, state string) error { // TODO: Paginator -func (e *Database) ListJobs(filter *rkasync.JobFilter) ([]rkasync.Job, error) { +func (e *Database) ListJobs(filter *rkasync.JobFilter) ([]*rkasync.Job, error) { clauses := make([]clause.Expression, 0) - limit := 10000 - if filter != nil { - for i := range filter.TypeList { - clauses = append(clauses, clause.Eq{ - Column: "type", - Value: filter.TypeList[i], - }) - } + limit := 100 + order := "updated_at desc" - for i := range filter.UserList { - clauses = append(clauses, clause.Eq{ - Column: "user", - Value: filter.UserList[i], - }) + if filter != nil { + if filter.Limit > 0 { + limit = filter.Limit } - for i := range filter.ClassList { - clauses = append(clauses, clause.Eq{ - Column: "class", - Value: filter.ClassList[i], - }) + if filter.ClauseList != nil { + clauses = append(clauses, filter.ClauseList...) } - for i := range filter.CategoryList { - clauses = append(clauses, clause.Eq{ - Column: "category", - Value: filter.CategoryList[i], - }) + if len(filter.Order) > 0 { + order = filter.Order } - limit = filter.Limit } - jobList := make([]*Wrapper, 0) + jobList := make([]*rkasync.Job, 0) - resDB := e.db.Clauses(clauses...).Distinct().Limit(limit).Order("updated_at desc").Find(&jobList) + resDB := e.db.Clauses(clauses...).Distinct().Limit(limit).Order(order).Find(&jobList) if resDB.Error != nil { return nil, resDB.Error } - res := make([]rkasync.Job, 0) - - for i := range jobList { - wrap := jobList[i] - - job, err := e.UnmarshalJob([]byte(wrap.JobRaw), wrap.JobMeta) - if err != nil { - continue - } - - res = append(res, job) - } - - return res, nil + return jobList, nil } -func (e *Database) GetJob(id string) (rkasync.Job, error) { - wrap := &Wrapper{} +func (e *Database) GetJob(id string) (*rkasync.Job, error) { + job := &rkasync.Job{} - resDB := e.db.Where("id = ?", id).Find(&wrap) + resDB := e.db.Where("id = ?", id).Find(job) if resDB.Error != nil { return nil, resDB.Error } @@ -240,46 +167,20 @@ func (e *Database) GetJob(id string) (rkasync.Job, error) { return nil, fmt.Errorf("job not found with id=%s", id) } - return e.UnmarshalJob([]byte(wrap.JobRaw), wrap.JobMeta) + return job, nil } func (e *Database) CancelJobsOverdue(days int, filter *rkasync.JobFilter) error { clauses := make([]clause.Expression, 0) - if filter != nil { - for i := range filter.TypeList { - clauses = append(clauses, clause.Eq{ - Column: "type", - Value: filter.TypeList[i], - }) - } - - for i := range filter.UserList { - clauses = append(clauses, clause.Eq{ - Column: "user", - Value: filter.UserList[i], - }) - } - - for i := range filter.ClassList { - clauses = append(clauses, clause.Eq{ - Column: "class", - Value: filter.ClassList[i], - }) - } - - for i := range filter.CategoryList { - clauses = append(clauses, clause.Eq{ - Column: "category", - Value: filter.CategoryList[i], - }) - } + if filter != nil && filter.ClauseList != nil { + clauses = append(clauses, filter.ClauseList...) } err := e.db.Transaction(func(tx *gorm.DB) error { due := time.Now().AddDate(0, 0, -days) - resDB := tx.Model(&Wrapper{}).Clauses(clauses...).Where("state = ? AND updated_at < ?", + resDB := tx.Model(&rkasync.Job{}).Clauses(clauses...).Where("state = ? AND updated_at < ?", rkasync.JobStateRunning, due).Update("state", rkasync.JobStateCanceled) if resDB.Error != nil { return resDB.Error @@ -294,34 +195,8 @@ func (e *Database) CancelJobsOverdue(days int, filter *rkasync.JobFilter) error func (e *Database) CleanJobs(days int, filter *rkasync.JobFilter) error { clauses := make([]clause.Expression, 0) - if filter != nil { - for i := range filter.TypeList { - clauses = append(clauses, clause.Eq{ - Column: "type", - Value: filter.TypeList[i], - }) - } - - for i := range filter.UserList { - clauses = append(clauses, clause.Eq{ - Column: "user", - Value: filter.UserList[i], - }) - } - - for i := range filter.ClassList { - clauses = append(clauses, clause.Eq{ - Column: "class", - Value: filter.ClassList[i], - }) - } - - for i := range filter.CategoryList { - clauses = append(clauses, clause.Eq{ - Column: "category", - Value: filter.CategoryList[i], - }) - } + if filter != nil && filter.ClauseList != nil { + clauses = append(clauses, filter.ClauseList...) } err := e.db.Transaction(func(tx *gorm.DB) error { @@ -331,7 +206,7 @@ func (e *Database) CleanJobs(days int, filter *rkasync.JobFilter) error { rkasync.JobStateFailed, rkasync.JobStateSuccess, rkasync.JobStateCanceled, } - resDB := tx.Clauses(clauses...).Where("state IN ? AND updated_at < ?", states, due).Delete(&Wrapper{}) + resDB := tx.Clauses(clauses...).Where("state IN ? AND updated_at < ?", states, due).Delete(&rkasync.Job{}) if resDB.Error != nil { return resDB.Error } @@ -341,12 +216,3 @@ func (e *Database) CleanJobs(days int, filter *rkasync.JobFilter) error { return err } - -type Wrapper struct { - *rkasync.JobMeta - JobRaw string `json:"-" yaml:"-" gorm:"longtext"` -} - -func (w *Wrapper) TableName() string { - return "async_job" -} diff --git a/database/mysql/go.mod b/database/mysql/go.mod index 1bd3b98..dbad1a7 100644 --- a/database/mysql/go.mod +++ b/database/mysql/go.mod @@ -6,7 +6,7 @@ replace github.com/rookie-ninja/rk-async => ../../ require ( github.com/rookie-ninja/rk-async v0.0.9 - github.com/rookie-ninja/rk-db/mysql v1.2.16 + github.com/rookie-ninja/rk-db/mysql v1.2.17 github.com/rs/xid v1.4.0 gorm.io/gorm v1.24.2 ) @@ -15,7 +15,7 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/fsnotify/fsnotify v1.5.4 // indirect - github.com/go-sql-driver/mysql v1.6.0 // indirect + github.com/go-sql-driver/mysql v1.7.0 // indirect github.com/golang-jwt/jwt/v4 v4.4.2 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/google/uuid v1.3.0 // indirect @@ -31,7 +31,7 @@ require ( github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect - github.com/rookie-ninja/rk-entry/v2 v2.2.16 // indirect + github.com/rookie-ninja/rk-entry/v2 v2.2.17 // indirect github.com/rookie-ninja/rk-logger v1.2.13 // indirect github.com/rookie-ninja/rk-query v1.2.14 // indirect github.com/spf13/afero v1.8.2 // indirect @@ -50,5 +50,6 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - gorm.io/driver/mysql v1.4.3 // indirect + gorm.io/datatypes v1.1.0 // indirect + gorm.io/driver/mysql v1.4.4 // indirect ) diff --git a/database/mysql/go.sum b/database/mysql/go.sum index c851e98..d7f89fd 100644 --- a/database/mysql/go.sum +++ b/database/mysql/go.sum @@ -84,12 +84,15 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= -github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= +github.com/go-sql-driver/mysql v1.7.0 h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc= +github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/golang-jwt/jwt/v4 v4.4.2 h1:rcc4lwaZgFMCZ5jxF9ABolDcIHdBytAFgqFPbSJQAYs= github.com/golang-jwt/jwt/v4 v4.4.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= +github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 h1:au07oEsX2xN0ktxqI+Sida1w446QrXBRJ0nee3SNZlA= +github.com/golang-sql/sqlexp v0.1.0 h1:ZCD6MBpcuOVfGVqsEmY5/4FtYiKz6tSyUv9LPEDei6A= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -158,6 +161,14 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= +github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= +github.com/jackc/pgconn v1.13.0 h1:3L1XMNV2Zvca/8BYhzcRFS70Lr0WlDg16Di6SFGAbys= +github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgproto3/v2 v2.3.1 h1:nwj7qwf0S+Q7ISFfBndqeLwSwxs+4DPsbRFjECT1Y4Y= +github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b h1:C8S2+VttkHFdOOCXJe+YGfa4vHYwlt4Zx+IVXQ97jYg= +github.com/jackc/pgtype v1.12.0 h1:Dlq8Qvcch7kiehm8wPGIW0W3KsCCHJnRacKW0UM8n5w= +github.com/jackc/pgx/v4 v4.17.2 h1:0Ut0rpeKwvIVbMQ1KbMBU4h6wxehBI535LK6Flheh8E= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.4/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= @@ -184,9 +195,11 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamhfG/Qzo= github.com/magiconair/properties v1.8.6/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60= +github.com/mattn/go-sqlite3 v1.14.15 h1:vfoHhTN1af61xCRSWzFIWzx2YskyMTwHLrExkBOjvxI= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 h1:I0XW9+e1XWDxdcEniV4rQAIOPUGDq67JSCiRCgGCZLI= github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= +github.com/microsoft/go-mssqldb v0.17.0 h1:Fto83dMZPnYv1Zwx5vHHxpNraeEaUlQ/hhHLgZiaenE= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -234,10 +247,10 @@ github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5 github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= -github.com/rookie-ninja/rk-db/mysql v1.2.16 h1:sJOMl1N7RQgOrQJgk7HRFvEdWt1HTro0rIYFI9zdFIM= -github.com/rookie-ninja/rk-db/mysql v1.2.16/go.mod h1:ZdUZAvL11vraIlZTneTwv58yYFZfj2Da5NDAb1Xm45Q= -github.com/rookie-ninja/rk-entry/v2 v2.2.16 h1:NorgaMxWbqdOGhdIv0bZeloWBJ5lyKQO6bJBlcuYsc8= -github.com/rookie-ninja/rk-entry/v2 v2.2.16/go.mod h1:P/Fd6Oyvvx0ITbEU2lzO3KkQ9miWVwd84aPaC0LMD0o= +github.com/rookie-ninja/rk-db/mysql v1.2.17 h1:twUhATB56+5gVBZgb2+U/62rtMMPOwGdM4ODzDoG7ZU= +github.com/rookie-ninja/rk-db/mysql v1.2.17/go.mod h1:8jki/JEtv4nBWhVuBTruxcO/qjKD+4IWCTzyE+JcVFI= +github.com/rookie-ninja/rk-entry/v2 v2.2.17 h1:cRy6xtjD7B8p5Z1Is0/vrnveDWWOP/3Ouclk4G/rU5Y= +github.com/rookie-ninja/rk-entry/v2 v2.2.17/go.mod h1:P/Fd6Oyvvx0ITbEU2lzO3KkQ9miWVwd84aPaC0LMD0o= github.com/rookie-ninja/rk-logger v1.2.13 h1:ERxeNZUmszlY4xehHcJRXECPtbjYIXzN8yRIyYyLGsg= github.com/rookie-ninja/rk-logger v1.2.13/go.mod h1:0ZiGn1KsHKOmCv+FHMH7k40DWYSJcj5yIR3EYcjlnLs= github.com/rookie-ninja/rk-query v1.2.14 h1:aYNyMXixpsEYRfEOz9Npt5QG3A6BQlo9vKjYc78x7bc= @@ -296,6 +309,7 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20221005025214-4161e89ecf1b h1:huxqepDufQpLLIRXiVkTvnxrzJlpwmIWAObmcCcUFr0= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -619,8 +633,13 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gorm.io/driver/mysql v1.4.3 h1:/JhWJhO2v17d8hjApTltKNADm7K7YI2ogkR7avJUL3k= -gorm.io/driver/mysql v1.4.3/go.mod h1:sSIebwZAVPiT+27jK9HIwvsqOGKx3YMPmrA3mBJR10c= +gorm.io/datatypes v1.1.0 h1:EVp1Z28N4ACpYFK1nHboEIJGIFfjY7vLeieDk8jSHJA= +gorm.io/datatypes v1.1.0/go.mod h1:SH2K9R+2RMjuX1CkCONrPwoe9JzVv2hkQvEu4bXGojE= +gorm.io/driver/mysql v1.4.4 h1:MX0K9Qvy0Na4o7qSC/YI7XxqUw5KDw01umqgID+svdQ= +gorm.io/driver/mysql v1.4.4/go.mod h1:BCg8cKI+R0j/rZRQxeKis/forqRwRSYOR8OM3Wo6hOM= +gorm.io/driver/postgres v1.4.5 h1:mTeXTTtHAgnS9PgmhN2YeUbazYpLhUI1doLnw42XUZc= +gorm.io/driver/sqlite v1.4.3 h1:HBBcZSDnWi5BW3B3rwvVTc510KGkBkexlOg0QrmLUuU= +gorm.io/driver/sqlserver v1.4.1 h1:t4r4r6Jam5E6ejqP7N82qAJIJAht27EGT41HyPfXRw0= gorm.io/gorm v1.23.8/go.mod h1:l2lP/RyAtc1ynaTjFksBde/O8v9oOGIApu2/xRitmZk= gorm.io/gorm v1.24.2 h1:9wR6CFD+G8nOusLdvkZelOEhpJVwwHzpQOUM+REd6U0= gorm.io/gorm v1.24.2/go.mod h1:DVrVomtaYTbqs7gB/x2uVvqnXzv0nqjB396B8cG4dBA= diff --git a/database/postgres/db.go b/database/postgres/db.go index b4f3bdc..9f0ad92 100644 --- a/database/postgres/db.go +++ b/database/postgres/db.go @@ -27,75 +27,54 @@ func RegisterDatabase(m map[string]string) rkasync.Database { } if !db.DryRun { - db.AutoMigrate(&Wrapper{}) + db.AutoMigrate(&rkasync.Job{}) } return &Database{ - db: db, - unmarshalerM: map[string]rkasync.UnmarshalerFunc{}, - lock: &sync.Mutex{}, + db: db, + lock: &sync.Mutex{}, + processorM: map[string]rkasync.Processor{}, } } type Database struct { - db *gorm.DB - lock sync.Locker - unmarshalerM map[string]rkasync.UnmarshalerFunc + db *gorm.DB + lock sync.Locker + processorM map[string]rkasync.Processor } -func (e *Database) Type() string { - return "PostgreSQL" +func (e *Database) RegisterProcessor(jobType string, processor rkasync.Processor) { + e.processorM[jobType] = processor } -func (e *Database) RegisterJob(job rkasync.Job) { - e.lock.Lock() - defer e.lock.Unlock() - - e.unmarshalerM[job.Meta().Type] = job.Unmarshal +func (e *Database) GetProcessor(jobType string) rkasync.Processor { + return e.processorM[jobType] } -func (e *Database) UnmarshalJob(b []byte, meta *rkasync.JobMeta) (rkasync.Job, error) { - e.lock.Lock() - defer e.lock.Unlock() - - unmar, ok := e.unmarshalerM[meta.Type] - if !ok { - return nil, fmt.Errorf("unsupported job type %s, please register job first", meta.Type) - } - - return unmar(b, meta) +func (e *Database) Type() string { + return "PostgreSQL" } -func (e *Database) AddJob(job rkasync.Job) error { - if job.Meta() == nil { - return fmt.Errorf("nil job meta") - } - - job.Meta().Id = xid.New().String() - job.Meta().State = rkasync.JobStateCreated - - wrapper := &Wrapper{ - JobMeta: job.Meta(), +func (e *Database) AddJob(job *rkasync.Job) error { + if job == nil { + return fmt.Errorf("nil job") } - b, err := job.Marshal() - if err != nil { - return err - } - wrapper.JobRaw = string(b) + job.Id = xid.New().String() + job.State = rkasync.JobStateCreated // sync to DB - resDB := e.db.Create(wrapper) + resDB := e.db.Create(job) return resDB.Error } -func (e *Database) PickJobToWork() (rkasync.Job, error) { - var res rkasync.Job +func (e *Database) PickJobToWork() (*rkasync.Job, error) { + var job *rkasync.Job err := e.db.Transaction(func(tx *gorm.DB) error { + res := &rkasync.Job{} // get job with state created - wrap := &Wrapper{} - resDB := tx.Where("state = ?", rkasync.JobStateCreated).Limit(1).Find(wrap) + resDB := tx.Where("state = ?", rkasync.JobStateCreated).Limit(1).Find(res) if resDB.Error != nil { return resDB.Error @@ -105,63 +84,39 @@ func (e *Database) PickJobToWork() (rkasync.Job, error) { return nil } - wrap.JobMeta.State = rkasync.JobStateRunning - wrap.JobMeta.UpdatedAt = time.Now() + res.State = rkasync.JobStateRunning + res.UpdatedAt = time.Now() - // update state for job structure - job, err := e.UnmarshalJob([]byte(wrap.JobRaw), wrap.JobMeta) - if err != nil { - return err - } - - // update in DB - b, err := job.Marshal() - if err != nil { - return err - } - wrap.JobRaw = string(b) - - resDB = tx.Updates(wrap) + resDB = tx.Updates(res) if resDB.Error != nil { return resDB.Error } if resDB.RowsAffected < 1 { - return fmt.Errorf("failed to update job state, id:%s, state:%s", wrap.Id, rkasync.JobStateRunning) + return fmt.Errorf("failed to update job state, id:%s, state:%s", job.Id, rkasync.JobStateRunning) } - res = job + job = res return nil }) - return res, err + return job, err } -func (e *Database) UpdateJobState(job rkasync.Job, state string) error { +func (e *Database) UpdateJobState(job *rkasync.Job, state string) error { err := e.db.Transaction(func(tx *gorm.DB) error { - if !rkasync.JobNewStateAllowed(job.Meta().State, state) { - return fmt.Errorf("job state mutation not allowed by policy, %s->%s", job.Meta().State, state) - } - - job.Meta().State = state - - wrap := &Wrapper{ - JobMeta: job.Meta(), + if !rkasync.JobNewStateAllowed(job.State, state) { + return fmt.Errorf("job state mutation not allowed by policy, %s->%s", job.State, state) } - // update in DB - b, err := job.Marshal() - if err != nil { - return err - } - wrap.JobRaw = string(b) + job.State = state - resDB := tx.Updates(wrap) + resDB := tx.Updates(job) if resDB.Error != nil { return resDB.Error } if resDB.RowsAffected < 1 { - return fmt.Errorf("failed to update job state, no rows updated, id:%s, state:%s", job.Meta().Id, state) + return fmt.Errorf("failed to update job state, no rows updated, id:%s, state:%s", job.Id, state) } return nil @@ -171,69 +126,40 @@ func (e *Database) UpdateJobState(job rkasync.Job, state string) error { // TODO: Paginator -func (e *Database) ListJobs(filter *rkasync.JobFilter) ([]rkasync.Job, error) { +func (e *Database) ListJobs(filter *rkasync.JobFilter) ([]*rkasync.Job, error) { clauses := make([]clause.Expression, 0) - limit := 10000 + limit := 100 + order := "updated_at desc" if filter != nil { - for i := range filter.TypeList { - clauses = append(clauses, clause.Eq{ - Column: "type", - Value: filter.TypeList[i], - }) - } - - for i := range filter.UserList { - clauses = append(clauses, clause.Eq{ - Column: "user", - Value: filter.UserList[i], - }) + if filter.Limit > 0 { + limit = filter.Limit } - for i := range filter.ClassList { - clauses = append(clauses, clause.Eq{ - Column: "class", - Value: filter.ClassList[i], - }) + if filter.ClauseList != nil { + clauses = append(clauses, filter.ClauseList...) } - for i := range filter.CategoryList { - clauses = append(clauses, clause.Eq{ - Column: "category", - Value: filter.CategoryList[i], - }) + if len(filter.Order) > 0 { + order = filter.Order } - limit = filter.Limit } - jobList := make([]*Wrapper, 0) + jobList := make([]*rkasync.Job, 0) - resDB := e.db.Clauses(clauses...).Distinct().Limit(limit).Order("updated_at desc").Find(&jobList) + resDB := e.db.Clauses(clauses...).Distinct().Limit(limit).Order(order).Find(&jobList) if resDB.Error != nil { return nil, resDB.Error } - res := make([]rkasync.Job, 0) - - for i := range jobList { - wrap := jobList[i] - - job, err := e.UnmarshalJob([]byte(wrap.JobRaw), wrap.JobMeta) - if err != nil { - continue - } - - res = append(res, job) - } - - return res, nil + return jobList, nil } -func (e *Database) GetJob(id string) (rkasync.Job, error) { - wrap := &Wrapper{} +func (e *Database) GetJob(id string) (*rkasync.Job, error) { + job := &rkasync.Job{} - resDB := e.db.Where("id = ?", id).Find(&wrap) + resDB := e.db.Where("id = ?", id).Find(job) if resDB.Error != nil { return nil, resDB.Error } @@ -241,46 +167,20 @@ func (e *Database) GetJob(id string) (rkasync.Job, error) { return nil, fmt.Errorf("job not found with id=%s", id) } - return e.UnmarshalJob([]byte(wrap.JobRaw), wrap.JobMeta) + return job, nil } func (e *Database) CancelJobsOverdue(days int, filter *rkasync.JobFilter) error { clauses := make([]clause.Expression, 0) - if filter != nil { - for i := range filter.TypeList { - clauses = append(clauses, clause.Eq{ - Column: "type", - Value: filter.TypeList[i], - }) - } - - for i := range filter.UserList { - clauses = append(clauses, clause.Eq{ - Column: "user", - Value: filter.UserList[i], - }) - } - - for i := range filter.ClassList { - clauses = append(clauses, clause.Eq{ - Column: "class", - Value: filter.ClassList[i], - }) - } - - for i := range filter.CategoryList { - clauses = append(clauses, clause.Eq{ - Column: "category", - Value: filter.CategoryList[i], - }) - } + if filter != nil && filter.ClauseList != nil { + clauses = append(clauses, filter.ClauseList...) } err := e.db.Transaction(func(tx *gorm.DB) error { due := time.Now().AddDate(0, 0, -days) - resDB := tx.Model(&Wrapper{}).Clauses(clauses...).Where("state = ? AND updated_at < ?", + resDB := tx.Model(&rkasync.Job{}).Clauses(clauses...).Where("state = ? AND updated_at < ?", rkasync.JobStateRunning, due).Update("state", rkasync.JobStateCanceled) if resDB.Error != nil { return resDB.Error @@ -295,34 +195,8 @@ func (e *Database) CancelJobsOverdue(days int, filter *rkasync.JobFilter) error func (e *Database) CleanJobs(days int, filter *rkasync.JobFilter) error { clauses := make([]clause.Expression, 0) - if filter != nil { - for i := range filter.TypeList { - clauses = append(clauses, clause.Eq{ - Column: "type", - Value: filter.TypeList[i], - }) - } - - for i := range filter.UserList { - clauses = append(clauses, clause.Eq{ - Column: "user", - Value: filter.UserList[i], - }) - } - - for i := range filter.ClassList { - clauses = append(clauses, clause.Eq{ - Column: "class", - Value: filter.ClassList[i], - }) - } - - for i := range filter.CategoryList { - clauses = append(clauses, clause.Eq{ - Column: "category", - Value: filter.CategoryList[i], - }) - } + if filter != nil && filter.ClauseList != nil { + clauses = append(clauses, filter.ClauseList...) } err := e.db.Transaction(func(tx *gorm.DB) error { @@ -332,7 +206,7 @@ func (e *Database) CleanJobs(days int, filter *rkasync.JobFilter) error { rkasync.JobStateFailed, rkasync.JobStateSuccess, rkasync.JobStateCanceled, } - resDB := tx.Clauses(clauses...).Where("state IN ? AND updated_at < ?", states, due).Delete(&Wrapper{}) + resDB := tx.Clauses(clauses...).Where("state IN ? AND updated_at < ?", states, due).Delete(&rkasync.Job{}) if resDB.Error != nil { return resDB.Error } @@ -342,12 +216,3 @@ func (e *Database) CleanJobs(days int, filter *rkasync.JobFilter) error { return err } - -type Wrapper struct { - *rkasync.JobMeta - JobRaw string `json:"-" yaml:"-" gorm:"longtext"` -} - -func (w *Wrapper) TableName() string { - return "async_job" -} diff --git a/database/postgres/go.mod b/database/postgres/go.mod index cdeb909..606ba72 100644 --- a/database/postgres/go.mod +++ b/database/postgres/go.mod @@ -6,7 +6,7 @@ replace github.com/rookie-ninja/rk-async => ../../ require ( github.com/rookie-ninja/rk-async v0.0.9 - github.com/rookie-ninja/rk-db/postgres v1.2.16 + github.com/rookie-ninja/rk-db/postgres v1.2.17 github.com/rs/xid v1.4.0 gorm.io/gorm v1.24.2 ) @@ -15,6 +15,7 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/fsnotify/fsnotify v1.5.4 // indirect + github.com/go-sql-driver/mysql v1.7.0 // indirect github.com/golang-jwt/jwt/v4 v4.4.2 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/google/uuid v1.3.0 // indirect @@ -38,7 +39,7 @@ require ( github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect - github.com/rookie-ninja/rk-entry/v2 v2.2.16 // indirect + github.com/rookie-ninja/rk-entry/v2 v2.2.17 // indirect github.com/rookie-ninja/rk-logger v1.2.13 // indirect github.com/rookie-ninja/rk-query v1.2.14 // indirect github.com/spf13/afero v1.8.2 // indirect @@ -50,7 +51,7 @@ require ( go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.21.0 // indirect - golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect + golang.org/x/crypto v0.0.0-20221005025214-4161e89ecf1b // indirect golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8 // indirect golang.org/x/text v0.3.7 // indirect google.golang.org/protobuf v1.28.1 // indirect @@ -58,5 +59,7 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + gorm.io/datatypes v1.1.0 // indirect + gorm.io/driver/mysql v1.4.4 // indirect gorm.io/driver/postgres v1.4.5 // indirect ) diff --git a/database/postgres/go.sum b/database/postgres/go.sum index b0d9852..b3a8e19 100644 --- a/database/postgres/go.sum +++ b/database/postgres/go.sum @@ -91,12 +91,17 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= +github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= +github.com/go-sql-driver/mysql v1.7.0 h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc= +github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw= github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/golang-jwt/jwt/v4 v4.4.2 h1:rcc4lwaZgFMCZ5jxF9ABolDcIHdBytAFgqFPbSJQAYs= github.com/golang-jwt/jwt/v4 v4.4.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= +github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 h1:au07oEsX2xN0ktxqI+Sida1w446QrXBRJ0nee3SNZlA= +github.com/golang-sql/sqlexp v0.1.0 h1:ZCD6MBpcuOVfGVqsEmY5/4FtYiKz6tSyUv9LPEDei6A= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -250,9 +255,11 @@ github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/mattn/go-sqlite3 v1.14.15 h1:vfoHhTN1af61xCRSWzFIWzx2YskyMTwHLrExkBOjvxI= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 h1:I0XW9+e1XWDxdcEniV4rQAIOPUGDq67JSCiRCgGCZLI= github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= +github.com/microsoft/go-mssqldb v0.17.0 h1:Fto83dMZPnYv1Zwx5vHHxpNraeEaUlQ/hhHLgZiaenE= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -300,10 +307,10 @@ github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5 github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= -github.com/rookie-ninja/rk-db/postgres v1.2.16 h1:tPonzzV+/gN+LSJgRZDiCYhMPqACGUceI2S+eyvROhk= -github.com/rookie-ninja/rk-db/postgres v1.2.16/go.mod h1:ZfM43/GYMgoWflxQZuLewFAHHP8E5TYY/tSy4uNSol0= -github.com/rookie-ninja/rk-entry/v2 v2.2.16 h1:NorgaMxWbqdOGhdIv0bZeloWBJ5lyKQO6bJBlcuYsc8= -github.com/rookie-ninja/rk-entry/v2 v2.2.16/go.mod h1:P/Fd6Oyvvx0ITbEU2lzO3KkQ9miWVwd84aPaC0LMD0o= +github.com/rookie-ninja/rk-db/postgres v1.2.17 h1:PqQepdp8z2vNwzycMYFsLratGI4L+zrq53MYrFNONHE= +github.com/rookie-ninja/rk-db/postgres v1.2.17/go.mod h1:DqRxBuMFIc/WgOG7y7bxAXn1k3ojovJkybTzGX+KiyE= +github.com/rookie-ninja/rk-entry/v2 v2.2.17 h1:cRy6xtjD7B8p5Z1Is0/vrnveDWWOP/3Ouclk4G/rU5Y= +github.com/rookie-ninja/rk-entry/v2 v2.2.17/go.mod h1:P/Fd6Oyvvx0ITbEU2lzO3KkQ9miWVwd84aPaC0LMD0o= github.com/rookie-ninja/rk-logger v1.2.13 h1:ERxeNZUmszlY4xehHcJRXECPtbjYIXzN8yRIyYyLGsg= github.com/rookie-ninja/rk-logger v1.2.13/go.mod h1:0ZiGn1KsHKOmCv+FHMH7k40DWYSJcj5yIR3EYcjlnLs= github.com/rookie-ninja/rk-query v1.2.14 h1:aYNyMXixpsEYRfEOz9Npt5QG3A6BQlo9vKjYc78x7bc= @@ -390,8 +397,9 @@ golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa h1:zuSxTR4o9y82ebqCUJYNGJbGPo6sKVl54f/TVDObg1c= golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.0.0-20221005025214-4161e89ecf1b h1:huxqepDufQpLLIRXiVkTvnxrzJlpwmIWAObmcCcUFr0= +golang.org/x/crypto v0.0.0-20221005025214-4161e89ecf1b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -731,8 +739,15 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gorm.io/datatypes v1.1.0 h1:EVp1Z28N4ACpYFK1nHboEIJGIFfjY7vLeieDk8jSHJA= +gorm.io/datatypes v1.1.0/go.mod h1:SH2K9R+2RMjuX1CkCONrPwoe9JzVv2hkQvEu4bXGojE= +gorm.io/driver/mysql v1.4.4 h1:MX0K9Qvy0Na4o7qSC/YI7XxqUw5KDw01umqgID+svdQ= +gorm.io/driver/mysql v1.4.4/go.mod h1:BCg8cKI+R0j/rZRQxeKis/forqRwRSYOR8OM3Wo6hOM= gorm.io/driver/postgres v1.4.5 h1:mTeXTTtHAgnS9PgmhN2YeUbazYpLhUI1doLnw42XUZc= gorm.io/driver/postgres v1.4.5/go.mod h1:GKNQYSJ14qvWkvPwXljMGehpKrhlDNsqYRr5HnYGncg= +gorm.io/driver/sqlite v1.4.3 h1:HBBcZSDnWi5BW3B3rwvVTc510KGkBkexlOg0QrmLUuU= +gorm.io/driver/sqlserver v1.4.1 h1:t4r4r6Jam5E6ejqP7N82qAJIJAht27EGT41HyPfXRw0= +gorm.io/gorm v1.23.8/go.mod h1:l2lP/RyAtc1ynaTjFksBde/O8v9oOGIApu2/xRitmZk= gorm.io/gorm v1.24.1-0.20221019064659-5dd2bb482755/go.mod h1:DVrVomtaYTbqs7gB/x2uVvqnXzv0nqjB396B8cG4dBA= gorm.io/gorm v1.24.2 h1:9wR6CFD+G8nOusLdvkZelOEhpJVwwHzpQOUM+REd6U0= gorm.io/gorm v1.24.2/go.mod h1:DVrVomtaYTbqs7gB/x2uVvqnXzv0nqjB396B8cG4dBA= diff --git a/entry.go b/entry.go index 637a23d..c955bab 100644 --- a/entry.go +++ b/entry.go @@ -171,19 +171,19 @@ func (e *Entry) Database() Database { return e.db } -func (e *Entry) AddJob(job Job) error { +func (e *Entry) AddJob(job *Job) error { return e.db.AddJob(job) } -func (e *Entry) UpdateJobState(job Job, state string) error { +func (e *Entry) UpdateJobState(job *Job, state string) error { return e.db.UpdateJobState(job, state) } -func (e *Entry) ListJobs(filter *JobFilter) ([]Job, error) { +func (e *Entry) ListJobs(filter *JobFilter) ([]*Job, error) { return e.db.ListJobs(filter) } -func (e *Entry) GetJob(id string) (Job, error) { +func (e *Entry) GetJob(id string) (*Job, error) { return e.db.GetJob(id) } @@ -194,7 +194,3 @@ func (e *Entry) CancelJobsOverdue(days int, filter *JobFilter) error { func (e *Entry) CleanJobs(days int, filter *JobFilter) error { return e.db.CleanJobs(days, filter) } - -func (e *Entry) RegisterJob(job Job) { - e.db.RegisterJob(job) -} diff --git a/go.mod b/go.mod index 1a3d81c..7ceb886 100644 --- a/go.mod +++ b/go.mod @@ -3,19 +3,24 @@ module github.com/rookie-ninja/rk-async go 1.19 require ( - github.com/rookie-ninja/rk-entry/v2 v2.2.16 + github.com/rookie-ninja/rk-entry/v2 v2.2.17 github.com/rookie-ninja/rk-query v1.2.14 go.uber.org/zap v1.21.0 + gorm.io/datatypes v1.1.0 + gorm.io/gorm v1.24.2 ) require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/fsnotify/fsnotify v1.5.4 // indirect + github.com/go-sql-driver/mysql v1.7.0 // indirect github.com/golang-jwt/jwt/v4 v4.4.2 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/google/uuid v1.3.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect + github.com/jinzhu/inflection v1.0.0 // indirect + github.com/jinzhu/now v1.1.5 // indirect github.com/magiconair/properties v1.8.6 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect @@ -42,4 +47,5 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + gorm.io/driver/mysql v1.4.4 // indirect ) diff --git a/go.sum b/go.sum index f2eb21a..75106f6 100644 --- a/go.sum +++ b/go.sum @@ -84,10 +84,15 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= +github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= +github.com/go-sql-driver/mysql v1.7.0 h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc= +github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/golang-jwt/jwt/v4 v4.4.2 h1:rcc4lwaZgFMCZ5jxF9ABolDcIHdBytAFgqFPbSJQAYs= github.com/golang-jwt/jwt/v4 v4.4.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= +github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 h1:au07oEsX2xN0ktxqI+Sida1w446QrXBRJ0nee3SNZlA= +github.com/golang-sql/sqlexp v0.1.0 h1:ZCD6MBpcuOVfGVqsEmY5/4FtYiKz6tSyUv9LPEDei6A= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -156,6 +161,19 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= +github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= +github.com/jackc/pgconn v1.13.0 h1:3L1XMNV2Zvca/8BYhzcRFS70Lr0WlDg16Di6SFGAbys= +github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgproto3/v2 v2.3.1 h1:nwj7qwf0S+Q7ISFfBndqeLwSwxs+4DPsbRFjECT1Y4Y= +github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b h1:C8S2+VttkHFdOOCXJe+YGfa4vHYwlt4Zx+IVXQ97jYg= +github.com/jackc/pgtype v1.12.0 h1:Dlq8Qvcch7kiehm8wPGIW0W3KsCCHJnRacKW0UM8n5w= +github.com/jackc/pgx/v4 v4.17.2 h1:0Ut0rpeKwvIVbMQ1KbMBU4h6wxehBI535LK6Flheh8E= +github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= +github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jinzhu/now v1.1.4/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= +github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= @@ -177,9 +195,11 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamhfG/Qzo= github.com/magiconair/properties v1.8.6/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60= +github.com/mattn/go-sqlite3 v1.14.15 h1:vfoHhTN1af61xCRSWzFIWzx2YskyMTwHLrExkBOjvxI= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 h1:I0XW9+e1XWDxdcEniV4rQAIOPUGDq67JSCiRCgGCZLI= github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= +github.com/microsoft/go-mssqldb v0.17.0 h1:Fto83dMZPnYv1Zwx5vHHxpNraeEaUlQ/hhHLgZiaenE= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -227,8 +247,8 @@ github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5 github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= -github.com/rookie-ninja/rk-entry/v2 v2.2.16 h1:NorgaMxWbqdOGhdIv0bZeloWBJ5lyKQO6bJBlcuYsc8= -github.com/rookie-ninja/rk-entry/v2 v2.2.16/go.mod h1:P/Fd6Oyvvx0ITbEU2lzO3KkQ9miWVwd84aPaC0LMD0o= +github.com/rookie-ninja/rk-entry/v2 v2.2.17 h1:cRy6xtjD7B8p5Z1Is0/vrnveDWWOP/3Ouclk4G/rU5Y= +github.com/rookie-ninja/rk-entry/v2 v2.2.17/go.mod h1:P/Fd6Oyvvx0ITbEU2lzO3KkQ9miWVwd84aPaC0LMD0o= github.com/rookie-ninja/rk-logger v1.2.13 h1:ERxeNZUmszlY4xehHcJRXECPtbjYIXzN8yRIyYyLGsg= github.com/rookie-ninja/rk-logger v1.2.13/go.mod h1:0ZiGn1KsHKOmCv+FHMH7k40DWYSJcj5yIR3EYcjlnLs= github.com/rookie-ninja/rk-query v1.2.14 h1:aYNyMXixpsEYRfEOz9Npt5QG3A6BQlo9vKjYc78x7bc= @@ -289,6 +309,7 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20221005025214-4161e89ecf1b h1:huxqepDufQpLLIRXiVkTvnxrzJlpwmIWAObmcCcUFr0= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -612,6 +633,16 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gorm.io/datatypes v1.1.0 h1:EVp1Z28N4ACpYFK1nHboEIJGIFfjY7vLeieDk8jSHJA= +gorm.io/datatypes v1.1.0/go.mod h1:SH2K9R+2RMjuX1CkCONrPwoe9JzVv2hkQvEu4bXGojE= +gorm.io/driver/mysql v1.4.4 h1:MX0K9Qvy0Na4o7qSC/YI7XxqUw5KDw01umqgID+svdQ= +gorm.io/driver/mysql v1.4.4/go.mod h1:BCg8cKI+R0j/rZRQxeKis/forqRwRSYOR8OM3Wo6hOM= +gorm.io/driver/postgres v1.4.5 h1:mTeXTTtHAgnS9PgmhN2YeUbazYpLhUI1doLnw42XUZc= +gorm.io/driver/sqlite v1.4.3 h1:HBBcZSDnWi5BW3B3rwvVTc510KGkBkexlOg0QrmLUuU= +gorm.io/driver/sqlserver v1.4.1 h1:t4r4r6Jam5E6ejqP7N82qAJIJAht27EGT41HyPfXRw0= +gorm.io/gorm v1.23.8/go.mod h1:l2lP/RyAtc1ynaTjFksBde/O8v9oOGIApu2/xRitmZk= +gorm.io/gorm v1.24.2 h1:9wR6CFD+G8nOusLdvkZelOEhpJVwwHzpQOUM+REd6U0= +gorm.io/gorm v1.24.2/go.mod h1:DVrVomtaYTbqs7gB/x2uVvqnXzv0nqjB396B8cG4dBA= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/job.go b/job.go index 08f5359..517ecfc 100644 --- a/job.go +++ b/job.go @@ -2,6 +2,8 @@ package rkasync import ( "context" + "fmt" + "gorm.io/datatypes" "time" ) @@ -13,7 +15,7 @@ const ( JobStateFailed = "failed" ) -type JobMeta struct { +type Job struct { // do not edit Id string `json:"id" yaml:"id" gorm:"primaryKey"` State string `json:"state" yaml:"state" gorm:"index"` @@ -21,23 +23,49 @@ type JobMeta struct { UpdatedAt time.Time `yaml:"updatedAt" json:"updatedAt" attr:"-"` // edit - Type string `json:"type" yaml:"type" gorm:"index"` - User string `json:"user" yaml:"user" gorm:"index"` - Class string `json:"class" yaml:"class" gorm:"index"` - Category string `json:"category" yaml:"category" gorm:"index"` + Type string `json:"type" yaml:"type" gorm:"index"` + UserId string `json:"userId" yaml:"userId" gorm:"index"` - // error - Error string `json:"error" yaml:"error" gorm:"text"` + Filter string `json:"filter" yaml:"filter" gorm:"text"` + + Steps datatypes.JSONType[[]*Step] `json:"steps" yaml:"steps"` + Payload datatypes.JSONType[interface{}] `json:"payload" yaml:"payload"` } -type Job interface { - Process(context.Context) error +func (j *Job) TableName() string { + return "rk_async_job" +} - Meta() *JobMeta +type Step struct { + Index int `json:"index" yaml:"index"` + Name string `json:"id" yaml:"id"` + State string `json:"state" yaml:"state"` + StartedAt time.Time `yaml:"startedAt" json:"startedAt"` + UpdatedAt time.Time `yaml:"updatedAt" json:"updatedAt"` + ElapsedSec float64 `yaml:"elapsedSec" json:"elapsedSec"` + Output []string `yaml:"output" json:"output"` +} + +func (s *Step) SuccessOutput(output string, startTime time.Time) { + s.UpdatedAt = time.Now() + s.Output = append(s.Output, fmt.Sprintf("%s, elapsedSec:%.2f", output, s.UpdatedAt.Sub(startTime).Seconds())) +} + +func (s *Step) FailedOutput(output string, startTime time.Time) { + s.UpdatedAt = time.Now() + s.State = JobStateFailed + s.Output = append(s.Output, fmt.Sprintf("%s, elapsedSec:%.2f", output, s.UpdatedAt.Sub(startTime).Seconds())) +} + +func (s *Step) Finish() { + s.UpdatedAt = time.Now() + s.ElapsedSec = s.UpdatedAt.Sub(s.StartedAt).Seconds() +} - Marshal() ([]byte, error) +type UpdateJobFunc func(j *Job, state string) error - Unmarshal([]byte, *JobMeta) (Job, error) +type Processor interface { + Process(context.Context, *Job, UpdateJobFunc) error } func JobNewStateAllowed(oldState, newState string) bool { diff --git a/worker.go b/worker.go index 92600bc..f9f031d 100644 --- a/worker.go +++ b/worker.go @@ -96,40 +96,38 @@ func (w *LocalWorker) processJob() { defer event.Finish() event.SetResCode("OK") - logger := w.logger.With(zap.String("id", job.Meta().Id), - zap.String("type", job.Meta().Type), - zap.String("user", job.Meta().User), - zap.String("class", job.Meta().Class), - zap.String("category", job.Meta().Category)) + logger := w.logger.With(zap.String("id", job.Id), + zap.String("type", job.Type), + zap.String("userId", job.UserId)) // process job & record error ctx := context.WithValue(context.Background(), LoggerKey, logger) ctx = context.WithValue(ctx, EventKey, event) - err = job.Process(ctx) - event.AddPair("jobType", job.Meta().Type) - event.AddPair("jobId", job.Meta().Id) + processor := w.Database().GetProcessor(job.Type) + if processor == nil { + return + } + + err = processor.Process(ctx, job, w.Database().UpdateJobState) + event.AddPair("jobType", job.Type) + event.AddPair("jobId", job.Id) if err != nil { - job.Meta().Error = err.Error() event.AddErr(err) event.SetResCode("Fail") w.logger.Error("failed to process job", - zap.String("id", job.Meta().Id), - zap.String("type", job.Meta().Type), - zap.String("user", job.Meta().User), - zap.String("class", job.Meta().Class), - zap.String("category", job.Meta().Category), + zap.String("id", job.Id), + zap.String("type", job.Type), + zap.String("userId", job.UserId), zap.Error(err)) if err := w.db.UpdateJobState(job, JobStateFailed); err != nil { w.logger.Error("failed to update job state", - zap.String("id", job.Meta().Id), - zap.String("type", job.Meta().Type), - zap.String("user", job.Meta().User), - zap.String("class", job.Meta().Class), - zap.String("category", job.Meta().Category), + zap.String("id", job.Id), + zap.String("type", job.Type), + zap.String("userId", job.UserId), zap.String("state", JobStateFailed), zap.Error(err)) return @@ -140,11 +138,9 @@ func (w *LocalWorker) processJob() { // update DB if err := w.db.UpdateJobState(job, JobStateSuccess); err != nil { w.logger.Error("failed to update job state", - zap.String("id", job.Meta().Id), - zap.String("type", job.Meta().Type), - zap.String("user", job.Meta().User), - zap.String("class", job.Meta().Class), - zap.String("category", job.Meta().Category), + zap.String("id", job.Id), + zap.String("type", job.Type), + zap.String("userId", job.UserId), zap.String("state", JobStateSuccess)) } }