Skip to content

Commit

Permalink
perf(coordinator): use optimistic lock during batch/chunk assignment (#…
Browse files Browse the repository at this point in the history
…958)

Co-authored-by: georgehao <[email protected]>
Co-authored-by: colin <[email protected]>
  • Loading branch information
3 people authored Sep 21, 2023
1 parent 1f2fe74 commit 20c5e98
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 99 deletions.
2 changes: 1 addition & 1 deletion common/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"runtime/debug"
)

var tag = "v4.3.12"
var tag = "v4.3.13"

var commit = func() string {
if info, ok := debug.ReadBuildInfo(); ok {
Expand Down
29 changes: 25 additions & 4 deletions coordinator/internal/logic/provertask/batch_prover_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,34 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato

maxActiveAttempts := bp.cfg.ProverManager.ProversPerSession
maxTotalAttempts := bp.cfg.ProverManager.SessionAttempts
batchTask, err := bp.batchOrm.UpdateBatchAttemptsReturning(ctx, maxActiveAttempts, maxTotalAttempts)
if err != nil {
log.Error("failed to get unassigned batch proving tasks", "err", err)
return nil, ErrCoordinatorInternalFailure
var batchTask *orm.Batch
for i := 0; i < 100; i++ {
unassignedBatch, getUnassignedErr := bp.batchOrm.GetUnassignedBatch(ctx, maxActiveAttempts, maxTotalAttempts)
if getUnassignedErr != nil {
log.Error("failed to get unassigned batch proving tasks", "height", getTaskParameter.ProverHeight, "err", getUnassignedErr)
return nil, ErrCoordinatorInternalFailure
}
if unassignedBatch == nil {
log.Debug("get empty unassigned batch", "height", getTaskParameter.ProverHeight)
return nil, nil
}

rowsAffected, updateAttemptsErr := bp.batchOrm.UpdateBatchAttempts(ctx, unassignedBatch.Index, unassignedBatch.ActiveAttempts, unassignedBatch.TotalAttempts)
if updateAttemptsErr != nil {
log.Error("failed to update batch attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr)
return nil, ErrCoordinatorInternalFailure
}

if rowsAffected == 0 {
continue
}

batchTask = unassignedBatch
break
}

if batchTask == nil {
log.Debug("get empty unassigned batch after retry 100 times", "height", getTaskParameter.ProverHeight)
return nil, nil
}

Expand Down
30 changes: 26 additions & 4 deletions coordinator/internal/logic/provertask/chunk_prover_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,35 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato

maxActiveAttempts := cp.cfg.ProverManager.ProversPerSession
maxTotalAttempts := cp.cfg.ProverManager.SessionAttempts
chunkTask, err := cp.chunkOrm.UpdateChunkAttemptsReturning(ctx, getTaskParameter.ProverHeight, maxActiveAttempts, maxTotalAttempts)
if err != nil {
log.Error("failed to get unassigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", err)
return nil, ErrCoordinatorInternalFailure
var chunkTask *orm.Chunk
for i := 0; i < 100; i++ {
unassignedChunk, getUnsignedChunkErr := cp.chunkOrm.GetUnassignedChunk(ctx, getTaskParameter.ProverHeight, maxActiveAttempts, maxTotalAttempts)
if getUnsignedChunkErr != nil {
log.Error("failed to get unassigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getUnsignedChunkErr)
return nil, ErrCoordinatorInternalFailure
}

if unassignedChunk == nil {
log.Debug("get empty unassigned chunk", "height", getTaskParameter.ProverHeight)
return nil, nil
}

rowsAffected, updateAttemptsErr := cp.chunkOrm.UpdateChunkAttempts(ctx, unassignedChunk.Index, unassignedChunk.ActiveAttempts, unassignedChunk.TotalAttempts)
if updateAttemptsErr != nil {
log.Error("failed to update chunk attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr)
return nil, ErrCoordinatorInternalFailure
}

if rowsAffected == 0 {
continue
}

chunkTask = unassignedChunk
break
}

if chunkTask == nil {
log.Debug("get empty unassigned chunk after retry 100 times", "height", getTaskParameter.ProverHeight)
return nil, nil
}

Expand Down
60 changes: 23 additions & 37 deletions coordinator/internal/orm/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/log"
"gorm.io/gorm"
"gorm.io/gorm/clause"

"scroll-tech/common/types"
"scroll-tech/common/types/message"
Expand Down Expand Up @@ -72,26 +71,26 @@ func (*Batch) TableName() string {
return "batch"
}

// GetUnassignedBatches retrieves unassigned batches based on the specified limit.
// The returned batches are sorted in ascending order by their index.
func (o *Batch) GetUnassignedBatches(ctx context.Context, limit int) ([]*Batch, error) {
if limit < 0 {
return nil, errors.New("limit must not be smaller than zero")
}
if limit == 0 {
return nil, nil
}

// GetUnassignedBatch retrieves unassigned batch based on the specified limit.
// The returned batch are sorted in ascending order by their index.
func (o *Batch) GetUnassignedBatch(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Batch, error) {
db := o.db.WithContext(ctx)
db = db.Where("proving_status = ? AND chunk_proofs_status = ?", types.ProvingTaskUnassigned, types.ChunkProofsStatusReady)
db = db.Where("proving_status not in (?)", []int{int(types.ProvingTaskVerified), int(types.ProvingTaskFailed)})
db = db.Where("total_attempts < ?", maxTotalAttempts)
db = db.Where("active_attempts < ?", maxActiveAttempts)
db = db.Where("chunk_proofs_status = ?", int(types.ChunkProofsStatusReady))
db = db.Order("index ASC")
db = db.Limit(limit)

var batches []*Batch
if err := db.Find(&batches).Error; err != nil {
var batch Batch
err := db.First(&batch).Error
if err != nil && errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil
}

if err != nil {
return nil, fmt.Errorf("Batch.GetUnassignedBatches error: %w", err)
}
return batches, nil
return &batch, nil
}

// GetUnassignedAndChunksUnreadyBatches get the batches which is unassigned and chunks is not ready
Expand Down Expand Up @@ -303,36 +302,23 @@ func (o *Batch) UpdateProofAndProvingStatusByHash(ctx context.Context, hash stri
return nil
}

// UpdateBatchAttemptsReturning atomically increments the attempts count for the earliest available batch that meets the conditions.
func (o *Batch) UpdateBatchAttemptsReturning(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Batch, error) {
// UpdateBatchAttempts atomically increments the attempts count for the earliest available batch that meets the conditions.
func (o *Batch) UpdateBatchAttempts(ctx context.Context, index uint64, curActiveAttempts, curTotalAttempts int16) (int64, error) {
db := o.db.WithContext(ctx)

subQueryDB := db.Model(&Batch{}).Select("index")
subQueryDB = subQueryDB.Clauses(clause.Locking{Strength: "UPDATE"})
subQueryDB = subQueryDB.Where("proving_status not in (?)", []int{int(types.ProvingTaskVerified), int(types.ProvingTaskFailed)})
subQueryDB = subQueryDB.Where("total_attempts < ?", maxTotalAttempts)
subQueryDB = subQueryDB.Where("active_attempts < ?", maxActiveAttempts)
subQueryDB = subQueryDB.Where("chunk_proofs_status = ?", int(types.ChunkProofsStatusReady))
subQueryDB = subQueryDB.Order("index ASC")
subQueryDB = subQueryDB.Limit(1)

var updatedBatch Batch
db = db.Model(&updatedBatch).Clauses(clause.Returning{})
db = db.Where("index = (?)", subQueryDB)
db = db.Model(&Batch{})
db = db.Where("index = ?", index)
db = db.Where("active_attempts = ?", curActiveAttempts)
db = db.Where("total_attempts = ?", curTotalAttempts)
result := db.Updates(map[string]interface{}{
"proving_status": types.ProvingTaskAssigned,
"total_attempts": gorm.Expr("total_attempts + 1"),
"active_attempts": gorm.Expr("active_attempts + 1"),
})

if result.Error != nil {
return nil, fmt.Errorf("failed to select and update batch, max active attempts: %v, max total attempts: %v, err: %w",
maxActiveAttempts, maxTotalAttempts, result.Error)
}
if result.RowsAffected == 0 {
return nil, nil
return 0, fmt.Errorf("failed to update batch, err:%w", result.Error)
}
return &updatedBatch, nil
return result.RowsAffected, nil
}

// DecreaseActiveAttemptsByHash decrements the active_attempts of a batch given its hash.
Expand Down
75 changes: 22 additions & 53 deletions coordinator/internal/orm/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/scroll-tech/go-ethereum/log"
"gorm.io/gorm"
"gorm.io/gorm/clause"

"scroll-tech/common/types"
"scroll-tech/common/types/message"
Expand Down Expand Up @@ -67,27 +66,27 @@ func (*Chunk) TableName() string {
return "chunk"
}

// GetUnassignedChunks retrieves unassigned chunks based on the specified limit.
// GetUnassignedChunk retrieves unassigned chunk based on the specified limit.
// The returned chunks are sorted in ascending order by their index.
func (o *Chunk) GetUnassignedChunks(ctx context.Context, limit int) ([]*Chunk, error) {
if limit < 0 {
return nil, errors.New("limit must not be smaller than zero")
}
if limit == 0 {
return nil, nil
}

func (o *Chunk) GetUnassignedChunk(ctx context.Context, height int, maxActiveAttempts, maxTotalAttempts uint8) (*Chunk, error) {
db := o.db.WithContext(ctx)
db = db.Model(&Chunk{})
db = db.Where("proving_status = ?", types.ProvingTaskUnassigned)
db = db.Where("proving_status not in (?)", []int{int(types.ProvingTaskVerified), int(types.ProvingTaskFailed)})
db = db.Where("total_attempts < ?", maxTotalAttempts)
db = db.Where("active_attempts < ?", maxActiveAttempts)
db = db.Where("end_block_number <= ?", height)
db = db.Order("index ASC")
db = db.Limit(limit)

var chunks []*Chunk
if err := db.Find(&chunks).Error; err != nil {
var chunk Chunk
err := db.First(&chunk).Error
if err != nil && errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil
}

if err != nil {
return nil, fmt.Errorf("Chunk.GetUnassignedChunks error: %w", err)
}
return chunks, nil
return &chunk, nil
}

// GetChunksByBatchHash retrieves the chunks associated with a specific batch hash.
Expand Down Expand Up @@ -158,19 +157,6 @@ func (o *Chunk) GetProvingStatusByHash(ctx context.Context, hash string) (types.
return types.ProvingStatus(chunk.ProvingStatus), nil
}

// GetAssignedChunks retrieves all chunks whose proving_status is either types.ProvingTaskAssigned.
func (o *Chunk) GetAssignedChunks(ctx context.Context) ([]*Chunk, error) {
db := o.db.WithContext(ctx)
db = db.Model(&Chunk{})
db = db.Where("proving_status = ?", int(types.ProvingTaskAssigned))

var chunks []*Chunk
if err := db.Find(&chunks).Error; err != nil {
return nil, fmt.Errorf("Chunk.GetAssignedChunks error: %w", err)
}
return chunks, nil
}

// CheckIfBatchChunkProofsAreReady checks if all proofs for all chunks of a given batchHash are collected.
func (o *Chunk) CheckIfBatchChunkProofsAreReady(ctx context.Context, batchHash string) (bool, error) {
db := o.db.WithContext(ctx)
Expand Down Expand Up @@ -350,40 +336,23 @@ func (o *Chunk) UpdateBatchHashInRange(ctx context.Context, startIndex uint64, e
return nil
}

// UpdateChunkAttemptsReturning atomically increments the attempts count for the earliest available chunk that meets the conditions.
func (o *Chunk) UpdateChunkAttemptsReturning(ctx context.Context, height int, maxActiveAttempts, maxTotalAttempts uint8) (*Chunk, error) {
if height <= 0 {
return nil, errors.New("Chunk.UpdateChunkAttemptsReturning error: height must be larger than zero")
}

// UpdateChunkAttempts atomically increments the attempts count for the earliest available chunk that meets the conditions.
func (o *Chunk) UpdateChunkAttempts(ctx context.Context, index uint64, curActiveAttempts, curTotalAttempts int16) (int64, error) {
db := o.db.WithContext(ctx)

subQueryDB := db.Model(&Chunk{}).Select("index")
subQueryDB = subQueryDB.Clauses(clause.Locking{Strength: "UPDATE"})
subQueryDB = subQueryDB.Where("proving_status not in (?)", []int{int(types.ProvingTaskVerified), int(types.ProvingTaskFailed)})
subQueryDB = subQueryDB.Where("total_attempts < ?", maxTotalAttempts)
subQueryDB = subQueryDB.Where("active_attempts < ?", maxActiveAttempts)
subQueryDB = subQueryDB.Where("end_block_number <= ?", height)
subQueryDB = subQueryDB.Order("index ASC")
subQueryDB = subQueryDB.Limit(1)

var updatedChunk Chunk
db = db.Model(&updatedChunk).Clauses(clause.Returning{})
db = db.Where("index = (?)", subQueryDB)
db = db.Model(&Chunk{})
db = db.Where("index = ?", index)
db = db.Where("active_attempts = ?", curActiveAttempts)
db = db.Where("total_attempts = ?", curTotalAttempts)
result := db.Updates(map[string]interface{}{
"proving_status": types.ProvingTaskAssigned,
"total_attempts": gorm.Expr("total_attempts + 1"),
"active_attempts": gorm.Expr("active_attempts + 1"),
})

if result.Error != nil {
return nil, fmt.Errorf("failed to select and update batch, max active attempts: %v, max total attempts: %v, err: %w",
maxActiveAttempts, maxTotalAttempts, result.Error)
}
if result.RowsAffected == 0 {
return nil, nil
return 0, fmt.Errorf("failed to update chunk, err:%w", result.Error)
}
return &updatedChunk, nil
return result.RowsAffected, nil
}

// DecreaseActiveAttemptsByHash decrements the active_attempts of a chunk given its hash.
Expand Down

0 comments on commit 20c5e98

Please sign in to comment.