feat: update

This commit is contained in:
georgehao
2023-08-22 20:27:28 +08:00
parent 510a519069
commit e523d61d1a
5 changed files with 51 additions and 45 deletions

View File

@@ -158,8 +158,7 @@ func (c *Collector) check(assignedProverTasks []orm.ProverTask, timeout promethe
"prover public key", assignedProverTask.ProverPublicKey, "prover name", assignedProverTask.ProverName, "task type", assignedProverTask.TaskType)
err := c.db.Transaction(func(tx *gorm.DB) error {
// update prover task proving status as ProverProofInvalid
if err := c.proverTaskOrm.UpdateProverTaskProvingStatus(c.ctx, message.ProofType(assignedProverTask.TaskType),
assignedProverTask.TaskID, assignedProverTask.ProverPublicKey, types.ProverProofInvalid, tx); err != nil {
if err := c.proverTaskOrm.UpdateProverTaskProvingStatusByUUID(c.ctx, assignedProverTask.UUID.String(), types.ProverProofInvalid, tx); err != nil {
log.Error("update prover task proving status failure", "hash", assignedProverTask.TaskID, "pubKey", assignedProverTask.ProverPublicKey, "err", err)
return err
}

View File

@@ -172,7 +172,7 @@ func (m *ProofReceiverLogic) HandleZkProof(ctx *gin.Context, proofMsg *message.P
if verifyErr != nil || !success {
m.verifierFailureTotal.WithLabelValues(pv).Inc()
m.proofRecover(ctx, proofMsg.ID, pk, proofMsg)
m.proofRecover(ctx, proverTask, proofMsg)
log.Info("proof verified by coordinator failed", "proof id", proofMsg.ID, "prover name", proverTask.ProverName,
"prover pk", pk, "prove type", proofMsg.Type, "proof time", proofTimeSec, "error", verifyErr)
@@ -188,9 +188,9 @@ func (m *ProofReceiverLogic) HandleZkProof(ctx *gin.Context, proofMsg *message.P
log.Info("proof verified and valid", "proof id", proofMsg.ID, "prover name", proverTask.ProverName,
"prover pk", pk, "prove type", proofMsg.Type, "proof time", proofTimeSec)
if err := m.closeProofTask(ctx, proofMsg.ID, pk, proofMsg, proofTimeSec); err != nil {
if err := m.closeProofTask(ctx, proverTask, proofMsg, proofTimeSec); err != nil {
m.proofSubmitFailure.Inc()
m.proofRecover(ctx, proofMsg.ID, pk, proofMsg)
m.proofRecover(ctx, proverTask, proofMsg)
return err
}
@@ -248,7 +248,7 @@ func (m *ProofReceiverLogic) validator(ctx context.Context, proverTask *orm.Prov
// Verify if the proving task has already been assigned to another prover.
// Upon receiving an error message, it's possible the proving status has been reset by another prover
// and the task has been reassigned. In this case, the coordinator should avoid resetting the proving status.
m.processProverErr(ctx, proofMsg.ID, pk, proofMsg.Type)
m.processProverErr(ctx, proverTask)
m.validateFailureProverTaskStatusNotOk.Inc()
@@ -283,28 +283,28 @@ func (m *ProofReceiverLogic) validator(ctx context.Context, proverTask *orm.Prov
return nil
}
func (m *ProofReceiverLogic) proofRecover(ctx context.Context, hash string, pubKey string, proofMsg *message.ProofMsg) {
log.Info("proof recover update proof status", "hash", hash, "proverPublicKey", pubKey,
func (m *ProofReceiverLogic) proofRecover(ctx context.Context, proverTask *orm.ProverTask, proofMsg *message.ProofMsg) {
log.Info("proof recover update proof status", "hash", proverTask.TaskID, "proverPublicKey", proverTask.ProverPublicKey,
"taskType", proofMsg.Type.String(), "status", types.ProvingTaskUnassigned.String())
if err := m.updateProofStatus(ctx, hash, pubKey, proofMsg, types.ProvingTaskUnassigned, 0); err != nil {
log.Error("failed to updated proof status ProvingTaskUnassigned", "hash", hash, "pubKey", pubKey, "error", err)
if err := m.updateProofStatus(ctx, proverTask, proofMsg, types.ProvingTaskUnassigned, 0); err != nil {
log.Error("failed to updated proof status ProvingTaskUnassigned", "hash", proverTask.TaskID, "pubKey", proverTask.ProverPublicKey, "error", err)
}
}
func (m *ProofReceiverLogic) closeProofTask(ctx context.Context, hash string, pubKey string, proofMsg *message.ProofMsg, proofTimeSec uint64) error {
log.Info("proof close task update proof status", "hash", hash, "proverPublicKey", pubKey,
func (m *ProofReceiverLogic) closeProofTask(ctx context.Context, proverTask *orm.ProverTask, proofMsg *message.ProofMsg, proofTimeSec uint64) error {
log.Info("proof close task update proof status", "hash", proverTask.TaskID, "proverPublicKey", proverTask.ProverPublicKey,
"taskType", proofMsg.Type.String(), "status", types.ProvingTaskVerified.String())
if err := m.updateProofStatus(ctx, hash, pubKey, proofMsg, types.ProvingTaskVerified, proofTimeSec); err != nil {
log.Error("failed to updated proof status ProvingTaskVerified", "hash", hash, "proverPublicKey", pubKey, "error", err)
if err := m.updateProofStatus(ctx, proverTask, proofMsg, types.ProvingTaskVerified, proofTimeSec); err != nil {
log.Error("failed to updated proof status ProvingTaskVerified", "hash", proverTask.TaskID, "proverPublicKey", proverTask.ProverPublicKey, "error", err)
return err
}
return nil
}
// UpdateProofStatus update the chunk/batch task and session info status
func (m *ProofReceiverLogic) updateProofStatus(ctx context.Context, hash string, proverPublicKey string, proofMsg *message.ProofMsg, status types.ProvingStatus, proofTimeSec uint64) error {
func (m *ProofReceiverLogic) updateProofStatus(ctx context.Context, proverTask *orm.ProverTask, proofMsg *message.ProofMsg, status types.ProvingStatus, proofTimeSec uint64) error {
var proverTaskStatus types.ProverProveStatus
switch status {
case types.ProvingTaskFailed, types.ProvingTaskUnassigned:
@@ -314,13 +314,13 @@ func (m *ProofReceiverLogic) updateProofStatus(ctx context.Context, hash string,
}
err := m.db.Transaction(func(tx *gorm.DB) error {
if updateErr := m.proverTaskOrm.UpdateProverTaskProvingStatus(ctx, proofMsg.Type, hash, proverPublicKey, proverTaskStatus, tx); updateErr != nil {
if updateErr := m.proverTaskOrm.UpdateProverTaskProvingStatusByUUID(ctx, proverTask.UUID.String(), proverTaskStatus, tx); updateErr != nil {
return updateErr
}
// if the block batch has proof verified, so the failed status not update block batch proving status
if m.checkIsTaskSuccess(ctx, hash, proofMsg.Type) {
log.Info("update proof status skip because this chunk / batch has been verified", "hash", hash, "public key", proverPublicKey)
if m.checkIsTaskSuccess(ctx, proverTask.TaskID, proofMsg.Type) {
log.Info("update proof status skip because this chunk / batch has been verified", "hash", proverTask.TaskID, "public key", proverTask.ProverPublicKey)
return nil
}
@@ -333,20 +333,20 @@ func (m *ProofReceiverLogic) updateProofStatus(ctx context.Context, hash string,
storeProofErr = m.batchOrm.UpdateProofByHash(ctx, proofMsg.ID, proofMsg.BatchProof, proofTimeSec, tx)
}
if storeProofErr != nil {
log.Error("failed to store chunk/batch proof into db", "hash", hash, "public key", proverPublicKey, "error", storeProofErr)
log.Error("failed to store chunk/batch proof into db", "hash", proverTask.TaskID, "public key", proverTask.ProverPublicKey, "error", storeProofErr)
return storeProofErr
}
}
switch proofMsg.Type {
case message.ProofTypeChunk:
if err := m.chunkOrm.UpdateProvingStatus(ctx, hash, status, tx); err != nil {
log.Error("failed to update chunk proving_status as failed", "msg.ID", hash, "error", err)
if err := m.chunkOrm.UpdateProvingStatus(ctx, proverTask.TaskID, status, tx); err != nil {
log.Error("failed to update chunk proving_status as failed", "hash", proverTask.TaskID, "error", err)
return err
}
case message.ProofTypeBatch:
if err := m.batchOrm.UpdateProvingStatus(ctx, hash, status, tx); err != nil {
log.Error("failed to update batch proving_status as failed", "msg.ID", hash, "error", err)
if err := m.batchOrm.UpdateProvingStatus(ctx, proverTask.TaskID, status, tx); err != nil {
log.Error("failed to update batch proving_status as failed", "hash", proverTask.TaskID, "error", err)
return err
}
}
@@ -358,7 +358,7 @@ func (m *ProofReceiverLogic) updateProofStatus(ctx context.Context, hash string,
}
if status == types.ProvingTaskVerified && proofMsg.Type == message.ProofTypeChunk {
if checkReadyErr := m.checkAreAllChunkProofsReady(ctx, hash); checkReadyErr != nil {
if checkReadyErr := m.checkAreAllChunkProofsReady(ctx, proverTask.TaskID); checkReadyErr != nil {
log.Error("failed to check are all chunk proofs ready", "error", checkReadyErr)
return checkReadyErr
}
@@ -387,14 +387,14 @@ func (m *ProofReceiverLogic) checkIsTaskSuccess(ctx context.Context, hash string
return provingStatus == types.ProvingTaskVerified
}
func (m *ProofReceiverLogic) processProverErr(ctx context.Context, taskID, pk string, taskType message.ProofType) {
if updateErr := m.proverTaskOrm.UpdateProverTaskProvingStatus(ctx, taskType, taskID, pk, types.ProverProofInvalid); updateErr != nil {
log.Error("update prover task proving status failure", "taskID", taskID, "proverPublicKey", pk, "taskType", taskType, "error", updateErr)
func (m *ProofReceiverLogic) processProverErr(ctx context.Context, proverTask *orm.ProverTask) {
if updateErr := m.proverTaskOrm.UpdateProverTaskProvingStatusByUUID(ctx, proverTask.UUID.String(), types.ProverProofInvalid); updateErr != nil {
log.Error("update prover task proving status failure", "taskID", proverTask.TaskID, "proverPublicKey", proverTask.ProverPublicKey, "taskType", message.ProofType(proverTask.TaskType).String(), "error", updateErr)
}
proverTasks, err := m.proverTaskOrm.GetValidOrAssignedTaskOfOtherProvers(ctx, taskType, taskID, pk)
proverTasks, err := m.proverTaskOrm.GetValidOrAssignedTaskOfOtherProvers(ctx, message.ProofType(proverTask.TaskType), proverTask.TaskID, proverTask.ProverPublicKey)
if err != nil {
log.Warn("checkIsAssignedToOtherProver failure", "taskID", taskID, "proverPublicKey", pk, "taskType", taskType, "error", err)
log.Warn("checkIsAssignedToOtherProver failure", "taskID", proverTask.TaskID, "proverPublicKey", proverTask.ProverPublicKey, "taskType", message.ProofType(proverTask.TaskType).String(), "error", err)
return
}
@@ -402,14 +402,14 @@ func (m *ProofReceiverLogic) processProverErr(ctx context.Context, taskID, pk st
return
}
switch taskType {
switch message.ProofType(proverTask.TaskType) {
case message.ProofTypeChunk:
if err := m.chunkOrm.UpdateProvingStatusFromProverError(ctx, taskID, types.ProvingTaskUnassigned); err != nil {
log.Error("failed to update chunk proving_status as failed", taskID, "proverPublicKey", pk, "taskType", taskType, "error", err)
if err := m.chunkOrm.UpdateProvingStatusFromProverError(ctx, proverTask.TaskID, types.ProvingTaskUnassigned); err != nil {
log.Error("failed to update chunk proving_status as failed", proverTask.TaskID, "proverPublicKey", proverTask.ProverPublicKey, "taskType", message.ProofType(proverTask.TaskType).String(), "error", err)
}
case message.ProofTypeBatch:
if err := m.batchOrm.UpdateProvingStatusFromProverError(ctx, taskID, types.ProvingTaskUnassigned); err != nil {
log.Error("failed to update batch proving_status as failed", taskID, "proverPublicKey", pk, "taskType", taskType, "error", err)
if err := m.batchOrm.UpdateProvingStatusFromProverError(ctx, proverTask.TaskID, types.ProvingTaskUnassigned); err != nil {
log.Error("failed to update batch proving_status as failed", proverTask.TaskID, "proverPublicKey", proverTask.ProverPublicKey, "taskType", message.ProofType(proverTask.TaskType).String(), "error", err)
}
}
}

View File

@@ -307,14 +307,6 @@ func (o *Chunk) UpdateProvingStatusFromProverError(ctx context.Context, hash str
updateFields := make(map[string]interface{})
updateFields["proving_status"] = int(status)
switch status {
case types.ProvingTaskAssigned:
updateFields["prover_assigned_at"] = time.Now()
case types.ProvingTaskUnassigned:
updateFields["prover_assigned_at"] = nil
case types.ProvingTaskVerified:
updateFields["proved_at"] = time.Now()
}
db := o.db.WithContext(ctx)
db = db.Model(&Chunk{})
db = db.Where("hash", hash).Where("proving_status", types.ProvingTaskAssigned)

View File

@@ -252,7 +252,7 @@ func (o *ProverTask) UpdateProverTaskProvingStatus(ctx context.Context, proofTyp
}
db = db.WithContext(ctx)
db = db.Model(&ProverTask{})
db = db.Where("task_type = ? AND task_id = ? AND prover_public_key = ?", int(proofType), taskID, pk)
db = db.Where("task_type = ? AND task_id = ? AND prover_public_key = ? AND proving_status = ?", int(proofType), taskID, pk, types.ProverAssigned)
if err := db.Update("proving_status", status).Error; err != nil {
return fmt.Errorf("ProverTask.UpdateProverTaskProvingStatus error: %w, proof type: %v, taskID: %v, prover public key: %v, status: %v", err, proofType.String(), taskID, pk, status.String())
@@ -260,6 +260,22 @@ func (o *ProverTask) UpdateProverTaskProvingStatus(ctx context.Context, proofTyp
return nil
}
// UpdateProverTaskProvingStatusByUUID updates the proving_status of a specific ProverTask record.
func (o *ProverTask) UpdateProverTaskProvingStatusByUUID(ctx context.Context, uuid string, status types.ProverProveStatus, dbTX ...*gorm.DB) error {
db := o.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
}
db = db.WithContext(ctx)
db = db.Model(&ProverTask{})
db = db.Where("uuid = ?", uuid)
if err := db.Update("proving_status", status).Error; err != nil {
return fmt.Errorf("ProverTask.UpdateProverTaskProvingStatus error: %w, uuid:%s, status: %v", err, uuid, status.String())
}
return nil
}
// UpdateAllProverTaskProvingStatusOfTaskID updates all the proving_status of a specific task id.
func (o *ProverTask) UpdateAllProverTaskProvingStatusOfTaskID(ctx context.Context, proofType message.ProofType, taskID string, status types.ProverProveStatus, dbTX ...*gorm.DB) error {
db := o.db

View File

@@ -3,5 +3,4 @@
ALTER TABLE prover_task ADD COLUMN uuid uuid DEFAULT gen_random_uuid() NOT NULL UNIQUE;
create index if not exists idx_uuid on prover_task (uuid) where deleted_at IS NULL;
-- +goose StatementEnd
-- +goose StatementEnd