diff --git a/coordinator/internal/controller/api/roller.go b/coordinator/internal/controller/api/roller.go index 87ea65857..3d39ebd29 100644 --- a/coordinator/internal/controller/api/roller.go +++ b/coordinator/internal/controller/api/roller.go @@ -41,7 +41,10 @@ func (r *RollerController) RequestToken(authMsg *message.AuthMsg) (string, error } return "", errors.New("signature verification failed") } - pubkey, _ := authMsg.PublicKey() + pubkey, err := authMsg.PublicKey() + if err != nil { + return "", fmt.Errorf("RequestToken auth msg public key error:%w", err) + } if token, ok := r.tokenCache.Get(pubkey); ok { return token.(string), nil } @@ -53,9 +56,12 @@ func (r *RollerController) RequestToken(authMsg *message.AuthMsg) (string, error return token, nil } -// VerifyToken verifies pukey for token and expiration time +// VerifyToken verifies pubkey for token and expiration time func (r *RollerController) verifyToken(authMsg *message.AuthMsg) (bool, error) { - pubkey, _ := authMsg.PublicKey() + pubkey, err := authMsg.PublicKey() + if err != nil { + return false, fmt.Errorf("verify token auth msg public key error:%w", err) + } // GetValue returns nil if value is expired if token, ok := r.tokenCache.Get(pubkey); !ok || token != authMsg.Identity.Token { return false, fmt.Errorf("failed to find corresponding token. roller name: %s roller pk: %s", authMsg.Identity.Name, pubkey) @@ -76,7 +82,10 @@ func (r *RollerController) Register(ctx context.Context, authMsg *message.AuthMs if ok, err := r.verifyToken(authMsg); !ok { return nil, err } - pubkey, _ := authMsg.PublicKey() + pubkey, err := authMsg.PublicKey() + if err != nil { + return nil, fmt.Errorf("register auth msg public key error:%w", err) + } // roller successfully registered, remove token associated with this roller r.tokenCache.Delete(pubkey) diff --git a/coordinator/internal/logic/collector/batch_proof_collector.go b/coordinator/internal/logic/collector/batch_proof_collector.go index 8e544eee5..03f662803 100644 --- a/coordinator/internal/logic/collector/batch_proof_collector.go +++ b/coordinator/internal/logic/collector/batch_proof_collector.go @@ -23,7 +23,7 @@ type BatchProofCollector struct { // NewBatchProofCollector new a batch collector func NewBatchProofCollector(cfg *config.Config, db *gorm.DB) *BatchProofCollector { - ac := &BatchProofCollector{ + bp := &BatchProofCollector{ BaseCollector: BaseCollector{ db: db, cfg: cfg, @@ -32,7 +32,7 @@ func NewBatchProofCollector(cfg *config.Config, db *gorm.DB) *BatchProofCollecto proverTaskOrm: orm.NewProverTask(db), }, } - return ac + return bp } // Name return the batch proof collector name @@ -52,7 +52,7 @@ func (ac *BatchProofCollector) Collect(ctx context.Context) error { } if len(batchTasks) != 1 { - return fmt.Errorf("get unassigned batch proving task len not 1") + return fmt.Errorf("get unassigned batch proving task len not 1, batch tasks:%v", batchTasks) } batchTask := batchTasks[0] diff --git a/coordinator/internal/logic/collector/chunk_proof_collector.go b/coordinator/internal/logic/collector/chunk_proof_collector.go index f2317f485..8eb97d636 100644 --- a/coordinator/internal/logic/collector/chunk_proof_collector.go +++ b/coordinator/internal/logic/collector/chunk_proof_collector.go @@ -54,7 +54,7 @@ func (cp *ChunkProofCollector) Collect(ctx context.Context) error { } if len(chunkTasks) != 1 { - return fmt.Errorf("get unassigned chunk proving task len not 1") + return fmt.Errorf("get unassigned chunk proving task len not 1, chunk tasks:%v", chunkTasks) } chunkTask := chunkTasks[0] diff --git a/coordinator/internal/logic/collector/collector.go b/coordinator/internal/logic/collector/collector.go index 73ebcfeee..b433d9296 100644 --- a/coordinator/internal/logic/collector/collector.go +++ b/coordinator/internal/logic/collector/collector.go @@ -33,12 +33,6 @@ type Collector interface { Collect(ctx context.Context) error } -// HashTaskPublicKey hash public key pair -type HashTaskPublicKey struct { - Attempt int - PubKey string -} - // BaseCollector a base collector which contain series functions type BaseCollector struct { cfg *config.Config @@ -74,10 +68,6 @@ func (b *BaseCollector) checkAttemptsExceeded(hash string, taskType message.Proo } transErr := b.db.Transaction(func(tx *gorm.DB) error { - // Set status as skipped. - // Note that this is only a workaround for testnet here. - // TODO: In real cases we should reset to orm.ProvingTaskUnassigned - // so as to re-distribute the task in the future switch message.ProofType(proverTasks[0].TaskType) { case message.ProofTypeChunk: if err := b.chunkOrm.UpdateProvingStatus(b.ctx, hash, types.ProvingTaskFailed, tx); err != nil { diff --git a/coordinator/internal/logic/proof/proof_receiver.go b/coordinator/internal/logic/proof/proof_receiver.go index dc333f03b..456f8d9e3 100644 --- a/coordinator/internal/logic/proof/proof_receiver.go +++ b/coordinator/internal/logic/proof/proof_receiver.go @@ -122,7 +122,6 @@ func (m *ZKProofReceiver) HandleZkProof(ctx context.Context, proofMsg *message.P coordinatorProofsReceivedTotalCounter.Inc(1) - // TODO: wrap both basic verifier and aggregator verifier success, verifyErr := m.verifier.VerifyProof(proofMsg.Proof) if verifyErr != nil || !success { if verifyErr != nil { @@ -224,7 +223,7 @@ func (m *ZKProofReceiver) closeProofTask(ctx context.Context, hash string, pubKe return nil } -// UpdateProofStatus update the block batch/agg task and session info status +// UpdateProofStatus update the chunk/batch task and session info status func (m *ZKProofReceiver) updateProofStatus(ctx context.Context, hash string, proverPublicKey string, proofMsgType message.ProofType, status types.ProvingStatus) error { // if the prover task failure type is SessionInfoFailureTimeout, // just skip update the status because the proof result come so slow. @@ -253,12 +252,12 @@ func (m *ZKProofReceiver) updateProofStatus(ctx context.Context, hash string, pr switch proofMsgType { case message.ProofTypeChunk: if err := m.chunkOrm.UpdateProvingStatus(ctx, hash, status, tx); err != nil { - log.Error("failed to update basic proving_status as failed", "msg.ID", hash, "error", err) + log.Error("failed to update chunk proving_status as failed", "msg.ID", hash, "error", err) return err } case message.ProofTypeBatch: if err := m.batchOrm.UpdateProvingStatus(ctx, hash, status, tx); err != nil { - log.Error("failed to update aggregator proving_status as failed", "msg.ID", hash, "error", err) + log.Error("failed to update batch proving_status as failed", "msg.ID", hash, "error", err) return err } } diff --git a/coordinator/internal/logic/proof/task_worker.go b/coordinator/internal/logic/proof/task_worker.go index 143039243..7b4eab047 100644 --- a/coordinator/internal/logic/proof/task_worker.go +++ b/coordinator/internal/logic/proof/task_worker.go @@ -2,6 +2,7 @@ package proof import ( "context" + "fmt" "github.com/scroll-tech/go-ethereum/log" gethMetrics "github.com/scroll-tech/go-ethereum/metrics" @@ -30,13 +31,17 @@ func (t *TaskWorker) AllocTaskWorker(ctx context.Context, authMsg *message.AuthM return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported } - pubKey, _ := authMsg.PublicKey() + pubKey, err := authMsg.PublicKey() + if err != nil { + return &rpc.Subscription{}, fmt.Errorf("AllocTaskWorker auth msg public key error:%w", err) + } + identity := authMsg.Identity // create or get the roller message channel taskCh, err := rollermanager.Manager.Register(pubKey, identity) if err != nil { - return nil, err + return &rpc.Subscription{}, err } rpcSub := notifier.CreateSubscription() diff --git a/coordinator/internal/logic/rollermanager/roller_manager.go b/coordinator/internal/logic/rollermanager/roller_manager.go index e840f9f40..2c5926218 100644 --- a/coordinator/internal/logic/rollermanager/roller_manager.go +++ b/coordinator/internal/logic/rollermanager/roller_manager.go @@ -22,7 +22,7 @@ var ( Manager *rollerManager ) -// RollerNode the interface for controller how to use roller. +// RollerNode is the interface that controls the rollers type rollerNode struct { // Roller name Name string