diff --git a/database.go b/database.go index e859778..4c1d00c 100644 --- a/database.go +++ b/database.go @@ -13,7 +13,9 @@ type Database interface { PickJobToWork() (*Job, error) - UpdateJob(job *Job) error + UpdateJobState(job *Job) error + + UpdateJobPayloadAndStep(job *Job) error ListJobs(filter *JobFilter) ([]*Job, error) diff --git a/database/mysql/db.go b/database/mysql/db.go index 91dfb03..773bb83 100644 --- a/database/mysql/db.go +++ b/database/mysql/db.go @@ -5,6 +5,7 @@ import ( "github.com/rookie-ninja/rk-async" rkmysql "github.com/rookie-ninja/rk-db/mysql" "github.com/rs/xid" + "go.uber.org/zap" "gorm.io/gorm" "gorm.io/gorm/clause" "sync" @@ -15,7 +16,7 @@ func init() { rkasync.RegisterDatabaseRegFunc("MySQL", RegisterDatabase) } -func RegisterDatabase(m map[string]string) rkasync.Database { +func RegisterDatabase(m map[string]string, logger *zap.Logger) rkasync.Database { entry := rkmysql.GetMySqlEntry(m["entryName"]) if entry == nil { return nil @@ -30,8 +31,13 @@ func RegisterDatabase(m map[string]string) rkasync.Database { db.AutoMigrate(&rkasync.Job{}) } + if logger == nil { + logger = zap.NewNop() + } + return &Database{ db: db, + logger: logger, processorM: map[string]rkasync.Processor{}, lock: &sync.Mutex{}, } @@ -39,6 +45,7 @@ func RegisterDatabase(m map[string]string) rkasync.Database { type Database struct { db *gorm.DB + logger *zap.Logger lock sync.Locker processorM map[string]rkasync.Processor } @@ -76,7 +83,7 @@ func (e *Database) PickJobToWork() (*rkasync.Job, error) { err := e.db.Transaction(func(tx *gorm.DB) error { res := &rkasync.Job{} // get job with state created - resDB := tx.Where("state = ?", rkasync.JobStateCreated).Limit(1).Find(res) + resDB := tx.Where("state = ?", rkasync.JobStateCreated).Order("created_at ASC").Limit(1).Find(res) if resDB.Error != nil { return resDB.Error @@ -105,18 +112,102 @@ func (e *Database) PickJobToWork() (*rkasync.Job, error) { return job, err } -func (e *Database) UpdateJob(job *rkasync.Job) error { - err := e.db.Transaction(func(tx *gorm.DB) error { - resDB := tx.Updates(job) - if resDB.Error != nil { - return resDB.Error +func (e *Database) UpdateJobPayloadAndStep(job *rkasync.Job) error { + var err error + // retry + for retry := 0; retry < 3; retry++ { + err = e.db.Transaction(func(tx *gorm.DB) error { + if job == nil { + return nil + } + + jobDB := &rkasync.Job{} + resDB := tx.Where("id = ?", job.Id).Find(jobDB) + if resDB.Error != nil { + return resDB.Error + } + if resDB.RowsAffected < 1 { + return fmt.Errorf("job not found, id:%s", job.Id) + } + + if jobDB.State == "canceled" || jobDB.State == "success" || jobDB.State == "failed" { + e.logger.Warn(fmt.Sprintf("job at final state:%s, skip updating payloads and steps", jobDB.State)) + return nil + } + + resDB = tx.Model(&rkasync.Job{}).Where("id = ?", job.Id).UpdateColumns( + rkasync.Job{ + UpdatedAt: time.Now(), + Payload: job.Payload, + Steps: job.Steps, + }) + + if resDB.Error != nil { + return resDB.Error + } + if resDB.RowsAffected < 1 { + return fmt.Errorf("failed to update job payloads and steps, no rows updated, id:%s", job.Id) + } + + return nil + }) + + if err == nil { + return nil } - if resDB.RowsAffected < 1 { - return fmt.Errorf("failed to update job state, no rows updated, id:%s, state:%s", job.Id, job.State) + } + + e.logger.Warn("failed to update job payloads and steps", zap.Error(err)) + + return err +} + +func (e *Database) UpdateJobState(job *rkasync.Job) error { + var err error + // retry + for retry := 0; retry < 3; retry++ { + err = e.db.Transaction(func(tx *gorm.DB) error { + if job == nil { + return nil + } + + jobDB := &rkasync.Job{} + resDB := tx.Where("id = ?", job.Id).Find(jobDB) + if resDB.Error != nil { + return resDB.Error + } + if resDB.RowsAffected < 1 { + return fmt.Errorf("job not found, id:%s", job.Id) + } + + if jobDB.State == "canceled" || jobDB.State == "success" || jobDB.State == "failed" { + e.logger.Warn(fmt.Sprintf("job at final state:%s, skip updating state %s", jobDB.State, job.State)) + return nil + } + + resDB = tx.Model(&rkasync.Job{}).Where("id = ?", job.Id).UpdateColumns( + rkasync.Job{ + UpdatedAt: time.Now(), + State: job.State, + }) + + if resDB.Error != nil { + return resDB.Error + } + if resDB.RowsAffected < 1 { + return fmt.Errorf("failed to update job state, no rows updated, id:%s, state:%s", job.Id, job.State) + } + + return nil + }) + + if err == nil { + return nil } + } + + e.logger.Warn("failed to update job state", zap.Error(err)) - return nil - }) return err } diff --git a/database/postgres/db.go b/database/postgres/db.go index 80783f7..41b1d11 100644 --- a/database/postgres/db.go +++ b/database/postgres/db.go @@ -5,6 +5,7 @@ import ( "github.com/rookie-ninja/rk-async" "github.com/rookie-ninja/rk-db/postgres" "github.com/rs/xid" + "go.uber.org/zap" "gorm.io/gorm" "gorm.io/gorm/clause" "sync" @@ -15,7 +16,7 @@ func init() { rkasync.RegisterDatabaseRegFunc("PostgreSQL", RegisterDatabase) } -func RegisterDatabase(m map[string]string) rkasync.Database { +func RegisterDatabase(m map[string]string, logger *zap.Logger) rkasync.Database { entry := rkpostgres.GetPostgresEntry(m["entryName"]) if entry == nil { return nil @@ -30,8 +31,13 @@ func RegisterDatabase(m map[string]string) rkasync.Database { db.AutoMigrate(&rkasync.Job{}) } + if logger == nil { + logger = zap.NewNop() + } + return &Database{ db: db, + logger: logger, lock: &sync.Mutex{}, processorM: map[string]rkasync.Processor{}, } @@ -39,6 +45,7 @@ func RegisterDatabase(m map[string]string) rkasync.Database { type Database struct { db *gorm.DB + logger *zap.Logger lock sync.Locker processorM map[string]rkasync.Processor } @@ -76,7 +83,7 @@ func (e *Database) PickJobToWork() (*rkasync.Job, error) { err := e.db.Transaction(func(tx *gorm.DB) error { res := &rkasync.Job{} // get job with state created - resDB := tx.Where("state = ?", rkasync.JobStateCreated).Limit(1).Find(res) + resDB := tx.Where("state = ?", rkasync.JobStateCreated).Order("created_at ASC").Limit(1).Find(res) if resDB.Error != nil { return resDB.Error @@ -105,18 +112,102 @@ func (e *Database) PickJobToWork() (*rkasync.Job, error) { return job, err } -func (e *Database) UpdateJob(job *rkasync.Job) error { - err := e.db.Transaction(func(tx *gorm.DB) error { - resDB := tx.Updates(job) - if resDB.Error != nil { - return resDB.Error +func (e *Database) UpdateJobPayloadAndStep(job *rkasync.Job) error { + var err error + // retry + for retry := 0; retry < 3; retry++ { + err = e.db.Transaction(func(tx *gorm.DB) error { + if job == nil { + return nil + } + + jobDB := &rkasync.Job{} + resDB := tx.Where("id = ?", job.Id).Find(jobDB) + if resDB.Error != nil { + return resDB.Error + } + if resDB.RowsAffected < 1 { + return fmt.Errorf("job not found, id:%s", job.Id) + } + + if jobDB.State == "canceled" || jobDB.State == "success" || jobDB.State == "failed" { + e.logger.Warn(fmt.Sprintf("job at final state:%s, skip updating payloads and steps", jobDB.State)) + return nil + } + + resDB = tx.Model(&rkasync.Job{}).Where("id = ?", job.Id).UpdateColumns( + rkasync.Job{ + UpdatedAt: time.Now(), + Payload: job.Payload, + Steps: job.Steps, + }) + + if resDB.Error != nil { + return resDB.Error + } + if resDB.RowsAffected < 1 { + return fmt.Errorf("failed to update job payloads and steps, no rows updated, id:%s", job.Id) + } + + return nil + }) + + if err == nil { + return nil } - if resDB.RowsAffected < 1 { - return fmt.Errorf("failed to update job state, no rows updated, id:%s, state:%s", job.Id, job.State) + } + + e.logger.Warn("failed to update job payloads and steps", zap.Error(err)) + + return err +} + +func (e *Database) UpdateJobState(job *rkasync.Job) error { + var err error + // retry + for retry := 0; retry < 3; retry++ { + err = e.db.Transaction(func(tx *gorm.DB) error { + if job == nil { + return nil + } + + jobDB := &rkasync.Job{} + resDB := tx.Where("id = ?", job.Id).Find(jobDB) + if resDB.Error != nil { + return resDB.Error + } + if resDB.RowsAffected < 1 { + return fmt.Errorf("job not found, id:%s", job.Id) + } + + if jobDB.State == "canceled" || jobDB.State == "success" || jobDB.State == "failed" { + e.logger.Warn(fmt.Sprintf("job at final state:%s, skip updating state %s", jobDB.State, job.State)) + return nil + } + + resDB = tx.Model(&rkasync.Job{}).Where("id = ?", job.Id).UpdateColumns( + rkasync.Job{ + UpdatedAt: time.Now(), + State: job.State, + }) + + if resDB.Error != nil { + return resDB.Error + } + if resDB.RowsAffected < 1 { + return fmt.Errorf("failed to update job state, no rows updated, id:%s, state:%s", job.Id, job.State) + } + + return nil + }) + + if err == nil { + return nil } + } + + e.logger.Warn("failed to update job state", zap.Error(err)) - return nil - }) return err } diff --git a/entry.go b/entry.go index d7d5fcd..3ded0f2 100644 --- a/entry.go +++ b/entry.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "github.com/rookie-ninja/rk-entry/v2/entry" + "go.uber.org/zap" "sync" ) @@ -13,7 +14,7 @@ func init() { } var ( - dbRegFuncM = map[string]func(map[string]string) Database{} + dbRegFuncM = map[string]func(map[string]string, *zap.Logger) Database{} ) func GetEntry() *Entry { @@ -29,7 +30,7 @@ func GetEntry() *Entry { return nil } -func RegisterDatabaseRegFunc(dbType string, f func(map[string]string) Database) { +func RegisterDatabaseRegFunc(dbType string, f func(map[string]string, *zap.Logger) Database) { dbRegFuncM[dbType] = f } @@ -86,13 +87,25 @@ type Entry struct { func (e *Entry) Bootstrap(ctx context.Context) { e.bootstrapOnce.Do(func() { + // logger + logger := rkentry.GlobalAppCtx.GetLoggerEntry(e.config.Async.Logger) + if logger == nil { + logger = rkentry.GlobalAppCtx.GetLoggerEntryDefault() + } + + // event + event := rkentry.GlobalAppCtx.GetEventEntry(e.config.Async.Event) + if event == nil { + event = rkentry.GlobalAppCtx.GetEventEntryDefault() + } + var db Database if e.config.Async.Database.MySql.Enabled { f := dbRegFuncM["MySQL"] db = f(map[string]string{ "entryName": e.config.Async.Database.MySql.EntryName, "database": e.config.Async.Database.MySql.Database, - }) + }, logger.Logger) } if e.config.Async.Database.Postgres.Enabled { @@ -100,7 +113,7 @@ func (e *Entry) Bootstrap(ctx context.Context) { db = f(map[string]string{ "entryName": e.config.Async.Database.Postgres.EntryName, "database": e.config.Async.Database.Postgres.Database, - }) + }, logger.Logger) } if db == nil { @@ -109,18 +122,6 @@ func (e *Entry) Bootstrap(ctx context.Context) { e.db = db - // logger - logger := rkentry.GlobalAppCtx.GetLoggerEntry(e.config.Async.Logger) - if logger == nil { - logger = rkentry.GlobalAppCtx.GetLoggerEntryDefault() - } - - // event - event := rkentry.GlobalAppCtx.GetEventEntry(e.config.Async.Event) - if event == nil { - event = rkentry.GlobalAppCtx.GetEventEntryDefault() - } - // worker if e.config.Async.Worker.Local.Enabled { e.worker = NewLocalWorker(db, logger, event) @@ -175,8 +176,32 @@ func (e *Entry) AddJob(job *Job) error { return e.db.AddJob(job) } -func (e *Entry) UpdateJob(job *Job) error { - return e.db.UpdateJob(job) +func (e *Entry) StartJob(job *Job) error { + job.State = JobStateRunning + return e.db.UpdateJobState(job) +} + +func (e *Entry) FinishJob(job *Job, success bool) error { + if success { + job.State = JobStateSuccess + } else { + job.State = JobStateFailed + } + + return e.db.UpdateJobState(job) +} + +func (e *Entry) CancelJob(job *Job) error { + job.State = JobStateCanceled + for i := range job.Steps.Data { + step := job.Steps.Data[i] + step.State = JobStateCanceled + } + return e.db.UpdateJobState(job) +} + +func (e *Entry) UpdateJobPayloadAndStep(job *Job) error { + return e.db.UpdateJobPayloadAndStep(job) } func (e *Entry) ListJobs(filter *JobFilter) ([]*Job, error) { diff --git a/job.go b/job.go index 106b98b..ba01957 100644 --- a/job.go +++ b/job.go @@ -66,6 +66,9 @@ func (s *Step) NewRecorder() *Recorder { } func (s *Step) Finish(state string) { + s.Lock.Lock() + defer s.Lock.Unlock() + s.State = state s.ElapsedSec = time.Now().Sub(s.StartedAt).Seconds() s.PersistFunc() @@ -87,8 +90,11 @@ func (r *Recorder) Title(s string) { output := fmt.Sprintf("👉🏻 %s", s) r.step.Lock.Lock() - r.step.Output[r.index] = output - r.step.Lock.Unlock() + defer r.step.Lock.Unlock() + + if r.step != nil && len(r.step.Output) > r.index { + r.step.Output[r.index] = output + } r.step.PersistFunc() } @@ -97,8 +103,11 @@ func (r *Recorder) Warn(s string) { output := fmt.Sprintf("⚠️️ [%s] %s", time.Duration(time.Now().Sub(r.startTime).Seconds())*time.Second, s) r.step.Lock.Lock() - r.step.Output[r.index] = output - r.step.Lock.Unlock() + defer r.step.Lock.Unlock() + + if r.step != nil && len(r.step.Output) > r.index { + r.step.Output[r.index] = output + } r.step.PersistFunc() } @@ -107,8 +116,11 @@ func (r *Recorder) Info(s string) { output := fmt.Sprintf("[%s] %s", time.Duration(time.Now().Sub(r.startTime).Seconds())*time.Second, s) r.step.Lock.Lock() - r.step.Output[r.index] = output - r.step.Lock.Unlock() + defer r.step.Lock.Unlock() + + if r.step != nil && len(r.step.Output) > r.index { + r.step.Output[r.index] = output + } r.step.PersistFunc() } diff --git a/worker.go b/worker.go index d8b422b..24cc98d 100644 --- a/worker.go +++ b/worker.go @@ -67,7 +67,6 @@ func (w *LocalWorker) Start() { w.processJob() waitChannel.Reset(time.Duration(1) * time.Second) default: - w.processJob() time.Sleep(time.Duration(1) * time.Second) } } @@ -128,13 +127,13 @@ func (w *LocalWorker) processJob() { if processor == nil { logger.Warn("processor is nil, aborting...") job.State = JobStateFailed - if err := w.db.UpdateJob(job); err != nil { + if err := w.db.UpdateJobState(job); err != nil { logger.Warn("failed to update job state", zap.Error(err)) } return } - err = processor.Process(ctx, job, w.Database().UpdateJob) + err = processor.Process(ctx, job, w.Database().UpdateJobPayloadAndStep) event.AddPair("jobType", job.Type) event.AddPair("jobId", job.Id) @@ -145,7 +144,7 @@ func (w *LocalWorker) processJob() { logger.Error("failed to process job", zap.Error(err)) job.State = JobStateFailed - if err := w.db.UpdateJob(job); err != nil { + if err := w.db.UpdateJobState(job); err != nil { logger.Error("failed to update job state", zap.String("state", JobStateFailed), zap.Error(err)) @@ -155,7 +154,7 @@ func (w *LocalWorker) processJob() { // update DB job.State = JobStateSuccess - if err := w.db.UpdateJob(job); err != nil { + if err := w.db.UpdateJobState(job); err != nil { logger.Error("failed to update job state", zap.String("state", JobStateSuccess)) } }