From 20c5e9855b0e8f09cf5a9786c5eefdac703ab3dd Mon Sep 17 00:00:00 2001 From: georgehao Date: Thu, 21 Sep 2023 17:03:42 +0800 Subject: [PATCH] perf(coordinator): use optimistic lock during batch/chunk assignment (#958) Co-authored-by: georgehao Co-authored-by: colin <102356659+colinlyguo@users.noreply.github.com> --- common/version/version.go | 2 +- .../logic/provertask/batch_prover_task.go | 29 ++++++- .../logic/provertask/chunk_prover_task.go | 30 +++++++- coordinator/internal/orm/batch.go | 60 ++++++--------- coordinator/internal/orm/chunk.go | 75 ++++++------------- 5 files changed, 97 insertions(+), 99 deletions(-) diff --git a/common/version/version.go b/common/version/version.go index ae848a7c14..fc53e48352 100644 --- a/common/version/version.go +++ b/common/version/version.go @@ -5,7 +5,7 @@ import ( "runtime/debug" ) -var tag = "v4.3.12" +var tag = "v4.3.13" var commit = func() string { if info, ok := debug.ReadBuildInfo(); ok { diff --git a/coordinator/internal/logic/provertask/batch_prover_task.go b/coordinator/internal/logic/provertask/batch_prover_task.go index a2843efd3d..335d33c75c 100644 --- a/coordinator/internal/logic/provertask/batch_prover_task.go +++ b/coordinator/internal/logic/provertask/batch_prover_task.go @@ -61,13 +61,34 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato maxActiveAttempts := bp.cfg.ProverManager.ProversPerSession maxTotalAttempts := bp.cfg.ProverManager.SessionAttempts - batchTask, err := bp.batchOrm.UpdateBatchAttemptsReturning(ctx, maxActiveAttempts, maxTotalAttempts) - if err != nil { - log.Error("failed to get unassigned batch proving tasks", "err", err) - return nil, ErrCoordinatorInternalFailure + var batchTask *orm.Batch + for i := 0; i < 100; i++ { + unassignedBatch, getUnassignedErr := bp.batchOrm.GetUnassignedBatch(ctx, maxActiveAttempts, maxTotalAttempts) + if getUnassignedErr != nil { + log.Error("failed to get unassigned batch proving tasks", "height", getTaskParameter.ProverHeight, "err", getUnassignedErr) + return nil, ErrCoordinatorInternalFailure + } + if unassignedBatch == nil { + log.Debug("get empty unassigned batch", "height", getTaskParameter.ProverHeight) + return nil, nil + } + + rowsAffected, updateAttemptsErr := bp.batchOrm.UpdateBatchAttempts(ctx, unassignedBatch.Index, unassignedBatch.ActiveAttempts, unassignedBatch.TotalAttempts) + if updateAttemptsErr != nil { + log.Error("failed to update batch attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr) + return nil, ErrCoordinatorInternalFailure + } + + if rowsAffected == 0 { + continue + } + + batchTask = unassignedBatch + break } if batchTask == nil { + log.Debug("get empty unassigned batch after retry 100 times", "height", getTaskParameter.ProverHeight) return nil, nil } diff --git a/coordinator/internal/logic/provertask/chunk_prover_task.go b/coordinator/internal/logic/provertask/chunk_prover_task.go index a6f7f43b50..fba5dbc9bb 100644 --- a/coordinator/internal/logic/provertask/chunk_prover_task.go +++ b/coordinator/internal/logic/provertask/chunk_prover_task.go @@ -64,13 +64,35 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato maxActiveAttempts := cp.cfg.ProverManager.ProversPerSession maxTotalAttempts := cp.cfg.ProverManager.SessionAttempts - chunkTask, err := cp.chunkOrm.UpdateChunkAttemptsReturning(ctx, getTaskParameter.ProverHeight, maxActiveAttempts, maxTotalAttempts) - if err != nil { - log.Error("failed to get unassigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", err) - return nil, ErrCoordinatorInternalFailure + var chunkTask *orm.Chunk + for i := 0; i < 100; i++ { + unassignedChunk, getUnsignedChunkErr := cp.chunkOrm.GetUnassignedChunk(ctx, getTaskParameter.ProverHeight, maxActiveAttempts, maxTotalAttempts) + if getUnsignedChunkErr != nil { + log.Error("failed to get unassigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getUnsignedChunkErr) + return nil, ErrCoordinatorInternalFailure + } + + if unassignedChunk == nil { + log.Debug("get empty unassigned chunk", "height", getTaskParameter.ProverHeight) + return nil, nil + } + + rowsAffected, updateAttemptsErr := cp.chunkOrm.UpdateChunkAttempts(ctx, unassignedChunk.Index, unassignedChunk.ActiveAttempts, unassignedChunk.TotalAttempts) + if updateAttemptsErr != nil { + log.Error("failed to update chunk attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr) + return nil, ErrCoordinatorInternalFailure + } + + if rowsAffected == 0 { + continue + } + + chunkTask = unassignedChunk + break } if chunkTask == nil { + log.Debug("get empty unassigned chunk after retry 100 times", "height", getTaskParameter.ProverHeight) return nil, nil } diff --git a/coordinator/internal/orm/batch.go b/coordinator/internal/orm/batch.go index ecbcf3330d..de71cb853a 100644 --- a/coordinator/internal/orm/batch.go +++ b/coordinator/internal/orm/batch.go @@ -10,7 +10,6 @@ import ( "github.com/scroll-tech/go-ethereum/common" "github.com/scroll-tech/go-ethereum/log" "gorm.io/gorm" - "gorm.io/gorm/clause" "scroll-tech/common/types" "scroll-tech/common/types/message" @@ -72,26 +71,26 @@ func (*Batch) TableName() string { return "batch" } -// GetUnassignedBatches retrieves unassigned batches based on the specified limit. -// The returned batches are sorted in ascending order by their index. -func (o *Batch) GetUnassignedBatches(ctx context.Context, limit int) ([]*Batch, error) { - if limit < 0 { - return nil, errors.New("limit must not be smaller than zero") - } - if limit == 0 { - return nil, nil - } - +// GetUnassignedBatch retrieves unassigned batch based on the specified limit. +// The returned batch are sorted in ascending order by their index. +func (o *Batch) GetUnassignedBatch(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Batch, error) { db := o.db.WithContext(ctx) - db = db.Where("proving_status = ? AND chunk_proofs_status = ?", types.ProvingTaskUnassigned, types.ChunkProofsStatusReady) + db = db.Where("proving_status not in (?)", []int{int(types.ProvingTaskVerified), int(types.ProvingTaskFailed)}) + db = db.Where("total_attempts < ?", maxTotalAttempts) + db = db.Where("active_attempts < ?", maxActiveAttempts) + db = db.Where("chunk_proofs_status = ?", int(types.ChunkProofsStatusReady)) db = db.Order("index ASC") - db = db.Limit(limit) - var batches []*Batch - if err := db.Find(&batches).Error; err != nil { + var batch Batch + err := db.First(&batch).Error + if err != nil && errors.Is(err, gorm.ErrRecordNotFound) { + return nil, nil + } + + if err != nil { return nil, fmt.Errorf("Batch.GetUnassignedBatches error: %w", err) } - return batches, nil + return &batch, nil } // GetUnassignedAndChunksUnreadyBatches get the batches which is unassigned and chunks is not ready @@ -303,22 +302,13 @@ func (o *Batch) UpdateProofAndProvingStatusByHash(ctx context.Context, hash stri return nil } -// UpdateBatchAttemptsReturning atomically increments the attempts count for the earliest available batch that meets the conditions. -func (o *Batch) UpdateBatchAttemptsReturning(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Batch, error) { +// UpdateBatchAttempts atomically increments the attempts count for the earliest available batch that meets the conditions. +func (o *Batch) UpdateBatchAttempts(ctx context.Context, index uint64, curActiveAttempts, curTotalAttempts int16) (int64, error) { db := o.db.WithContext(ctx) - - subQueryDB := db.Model(&Batch{}).Select("index") - subQueryDB = subQueryDB.Clauses(clause.Locking{Strength: "UPDATE"}) - subQueryDB = subQueryDB.Where("proving_status not in (?)", []int{int(types.ProvingTaskVerified), int(types.ProvingTaskFailed)}) - subQueryDB = subQueryDB.Where("total_attempts < ?", maxTotalAttempts) - subQueryDB = subQueryDB.Where("active_attempts < ?", maxActiveAttempts) - subQueryDB = subQueryDB.Where("chunk_proofs_status = ?", int(types.ChunkProofsStatusReady)) - subQueryDB = subQueryDB.Order("index ASC") - subQueryDB = subQueryDB.Limit(1) - - var updatedBatch Batch - db = db.Model(&updatedBatch).Clauses(clause.Returning{}) - db = db.Where("index = (?)", subQueryDB) + db = db.Model(&Batch{}) + db = db.Where("index = ?", index) + db = db.Where("active_attempts = ?", curActiveAttempts) + db = db.Where("total_attempts = ?", curTotalAttempts) result := db.Updates(map[string]interface{}{ "proving_status": types.ProvingTaskAssigned, "total_attempts": gorm.Expr("total_attempts + 1"), @@ -326,13 +316,9 @@ func (o *Batch) UpdateBatchAttemptsReturning(ctx context.Context, maxActiveAttem }) if result.Error != nil { - return nil, fmt.Errorf("failed to select and update batch, max active attempts: %v, max total attempts: %v, err: %w", - maxActiveAttempts, maxTotalAttempts, result.Error) - } - if result.RowsAffected == 0 { - return nil, nil + return 0, fmt.Errorf("failed to update batch, err:%w", result.Error) } - return &updatedBatch, nil + return result.RowsAffected, nil } // DecreaseActiveAttemptsByHash decrements the active_attempts of a batch given its hash. diff --git a/coordinator/internal/orm/chunk.go b/coordinator/internal/orm/chunk.go index 8193b8f8b0..2fe914babd 100644 --- a/coordinator/internal/orm/chunk.go +++ b/coordinator/internal/orm/chunk.go @@ -9,7 +9,6 @@ import ( "github.com/scroll-tech/go-ethereum/log" "gorm.io/gorm" - "gorm.io/gorm/clause" "scroll-tech/common/types" "scroll-tech/common/types/message" @@ -67,27 +66,27 @@ func (*Chunk) TableName() string { return "chunk" } -// GetUnassignedChunks retrieves unassigned chunks based on the specified limit. +// GetUnassignedChunk retrieves unassigned chunk based on the specified limit. // The returned chunks are sorted in ascending order by their index. -func (o *Chunk) GetUnassignedChunks(ctx context.Context, limit int) ([]*Chunk, error) { - if limit < 0 { - return nil, errors.New("limit must not be smaller than zero") - } - if limit == 0 { - return nil, nil - } - +func (o *Chunk) GetUnassignedChunk(ctx context.Context, height int, maxActiveAttempts, maxTotalAttempts uint8) (*Chunk, error) { db := o.db.WithContext(ctx) db = db.Model(&Chunk{}) - db = db.Where("proving_status = ?", types.ProvingTaskUnassigned) + db = db.Where("proving_status not in (?)", []int{int(types.ProvingTaskVerified), int(types.ProvingTaskFailed)}) + db = db.Where("total_attempts < ?", maxTotalAttempts) + db = db.Where("active_attempts < ?", maxActiveAttempts) + db = db.Where("end_block_number <= ?", height) db = db.Order("index ASC") - db = db.Limit(limit) - var chunks []*Chunk - if err := db.Find(&chunks).Error; err != nil { + var chunk Chunk + err := db.First(&chunk).Error + if err != nil && errors.Is(err, gorm.ErrRecordNotFound) { + return nil, nil + } + + if err != nil { return nil, fmt.Errorf("Chunk.GetUnassignedChunks error: %w", err) } - return chunks, nil + return &chunk, nil } // GetChunksByBatchHash retrieves the chunks associated with a specific batch hash. @@ -158,19 +157,6 @@ func (o *Chunk) GetProvingStatusByHash(ctx context.Context, hash string) (types. return types.ProvingStatus(chunk.ProvingStatus), nil } -// GetAssignedChunks retrieves all chunks whose proving_status is either types.ProvingTaskAssigned. -func (o *Chunk) GetAssignedChunks(ctx context.Context) ([]*Chunk, error) { - db := o.db.WithContext(ctx) - db = db.Model(&Chunk{}) - db = db.Where("proving_status = ?", int(types.ProvingTaskAssigned)) - - var chunks []*Chunk - if err := db.Find(&chunks).Error; err != nil { - return nil, fmt.Errorf("Chunk.GetAssignedChunks error: %w", err) - } - return chunks, nil -} - // CheckIfBatchChunkProofsAreReady checks if all proofs for all chunks of a given batchHash are collected. func (o *Chunk) CheckIfBatchChunkProofsAreReady(ctx context.Context, batchHash string) (bool, error) { db := o.db.WithContext(ctx) @@ -350,26 +336,13 @@ func (o *Chunk) UpdateBatchHashInRange(ctx context.Context, startIndex uint64, e return nil } -// UpdateChunkAttemptsReturning atomically increments the attempts count for the earliest available chunk that meets the conditions. -func (o *Chunk) UpdateChunkAttemptsReturning(ctx context.Context, height int, maxActiveAttempts, maxTotalAttempts uint8) (*Chunk, error) { - if height <= 0 { - return nil, errors.New("Chunk.UpdateChunkAttemptsReturning error: height must be larger than zero") - } - +// UpdateChunkAttempts atomically increments the attempts count for the earliest available chunk that meets the conditions. +func (o *Chunk) UpdateChunkAttempts(ctx context.Context, index uint64, curActiveAttempts, curTotalAttempts int16) (int64, error) { db := o.db.WithContext(ctx) - - subQueryDB := db.Model(&Chunk{}).Select("index") - subQueryDB = subQueryDB.Clauses(clause.Locking{Strength: "UPDATE"}) - subQueryDB = subQueryDB.Where("proving_status not in (?)", []int{int(types.ProvingTaskVerified), int(types.ProvingTaskFailed)}) - subQueryDB = subQueryDB.Where("total_attempts < ?", maxTotalAttempts) - subQueryDB = subQueryDB.Where("active_attempts < ?", maxActiveAttempts) - subQueryDB = subQueryDB.Where("end_block_number <= ?", height) - subQueryDB = subQueryDB.Order("index ASC") - subQueryDB = subQueryDB.Limit(1) - - var updatedChunk Chunk - db = db.Model(&updatedChunk).Clauses(clause.Returning{}) - db = db.Where("index = (?)", subQueryDB) + db = db.Model(&Chunk{}) + db = db.Where("index = ?", index) + db = db.Where("active_attempts = ?", curActiveAttempts) + db = db.Where("total_attempts = ?", curTotalAttempts) result := db.Updates(map[string]interface{}{ "proving_status": types.ProvingTaskAssigned, "total_attempts": gorm.Expr("total_attempts + 1"), @@ -377,13 +350,9 @@ func (o *Chunk) UpdateChunkAttemptsReturning(ctx context.Context, height int, ma }) if result.Error != nil { - return nil, fmt.Errorf("failed to select and update batch, max active attempts: %v, max total attempts: %v, err: %w", - maxActiveAttempts, maxTotalAttempts, result.Error) - } - if result.RowsAffected == 0 { - return nil, nil + return 0, fmt.Errorf("failed to update chunk, err:%w", result.Error) } - return &updatedChunk, nil + return result.RowsAffected, nil } // DecreaseActiveAttemptsByHash decrements the active_attempts of a chunk given its hash.