mirror of
https://github.com/scroll-tech/scroll.git
synced 2026-04-23 03:00:50 -04:00
Compare commits
4 Commits
fix/validi
...
fix/attemp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
11c76800a2 | ||
|
|
03fce0db33 | ||
|
|
970a4760fe | ||
|
|
f8cec5d4dd |
@@ -5,7 +5,7 @@ import (
|
||||
"runtime/debug"
|
||||
)
|
||||
|
||||
var tag = "v4.7.12"
|
||||
var tag = "v4.7.13"
|
||||
|
||||
var commit = func() string {
|
||||
if info, ok := debug.ReadBuildInfo(); ok {
|
||||
|
||||
@@ -59,7 +59,7 @@ func NewBatchProverTask(cfg *config.Config, chainCfg *params.ChainConfig, db *go
|
||||
}
|
||||
|
||||
// Assign load and assign batch tasks
|
||||
func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinatorType.GetTaskParameter) (*coordinatorType.GetTaskSchema, error) {
|
||||
func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinatorType.GetTaskParameter) (_ *coordinatorType.GetTaskSchema, retErr error) {
|
||||
taskCtx, err := bp.checkParameter(ctx)
|
||||
if err != nil || taskCtx == nil {
|
||||
return nil, fmt.Errorf("check prover task parameter failed, error:%w", err)
|
||||
@@ -163,11 +163,17 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
|
||||
log.Error("failed to update batch attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr)
|
||||
return nil, ErrCoordinatorInternalFailure
|
||||
}
|
||||
|
||||
if rowsAffected == 0 {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
|
||||
defer func(batchTask *orm.Batch) {
|
||||
if retErr != nil {
|
||||
bp.recoverActiveAttempts(ctx, batchTask)
|
||||
log.Debug("recover active attempts", "batch task_id", batchTask.Hash)
|
||||
}
|
||||
}(tmpBatchTask)
|
||||
}
|
||||
|
||||
batchTask = tmpBatchTask
|
||||
@@ -199,7 +205,6 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
|
||||
|
||||
taskMsg, err := bp.formatProverTask(ctx.Copy(), proverTask, batchTask, hardForkName)
|
||||
if err != nil {
|
||||
bp.recoverActiveAttempts(ctx, batchTask)
|
||||
log.Error("format prover task failure", "task_id", batchTask.Hash, "err", err)
|
||||
return nil, ErrCoordinatorInternalFailure
|
||||
}
|
||||
@@ -208,7 +213,6 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
|
||||
|
||||
taskMsg, metadata, err = bp.applyUniversal(taskMsg)
|
||||
if err != nil {
|
||||
bp.recoverActiveAttempts(ctx, batchTask)
|
||||
log.Error("Generate universal prover task failure", "task_id", batchTask.Hash, "type", "batch", "err", err)
|
||||
return nil, ErrCoordinatorInternalFailure
|
||||
}
|
||||
@@ -226,7 +230,6 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
|
||||
// Store session info.
|
||||
if taskCtx.hasAssignedTask == nil {
|
||||
if err = bp.proverTaskOrm.InsertProverTask(ctx.Copy(), proverTask); err != nil {
|
||||
bp.recoverActiveAttempts(ctx, batchTask)
|
||||
log.Error("insert batch prover task info fail", "task_id", batchTask.Hash, "publicKey", taskCtx.PublicKey, "err", err)
|
||||
return nil, ErrCoordinatorInternalFailure
|
||||
}
|
||||
|
||||
@@ -57,7 +57,7 @@ func NewBundleProverTask(cfg *config.Config, chainCfg *params.ChainConfig, db *g
|
||||
}
|
||||
|
||||
// Assign load and assign batch tasks
|
||||
func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinatorType.GetTaskParameter) (*coordinatorType.GetTaskSchema, error) {
|
||||
func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinatorType.GetTaskParameter) (_ *coordinatorType.GetTaskSchema, retErr error) {
|
||||
taskCtx, err := bp.checkParameter(ctx)
|
||||
if err != nil || taskCtx == nil {
|
||||
return nil, fmt.Errorf("check prover task parameter failed, error:%w", err)
|
||||
@@ -161,11 +161,16 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat
|
||||
log.Error("failed to update bundle attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr)
|
||||
return nil, ErrCoordinatorInternalFailure
|
||||
}
|
||||
|
||||
if rowsAffected == 0 {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
defer func(bundleTask *orm.Bundle) {
|
||||
if retErr != nil {
|
||||
bp.recoverActiveAttempts(ctx, bundleTask)
|
||||
log.Debug("recover active attempts", "bundle task_id", bundleTask.Hash)
|
||||
}
|
||||
}(tmpBundleTask)
|
||||
}
|
||||
bundleTask = tmpBundleTask
|
||||
break
|
||||
@@ -196,7 +201,6 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat
|
||||
|
||||
taskMsg, err := bp.formatProverTask(ctx.Copy(), proverTask, hardForkName)
|
||||
if err != nil {
|
||||
bp.recoverActiveAttempts(ctx, bundleTask)
|
||||
log.Error("format bundle prover task failure", "task_id", bundleTask.Hash, "err", err)
|
||||
return nil, ErrCoordinatorInternalFailure
|
||||
}
|
||||
@@ -204,7 +208,6 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat
|
||||
var metadata []byte
|
||||
taskMsg, metadata, err = bp.applyUniversal(taskMsg)
|
||||
if err != nil {
|
||||
bp.recoverActiveAttempts(ctx, bundleTask)
|
||||
log.Error("Generate universal prover task failure", "task_id", bundleTask.Hash, "type", "bundle", "err", err)
|
||||
return nil, ErrCoordinatorInternalFailure
|
||||
}
|
||||
@@ -224,7 +227,6 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat
|
||||
// Store session info.
|
||||
if taskCtx.hasAssignedTask == nil {
|
||||
if err = bp.proverTaskOrm.InsertProverTask(ctx.Copy(), proverTask); err != nil {
|
||||
bp.recoverActiveAttempts(ctx, bundleTask)
|
||||
log.Error("insert bundle prover task info fail", "task_id", bundleTask.Hash, "publicKey", taskCtx.PublicKey, "err", err)
|
||||
return nil, ErrCoordinatorInternalFailure
|
||||
}
|
||||
|
||||
@@ -24,6 +24,13 @@ import (
|
||||
cutils "scroll-tech/coordinator/internal/utils"
|
||||
)
|
||||
|
||||
// Implement global throttle on debug_executionWitness calls.
|
||||
// This API slows down when there are multiple concurrent calls.
|
||||
var (
|
||||
applyUniversalMaxParallelism = 2
|
||||
witnessSemaphore = make(chan struct{}, applyUniversalMaxParallelism)
|
||||
)
|
||||
|
||||
// ChunkProverTask the chunk prover task
|
||||
type ChunkProverTask struct {
|
||||
BaseProverTask
|
||||
@@ -55,7 +62,7 @@ func NewChunkProverTask(cfg *config.Config, chainCfg *params.ChainConfig, db *go
|
||||
}
|
||||
|
||||
// Assign the chunk proof which need to prove
|
||||
func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinatorType.GetTaskParameter) (*coordinatorType.GetTaskSchema, error) {
|
||||
func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinatorType.GetTaskParameter) (_ *coordinatorType.GetTaskSchema, retErr error) {
|
||||
taskCtx, err := cp.checkParameter(ctx)
|
||||
if err != nil || taskCtx == nil {
|
||||
return nil, fmt.Errorf("check prover task parameter failed, error:%w", err)
|
||||
@@ -159,11 +166,17 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
|
||||
log.Error("failed to update chunk attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr)
|
||||
return nil, ErrCoordinatorInternalFailure
|
||||
}
|
||||
|
||||
if rowsAffected == 0 {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
defer func(chunkTask *orm.Chunk) {
|
||||
if retErr != nil {
|
||||
cp.recoverActiveAttempts(ctx, chunkTask)
|
||||
log.Debug("recover active attempts", "chunk task_id", chunkTask.Hash)
|
||||
}
|
||||
}(tmpChunkTask)
|
||||
|
||||
}
|
||||
chunkTask = tmpChunkTask
|
||||
break
|
||||
@@ -194,16 +207,24 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
|
||||
|
||||
taskMsg, err := cp.formatProverTask(ctx.Copy(), proverTask, chunkTask, hardForkName)
|
||||
if err != nil {
|
||||
cp.recoverActiveAttempts(ctx, chunkTask)
|
||||
log.Error("format prover task failure", "task_id", chunkTask.Hash, "err", err)
|
||||
return nil, ErrCoordinatorInternalFailure
|
||||
}
|
||||
|
||||
if getTaskParameter.Universal {
|
||||
var metadata []byte
|
||||
|
||||
select {
|
||||
case witnessSemaphore <- struct{}{}:
|
||||
// Released when Assign returns (defer).
|
||||
defer func() { <-witnessSemaphore }()
|
||||
case <-ctx.Done():
|
||||
log.Warn("context canceled waiting for witness semaphore", "task_id", chunkTask.Hash, "err", ctx.Err())
|
||||
return nil, fmt.Errorf("context canceled: %s", ctx.Err())
|
||||
}
|
||||
|
||||
taskMsg, metadata, err = cp.applyUniversal(taskMsg)
|
||||
if err != nil {
|
||||
cp.recoverActiveAttempts(ctx, chunkTask)
|
||||
log.Error("Generate universal prover task failure", "task_id", chunkTask.Hash, "type", "chunk", "err", err)
|
||||
return nil, ErrCoordinatorInternalFailure
|
||||
}
|
||||
@@ -212,7 +233,6 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
|
||||
|
||||
if taskCtx.hasAssignedTask == nil {
|
||||
if err = cp.proverTaskOrm.InsertProverTask(ctx.Copy(), proverTask); err != nil {
|
||||
cp.recoverActiveAttempts(ctx, chunkTask)
|
||||
log.Error("insert chunk prover task fail", "task_id", chunkTask.Hash, "publicKey", taskCtx.PublicKey, "err", err)
|
||||
return nil, ErrCoordinatorInternalFailure
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user