Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(coordinator): optimize coordinator get task's index #962

Merged
merged 4 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"runtime/debug"
)

var tag = "v4.3.15"
var tag = "v4.3.16"

var commit = func() string {
if info, ok := debug.ReadBuildInfo(); ok {
Expand Down
29 changes: 21 additions & 8 deletions coordinator/internal/logic/provertask/batch_prover_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,30 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
maxTotalAttempts := bp.cfg.ProverManager.SessionAttempts
var batchTask *orm.Batch
for i := 0; i < 5; i++ {
unassignedBatch, getUnassignedErr := bp.batchOrm.GetUnassignedBatch(ctx, maxActiveAttempts, maxTotalAttempts)
if getUnassignedErr != nil {
log.Error("failed to get unassigned batch proving tasks", "height", getTaskParameter.ProverHeight, "err", getUnassignedErr)
var getTaskError error
var tmpBatchTask *orm.Batch
tmpBatchTask, getTaskError = bp.batchOrm.GetUnassignedBatch(ctx, maxActiveAttempts, maxTotalAttempts)
if getTaskError != nil {
log.Error("failed to get unassigned batch proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
return nil, ErrCoordinatorInternalFailure
}
if unassignedBatch == nil {
log.Debug("get empty unassigned batch", "height", getTaskParameter.ProverHeight)

// Why here need get again? In order to support a task can assign to multiple prover, need also assign `ProvingTaskAssigned`
// batch to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql.
if tmpBatchTask == nil {
tmpBatchTask, getTaskError = bp.batchOrm.GetAssignedBatch(ctx, maxActiveAttempts, maxTotalAttempts)
if getTaskError != nil {
log.Error("failed to get assigned batch proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
return nil, ErrCoordinatorInternalFailure
}
}

if tmpBatchTask == nil {
log.Debug("get empty batch", "height", getTaskParameter.ProverHeight)
return nil, nil
}

rowsAffected, updateAttemptsErr := bp.batchOrm.UpdateBatchAttempts(ctx, unassignedBatch.Index, unassignedBatch.ActiveAttempts, unassignedBatch.TotalAttempts)
rowsAffected, updateAttemptsErr := bp.batchOrm.UpdateBatchAttempts(ctx, tmpBatchTask.Index, tmpBatchTask.ActiveAttempts, tmpBatchTask.TotalAttempts)
if updateAttemptsErr != nil {
log.Error("failed to update batch attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr)
return nil, ErrCoordinatorInternalFailure
Expand All @@ -85,12 +98,12 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
continue
}

batchTask = unassignedBatch
batchTask = tmpBatchTask
break
}

if batchTask == nil {
log.Debug("get empty unassigned batch after retry 100 times", "height", getTaskParameter.ProverHeight)
log.Debug("get empty unassigned batch after retry 5 times", "height", getTaskParameter.ProverHeight)
return nil, nil
}

Expand Down
28 changes: 20 additions & 8 deletions coordinator/internal/logic/provertask/chunk_prover_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,30 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
maxTotalAttempts := cp.cfg.ProverManager.SessionAttempts
var chunkTask *orm.Chunk
for i := 0; i < 5; i++ {
unassignedChunk, getUnsignedChunkErr := cp.chunkOrm.GetUnassignedChunk(ctx, getTaskParameter.ProverHeight, maxActiveAttempts, maxTotalAttempts)
if getUnsignedChunkErr != nil {
log.Error("failed to get unassigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getUnsignedChunkErr)
var getTaskError error
var tmpChunkTask *orm.Chunk
tmpChunkTask, getTaskError = cp.chunkOrm.GetUnassignedChunk(ctx, getTaskParameter.ProverHeight, maxActiveAttempts, maxTotalAttempts)
if getTaskError != nil {
log.Error("failed to get unassigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
return nil, ErrCoordinatorInternalFailure
}

if unassignedChunk == nil {
log.Debug("get empty unassigned chunk", "height", getTaskParameter.ProverHeight)
// Why here need get again? In order to support a task can assign to multiple prover, need also assign `ProvingTaskAssigned`
// chunk to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql.
if tmpChunkTask == nil {
tmpChunkTask, getTaskError = cp.chunkOrm.GetAssignedChunk(ctx, getTaskParameter.ProverHeight, maxActiveAttempts, maxTotalAttempts)
if getTaskError != nil {
log.Error("failed to get assigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
return nil, ErrCoordinatorInternalFailure
}
}

if tmpChunkTask == nil {
log.Debug("get empty chunk", "height", getTaskParameter.ProverHeight)
return nil, nil
}

rowsAffected, updateAttemptsErr := cp.chunkOrm.UpdateChunkAttempts(ctx, unassignedChunk.Index, unassignedChunk.ActiveAttempts, unassignedChunk.TotalAttempts)
rowsAffected, updateAttemptsErr := cp.chunkOrm.UpdateChunkAttempts(ctx, tmpChunkTask.Index, tmpChunkTask.ActiveAttempts, tmpChunkTask.TotalAttempts)
if updateAttemptsErr != nil {
log.Error("failed to update chunk attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr)
return nil, ErrCoordinatorInternalFailure
Expand All @@ -89,12 +101,12 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
continue
}

chunkTask = unassignedChunk
chunkTask = tmpChunkTask
break
}

if chunkTask == nil {
log.Debug("get empty unassigned chunk after retry 100 times", "height", getTaskParameter.ProverHeight)
log.Debug("get empty unassigned chunk after retry 5 times", "height", getTaskParameter.ProverHeight)
return nil, nil
}

Expand Down
23 changes: 22 additions & 1 deletion coordinator/internal/orm/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (*Batch) TableName() string {
// The returned batch are sorted in ascending order by their index.
func (o *Batch) GetUnassignedBatch(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Batch, error) {
db := o.db.WithContext(ctx)
db = db.Where("proving_status in (?)", []int{int(types.ProvingTaskUnassigned), int(types.ProvingTaskAssigned)})
db = db.Where("proving_status = ?", int(types.ProvingTaskUnassigned))
db = db.Where("total_attempts < ?", maxTotalAttempts)
db = db.Where("active_attempts < ?", maxActiveAttempts)
db = db.Where("chunk_proofs_status = ?", int(types.ChunkProofsStatusReady))
Expand All @@ -92,6 +92,27 @@ func (o *Batch) GetUnassignedBatch(ctx context.Context, maxActiveAttempts, maxTo
return &batch, nil
}

// GetAssignedBatch retrieves assigned batch based on the specified limit.
// The returned batch are sorted in ascending order by their index.
func (o *Batch) GetAssignedBatch(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Batch, error) {
db := o.db.WithContext(ctx)
db = db.Where("proving_status = ?", int(types.ProvingTaskAssigned))
db = db.Where("total_attempts < ?", maxTotalAttempts)
db = db.Where("active_attempts < ?", maxActiveAttempts)
db = db.Where("chunk_proofs_status = ?", int(types.ChunkProofsStatusReady))

var batch Batch
err := db.First(&batch).Error
if err != nil && errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil
}

if err != nil {
return nil, fmt.Errorf("Batch.GetAssignedBatches error: %w", err)
}
return &batch, nil
}

// GetUnassignedAndChunksUnreadyBatches get the batches which is unassigned and chunks is not ready
func (o *Batch) GetUnassignedAndChunksUnreadyBatches(ctx context.Context, offset, limit int) ([]*Batch, error) {
if offset < 0 || limit < 0 {
Expand Down
24 changes: 23 additions & 1 deletion coordinator/internal/orm/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (*Chunk) TableName() string {
func (o *Chunk) GetUnassignedChunk(ctx context.Context, height int, maxActiveAttempts, maxTotalAttempts uint8) (*Chunk, error) {
db := o.db.WithContext(ctx)
db = db.Model(&Chunk{})
db = db.Where("proving_status in (?)", []int{int(types.ProvingTaskUnassigned), int(types.ProvingTaskAssigned)})
db = db.Where("proving_status = ?", int(types.ProvingTaskUnassigned))
db = db.Where("total_attempts < ?", maxTotalAttempts)
db = db.Where("active_attempts < ?", maxActiveAttempts)
db = db.Where("end_block_number <= ?", height)
Expand All @@ -88,6 +88,28 @@ func (o *Chunk) GetUnassignedChunk(ctx context.Context, height int, maxActiveAtt
return &chunk, nil
}

// GetAssignedChunk retrieves assigned chunk based on the specified limit.
// The returned chunks are sorted in ascending order by their index.
func (o *Chunk) GetAssignedChunk(ctx context.Context, height int, maxActiveAttempts, maxTotalAttempts uint8) (*Chunk, error) {
db := o.db.WithContext(ctx)
db = db.Model(&Chunk{})
db = db.Where("proving_status = ?", int(types.ProvingTaskAssigned))
db = db.Where("total_attempts < ?", maxTotalAttempts)
db = db.Where("active_attempts < ?", maxActiveAttempts)
db = db.Where("end_block_number <= ?", height)

var chunk Chunk
err := db.First(&chunk).Error
if err != nil && errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil
}

if err != nil {
return nil, fmt.Errorf("Chunk.GetAssignedChunks error: %w", err)
}
return &chunk, nil
}

// GetChunksByBatchHash retrieves the chunks associated with a specific batch hash.
// The returned chunks are sorted in ascending order by their associated chunk index.
func (o *Chunk) GetChunksByBatchHash(ctx context.Context, batchHash string) ([]*Chunk, error) {
Expand Down