diff --git a/coordinator/internal/controller/cron/collect_proof.go b/coordinator/internal/controller/cron/collect_proof.go index 3833c65c5..729f782a9 100644 --- a/coordinator/internal/controller/cron/collect_proof.go +++ b/coordinator/internal/controller/cron/collect_proof.go @@ -158,8 +158,7 @@ func (c *Collector) check(assignedProverTasks []orm.ProverTask, timeout promethe "prover public key", assignedProverTask.ProverPublicKey, "prover name", assignedProverTask.ProverName, "task type", assignedProverTask.TaskType) err := c.db.Transaction(func(tx *gorm.DB) error { // update prover task proving status as ProverProofInvalid - if err := c.proverTaskOrm.UpdateProverTaskProvingStatus(c.ctx, message.ProofType(assignedProverTask.TaskType), - assignedProverTask.TaskID, assignedProverTask.ProverPublicKey, types.ProverProofInvalid, tx); err != nil { + if err := c.proverTaskOrm.UpdateProverTaskProvingStatusByUUID(c.ctx, assignedProverTask.UUID.String(), types.ProverProofInvalid, tx); err != nil { log.Error("update prover task proving status failure", "hash", assignedProverTask.TaskID, "pubKey", assignedProverTask.ProverPublicKey, "err", err) return err } diff --git a/coordinator/internal/logic/submitproof/proof_receiver.go b/coordinator/internal/logic/submitproof/proof_receiver.go index ad068c7e5..c80efe825 100644 --- a/coordinator/internal/logic/submitproof/proof_receiver.go +++ b/coordinator/internal/logic/submitproof/proof_receiver.go @@ -172,7 +172,7 @@ func (m *ProofReceiverLogic) HandleZkProof(ctx *gin.Context, proofMsg *message.P if verifyErr != nil || !success { m.verifierFailureTotal.WithLabelValues(pv).Inc() - m.proofRecover(ctx, proofMsg.ID, pk, proofMsg) + m.proofRecover(ctx, proverTask, proofMsg) log.Info("proof verified by coordinator failed", "proof id", proofMsg.ID, "prover name", proverTask.ProverName, "prover pk", pk, "prove type", proofMsg.Type, "proof time", proofTimeSec, "error", verifyErr) @@ -188,9 +188,9 @@ func (m *ProofReceiverLogic) HandleZkProof(ctx *gin.Context, proofMsg *message.P log.Info("proof verified and valid", "proof id", proofMsg.ID, "prover name", proverTask.ProverName, "prover pk", pk, "prove type", proofMsg.Type, "proof time", proofTimeSec) - if err := m.closeProofTask(ctx, proofMsg.ID, pk, proofMsg, proofTimeSec); err != nil { + if err := m.closeProofTask(ctx, proverTask, proofMsg, proofTimeSec); err != nil { m.proofSubmitFailure.Inc() - m.proofRecover(ctx, proofMsg.ID, pk, proofMsg) + m.proofRecover(ctx, proverTask, proofMsg) return err } @@ -248,7 +248,7 @@ func (m *ProofReceiverLogic) validator(ctx context.Context, proverTask *orm.Prov // Verify if the proving task has already been assigned to another prover. // Upon receiving an error message, it's possible the proving status has been reset by another prover // and the task has been reassigned. In this case, the coordinator should avoid resetting the proving status. - m.processProverErr(ctx, proofMsg.ID, pk, proofMsg.Type) + m.processProverErr(ctx, proverTask) m.validateFailureProverTaskStatusNotOk.Inc() @@ -283,28 +283,28 @@ func (m *ProofReceiverLogic) validator(ctx context.Context, proverTask *orm.Prov return nil } -func (m *ProofReceiverLogic) proofRecover(ctx context.Context, hash string, pubKey string, proofMsg *message.ProofMsg) { - log.Info("proof recover update proof status", "hash", hash, "proverPublicKey", pubKey, +func (m *ProofReceiverLogic) proofRecover(ctx context.Context, proverTask *orm.ProverTask, proofMsg *message.ProofMsg) { + log.Info("proof recover update proof status", "hash", proverTask.TaskID, "proverPublicKey", proverTask.ProverPublicKey, "taskType", proofMsg.Type.String(), "status", types.ProvingTaskUnassigned.String()) - if err := m.updateProofStatus(ctx, hash, pubKey, proofMsg, types.ProvingTaskUnassigned, 0); err != nil { - log.Error("failed to updated proof status ProvingTaskUnassigned", "hash", hash, "pubKey", pubKey, "error", err) + if err := m.updateProofStatus(ctx, proverTask, proofMsg, types.ProvingTaskUnassigned, 0); err != nil { + log.Error("failed to updated proof status ProvingTaskUnassigned", "hash", proverTask.TaskID, "pubKey", proverTask.ProverPublicKey, "error", err) } } -func (m *ProofReceiverLogic) closeProofTask(ctx context.Context, hash string, pubKey string, proofMsg *message.ProofMsg, proofTimeSec uint64) error { - log.Info("proof close task update proof status", "hash", hash, "proverPublicKey", pubKey, +func (m *ProofReceiverLogic) closeProofTask(ctx context.Context, proverTask *orm.ProverTask, proofMsg *message.ProofMsg, proofTimeSec uint64) error { + log.Info("proof close task update proof status", "hash", proverTask.TaskID, "proverPublicKey", proverTask.ProverPublicKey, "taskType", proofMsg.Type.String(), "status", types.ProvingTaskVerified.String()) - if err := m.updateProofStatus(ctx, hash, pubKey, proofMsg, types.ProvingTaskVerified, proofTimeSec); err != nil { - log.Error("failed to updated proof status ProvingTaskVerified", "hash", hash, "proverPublicKey", pubKey, "error", err) + if err := m.updateProofStatus(ctx, proverTask, proofMsg, types.ProvingTaskVerified, proofTimeSec); err != nil { + log.Error("failed to updated proof status ProvingTaskVerified", "hash", proverTask.TaskID, "proverPublicKey", proverTask.ProverPublicKey, "error", err) return err } return nil } // UpdateProofStatus update the chunk/batch task and session info status -func (m *ProofReceiverLogic) updateProofStatus(ctx context.Context, hash string, proverPublicKey string, proofMsg *message.ProofMsg, status types.ProvingStatus, proofTimeSec uint64) error { +func (m *ProofReceiverLogic) updateProofStatus(ctx context.Context, proverTask *orm.ProverTask, proofMsg *message.ProofMsg, status types.ProvingStatus, proofTimeSec uint64) error { var proverTaskStatus types.ProverProveStatus switch status { case types.ProvingTaskFailed, types.ProvingTaskUnassigned: @@ -314,13 +314,13 @@ func (m *ProofReceiverLogic) updateProofStatus(ctx context.Context, hash string, } err := m.db.Transaction(func(tx *gorm.DB) error { - if updateErr := m.proverTaskOrm.UpdateProverTaskProvingStatus(ctx, proofMsg.Type, hash, proverPublicKey, proverTaskStatus, tx); updateErr != nil { + if updateErr := m.proverTaskOrm.UpdateProverTaskProvingStatusByUUID(ctx, proverTask.UUID.String(), proverTaskStatus, tx); updateErr != nil { return updateErr } // if the block batch has proof verified, so the failed status not update block batch proving status - if m.checkIsTaskSuccess(ctx, hash, proofMsg.Type) { - log.Info("update proof status skip because this chunk / batch has been verified", "hash", hash, "public key", proverPublicKey) + if m.checkIsTaskSuccess(ctx, proverTask.TaskID, proofMsg.Type) { + log.Info("update proof status skip because this chunk / batch has been verified", "hash", proverTask.TaskID, "public key", proverTask.ProverPublicKey) return nil } @@ -333,20 +333,20 @@ func (m *ProofReceiverLogic) updateProofStatus(ctx context.Context, hash string, storeProofErr = m.batchOrm.UpdateProofByHash(ctx, proofMsg.ID, proofMsg.BatchProof, proofTimeSec, tx) } if storeProofErr != nil { - log.Error("failed to store chunk/batch proof into db", "hash", hash, "public key", proverPublicKey, "error", storeProofErr) + log.Error("failed to store chunk/batch proof into db", "hash", proverTask.TaskID, "public key", proverTask.ProverPublicKey, "error", storeProofErr) return storeProofErr } } switch proofMsg.Type { case message.ProofTypeChunk: - if err := m.chunkOrm.UpdateProvingStatus(ctx, hash, status, tx); err != nil { - log.Error("failed to update chunk proving_status as failed", "msg.ID", hash, "error", err) + if err := m.chunkOrm.UpdateProvingStatus(ctx, proverTask.TaskID, status, tx); err != nil { + log.Error("failed to update chunk proving_status as failed", "hash", proverTask.TaskID, "error", err) return err } case message.ProofTypeBatch: - if err := m.batchOrm.UpdateProvingStatus(ctx, hash, status, tx); err != nil { - log.Error("failed to update batch proving_status as failed", "msg.ID", hash, "error", err) + if err := m.batchOrm.UpdateProvingStatus(ctx, proverTask.TaskID, status, tx); err != nil { + log.Error("failed to update batch proving_status as failed", "hash", proverTask.TaskID, "error", err) return err } } @@ -358,7 +358,7 @@ func (m *ProofReceiverLogic) updateProofStatus(ctx context.Context, hash string, } if status == types.ProvingTaskVerified && proofMsg.Type == message.ProofTypeChunk { - if checkReadyErr := m.checkAreAllChunkProofsReady(ctx, hash); checkReadyErr != nil { + if checkReadyErr := m.checkAreAllChunkProofsReady(ctx, proverTask.TaskID); checkReadyErr != nil { log.Error("failed to check are all chunk proofs ready", "error", checkReadyErr) return checkReadyErr } @@ -387,14 +387,14 @@ func (m *ProofReceiverLogic) checkIsTaskSuccess(ctx context.Context, hash string return provingStatus == types.ProvingTaskVerified } -func (m *ProofReceiverLogic) processProverErr(ctx context.Context, taskID, pk string, taskType message.ProofType) { - if updateErr := m.proverTaskOrm.UpdateProverTaskProvingStatus(ctx, taskType, taskID, pk, types.ProverProofInvalid); updateErr != nil { - log.Error("update prover task proving status failure", "taskID", taskID, "proverPublicKey", pk, "taskType", taskType, "error", updateErr) +func (m *ProofReceiverLogic) processProverErr(ctx context.Context, proverTask *orm.ProverTask) { + if updateErr := m.proverTaskOrm.UpdateProverTaskProvingStatusByUUID(ctx, proverTask.UUID.String(), types.ProverProofInvalid); updateErr != nil { + log.Error("update prover task proving status failure", "taskID", proverTask.TaskID, "proverPublicKey", proverTask.ProverPublicKey, "taskType", message.ProofType(proverTask.TaskType).String(), "error", updateErr) } - proverTasks, err := m.proverTaskOrm.GetValidOrAssignedTaskOfOtherProvers(ctx, taskType, taskID, pk) + proverTasks, err := m.proverTaskOrm.GetValidOrAssignedTaskOfOtherProvers(ctx, message.ProofType(proverTask.TaskType), proverTask.TaskID, proverTask.ProverPublicKey) if err != nil { - log.Warn("checkIsAssignedToOtherProver failure", "taskID", taskID, "proverPublicKey", pk, "taskType", taskType, "error", err) + log.Warn("checkIsAssignedToOtherProver failure", "taskID", proverTask.TaskID, "proverPublicKey", proverTask.ProverPublicKey, "taskType", message.ProofType(proverTask.TaskType).String(), "error", err) return } @@ -402,14 +402,14 @@ func (m *ProofReceiverLogic) processProverErr(ctx context.Context, taskID, pk st return } - switch taskType { + switch message.ProofType(proverTask.TaskType) { case message.ProofTypeChunk: - if err := m.chunkOrm.UpdateProvingStatusFromProverError(ctx, taskID, types.ProvingTaskUnassigned); err != nil { - log.Error("failed to update chunk proving_status as failed", taskID, "proverPublicKey", pk, "taskType", taskType, "error", err) + if err := m.chunkOrm.UpdateProvingStatusFromProverError(ctx, proverTask.TaskID, types.ProvingTaskUnassigned); err != nil { + log.Error("failed to update chunk proving_status as failed", proverTask.TaskID, "proverPublicKey", proverTask.ProverPublicKey, "taskType", message.ProofType(proverTask.TaskType).String(), "error", err) } case message.ProofTypeBatch: - if err := m.batchOrm.UpdateProvingStatusFromProverError(ctx, taskID, types.ProvingTaskUnassigned); err != nil { - log.Error("failed to update batch proving_status as failed", taskID, "proverPublicKey", pk, "taskType", taskType, "error", err) + if err := m.batchOrm.UpdateProvingStatusFromProverError(ctx, proverTask.TaskID, types.ProvingTaskUnassigned); err != nil { + log.Error("failed to update batch proving_status as failed", proverTask.TaskID, "proverPublicKey", proverTask.ProverPublicKey, "taskType", message.ProofType(proverTask.TaskType).String(), "error", err) } } } diff --git a/coordinator/internal/orm/chunk.go b/coordinator/internal/orm/chunk.go index 55c9a39fd..64468c02b 100644 --- a/coordinator/internal/orm/chunk.go +++ b/coordinator/internal/orm/chunk.go @@ -307,14 +307,6 @@ func (o *Chunk) UpdateProvingStatusFromProverError(ctx context.Context, hash str updateFields := make(map[string]interface{}) updateFields["proving_status"] = int(status) - switch status { - case types.ProvingTaskAssigned: - updateFields["prover_assigned_at"] = time.Now() - case types.ProvingTaskUnassigned: - updateFields["prover_assigned_at"] = nil - case types.ProvingTaskVerified: - updateFields["proved_at"] = time.Now() - } db := o.db.WithContext(ctx) db = db.Model(&Chunk{}) db = db.Where("hash", hash).Where("proving_status", types.ProvingTaskAssigned) diff --git a/coordinator/internal/orm/prover_task.go b/coordinator/internal/orm/prover_task.go index 59f85c407..e4e2c3c70 100644 --- a/coordinator/internal/orm/prover_task.go +++ b/coordinator/internal/orm/prover_task.go @@ -252,7 +252,7 @@ func (o *ProverTask) UpdateProverTaskProvingStatus(ctx context.Context, proofTyp } db = db.WithContext(ctx) db = db.Model(&ProverTask{}) - db = db.Where("task_type = ? AND task_id = ? AND prover_public_key = ?", int(proofType), taskID, pk) + db = db.Where("task_type = ? AND task_id = ? AND prover_public_key = ? AND proving_status = ?", int(proofType), taskID, pk, types.ProverAssigned) if err := db.Update("proving_status", status).Error; err != nil { return fmt.Errorf("ProverTask.UpdateProverTaskProvingStatus error: %w, proof type: %v, taskID: %v, prover public key: %v, status: %v", err, proofType.String(), taskID, pk, status.String()) @@ -260,6 +260,22 @@ func (o *ProverTask) UpdateProverTaskProvingStatus(ctx context.Context, proofTyp return nil } +// UpdateProverTaskProvingStatusByUUID updates the proving_status of a specific ProverTask record. +func (o *ProverTask) UpdateProverTaskProvingStatusByUUID(ctx context.Context, uuid string, status types.ProverProveStatus, dbTX ...*gorm.DB) error { + db := o.db + if len(dbTX) > 0 && dbTX[0] != nil { + db = dbTX[0] + } + db = db.WithContext(ctx) + db = db.Model(&ProverTask{}) + db = db.Where("uuid = ?", uuid) + + if err := db.Update("proving_status", status).Error; err != nil { + return fmt.Errorf("ProverTask.UpdateProverTaskProvingStatus error: %w, uuid:%s, status: %v", err, uuid, status.String()) + } + return nil +} + // UpdateAllProverTaskProvingStatusOfTaskID updates all the proving_status of a specific task id. func (o *ProverTask) UpdateAllProverTaskProvingStatusOfTaskID(ctx context.Context, proofType message.ProofType, taskID string, status types.ProverProveStatus, dbTX ...*gorm.DB) error { db := o.db diff --git a/database/migrate/migrations/00009_add_uuid_to_prover_task.sql b/database/migrate/migrations/00009_add_uuid_to_prover_task.sql index 7bfe96ab0..4f53ac567 100644 --- a/database/migrate/migrations/00009_add_uuid_to_prover_task.sql +++ b/database/migrate/migrations/00009_add_uuid_to_prover_task.sql @@ -3,5 +3,4 @@ ALTER TABLE prover_task ADD COLUMN uuid uuid DEFAULT gen_random_uuid() NOT NULL UNIQUE; create index if not exists idx_uuid on prover_task (uuid) where deleted_at IS NULL; - --- +goose StatementEnd \ No newline at end of file +-- +goose StatementEnd