package coordinator import ( "context" "errors" "fmt" mathrand "math/rand" "sync" "sync/atomic" "time" cmap "github.com/orcaman/concurrent-map" "github.com/patrickmn/go-cache" "github.com/scroll-tech/go-ethereum/common" geth_types "github.com/scroll-tech/go-ethereum/core/types" "github.com/scroll-tech/go-ethereum/ethclient" "github.com/scroll-tech/go-ethereum/log" geth_metrics "github.com/scroll-tech/go-ethereum/metrics" "github.com/scroll-tech/go-ethereum/rpc" "scroll-tech/common/metrics" "scroll-tech/common/types" "scroll-tech/common/types/message" "scroll-tech/common/utils/workerpool" "scroll-tech/database" "scroll-tech/coordinator/config" "scroll-tech/coordinator/verifier" ) var ( // proofs coordinatorProofsReceivedTotalCounter = geth_metrics.NewRegisteredCounter("coordinator/proofs/received/total", metrics.ScrollRegistry) coordinatorProofsVerifiedSuccessTimeTimer = geth_metrics.NewRegisteredTimer("coordinator/proofs/verified/success/time", metrics.ScrollRegistry) coordinatorProofsVerifiedFailedTimeTimer = geth_metrics.NewRegisteredTimer("coordinator/proofs/verified/failed/time", metrics.ScrollRegistry) coordinatorProofsGeneratedFailedTimeTimer = geth_metrics.NewRegisteredTimer("coordinator/proofs/generated/failed/time", metrics.ScrollRegistry) // sessions coordinatorSessionsSuccessTotalCounter = geth_metrics.NewRegisteredCounter("coordinator/sessions/success/total", metrics.ScrollRegistry) coordinatorSessionsTimeoutTotalCounter = geth_metrics.NewRegisteredCounter("coordinator/sessions/timeout/total", metrics.ScrollRegistry) coordinatorSessionsFailedTotalCounter = geth_metrics.NewRegisteredCounter("coordinator/sessions/failed/total", metrics.ScrollRegistry) coordinatorSessionsActiveNumberGauge = geth_metrics.NewRegisteredCounter("coordinator/sessions/active/number", metrics.ScrollRegistry) ) const ( proofAndPkBufferSize = 10 ) type rollerProofStatus struct { id string typ message.ProveType pk string status types.RollerProveStatus } // Contains all the information on an ongoing proof generation session. type session struct { info *types.SessionInfo // finish channel is used to pass the public key of the rollers who finished proving process. finishChan chan rollerProofStatus } // Manager is responsible for maintaining connections with active rollers, // sending the challenges, and receiving proofs. It also regulates the reward // distribution. All read and write logic and connection handling happens through // a modular websocket server, contained within the Manager. Incoming messages are // then passed to the Manager where the actual handling logic resides. type Manager struct { // The manager context. ctx context.Context // The roller manager configuration. cfg *config.RollerManagerConfig // The indicator whether the backend is running or not. running int32 // A mutex guarding the boolean below. mu sync.RWMutex // A map containing all active proof generation sessions. sessions map[string]*session // A map containing proof failed or verify failed proof. rollerPool cmap.ConcurrentMap failedSessionInfos map[string]*SessionInfo // A direct connection to the Halo2 verifier, used to verify // incoming proofs. verifier *verifier.Verifier // db interface orm database.OrmFactory // l2geth client *ethclient.Client // Token cache tokenCache *cache.Cache // A mutex guarding registration registerMu sync.RWMutex // Verifier worker pool verifierWorkerPool *workerpool.WorkerPool } // New returns a new instance of Manager. The instance will be not fully prepared, // and still needs to be finalized and ran by calling `manager.Start`. func New(ctx context.Context, cfg *config.RollerManagerConfig, orm database.OrmFactory, client *ethclient.Client) (*Manager, error) { v, err := verifier.NewVerifier(cfg.Verifier) if err != nil { return nil, err } log.Info("Start coordinator successfully.") return &Manager{ ctx: ctx, cfg: cfg, rollerPool: cmap.New(), sessions: make(map[string]*session), failedSessionInfos: make(map[string]*SessionInfo), verifier: v, orm: orm, Client: client, tokenCache: cache.New(time.Duration(cfg.TokenTimeToLive)*time.Second, 1*time.Hour), verifierWorkerPool: workerpool.NewWorkerPool(cfg.MaxVerifierWorkers), }, nil } // Start the Manager module. func (m *Manager) Start() error { if m.isRunning() { return nil } m.verifierWorkerPool.Run() m.restorePrevSessions() atomic.StoreInt32(&m.running, 1) go m.Loop() return nil } // Stop the Manager module, for a graceful shutdown. func (m *Manager) Stop() { if !m.isRunning() { return } m.verifierWorkerPool.Stop() atomic.StoreInt32(&m.running, 0) } // isRunning returns an indicator whether manager is running or not. func (m *Manager) isRunning() bool { return atomic.LoadInt32(&m.running) == 1 } // Loop keeps the manager running. func (m *Manager) Loop() { var ( tick = time.NewTicker(time.Second * 2) tasks []*types.BlockBatch aggTasks []*types.AggTask ) defer tick.Stop() for { select { case <-tick.C: // load and send aggregator tasks if len(aggTasks) == 0 && m.orm != nil { var err error aggTasks, err = m.orm.GetUnassignedAggTasks() if err != nil { log.Error("failed to get unassigned aggregator proving tasks", "error", err) continue } } // Select aggregator type roller and send message for len(aggTasks) > 0 && m.StartAggProofGenerationSession(aggTasks[0], nil) { aggTasks = aggTasks[1:] } // load and send basic tasks if len(tasks) == 0 && m.orm != nil { var err error // TODO: add cache if tasks, err = m.orm.GetBlockBatches( map[string]interface{}{"proving_status": types.ProvingTaskUnassigned}, fmt.Sprintf( "ORDER BY index %s LIMIT %d;", m.cfg.OrderSession, m.GetNumberOfIdleRollers(message.BasicProve), ), ); err != nil { log.Error("failed to get unassigned basic proving tasks", "error", err) continue } } // Select basic type roller and send message for len(tasks) > 0 && m.StartBasicProofGenerationSession(tasks[0], nil) { tasks = tasks[1:] } case <-m.ctx.Done(): if m.ctx.Err() != nil { log.Error( "manager context canceled with error", "error", m.ctx.Err(), ) } return } } } func (m *Manager) restorePrevSessions() { // m.orm may be nil in scroll tests if m.orm == nil { return } m.mu.Lock() defer m.mu.Unlock() var hashes []string // load assigned aggregator tasks from db aggTasks, err := m.orm.GetAssignedAggTasks() if err != nil { log.Error("failed to load assigned aggregator tasks from db", "error", err) return } for _, aggTask := range aggTasks { hashes = append(hashes, aggTask.ID) } // load assigned basic tasks from db batchHashes, err := m.orm.GetAssignedBatchHashes() if err != nil { log.Error("failed to get assigned batch batchHashes from db", "error", err) return } hashes = append(hashes, batchHashes...) prevSessions, err := m.orm.GetSessionInfosByHashes(hashes) if err != nil { log.Error("failed to recover roller session info from db", "error", err) return } for _, v := range prevSessions { sess := &session{ info: v, finishChan: make(chan rollerProofStatus, proofAndPkBufferSize), } m.sessions[sess.info.ID] = sess log.Info("Coordinator restart reload sessions", "session start time", time.Unix(sess.info.StartTimestamp, 0)) for _, roller := range sess.info.Rollers { log.Info( "restore roller info for session", "session id", sess.info.ID, "roller name", roller.Name, "prove type", sess.info.ProveType, "public key", roller.PublicKey, "proof status", roller.Status) } go m.CollectProofs(sess) } } // HandleZkProof handle a ZkProof submitted from a roller. // For now only proving/verifying error will lead to setting status as skipped. // db/unmarshal errors will not because they are errors on the business logic side. func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error { var dbErr error var success bool // Assess if the proof generation session for the given ID is still active. // We hold the read lock until the end of the function so that there is no // potential race for channel deletion. m.mu.RLock() defer m.mu.RUnlock() sess, ok := m.sessions[msg.ID] if !ok { return fmt.Errorf("proof generation session for id %v does not existID", msg.ID) } proofTime := time.Since(time.Unix(sess.info.StartTimestamp, 0)) proofTimeSec := uint64(proofTime.Seconds()) // Ensure this roller is eligible to participate in the session. roller, ok := sess.info.Rollers[pk] if !ok { return fmt.Errorf("roller %s %s (%s) is not eligible to partake in proof session %v", roller.Name, sess.info.ProveType, roller.PublicKey, msg.ID) } if roller.Status == types.RollerProofValid { // In order to prevent DoS attacks, it is forbidden to repeatedly submit valid proofs. // TODO: Defend invalid proof resubmissions by one of the following two methods: // (i) slash the roller for each submission of invalid proof // (ii) set the maximum failure retry times log.Warn( "roller has already submitted valid proof in proof session", "roller name", roller.Name, "roller pk", roller.PublicKey, "prove type", sess.info.ProveType, "proof id", msg.ID, ) return nil } log.Info( "handling zk proof", "proof id", msg.ID, "roller name", roller.Name, "roller pk", roller.PublicKey, "prove type", sess.info.ProveType, "proof time", proofTimeSec, ) defer func() { // TODO: maybe we should use db tx for the whole process? // Roll back current proof's status. if dbErr != nil { if msg.Type == message.BasicProve { if err := m.orm.UpdateProvingStatus(msg.ID, types.ProvingTaskUnassigned); err != nil { log.Error("fail to reset basic task status as Unassigned", "msg.ID", msg.ID) } } if msg.Type == message.AggregatorProve { if err := m.orm.UpdateAggTaskStatus(msg.ID, types.ProvingTaskUnassigned); err != nil { log.Error("fail to reset aggregator task status as Unassigned", "msg.ID", msg.ID) } } } // set proof status status := types.RollerProofInvalid if success && dbErr == nil { status = types.RollerProofValid } // notify the session that the roller finishes the proving process sess.finishChan <- rollerProofStatus{msg.ID, msg.Type, pk, status} }() if msg.Status != message.StatusOk { coordinatorProofsGeneratedFailedTimeTimer.Update(proofTime) m.updateMetricRollerProofsGeneratedFailedTimeTimer(roller.PublicKey, proofTime) log.Info( "proof generated by roller failed", "proof id", msg.ID, "roller name", roller.Name, "roller pk", roller.PublicKey, "prove type", msg.Type, "proof time", proofTimeSec, "error", msg.Error, ) return nil } // store proof content if msg.Type == message.BasicProve { if dbErr = m.orm.UpdateProofByHash(m.ctx, msg.ID, msg.Proof.Proof, msg.Proof.FinalPair, proofTimeSec); dbErr != nil { log.Error("failed to store basic proof into db", "error", dbErr) return dbErr } if dbErr = m.orm.UpdateProvingStatus(msg.ID, types.ProvingTaskProved); dbErr != nil { log.Error("failed to update basic task status as proved", "error", dbErr) return dbErr } } if msg.Type == message.AggregatorProve { if dbErr = m.orm.UpdateProofForAggTask(msg.ID, msg.Proof); dbErr != nil { log.Error("failed to store aggregator proof into db", "error", dbErr) return dbErr } } coordinatorProofsReceivedTotalCounter.Inc(1) var verifyErr error // TODO: wrap both basic verifier and aggregator verifier success, verifyErr = m.verifyProof(msg.Proof) if verifyErr != nil { // TODO: this is only a temp workaround for testnet, we should return err in real cases success = false log.Error("Failed to verify zk proof", "proof id", msg.ID, "roller name", roller.Name, "roller pk", roller.PublicKey, "prove type", msg.Type, "proof time", proofTimeSec, "error", verifyErr) // TODO: Roller needs to be slashed if proof is invalid. } if success { if msg.Type == message.AggregatorProve { if dbErr = m.orm.UpdateAggTaskStatus(msg.ID, types.ProvingTaskVerified); dbErr != nil { log.Error( "failed to update aggregator proving_status", "msg.ID", msg.ID, "status", types.ProvingTaskVerified, "error", dbErr) return dbErr } } if msg.Type == message.BasicProve { if dbErr = m.orm.UpdateProvingStatus(msg.ID, types.ProvingTaskVerified); dbErr != nil { log.Error( "failed to update basic proving_status", "msg.ID", msg.ID, "status", types.ProvingTaskVerified, "error", dbErr) return dbErr } } coordinatorProofsVerifiedSuccessTimeTimer.Update(proofTime) m.updateMetricRollerProofsVerifiedSuccessTimeTimer(roller.PublicKey, proofTime) log.Info("proof verified by coordinator success", "proof id", msg.ID, "roller name", roller.Name, "roller pk", roller.PublicKey, "prove type", msg.Type, "proof time", proofTimeSec) } else { coordinatorProofsVerifiedFailedTimeTimer.Update(proofTime) m.updateMetricRollerProofsVerifiedFailedTimeTimer(roller.PublicKey, proofTime) log.Info("proof verified by coordinator failed", "proof id", msg.ID, "roller name", roller.Name, "roller pk", roller.PublicKey, "prove type", msg.Type, "proof time", proofTimeSec, "error", verifyErr) } return nil } // CollectProofs collects proofs corresponding to a proof generation session. func (m *Manager) CollectProofs(sess *session) { coordinatorSessionsActiveNumberGauge.Inc(1) defer coordinatorSessionsActiveNumberGauge.Dec(1) for { select { //Execute after timeout, set in config.json. Consider all rollers failed. case <-time.After(time.Duration(m.cfg.CollectionTime) * time.Minute): // Check if session can be replayed if sess.info.Attempts < m.cfg.SessionAttempts { var success bool if sess.info.ProveType == message.AggregatorProve { success = m.StartAggProofGenerationSession(nil, sess) } else if sess.info.ProveType == message.BasicProve { success = m.StartBasicProofGenerationSession(nil, sess) } if success { m.mu.Lock() for pk := range sess.info.Rollers { m.freeTaskIDForRoller(pk, sess.info.ID) } m.mu.Unlock() log.Info("Retrying session", "session id:", sess.info.ID) return } } // record failed session. errMsg := "proof generation session ended without receiving any valid proofs" m.addFailedSession(sess, errMsg) log.Warn(errMsg, "session id", sess.info.ID) // Set status as skipped. // Note that this is only a workaround for testnet here. // TODO: In real cases we should reset to orm.ProvingTaskUnassigned // so as to re-distribute the task in the future if sess.info.ProveType == message.BasicProve { if err := m.orm.UpdateProvingStatus(sess.info.ID, types.ProvingTaskFailed); err != nil { log.Error("fail to reset basic task_status as Unassigned", "id", sess.info.ID, "err", err) } } if sess.info.ProveType == message.AggregatorProve { if err := m.orm.UpdateAggTaskStatus(sess.info.ID, types.ProvingTaskFailed); err != nil { log.Error("fail to reset aggregator task_status as Unassigned", "id", sess.info.ID, "err", err) } } m.mu.Lock() for pk := range sess.info.Rollers { m.freeTaskIDForRoller(pk, sess.info.ID) } delete(m.sessions, sess.info.ID) m.mu.Unlock() coordinatorSessionsTimeoutTotalCounter.Inc(1) return //Execute after one of the roller finishes sending proof, return early if all rollers had sent results. case ret := <-sess.finishChan: m.mu.Lock() sess.info.Rollers[ret.pk].Status = ret.status if sess.isSessionFailed() { if ret.typ == message.BasicProve { if err := m.orm.UpdateProvingStatus(ret.id, types.ProvingTaskFailed); err != nil { log.Error("failed to update basic proving_status as failed", "msg.ID", ret.id, "error", err) } } if ret.typ == message.AggregatorProve { if err := m.orm.UpdateAggTaskStatus(ret.id, types.ProvingTaskFailed); err != nil { log.Error("failed to update aggregator proving_status as failed", "msg.ID", ret.id, "error", err) } } coordinatorSessionsFailedTotalCounter.Inc(1) } if err := m.orm.SetSessionInfo(sess.info); err != nil { log.Error("db set session info fail", "pk", ret.pk, "error", err) } //Check if all rollers have finished their tasks, and rollers with valid results are indexed by public key. finished, validRollers := sess.isRollersFinished() //When all rollers have finished submitting their tasks, select a winner within rollers with valid proof, and return, terminate the for loop. if finished && len(validRollers) > 0 { //Select a random index for this slice. randIndex := mathrand.Intn(len(validRollers)) _ = validRollers[randIndex] // TODO: reward winner for pk := range sess.info.Rollers { m.freeTaskIDForRoller(pk, sess.info.ID) } delete(m.sessions, sess.info.ID) m.mu.Unlock() coordinatorSessionsSuccessTotalCounter.Inc(1) return } m.mu.Unlock() } } } // isRollersFinished checks if all rollers have finished submitting proofs, check their validity, and record rollers who produce valid proof. // When rollersLeft reaches 0, it means all rollers have finished their tasks. // validRollers also records the public keys of rollers who have finished their tasks correctly as index. func (s *session) isRollersFinished() (bool, []string) { var validRollers []string for pk, roller := range s.info.Rollers { if roller.Status == types.RollerProofValid { validRollers = append(validRollers, pk) continue } if roller.Status == types.RollerProofInvalid { continue } // Some rollers are still proving. return false, nil } return true, validRollers } func (s *session) isSessionFailed() bool { for _, roller := range s.info.Rollers { if roller.Status != types.RollerProofInvalid { return false } } return true } // APIs collect API services. func (m *Manager) APIs() []rpc.API { return []rpc.API{ { Namespace: "roller", Service: RollerAPI(m), Public: true, }, { Namespace: "debug", Public: true, Service: RollerDebugAPI(m), }, } } // StartBasicProofGenerationSession starts a basic proof generation session func (m *Manager) StartBasicProofGenerationSession(task *types.BlockBatch, prevSession *session) (success bool) { var taskId string if task != nil { taskId = task.Hash } else { taskId = prevSession.info.ID } if m.GetNumberOfIdleRollers(message.BasicProve) == 0 { log.Warn("no idle basic roller when starting proof generation session", "id", taskId) return false } log.Info("start basic proof generation session", "id", taskId) defer func() { if !success { if task != nil { if err := m.orm.UpdateProvingStatus(taskId, types.ProvingTaskUnassigned); err != nil { log.Error("fail to reset task_status as Unassigned", "id", taskId, "err", err) } } else { if err := m.orm.UpdateProvingStatus(taskId, types.ProvingTaskFailed); err != nil { log.Error("fail to reset task_status as Failed", "id", taskId, "err", err) } } } }() // Get block traces. blockInfos, err := m.orm.GetL2BlockInfos(map[string]interface{}{"batch_hash": taskId}) if err != nil { log.Error( "could not GetBlockInfos", "batch_hash", taskId, "error", err, ) return false } traces := make([]*geth_types.BlockTrace, len(blockInfos)) for i, blockInfo := range blockInfos { traces[i], err = m.Client.GetBlockTraceByHash(m.ctx, common.HexToHash(blockInfo.Hash)) if err != nil { log.Error( "could not GetBlockTraceByNumber", "block number", blockInfo.Number, "block hash", blockInfo.Hash, "error", err, ) return false } } // Dispatch task to basic rollers. rollers := make(map[string]*types.RollerStatus) for i := 0; i < int(m.cfg.RollersPerSession); i++ { roller := m.selectRoller(message.BasicProve) if roller == nil { log.Info("selectRoller returns nil") break } log.Info("roller is picked", "session id", taskId, "name", roller.Name, "public key", roller.PublicKey) // send trace to roller if !roller.sendTask(&message.TaskMsg{ID: taskId, Type: message.BasicProve, Traces: traces}) { log.Error("send task failed", "roller name", roller.Name, "public key", roller.PublicKey, "id", taskId) continue } m.updateMetricRollerProofsLastAssignedTimestampGauge(roller.PublicKey) rollers[roller.PublicKey] = &types.RollerStatus{PublicKey: roller.PublicKey, Name: roller.Name, Status: types.RollerAssigned} } // No roller assigned. if len(rollers) == 0 { log.Error("no roller assigned", "id", taskId, "number of idle basic rollers", m.GetNumberOfIdleRollers(message.BasicProve)) return false } // Update session proving status as assigned. if err = m.orm.UpdateProvingStatus(taskId, types.ProvingTaskAssigned); err != nil { log.Error("failed to update task status", "id", taskId, "err", err) return false } // Create a proof generation session. sess := &session{ info: &types.SessionInfo{ ID: taskId, Rollers: rollers, ProveType: message.BasicProve, StartTimestamp: time.Now().Unix(), Attempts: 1, }, finishChan: make(chan rollerProofStatus, proofAndPkBufferSize), } if prevSession != nil { sess.info.Attempts += prevSession.info.Attempts } for _, roller := range sess.info.Rollers { log.Info( "assigned proof to roller", "session id", sess.info.ID, "session type", sess.info.ProveType, "roller name", roller.Name, "roller pk", roller.PublicKey, "proof status", roller.Status) } // Store session info. if err = m.orm.SetSessionInfo(sess.info); err != nil { log.Error("db set session info fail", "session id", sess.info.ID, "error", err) return false } m.mu.Lock() m.sessions[taskId] = sess m.mu.Unlock() go m.CollectProofs(sess) return true } // StartAggProofGenerationSession starts an aggregator proof generation. func (m *Manager) StartAggProofGenerationSession(task *types.AggTask, prevSession *session) (success bool) { var taskId string if task != nil { taskId = task.ID } else { taskId = prevSession.info.ID } if m.GetNumberOfIdleRollers(message.AggregatorProve) == 0 { log.Warn("no idle common roller when starting proof generation session", "id", taskId) return false } log.Info("start aggregator proof generation session", "id", taskId) defer func() { if !success { if task != nil { if err := m.orm.UpdateAggTaskStatus(taskId, types.ProvingTaskUnassigned); err != nil { log.Error("fail to reset task_status as Unassigned", "id", taskId, "err", err) } else if err := m.orm.UpdateAggTaskStatus(taskId, types.ProvingTaskFailed); err != nil { log.Error("fail to reset task_status as Failed", "id", taskId, "err", err) } } } }() // get agg task from db subProofs, err := m.orm.GetSubProofsByAggTaskID(taskId) if err != nil { log.Error("failed to get sub proofs for aggregator task", "id", taskId, "error", err) return false } // Dispatch task to basic rollers. rollers := make(map[string]*types.RollerStatus) for i := 0; i < int(m.cfg.RollersPerSession); i++ { roller := m.selectRoller(message.AggregatorProve) if roller == nil { log.Info("selectRoller returns nil") break } log.Info("roller is picked", "session id", taskId, "name", roller.Name, "type", roller.Type, "public key", roller.PublicKey) // send trace to roller if !roller.sendTask(&message.TaskMsg{ ID: taskId, Type: message.AggregatorProve, SubProofs: subProofs, }) { log.Error("send task failed", "roller name", roller.Name, "public key", roller.PublicKey, "id", taskId) continue } m.updateMetricRollerProofsLastAssignedTimestampGauge(roller.PublicKey) rollers[roller.PublicKey] = &types.RollerStatus{PublicKey: roller.PublicKey, Name: roller.Name, Status: types.RollerAssigned} } // No roller assigned. if len(rollers) == 0 { log.Error("no roller assigned", "id", taskId, "number of idle aggregator rollers", m.GetNumberOfIdleRollers(message.AggregatorProve)) return false } // Update session proving status as assigned. if err = m.orm.UpdateAggTaskStatus(taskId, types.ProvingTaskAssigned); err != nil { log.Error("failed to update task status", "id", taskId, "err", err) return false } // Create a proof generation session. sess := &session{ info: &types.SessionInfo{ ID: taskId, Rollers: rollers, ProveType: message.AggregatorProve, StartTimestamp: time.Now().Unix(), Attempts: 1, }, finishChan: make(chan rollerProofStatus, proofAndPkBufferSize), } if prevSession != nil { sess.info.Attempts += prevSession.info.Attempts } for _, roller := range sess.info.Rollers { log.Info( "assigned proof to roller", "session id", sess.info.ID, "session type", sess.info.ProveType, "roller name", roller.Name, "roller pk", roller.PublicKey, "proof status", roller.Status) } // Store session info. if err = m.orm.SetSessionInfo(sess.info); err != nil { log.Error("db set session info fail", "session id", sess.info.ID, "error", err) return false } m.mu.Lock() m.sessions[taskId] = sess m.mu.Unlock() go m.CollectProofs(sess) return true } func (m *Manager) addFailedSession(sess *session, errMsg string) { m.mu.Lock() defer m.mu.Unlock() m.failedSessionInfos[sess.info.ID] = newSessionInfo(sess, types.ProvingTaskFailed, errMsg, true) } // VerifyToken verifies pukey for token and expiration time func (m *Manager) VerifyToken(authMsg *message.AuthMsg) (bool, error) { pubkey, _ := authMsg.PublicKey() // GetValue returns nil if value is expired if token, ok := m.tokenCache.Get(pubkey); !ok || token != authMsg.Identity.Token { return false, fmt.Errorf("failed to find corresponding token. roller name: %s. roller pk: %s.", authMsg.Identity.Name, pubkey) } return true, nil } func (m *Manager) addVerifyTask(proof *message.AggProof) chan verifyResult { c := make(chan verifyResult, 1) m.verifierWorkerPool.AddTask(func() { result, err := m.verifier.VerifyProof(proof) c <- verifyResult{result, err} }) return c } func (m *Manager) verifyProof(proof *message.AggProof) (bool, error) { if !m.isRunning() { return false, errors.New("coordinator has stopped before verification") } verifyResultChan := m.addVerifyTask(proof) verifyResult := <-verifyResultChan return verifyResult.result, verifyResult.err } type verifyResult struct { result bool err error }