Skip to content

Commit

Permalink
Refactor entry and dB
Browse files Browse the repository at this point in the history
  • Loading branch information
dongxuny committed Oct 26, 2023
1 parent c516406 commit 5f3839f
Show file tree
Hide file tree
Showing 6 changed files with 272 additions and 52 deletions.
4 changes: 3 additions & 1 deletion database.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ type Database interface {

PickJobToWork() (*Job, error)

UpdateJob(job *Job) error
UpdateJobState(job *Job) error

UpdateJobPayloadAndStep(job *Job) error

ListJobs(filter *JobFilter) ([]*Job, error)

Expand Down
113 changes: 102 additions & 11 deletions database/mysql/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/rookie-ninja/rk-async"
rkmysql "github.com/rookie-ninja/rk-db/mysql"
"github.com/rs/xid"
"go.uber.org/zap"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"sync"
Expand All @@ -15,7 +16,7 @@ func init() {
rkasync.RegisterDatabaseRegFunc("MySQL", RegisterDatabase)
}

func RegisterDatabase(m map[string]string) rkasync.Database {
func RegisterDatabase(m map[string]string, logger *zap.Logger) rkasync.Database {
entry := rkmysql.GetMySqlEntry(m["entryName"])
if entry == nil {
return nil
Expand All @@ -30,15 +31,21 @@ func RegisterDatabase(m map[string]string) rkasync.Database {
db.AutoMigrate(&rkasync.Job{})
}

if logger == nil {
logger = zap.NewNop()
}

return &Database{
db: db,
logger: logger,
processorM: map[string]rkasync.Processor{},
lock: &sync.Mutex{},
}
}

type Database struct {
db *gorm.DB
logger *zap.Logger
lock sync.Locker
processorM map[string]rkasync.Processor
}
Expand Down Expand Up @@ -76,7 +83,7 @@ func (e *Database) PickJobToWork() (*rkasync.Job, error) {
err := e.db.Transaction(func(tx *gorm.DB) error {
res := &rkasync.Job{}
// get job with state created
resDB := tx.Where("state = ?", rkasync.JobStateCreated).Limit(1).Find(res)
resDB := tx.Where("state = ?", rkasync.JobStateCreated).Order("created_at ASC").Limit(1).Find(res)

if resDB.Error != nil {
return resDB.Error
Expand Down Expand Up @@ -105,18 +112,102 @@ func (e *Database) PickJobToWork() (*rkasync.Job, error) {
return job, err
}

func (e *Database) UpdateJob(job *rkasync.Job) error {
err := e.db.Transaction(func(tx *gorm.DB) error {
resDB := tx.Updates(job)
if resDB.Error != nil {
return resDB.Error
func (e *Database) UpdateJobPayloadAndStep(job *rkasync.Job) error {
var err error
// retry
for retry := 0; retry < 3; retry++ {
err = e.db.Transaction(func(tx *gorm.DB) error {
if job == nil {
return nil
}

jobDB := &rkasync.Job{}
resDB := tx.Where("id = ?", job.Id).Find(jobDB)
if resDB.Error != nil {
return resDB.Error
}
if resDB.RowsAffected < 1 {
return fmt.Errorf("job not found, id:%s", job.Id)
}

if jobDB.State == "canceled" || jobDB.State == "success" || jobDB.State == "failed" {
e.logger.Warn(fmt.Sprintf("job at final state:%s, skip updating payloads and steps", jobDB.State))
return nil
}

resDB = tx.Model(&rkasync.Job{}).Where("id = ?", job.Id).UpdateColumns(
rkasync.Job{
UpdatedAt: time.Now(),
Payload: job.Payload,
Steps: job.Steps,
})

if resDB.Error != nil {
return resDB.Error
}
if resDB.RowsAffected < 1 {
return fmt.Errorf("failed to update job payloads and steps, no rows updated, id:%s", job.Id)
}

return nil
})

if err == nil {
return nil
}
if resDB.RowsAffected < 1 {
return fmt.Errorf("failed to update job state, no rows updated, id:%s, state:%s", job.Id, job.State)
}

e.logger.Warn("failed to update job payloads and steps", zap.Error(err))

return err
}

func (e *Database) UpdateJobState(job *rkasync.Job) error {
var err error
// retry
for retry := 0; retry < 3; retry++ {
err = e.db.Transaction(func(tx *gorm.DB) error {
if job == nil {
return nil
}

jobDB := &rkasync.Job{}
resDB := tx.Where("id = ?", job.Id).Find(jobDB)
if resDB.Error != nil {
return resDB.Error
}
if resDB.RowsAffected < 1 {
return fmt.Errorf("job not found, id:%s", job.Id)
}

if jobDB.State == "canceled" || jobDB.State == "success" || jobDB.State == "failed" {
e.logger.Warn(fmt.Sprintf("job at final state:%s, skip updating state %s", jobDB.State, job.State))
return nil
}

resDB = tx.Model(&rkasync.Job{}).Where("id = ?", job.Id).UpdateColumns(
rkasync.Job{
UpdatedAt: time.Now(),
State: job.State,
})

if resDB.Error != nil {
return resDB.Error
}
if resDB.RowsAffected < 1 {
return fmt.Errorf("failed to update job state, no rows updated, id:%s, state:%s", job.Id, job.State)
}

return nil
})

if err == nil {
return nil
}
}

e.logger.Warn("failed to update job state", zap.Error(err))

return nil
})
return err
}

Expand Down
113 changes: 102 additions & 11 deletions database/postgres/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/rookie-ninja/rk-async"
"github.com/rookie-ninja/rk-db/postgres"
"github.com/rs/xid"
"go.uber.org/zap"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"sync"
Expand All @@ -15,7 +16,7 @@ func init() {
rkasync.RegisterDatabaseRegFunc("PostgreSQL", RegisterDatabase)
}

func RegisterDatabase(m map[string]string) rkasync.Database {
func RegisterDatabase(m map[string]string, logger *zap.Logger) rkasync.Database {
entry := rkpostgres.GetPostgresEntry(m["entryName"])
if entry == nil {
return nil
Expand All @@ -30,15 +31,21 @@ func RegisterDatabase(m map[string]string) rkasync.Database {
db.AutoMigrate(&rkasync.Job{})
}

if logger == nil {
logger = zap.NewNop()
}

return &Database{
db: db,
logger: logger,
lock: &sync.Mutex{},
processorM: map[string]rkasync.Processor{},
}
}

type Database struct {
db *gorm.DB
logger *zap.Logger
lock sync.Locker
processorM map[string]rkasync.Processor
}
Expand Down Expand Up @@ -76,7 +83,7 @@ func (e *Database) PickJobToWork() (*rkasync.Job, error) {
err := e.db.Transaction(func(tx *gorm.DB) error {
res := &rkasync.Job{}
// get job with state created
resDB := tx.Where("state = ?", rkasync.JobStateCreated).Limit(1).Find(res)
resDB := tx.Where("state = ?", rkasync.JobStateCreated).Order("created_at ASC").Limit(1).Find(res)

if resDB.Error != nil {
return resDB.Error
Expand Down Expand Up @@ -105,18 +112,102 @@ func (e *Database) PickJobToWork() (*rkasync.Job, error) {
return job, err
}

func (e *Database) UpdateJob(job *rkasync.Job) error {
err := e.db.Transaction(func(tx *gorm.DB) error {
resDB := tx.Updates(job)
if resDB.Error != nil {
return resDB.Error
func (e *Database) UpdateJobPayloadAndStep(job *rkasync.Job) error {
var err error
// retry
for retry := 0; retry < 3; retry++ {
err = e.db.Transaction(func(tx *gorm.DB) error {
if job == nil {
return nil
}

jobDB := &rkasync.Job{}
resDB := tx.Where("id = ?", job.Id).Find(jobDB)
if resDB.Error != nil {
return resDB.Error
}
if resDB.RowsAffected < 1 {
return fmt.Errorf("job not found, id:%s", job.Id)
}

if jobDB.State == "canceled" || jobDB.State == "success" || jobDB.State == "failed" {
e.logger.Warn(fmt.Sprintf("job at final state:%s, skip updating payloads and steps", jobDB.State))
return nil
}

resDB = tx.Model(&rkasync.Job{}).Where("id = ?", job.Id).UpdateColumns(
rkasync.Job{
UpdatedAt: time.Now(),
Payload: job.Payload,
Steps: job.Steps,
})

if resDB.Error != nil {
return resDB.Error
}
if resDB.RowsAffected < 1 {
return fmt.Errorf("failed to update job payloads and steps, no rows updated, id:%s", job.Id)
}

return nil
})

if err == nil {
return nil
}
if resDB.RowsAffected < 1 {
return fmt.Errorf("failed to update job state, no rows updated, id:%s, state:%s", job.Id, job.State)
}

e.logger.Warn("failed to update job payloads and steps", zap.Error(err))

return err
}

func (e *Database) UpdateJobState(job *rkasync.Job) error {
var err error
// retry
for retry := 0; retry < 3; retry++ {
err = e.db.Transaction(func(tx *gorm.DB) error {
if job == nil {
return nil
}

jobDB := &rkasync.Job{}
resDB := tx.Where("id = ?", job.Id).Find(jobDB)
if resDB.Error != nil {
return resDB.Error
}
if resDB.RowsAffected < 1 {
return fmt.Errorf("job not found, id:%s", job.Id)
}

if jobDB.State == "canceled" || jobDB.State == "success" || jobDB.State == "failed" {
e.logger.Warn(fmt.Sprintf("job at final state:%s, skip updating state %s", jobDB.State, job.State))
return nil
}

resDB = tx.Model(&rkasync.Job{}).Where("id = ?", job.Id).UpdateColumns(
rkasync.Job{
UpdatedAt: time.Now(),
State: job.State,
})

if resDB.Error != nil {
return resDB.Error
}
if resDB.RowsAffected < 1 {
return fmt.Errorf("failed to update job state, no rows updated, id:%s, state:%s", job.Id, job.State)
}

return nil
})

if err == nil {
return nil
}
}

e.logger.Warn("failed to update job state", zap.Error(err))

return nil
})
return err
}

Expand Down
Loading

0 comments on commit 5f3839f

Please sign in to comment.