diff --git a/common/version/version.go b/common/version/version.go index 0f3880486..f8ec0cb2c 100644 --- a/common/version/version.go +++ b/common/version/version.go @@ -6,7 +6,7 @@ import ( "strings" ) -var tag = "v4.1.57" +var tag = "v4.1.58" var commit = func() string { if info, ok := debug.ReadBuildInfo(); ok { diff --git a/coordinator/cmd/app/mock_app.go b/coordinator/cmd/app/mock_app.go index c74cd83dd..47f060e58 100644 --- a/coordinator/cmd/app/mock_app.go +++ b/coordinator/cmd/app/mock_app.go @@ -81,11 +81,12 @@ func (c *CoordinatorApp) MockConfig(store bool) error { } // Reset prover manager config for manager test cases. cfg.ProverManager = &coordinatorConfig.ProverManager{ - ProversPerSession: 1, - Verifier: &coordinatorConfig.VerifierConfig{MockMode: true}, - CollectionTimeSec: 60, - SessionAttempts: 10, - MaxVerifierWorkers: 4, + ProversPerSession: 1, + Verifier: &coordinatorConfig.VerifierConfig{MockMode: true}, + BatchCollectionTimeSec: 60, + ChunkCollectionTimeSec: 60, + SessionAttempts: 10, + MaxVerifierWorkers: 4, } cfg.DB.DSN = base.DBImg.Endpoint() cfg.L2.ChainID = 111 diff --git a/coordinator/conf/config.json b/coordinator/conf/config.json index 4393b78a1..6cbeaca0d 100644 --- a/coordinator/conf/config.json +++ b/coordinator/conf/config.json @@ -2,7 +2,8 @@ "prover_manager": { "provers_per_session": 1, "session_attempts": 5, - "collection_time_sec": 180, + "batch_collection_time_sec": 180, + "chunk_collection_time_sec": 180, "verifier": { "mock_mode": true, "params_path": "", diff --git a/coordinator/internal/config/config.go b/coordinator/internal/config/config.go index 4a7fab30b..0848b62a2 100644 --- a/coordinator/internal/config/config.go +++ b/coordinator/internal/config/config.go @@ -17,8 +17,10 @@ type ProverManager struct { SessionAttempts uint8 `json:"session_attempts"` // Zk verifier config. Verifier *VerifierConfig `json:"verifier"` - // Proof collection time (in seconds). - CollectionTimeSec int `json:"collection_time_sec"` + // BatchCollectionTimeSec batch Proof collection time (in seconds). + BatchCollectionTimeSec int `json:"batch_collection_time_sec"` + // ChunkCollectionTimeSec chunk Proof collection time (in seconds). + ChunkCollectionTimeSec int `json:"chunk_collection_time_sec"` // Max number of workers in verifier worker pool MaxVerifierWorkers int `json:"max_verifier_workers"` } diff --git a/coordinator/internal/config/config_test.go b/coordinator/internal/config/config_test.go index 6ff3e54c0..482f8ec11 100644 --- a/coordinator/internal/config/config_test.go +++ b/coordinator/internal/config/config_test.go @@ -15,7 +15,8 @@ func TestConfig(t *testing.T) { "prover_manager": { "provers_per_session": 1, "session_attempts": 5, - "collection_time_sec": 180, + "batch_collection_time_sec": 180, + "chunk_collection_time_sec": 180, "verifier": { "mock_mode": true, "params_path": "", diff --git a/coordinator/internal/controller/cron/collect_proof.go b/coordinator/internal/controller/cron/collect_proof.go index 60d758dd2..171cf85be 100644 --- a/coordinator/internal/controller/cron/collect_proof.go +++ b/coordinator/internal/controller/cron/collect_proof.go @@ -29,8 +29,10 @@ type Collector struct { chunkOrm *orm.Chunk batchOrm *orm.Batch - timeoutCheckerRunTotal prometheus.Counter - proverTaskTimeoutTotal prometheus.Counter + timeoutBatchCheckerRunTotal prometheus.Counter + batchProverTaskTimeoutTotal prometheus.Counter + timeoutChunkCheckerRunTotal prometheus.Counter + chunkProverTaskTimeoutTotal prometheus.Counter } // NewCollector create a collector to cron collect the data to send to prover @@ -44,17 +46,26 @@ func NewCollector(ctx context.Context, db *gorm.DB, cfg *config.Config, reg prom chunkOrm: orm.NewChunk(db), batchOrm: orm.NewBatch(db), - timeoutCheckerRunTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "coordinator_timeout_checker_run_total", - Help: "Total number of timeout checker run.", + timeoutBatchCheckerRunTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "coordinator_batch_timeout_checker_run_total", + Help: "Total number of batch timeout checker run.", }), - proverTaskTimeoutTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "coordinator_prover_task_timeout_total", - Help: "Total number of timeout prover task.", + batchProverTaskTimeoutTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "coordinator_batch_prover_task_timeout_total", + Help: "Total number of batch timeout prover task.", + }), + timeoutChunkCheckerRunTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "coordinator_chunk_timeout_checker_run_total", + Help: "Total number of chunk timeout checker run.", + }), + chunkProverTaskTimeoutTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "coordinator_chunk_prover_task_timeout_total", + Help: "Total number of chunk timeout prover task.", }), } - go c.timeoutProofTask() + go c.timeoutBatchProofTask() + go c.timeoutChunkProofTask() log.Info("Start coordinator successfully.") @@ -69,10 +80,10 @@ func (c *Collector) Stop() { // timeoutTask cron check the send task is timeout. if timeout reached, restore the // chunk/batch task to unassigned. then the batch/chunk collector can retry it. -func (c *Collector) timeoutProofTask() { +func (c *Collector) timeoutBatchProofTask() { defer func() { if err := recover(); err != nil { - nerr := fmt.Errorf("timeout proof task panic error:%v", err) + nerr := fmt.Errorf("timeout batch proof task panic error:%v", err) log.Warn(nerr.Error()) } }() @@ -81,52 +92,14 @@ func (c *Collector) timeoutProofTask() { for { select { case <-ticker.C: - c.timeoutCheckerRunTotal.Inc() - timeout := time.Duration(c.cfg.ProverManager.CollectionTimeSec) * time.Second - assignedProverTasks, err := c.proverTaskOrm.GetTimeoutAssignedProverTasks(c.ctx, 10, timeout) + c.timeoutBatchCheckerRunTotal.Inc() + timeout := time.Duration(c.cfg.ProverManager.BatchCollectionTimeSec) * time.Second + assignedProverTasks, err := c.proverTaskOrm.GetTimeoutAssignedProverTasks(c.ctx, 10, message.ProofTypeBatch, timeout) if err != nil { log.Error("get unassigned session info failure", "error", err) break } - - // here not update the block batch proving status failed, because the collector loop will check - // the attempt times. if reach the times, the collector will set the block batch proving status. - for _, assignedProverTask := range assignedProverTasks { - c.proverTaskTimeoutTotal.Inc() - log.Warn("proof task have reach the timeout", "task id", assignedProverTask.TaskID, - "prover public key", assignedProverTask.ProverPublicKey, "prover name", assignedProverTask.ProverName, "task type", assignedProverTask.TaskType) - err = c.db.Transaction(func(tx *gorm.DB) error { - // update prover task proving status as ProverProofInvalid - if err = c.proverTaskOrm.UpdateProverTaskProvingStatus(c.ctx, message.ProofType(assignedProverTask.TaskType), - assignedProverTask.TaskID, assignedProverTask.ProverPublicKey, types.ProverProofInvalid, tx); err != nil { - log.Error("update prover task proving status failure", "hash", assignedProverTask.TaskID, "pubKey", assignedProverTask.ProverPublicKey, "err", err) - return err - } - - // update prover task failure type - if err = c.proverTaskOrm.UpdateProverTaskFailureType(c.ctx, message.ProofType(assignedProverTask.TaskType), - assignedProverTask.TaskID, assignedProverTask.ProverPublicKey, types.ProverTaskFailureTypeTimeout, tx); err != nil { - log.Error("update prover task failure type failure", "hash", assignedProverTask.TaskID, "pubKey", assignedProverTask.ProverPublicKey, "err", err) - return err - } - - // update the task to unassigned, let collector restart it - if message.ProofType(assignedProverTask.TaskType) == message.ProofTypeChunk { - if err = c.chunkOrm.UpdateProvingStatus(c.ctx, assignedProverTask.TaskID, types.ProvingTaskUnassigned, tx); err != nil { - log.Error("update chunk proving status to unassigned to restart it failure", "hash", assignedProverTask.TaskID, "err", err) - } - } - if message.ProofType(assignedProverTask.TaskType) == message.ProofTypeBatch { - if err = c.batchOrm.UpdateProvingStatus(c.ctx, assignedProverTask.TaskID, types.ProvingTaskUnassigned, tx); err != nil { - log.Error("update batch proving status to unassigned to restart it failure", "hash", assignedProverTask.TaskID, "err", err) - } - } - return nil - }) - if err != nil { - log.Error("check task proof is timeout failure", "error", err) - } - } + c.check(assignedProverTasks, c.batchProverTaskTimeoutTotal) case <-c.ctx.Done(): if c.ctx.Err() != nil { log.Error("manager context canceled with error", "error", c.ctx.Err()) @@ -138,3 +111,77 @@ func (c *Collector) timeoutProofTask() { } } } + +func (c *Collector) timeoutChunkProofTask() { + defer func() { + if err := recover(); err != nil { + nerr := fmt.Errorf("timeout proof chunk task panic error:%v", err) + log.Warn(nerr.Error()) + } + }() + + ticker := time.NewTicker(time.Second * 2) + for { + select { + case <-ticker.C: + c.timeoutChunkCheckerRunTotal.Inc() + timeout := time.Duration(c.cfg.ProverManager.ChunkCollectionTimeSec) * time.Second + assignedProverTasks, err := c.proverTaskOrm.GetTimeoutAssignedProverTasks(c.ctx, 10, message.ProofTypeChunk, timeout) + if err != nil { + log.Error("get unassigned session info failure", "error", err) + break + } + c.check(assignedProverTasks, c.chunkProverTaskTimeoutTotal) + + case <-c.ctx.Done(): + if c.ctx.Err() != nil { + log.Error("manager context canceled with error", "error", c.ctx.Err()) + } + return + case <-c.stopTimeoutChan: + log.Info("the coordinator run loop exit") + return + } + } +} + +func (c *Collector) check(assignedProverTasks []orm.ProverTask, timeout prometheus.Counter) { + // here not update the block batch proving status failed, because the collector loop will check + // the attempt times. if reach the times, the collector will set the block batch proving status. + for _, assignedProverTask := range assignedProverTasks { + timeout.Inc() + log.Warn("proof task have reach the timeout", "task id", assignedProverTask.TaskID, + "prover public key", assignedProverTask.ProverPublicKey, "prover name", assignedProverTask.ProverName, "task type", assignedProverTask.TaskType) + err := c.db.Transaction(func(tx *gorm.DB) error { + // update prover task proving status as ProverProofInvalid + if err := c.proverTaskOrm.UpdateProverTaskProvingStatus(c.ctx, message.ProofType(assignedProverTask.TaskType), + assignedProverTask.TaskID, assignedProverTask.ProverPublicKey, types.ProverProofInvalid, tx); err != nil { + log.Error("update prover task proving status failure", "hash", assignedProverTask.TaskID, "pubKey", assignedProverTask.ProverPublicKey, "err", err) + return err + } + + // update prover task failure type + if err := c.proverTaskOrm.UpdateProverTaskFailureType(c.ctx, message.ProofType(assignedProverTask.TaskType), + assignedProverTask.TaskID, assignedProverTask.ProverPublicKey, types.ProverTaskFailureTypeTimeout, tx); err != nil { + log.Error("update prover task failure type failure", "hash", assignedProverTask.TaskID, "pubKey", assignedProverTask.ProverPublicKey, "err", err) + return err + } + + // update the task to unassigned, let collector restart it + if message.ProofType(assignedProverTask.TaskType) == message.ProofTypeChunk { + if err := c.chunkOrm.UpdateProvingStatus(c.ctx, assignedProverTask.TaskID, types.ProvingTaskUnassigned, tx); err != nil { + log.Error("update chunk proving status to unassigned to restart it failure", "hash", assignedProverTask.TaskID, "err", err) + } + } + if message.ProofType(assignedProverTask.TaskType) == message.ProofTypeBatch { + if err := c.batchOrm.UpdateProvingStatus(c.ctx, assignedProverTask.TaskID, types.ProvingTaskUnassigned, tx); err != nil { + log.Error("update batch proving status to unassigned to restart it failure", "hash", assignedProverTask.TaskID, "err", err) + } + } + return nil + }) + if err != nil { + log.Error("check task proof is timeout failure", "error", err) + } + } +} diff --git a/coordinator/internal/orm/prover_task.go b/coordinator/internal/orm/prover_task.go index db6eccf34..431963559 100644 --- a/coordinator/internal/orm/prover_task.go +++ b/coordinator/internal/orm/prover_task.go @@ -144,10 +144,11 @@ func (o *ProverTask) GetProvingStatusByTaskID(ctx context.Context, taskID string } // GetTimeoutAssignedProverTasks get the timeout and assigned proving_status prover task -func (o *ProverTask) GetTimeoutAssignedProverTasks(ctx context.Context, limit int, timeout time.Duration) ([]ProverTask, error) { +func (o *ProverTask) GetTimeoutAssignedProverTasks(ctx context.Context, limit int, taskType message.ProofType, timeout time.Duration) ([]ProverTask, error) { db := o.db.WithContext(ctx) db = db.Model(&ProverTask{}) db = db.Where("proving_status", int(types.ProverAssigned)) + db = db.Where("task_type", int(taskType)) db = db.Where("assigned_at < ?", utils.NowUTC().Add(-timeout)) db = db.Limit(limit) diff --git a/coordinator/test/api_test.go b/coordinator/test/api_test.go index a9e6bb1f9..c4a3ae771 100644 --- a/coordinator/test/api_test.go +++ b/coordinator/test/api_test.go @@ -76,11 +76,12 @@ func setupCoordinator(t *testing.T, proversPerSession uint8, coordinatorURL stri ChainID: 111, }, ProverManager: &config.ProverManager{ - ProversPerSession: proversPerSession, - Verifier: &config.VerifierConfig{MockMode: true}, - CollectionTimeSec: 10, - MaxVerifierWorkers: 10, - SessionAttempts: 5, + ProversPerSession: proversPerSession, + Verifier: &config.VerifierConfig{MockMode: true}, + BatchCollectionTimeSec: 10, + ChunkCollectionTimeSec: 10, + MaxVerifierWorkers: 10, + SessionAttempts: 5, }, Auth: &config.Auth{ ChallengeExpireDurationSec: tokenTimeout, @@ -438,7 +439,7 @@ func testTimeoutProof(t *testing.T) { assert.Equal(t, batchProofStatus, types.ProvingTaskAssigned) // wait coordinator to reset the prover task proving status - time.Sleep(time.Duration(conf.ProverManager.CollectionTimeSec*2) * time.Second) + time.Sleep(time.Duration(conf.ProverManager.BatchCollectionTimeSec*2) * time.Second) // create second mock prover, that will send valid proof. chunkProver2 := newMockProver(t, "prover_test"+strconv.Itoa(2), coordinatorURL, message.ProofTypeChunk)