diff --git a/common/version/version.go b/common/version/version.go index a917a5268..f9ddb6e8f 100644 --- a/common/version/version.go +++ b/common/version/version.go @@ -5,7 +5,7 @@ import ( "runtime/debug" ) -var tag = "v4.3.15" +var tag = "v4.3.16" var commit = func() string { if info, ok := debug.ReadBuildInfo(); ok { diff --git a/coordinator/internal/logic/provertask/batch_prover_task.go b/coordinator/internal/logic/provertask/batch_prover_task.go index 6567351cb..a3700cb38 100644 --- a/coordinator/internal/logic/provertask/batch_prover_task.go +++ b/coordinator/internal/logic/provertask/batch_prover_task.go @@ -64,17 +64,30 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato maxTotalAttempts := bp.cfg.ProverManager.SessionAttempts var batchTask *orm.Batch for i := 0; i < 5; i++ { - unassignedBatch, getUnassignedErr := bp.batchOrm.GetUnassignedBatch(ctx, maxActiveAttempts, maxTotalAttempts) - if getUnassignedErr != nil { - log.Error("failed to get unassigned batch proving tasks", "height", getTaskParameter.ProverHeight, "err", getUnassignedErr) + var getTaskError error + var tmpBatchTask *orm.Batch + tmpBatchTask, getTaskError = bp.batchOrm.GetUnassignedBatch(ctx, maxActiveAttempts, maxTotalAttempts) + if getTaskError != nil { + log.Error("failed to get unassigned batch proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) return nil, ErrCoordinatorInternalFailure } - if unassignedBatch == nil { - log.Debug("get empty unassigned batch", "height", getTaskParameter.ProverHeight) + + // Why here need get again? In order to support a task can assign to multiple prover, need also assign `ProvingTaskAssigned` + // batch to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql. + if tmpBatchTask == nil { + tmpBatchTask, getTaskError = bp.batchOrm.GetAssignedBatch(ctx, maxActiveAttempts, maxTotalAttempts) + if getTaskError != nil { + log.Error("failed to get assigned batch proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) + return nil, ErrCoordinatorInternalFailure + } + } + + if tmpBatchTask == nil { + log.Debug("get empty batch", "height", getTaskParameter.ProverHeight) return nil, nil } - rowsAffected, updateAttemptsErr := bp.batchOrm.UpdateBatchAttempts(ctx, unassignedBatch.Index, unassignedBatch.ActiveAttempts, unassignedBatch.TotalAttempts) + rowsAffected, updateAttemptsErr := bp.batchOrm.UpdateBatchAttempts(ctx, tmpBatchTask.Index, tmpBatchTask.ActiveAttempts, tmpBatchTask.TotalAttempts) if updateAttemptsErr != nil { log.Error("failed to update batch attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr) return nil, ErrCoordinatorInternalFailure @@ -85,12 +98,12 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato continue } - batchTask = unassignedBatch + batchTask = tmpBatchTask break } if batchTask == nil { - log.Debug("get empty unassigned batch after retry 100 times", "height", getTaskParameter.ProverHeight) + log.Debug("get empty unassigned batch after retry 5 times", "height", getTaskParameter.ProverHeight) return nil, nil } diff --git a/coordinator/internal/logic/provertask/chunk_prover_task.go b/coordinator/internal/logic/provertask/chunk_prover_task.go index 473d14c85..da77b6172 100644 --- a/coordinator/internal/logic/provertask/chunk_prover_task.go +++ b/coordinator/internal/logic/provertask/chunk_prover_task.go @@ -67,18 +67,30 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato maxTotalAttempts := cp.cfg.ProverManager.SessionAttempts var chunkTask *orm.Chunk for i := 0; i < 5; i++ { - unassignedChunk, getUnsignedChunkErr := cp.chunkOrm.GetUnassignedChunk(ctx, getTaskParameter.ProverHeight, maxActiveAttempts, maxTotalAttempts) - if getUnsignedChunkErr != nil { - log.Error("failed to get unassigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getUnsignedChunkErr) + var getTaskError error + var tmpChunkTask *orm.Chunk + tmpChunkTask, getTaskError = cp.chunkOrm.GetUnassignedChunk(ctx, getTaskParameter.ProverHeight, maxActiveAttempts, maxTotalAttempts) + if getTaskError != nil { + log.Error("failed to get unassigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) return nil, ErrCoordinatorInternalFailure } - if unassignedChunk == nil { - log.Debug("get empty unassigned chunk", "height", getTaskParameter.ProverHeight) + // Why here need get again? In order to support a task can assign to multiple prover, need also assign `ProvingTaskAssigned` + // chunk to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql. + if tmpChunkTask == nil { + tmpChunkTask, getTaskError = cp.chunkOrm.GetAssignedChunk(ctx, getTaskParameter.ProverHeight, maxActiveAttempts, maxTotalAttempts) + if getTaskError != nil { + log.Error("failed to get assigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) + return nil, ErrCoordinatorInternalFailure + } + } + + if tmpChunkTask == nil { + log.Debug("get empty chunk", "height", getTaskParameter.ProverHeight) return nil, nil } - rowsAffected, updateAttemptsErr := cp.chunkOrm.UpdateChunkAttempts(ctx, unassignedChunk.Index, unassignedChunk.ActiveAttempts, unassignedChunk.TotalAttempts) + rowsAffected, updateAttemptsErr := cp.chunkOrm.UpdateChunkAttempts(ctx, tmpChunkTask.Index, tmpChunkTask.ActiveAttempts, tmpChunkTask.TotalAttempts) if updateAttemptsErr != nil { log.Error("failed to update chunk attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr) return nil, ErrCoordinatorInternalFailure @@ -89,12 +101,12 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato continue } - chunkTask = unassignedChunk + chunkTask = tmpChunkTask break } if chunkTask == nil { - log.Debug("get empty unassigned chunk after retry 100 times", "height", getTaskParameter.ProverHeight) + log.Debug("get empty unassigned chunk after retry 5 times", "height", getTaskParameter.ProverHeight) return nil, nil } diff --git a/coordinator/internal/orm/batch.go b/coordinator/internal/orm/batch.go index 71fc1f213..fd31d66d8 100644 --- a/coordinator/internal/orm/batch.go +++ b/coordinator/internal/orm/batch.go @@ -75,7 +75,7 @@ func (*Batch) TableName() string { // The returned batch are sorted in ascending order by their index. func (o *Batch) GetUnassignedBatch(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Batch, error) { db := o.db.WithContext(ctx) - db = db.Where("proving_status in (?)", []int{int(types.ProvingTaskUnassigned), int(types.ProvingTaskAssigned)}) + db = db.Where("proving_status = ?", int(types.ProvingTaskUnassigned)) db = db.Where("total_attempts < ?", maxTotalAttempts) db = db.Where("active_attempts < ?", maxActiveAttempts) db = db.Where("chunk_proofs_status = ?", int(types.ChunkProofsStatusReady)) @@ -92,6 +92,27 @@ func (o *Batch) GetUnassignedBatch(ctx context.Context, maxActiveAttempts, maxTo return &batch, nil } +// GetAssignedBatch retrieves assigned batch based on the specified limit. +// The returned batch are sorted in ascending order by their index. +func (o *Batch) GetAssignedBatch(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Batch, error) { + db := o.db.WithContext(ctx) + db = db.Where("proving_status = ?", int(types.ProvingTaskAssigned)) + db = db.Where("total_attempts < ?", maxTotalAttempts) + db = db.Where("active_attempts < ?", maxActiveAttempts) + db = db.Where("chunk_proofs_status = ?", int(types.ChunkProofsStatusReady)) + + var batch Batch + err := db.First(&batch).Error + if err != nil && errors.Is(err, gorm.ErrRecordNotFound) { + return nil, nil + } + + if err != nil { + return nil, fmt.Errorf("Batch.GetAssignedBatches error: %w", err) + } + return &batch, nil +} + // GetUnassignedAndChunksUnreadyBatches get the batches which is unassigned and chunks is not ready func (o *Batch) GetUnassignedAndChunksUnreadyBatches(ctx context.Context, offset, limit int) ([]*Batch, error) { if offset < 0 || limit < 0 { diff --git a/coordinator/internal/orm/chunk.go b/coordinator/internal/orm/chunk.go index fa7ba6559..965157a3b 100644 --- a/coordinator/internal/orm/chunk.go +++ b/coordinator/internal/orm/chunk.go @@ -71,7 +71,7 @@ func (*Chunk) TableName() string { func (o *Chunk) GetUnassignedChunk(ctx context.Context, height int, maxActiveAttempts, maxTotalAttempts uint8) (*Chunk, error) { db := o.db.WithContext(ctx) db = db.Model(&Chunk{}) - db = db.Where("proving_status in (?)", []int{int(types.ProvingTaskUnassigned), int(types.ProvingTaskAssigned)}) + db = db.Where("proving_status = ?", int(types.ProvingTaskUnassigned)) db = db.Where("total_attempts < ?", maxTotalAttempts) db = db.Where("active_attempts < ?", maxActiveAttempts) db = db.Where("end_block_number <= ?", height) @@ -88,6 +88,28 @@ func (o *Chunk) GetUnassignedChunk(ctx context.Context, height int, maxActiveAtt return &chunk, nil } +// GetAssignedChunk retrieves assigned chunk based on the specified limit. +// The returned chunks are sorted in ascending order by their index. +func (o *Chunk) GetAssignedChunk(ctx context.Context, height int, maxActiveAttempts, maxTotalAttempts uint8) (*Chunk, error) { + db := o.db.WithContext(ctx) + db = db.Model(&Chunk{}) + db = db.Where("proving_status = ?", int(types.ProvingTaskAssigned)) + db = db.Where("total_attempts < ?", maxTotalAttempts) + db = db.Where("active_attempts < ?", maxActiveAttempts) + db = db.Where("end_block_number <= ?", height) + + var chunk Chunk + err := db.First(&chunk).Error + if err != nil && errors.Is(err, gorm.ErrRecordNotFound) { + return nil, nil + } + + if err != nil { + return nil, fmt.Errorf("Chunk.GetAssignedChunks error: %w", err) + } + return &chunk, nil +} + // GetChunksByBatchHash retrieves the chunks associated with a specific batch hash. // The returned chunks are sorted in ascending order by their associated chunk index. func (o *Chunk) GetChunksByBatchHash(ctx context.Context, batchHash string) ([]*Chunk, error) {