fix possible status inconsistent in attempt number

This commit is contained in:
Ho
2026-02-27 18:45:51 +09:00
parent f8cec5d4dd
commit 970a4760fe
3 changed files with 23 additions and 15 deletions

View File

@@ -59,7 +59,7 @@ func NewBatchProverTask(cfg *config.Config, chainCfg *params.ChainConfig, db *go
}
// Assign load and assign batch tasks
func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinatorType.GetTaskParameter) (*coordinatorType.GetTaskSchema, error) {
func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinatorType.GetTaskParameter) (_ *coordinatorType.GetTaskSchema, retErr error) {
taskCtx, err := bp.checkParameter(ctx)
if err != nil || taskCtx == nil {
return nil, fmt.Errorf("check prover task parameter failed, error:%w", err)
@@ -163,6 +163,12 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
log.Error("failed to update batch attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr)
return nil, ErrCoordinatorInternalFailure
}
defer func(batchTask *orm.Batch) {
if retErr != nil {
bp.recoverActiveAttempts(ctx, batchTask)
log.Debug("recover active attempts", "batch task_id", batchTask.Hash)
}
}(tmpBatchTask)
if rowsAffected == 0 {
time.Sleep(100 * time.Millisecond)
@@ -199,7 +205,6 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
taskMsg, err := bp.formatProverTask(ctx.Copy(), proverTask, batchTask, hardForkName)
if err != nil {
bp.recoverActiveAttempts(ctx, batchTask)
log.Error("format prover task failure", "task_id", batchTask.Hash, "err", err)
return nil, ErrCoordinatorInternalFailure
}
@@ -208,7 +213,6 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
taskMsg, metadata, err = bp.applyUniversal(taskMsg)
if err != nil {
bp.recoverActiveAttempts(ctx, batchTask)
log.Error("Generate universal prover task failure", "task_id", batchTask.Hash, "type", "batch", "err", err)
return nil, ErrCoordinatorInternalFailure
}
@@ -226,7 +230,6 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
// Store session info.
if taskCtx.hasAssignedTask == nil {
if err = bp.proverTaskOrm.InsertProverTask(ctx.Copy(), proverTask); err != nil {
bp.recoverActiveAttempts(ctx, batchTask)
log.Error("insert batch prover task info fail", "task_id", batchTask.Hash, "publicKey", taskCtx.PublicKey, "err", err)
return nil, ErrCoordinatorInternalFailure
}

View File

@@ -57,7 +57,7 @@ func NewBundleProverTask(cfg *config.Config, chainCfg *params.ChainConfig, db *g
}
// Assign load and assign batch tasks
func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinatorType.GetTaskParameter) (*coordinatorType.GetTaskSchema, error) {
func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinatorType.GetTaskParameter) (_ *coordinatorType.GetTaskSchema, retErr error) {
taskCtx, err := bp.checkParameter(ctx)
if err != nil || taskCtx == nil {
return nil, fmt.Errorf("check prover task parameter failed, error:%w", err)
@@ -161,6 +161,12 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat
log.Error("failed to update bundle attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr)
return nil, ErrCoordinatorInternalFailure
}
defer func(bundleTask *orm.Bundle) {
if retErr != nil {
bp.recoverActiveAttempts(ctx, bundleTask)
log.Debug("recover active attempts", "bundle task_id", bundleTask.Hash)
}
}(tmpBundleTask)
if rowsAffected == 0 {
time.Sleep(100 * time.Millisecond)
@@ -196,7 +202,6 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat
taskMsg, err := bp.formatProverTask(ctx.Copy(), proverTask, hardForkName)
if err != nil {
bp.recoverActiveAttempts(ctx, bundleTask)
log.Error("format bundle prover task failure", "task_id", bundleTask.Hash, "err", err)
return nil, ErrCoordinatorInternalFailure
}
@@ -204,7 +209,6 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat
var metadata []byte
taskMsg, metadata, err = bp.applyUniversal(taskMsg)
if err != nil {
bp.recoverActiveAttempts(ctx, bundleTask)
log.Error("Generate universal prover task failure", "task_id", bundleTask.Hash, "type", "bundle", "err", err)
return nil, ErrCoordinatorInternalFailure
}
@@ -224,7 +228,6 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat
// Store session info.
if taskCtx.hasAssignedTask == nil {
if err = bp.proverTaskOrm.InsertProverTask(ctx.Copy(), proverTask); err != nil {
bp.recoverActiveAttempts(ctx, bundleTask)
log.Error("insert bundle prover task info fail", "task_id", bundleTask.Hash, "publicKey", taskCtx.PublicKey, "err", err)
return nil, ErrCoordinatorInternalFailure
}

View File

@@ -62,7 +62,7 @@ func NewChunkProverTask(cfg *config.Config, chainCfg *params.ChainConfig, db *go
}
// Assign the chunk proof which need to prove
func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinatorType.GetTaskParameter) (*coordinatorType.GetTaskSchema, error) {
func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinatorType.GetTaskParameter) (_ *coordinatorType.GetTaskSchema, retErr error) {
taskCtx, err := cp.checkParameter(ctx)
if err != nil || taskCtx == nil {
return nil, fmt.Errorf("check prover task parameter failed, error:%w", err)
@@ -166,6 +166,12 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
log.Error("failed to update chunk attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr)
return nil, ErrCoordinatorInternalFailure
}
defer func(chunkTask *orm.Chunk) {
if retErr != nil {
cp.recoverActiveAttempts(ctx, chunkTask)
log.Debug("recover active attempts", "chunk task_id", chunkTask.Hash)
}
}(tmpChunkTask)
if rowsAffected == 0 {
time.Sleep(100 * time.Millisecond)
@@ -201,7 +207,6 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
taskMsg, err := cp.formatProverTask(ctx.Copy(), proverTask, chunkTask, hardForkName)
if err != nil {
cp.recoverActiveAttempts(ctx, chunkTask)
log.Error("format prover task failure", "task_id", chunkTask.Hash, "err", err)
return nil, ErrCoordinatorInternalFailure
}
@@ -214,14 +219,12 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
// Released when Assign returns (defer).
defer func() { <-witnessSemaphore }()
case <-ctx.Done():
log.Warn("context canceled waiting for witness semaphore", "task_id", chunkTask.Hash, "err", ctx.Err())
cp.recoverActiveAttempts(ctx, chunkTask)
return nil, ctx.Err()
log.Warn("context cancelled waiting for witness semaphore", "task_id", chunkTask.Hash, "err", ctx.Err())
return nil, fmt.Errorf("context cancelled: %s", ctx.Err())
}
taskMsg, metadata, err = cp.applyUniversal(taskMsg)
if err != nil {
cp.recoverActiveAttempts(ctx, chunkTask)
log.Error("Generate universal prover task failure", "task_id", chunkTask.Hash, "type", "chunk", "err", err)
return nil, ErrCoordinatorInternalFailure
}
@@ -230,7 +233,6 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
if taskCtx.hasAssignedTask == nil {
if err = cp.proverTaskOrm.InsertProverTask(ctx.Copy(), proverTask); err != nil {
cp.recoverActiveAttempts(ctx, chunkTask)
log.Error("insert chunk prover task fail", "task_id", chunkTask.Hash, "publicKey", taskCtx.PublicKey, "err", err)
return nil, ErrCoordinatorInternalFailure
}