Files
scroll/coordinator/manager.go

837 lines
26 KiB
Go

package coordinator
import (
"context"
"errors"
"fmt"
mathrand "math/rand"
"sync"
"sync/atomic"
"time"
cmap "github.com/orcaman/concurrent-map"
"github.com/patrickmn/go-cache"
"github.com/scroll-tech/go-ethereum/common"
geth_types "github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/ethclient"
"github.com/scroll-tech/go-ethereum/log"
geth_metrics "github.com/scroll-tech/go-ethereum/metrics"
"github.com/scroll-tech/go-ethereum/rpc"
"scroll-tech/common/metrics"
"scroll-tech/common/types"
"scroll-tech/common/types/message"
"scroll-tech/common/utils/workerpool"
"scroll-tech/database"
"scroll-tech/coordinator/config"
"scroll-tech/coordinator/verifier"
)
var (
// proofs
coordinatorProofsReceivedTotalCounter = geth_metrics.NewRegisteredCounter("coordinator/proofs/received/total", metrics.ScrollRegistry)
coordinatorProofsVerifiedSuccessTimeTimer = geth_metrics.NewRegisteredTimer("coordinator/proofs/verified/success/time", metrics.ScrollRegistry)
coordinatorProofsVerifiedFailedTimeTimer = geth_metrics.NewRegisteredTimer("coordinator/proofs/verified/failed/time", metrics.ScrollRegistry)
coordinatorProofsGeneratedFailedTimeTimer = geth_metrics.NewRegisteredTimer("coordinator/proofs/generated/failed/time", metrics.ScrollRegistry)
// sessions
coordinatorSessionsSuccessTotalCounter = geth_metrics.NewRegisteredCounter("coordinator/sessions/success/total", metrics.ScrollRegistry)
coordinatorSessionsTimeoutTotalCounter = geth_metrics.NewRegisteredCounter("coordinator/sessions/timeout/total", metrics.ScrollRegistry)
coordinatorSessionsFailedTotalCounter = geth_metrics.NewRegisteredCounter("coordinator/sessions/failed/total", metrics.ScrollRegistry)
coordinatorSessionsActiveNumberGauge = geth_metrics.NewRegisteredCounter("coordinator/sessions/active/number", metrics.ScrollRegistry)
)
const (
proofAndPkBufferSize = 10
)
type rollerProofStatus struct {
id string
typ message.ProveType
pk string
status types.RollerProveStatus
}
// Contains all the information on an ongoing proof generation session.
type session struct {
info *types.SessionInfo
// finish channel is used to pass the public key of the rollers who finished proving process.
finishChan chan rollerProofStatus
}
// Manager is responsible for maintaining connections with active rollers,
// sending the challenges, and receiving proofs. It also regulates the reward
// distribution. All read and write logic and connection handling happens through
// a modular websocket server, contained within the Manager. Incoming messages are
// then passed to the Manager where the actual handling logic resides.
type Manager struct {
// The manager context.
ctx context.Context
// The roller manager configuration.
cfg *config.RollerManagerConfig
// The indicator whether the backend is running or not.
running int32
// A mutex guarding the boolean below.
mu sync.RWMutex
// A map containing all active proof generation sessions.
sessions map[string]*session
// A map containing proof failed or verify failed proof.
rollerPool cmap.ConcurrentMap
failedSessionInfos map[string]*SessionInfo
// A direct connection to the Halo2 verifier, used to verify
// incoming proofs.
verifier *verifier.Verifier
// db interface
orm database.OrmFactory
// l2geth client
*ethclient.Client
// Token cache
tokenCache *cache.Cache
// A mutex guarding registration
registerMu sync.RWMutex
// Verifier worker pool
verifierWorkerPool *workerpool.WorkerPool
}
// New returns a new instance of Manager. The instance will be not fully prepared,
// and still needs to be finalized and ran by calling `manager.Start`.
func New(ctx context.Context, cfg *config.RollerManagerConfig, orm database.OrmFactory, client *ethclient.Client) (*Manager, error) {
v, err := verifier.NewVerifier(cfg.Verifier)
if err != nil {
return nil, err
}
log.Info("Start coordinator successfully.")
return &Manager{
ctx: ctx,
cfg: cfg,
rollerPool: cmap.New(),
sessions: make(map[string]*session),
failedSessionInfos: make(map[string]*SessionInfo),
verifier: v,
orm: orm,
Client: client,
tokenCache: cache.New(time.Duration(cfg.TokenTimeToLive)*time.Second, 1*time.Hour),
verifierWorkerPool: workerpool.NewWorkerPool(cfg.MaxVerifierWorkers),
}, nil
}
// Start the Manager module.
func (m *Manager) Start() error {
if m.isRunning() {
return nil
}
m.verifierWorkerPool.Run()
m.restorePrevSessions()
atomic.StoreInt32(&m.running, 1)
go m.Loop()
return nil
}
// Stop the Manager module, for a graceful shutdown.
func (m *Manager) Stop() {
if !m.isRunning() {
return
}
m.verifierWorkerPool.Stop()
atomic.StoreInt32(&m.running, 0)
}
// isRunning returns an indicator whether manager is running or not.
func (m *Manager) isRunning() bool {
return atomic.LoadInt32(&m.running) == 1
}
// Loop keeps the manager running.
func (m *Manager) Loop() {
var (
tick = time.NewTicker(time.Second * 2)
tasks []*types.BlockBatch
aggTasks []*types.AggTask
)
defer tick.Stop()
for {
select {
case <-tick.C:
// load and send aggregator tasks
if len(aggTasks) == 0 && m.orm != nil {
var err error
aggTasks, err = m.orm.GetUnassignedAggTasks()
if err != nil {
log.Error("failed to get unassigned aggregator proving tasks", "error", err)
continue
}
}
// Select aggregator type roller and send message
for len(aggTasks) > 0 && m.StartAggProofGenerationSession(aggTasks[0], nil) {
aggTasks = aggTasks[1:]
}
// load and send basic tasks
if len(tasks) == 0 && m.orm != nil {
var err error
// TODO: add cache
if tasks, err = m.orm.GetBlockBatches(
map[string]interface{}{"proving_status": types.ProvingTaskUnassigned},
fmt.Sprintf(
"ORDER BY index %s LIMIT %d;",
m.cfg.OrderSession,
m.GetNumberOfIdleRollers(message.BasicProve),
),
); err != nil {
log.Error("failed to get unassigned basic proving tasks", "error", err)
continue
}
}
// Select basic type roller and send message
for len(tasks) > 0 && m.StartBasicProofGenerationSession(tasks[0], nil) {
tasks = tasks[1:]
}
case <-m.ctx.Done():
if m.ctx.Err() != nil {
log.Error(
"manager context canceled with error",
"error", m.ctx.Err(),
)
}
return
}
}
}
func (m *Manager) restorePrevSessions() {
// m.orm may be nil in scroll tests
if m.orm == nil {
return
}
m.mu.Lock()
defer m.mu.Unlock()
var hashes []string
// load assigned aggregator tasks from db
aggTasks, err := m.orm.GetAssignedAggTasks()
if err != nil {
log.Error("failed to load assigned aggregator tasks from db", "error", err)
return
}
for _, aggTask := range aggTasks {
hashes = append(hashes, aggTask.ID)
}
// load assigned basic tasks from db
batchHashes, err := m.orm.GetAssignedBatchHashes()
if err != nil {
log.Error("failed to get assigned batch batchHashes from db", "error", err)
return
}
hashes = append(hashes, batchHashes...)
prevSessions, err := m.orm.GetSessionInfosByHashes(hashes)
if err != nil {
log.Error("failed to recover roller session info from db", "error", err)
return
}
for _, v := range prevSessions {
sess := &session{
info: v,
finishChan: make(chan rollerProofStatus, proofAndPkBufferSize),
}
m.sessions[sess.info.ID] = sess
log.Info("Coordinator restart reload sessions", "session start time", time.Unix(sess.info.StartTimestamp, 0))
for _, roller := range sess.info.Rollers {
log.Info(
"restore roller info for session",
"session id", sess.info.ID,
"roller name", roller.Name,
"prove type", sess.info.ProveType,
"public key", roller.PublicKey,
"proof status", roller.Status)
}
go m.CollectProofs(sess)
}
}
// HandleZkProof handle a ZkProof submitted from a roller.
// For now only proving/verifying error will lead to setting status as skipped.
// db/unmarshal errors will not because they are errors on the business logic side.
func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
var dbErr error
var success bool
// Assess if the proof generation session for the given ID is still active.
// We hold the read lock until the end of the function so that there is no
// potential race for channel deletion.
m.mu.RLock()
defer m.mu.RUnlock()
sess, ok := m.sessions[msg.ID]
if !ok {
return fmt.Errorf("proof generation session for id %v does not existID", msg.ID)
}
proofTime := time.Since(time.Unix(sess.info.StartTimestamp, 0))
proofTimeSec := uint64(proofTime.Seconds())
// Ensure this roller is eligible to participate in the session.
roller, ok := sess.info.Rollers[pk]
if !ok {
return fmt.Errorf("roller %s %s (%s) is not eligible to partake in proof session %v", roller.Name, sess.info.ProveType, roller.PublicKey, msg.ID)
}
if roller.Status == types.RollerProofValid {
// In order to prevent DoS attacks, it is forbidden to repeatedly submit valid proofs.
// TODO: Defend invalid proof resubmissions by one of the following two methods:
// (i) slash the roller for each submission of invalid proof
// (ii) set the maximum failure retry times
log.Warn(
"roller has already submitted valid proof in proof session",
"roller name", roller.Name,
"roller pk", roller.PublicKey,
"prove type", sess.info.ProveType,
"proof id", msg.ID,
)
return nil
}
log.Info(
"handling zk proof",
"proof id", msg.ID,
"roller name", roller.Name,
"roller pk", roller.PublicKey,
"prove type", sess.info.ProveType,
"proof time", proofTimeSec,
)
defer func() {
// TODO: maybe we should use db tx for the whole process?
// Roll back current proof's status.
if dbErr != nil {
if msg.Type == message.BasicProve {
if err := m.orm.UpdateProvingStatus(msg.ID, types.ProvingTaskUnassigned); err != nil {
log.Error("fail to reset basic task status as Unassigned", "msg.ID", msg.ID)
}
}
if msg.Type == message.AggregatorProve {
if err := m.orm.UpdateAggTaskStatus(msg.ID, types.ProvingTaskUnassigned); err != nil {
log.Error("fail to reset aggregator task status as Unassigned", "msg.ID", msg.ID)
}
}
}
// set proof status
status := types.RollerProofInvalid
if success && dbErr == nil {
status = types.RollerProofValid
}
// notify the session that the roller finishes the proving process
sess.finishChan <- rollerProofStatus{msg.ID, msg.Type, pk, status}
}()
if msg.Status != message.StatusOk {
coordinatorProofsGeneratedFailedTimeTimer.Update(proofTime)
m.updateMetricRollerProofsGeneratedFailedTimeTimer(roller.PublicKey, proofTime)
log.Info(
"proof generated by roller failed",
"proof id", msg.ID,
"roller name", roller.Name,
"roller pk", roller.PublicKey,
"prove type", msg.Type,
"proof time", proofTimeSec,
"error", msg.Error,
)
return nil
}
// store proof content
if msg.Type == message.BasicProve {
if dbErr = m.orm.UpdateProofByHash(m.ctx, msg.ID, msg.Proof.Proof, msg.Proof.FinalPair, proofTimeSec); dbErr != nil {
log.Error("failed to store basic proof into db", "error", dbErr)
return dbErr
}
if dbErr = m.orm.UpdateProvingStatus(msg.ID, types.ProvingTaskProved); dbErr != nil {
log.Error("failed to update basic task status as proved", "error", dbErr)
return dbErr
}
}
if msg.Type == message.AggregatorProve {
if dbErr = m.orm.UpdateProofForAggTask(msg.ID, msg.Proof); dbErr != nil {
log.Error("failed to store aggregator proof into db", "error", dbErr)
return dbErr
}
}
coordinatorProofsReceivedTotalCounter.Inc(1)
var verifyErr error
// TODO: wrap both basic verifier and aggregator verifier
success, verifyErr = m.verifyProof(msg.Proof)
if verifyErr != nil {
// TODO: this is only a temp workaround for testnet, we should return err in real cases
success = false
log.Error("Failed to verify zk proof", "proof id", msg.ID, "roller name", roller.Name,
"roller pk", roller.PublicKey, "prove type", msg.Type, "proof time", proofTimeSec, "error", verifyErr)
// TODO: Roller needs to be slashed if proof is invalid.
}
if success {
if msg.Type == message.AggregatorProve {
if dbErr = m.orm.UpdateAggTaskStatus(msg.ID, types.ProvingTaskVerified); dbErr != nil {
log.Error(
"failed to update aggregator proving_status",
"msg.ID", msg.ID,
"status", types.ProvingTaskVerified,
"error", dbErr)
return dbErr
}
}
if msg.Type == message.BasicProve {
if dbErr = m.orm.UpdateProvingStatus(msg.ID, types.ProvingTaskVerified); dbErr != nil {
log.Error(
"failed to update basic proving_status",
"msg.ID", msg.ID,
"status", types.ProvingTaskVerified,
"error", dbErr)
return dbErr
}
}
coordinatorProofsVerifiedSuccessTimeTimer.Update(proofTime)
m.updateMetricRollerProofsVerifiedSuccessTimeTimer(roller.PublicKey, proofTime)
log.Info("proof verified by coordinator success", "proof id", msg.ID, "roller name", roller.Name,
"roller pk", roller.PublicKey, "prove type", msg.Type, "proof time", proofTimeSec)
} else {
coordinatorProofsVerifiedFailedTimeTimer.Update(proofTime)
m.updateMetricRollerProofsVerifiedFailedTimeTimer(roller.PublicKey, proofTime)
log.Info("proof verified by coordinator failed", "proof id", msg.ID, "roller name", roller.Name,
"roller pk", roller.PublicKey, "prove type", msg.Type, "proof time", proofTimeSec, "error", verifyErr)
}
return nil
}
// CollectProofs collects proofs corresponding to a proof generation session.
func (m *Manager) CollectProofs(sess *session) {
coordinatorSessionsActiveNumberGauge.Inc(1)
defer coordinatorSessionsActiveNumberGauge.Dec(1)
for {
select {
//Execute after timeout, set in config.json. Consider all rollers failed.
case <-time.After(time.Duration(m.cfg.CollectionTime) * time.Minute):
// Check if session can be replayed
if sess.info.Attempts < m.cfg.SessionAttempts {
var success bool
if sess.info.ProveType == message.AggregatorProve {
success = m.StartAggProofGenerationSession(nil, sess)
} else if sess.info.ProveType == message.BasicProve {
success = m.StartBasicProofGenerationSession(nil, sess)
}
if success {
m.mu.Lock()
for pk := range sess.info.Rollers {
m.freeTaskIDForRoller(pk, sess.info.ID)
}
m.mu.Unlock()
log.Info("Retrying session", "session id:", sess.info.ID)
return
}
}
// record failed session.
errMsg := "proof generation session ended without receiving any valid proofs"
m.addFailedSession(sess, errMsg)
log.Warn(errMsg, "session id", sess.info.ID)
// Set status as skipped.
// Note that this is only a workaround for testnet here.
// TODO: In real cases we should reset to orm.ProvingTaskUnassigned
// so as to re-distribute the task in the future
if sess.info.ProveType == message.BasicProve {
if err := m.orm.UpdateProvingStatus(sess.info.ID, types.ProvingTaskFailed); err != nil {
log.Error("fail to reset basic task_status as Unassigned", "id", sess.info.ID, "err", err)
}
}
if sess.info.ProveType == message.AggregatorProve {
if err := m.orm.UpdateAggTaskStatus(sess.info.ID, types.ProvingTaskFailed); err != nil {
log.Error("fail to reset aggregator task_status as Unassigned", "id", sess.info.ID, "err", err)
}
}
m.mu.Lock()
for pk := range sess.info.Rollers {
m.freeTaskIDForRoller(pk, sess.info.ID)
}
delete(m.sessions, sess.info.ID)
m.mu.Unlock()
coordinatorSessionsTimeoutTotalCounter.Inc(1)
return
//Execute after one of the roller finishes sending proof, return early if all rollers had sent results.
case ret := <-sess.finishChan:
m.mu.Lock()
sess.info.Rollers[ret.pk].Status = ret.status
if sess.isSessionFailed() {
if ret.typ == message.BasicProve {
if err := m.orm.UpdateProvingStatus(ret.id, types.ProvingTaskFailed); err != nil {
log.Error("failed to update basic proving_status as failed", "msg.ID", ret.id, "error", err)
}
}
if ret.typ == message.AggregatorProve {
if err := m.orm.UpdateAggTaskStatus(ret.id, types.ProvingTaskFailed); err != nil {
log.Error("failed to update aggregator proving_status as failed", "msg.ID", ret.id, "error", err)
}
}
coordinatorSessionsFailedTotalCounter.Inc(1)
}
if err := m.orm.SetSessionInfo(sess.info); err != nil {
log.Error("db set session info fail", "pk", ret.pk, "error", err)
}
//Check if all rollers have finished their tasks, and rollers with valid results are indexed by public key.
finished, validRollers := sess.isRollersFinished()
//When all rollers have finished submitting their tasks, select a winner within rollers with valid proof, and return, terminate the for loop.
if finished && len(validRollers) > 0 {
//Select a random index for this slice.
randIndex := mathrand.Intn(len(validRollers))
_ = validRollers[randIndex]
// TODO: reward winner
for pk := range sess.info.Rollers {
m.freeTaskIDForRoller(pk, sess.info.ID)
}
delete(m.sessions, sess.info.ID)
m.mu.Unlock()
coordinatorSessionsSuccessTotalCounter.Inc(1)
return
}
m.mu.Unlock()
}
}
}
// isRollersFinished checks if all rollers have finished submitting proofs, check their validity, and record rollers who produce valid proof.
// When rollersLeft reaches 0, it means all rollers have finished their tasks.
// validRollers also records the public keys of rollers who have finished their tasks correctly as index.
func (s *session) isRollersFinished() (bool, []string) {
var validRollers []string
for pk, roller := range s.info.Rollers {
if roller.Status == types.RollerProofValid {
validRollers = append(validRollers, pk)
continue
}
if roller.Status == types.RollerProofInvalid {
continue
}
// Some rollers are still proving.
return false, nil
}
return true, validRollers
}
func (s *session) isSessionFailed() bool {
for _, roller := range s.info.Rollers {
if roller.Status != types.RollerProofInvalid {
return false
}
}
return true
}
// APIs collect API services.
func (m *Manager) APIs() []rpc.API {
return []rpc.API{
{
Namespace: "roller",
Service: RollerAPI(m),
Public: true,
},
{
Namespace: "debug",
Public: true,
Service: RollerDebugAPI(m),
},
}
}
// StartBasicProofGenerationSession starts a basic proof generation session
func (m *Manager) StartBasicProofGenerationSession(task *types.BlockBatch, prevSession *session) (success bool) {
var taskId string
if task != nil {
taskId = task.Hash
} else {
taskId = prevSession.info.ID
}
if m.GetNumberOfIdleRollers(message.BasicProve) == 0 {
log.Warn("no idle basic roller when starting proof generation session", "id", taskId)
return false
}
log.Info("start basic proof generation session", "id", taskId)
defer func() {
if !success {
if task != nil {
if err := m.orm.UpdateProvingStatus(taskId, types.ProvingTaskUnassigned); err != nil {
log.Error("fail to reset task_status as Unassigned", "id", taskId, "err", err)
}
} else {
if err := m.orm.UpdateProvingStatus(taskId, types.ProvingTaskFailed); err != nil {
log.Error("fail to reset task_status as Failed", "id", taskId, "err", err)
}
}
}
}()
// Get block traces.
blockInfos, err := m.orm.GetL2BlockInfos(map[string]interface{}{"batch_hash": taskId})
if err != nil {
log.Error(
"could not GetBlockInfos",
"batch_hash", taskId,
"error", err,
)
return false
}
traces := make([]*geth_types.BlockTrace, len(blockInfos))
for i, blockInfo := range blockInfos {
traces[i], err = m.Client.GetBlockTraceByHash(m.ctx, common.HexToHash(blockInfo.Hash))
if err != nil {
log.Error(
"could not GetBlockTraceByNumber",
"block number", blockInfo.Number,
"block hash", blockInfo.Hash,
"error", err,
)
return false
}
}
// Dispatch task to basic rollers.
rollers := make(map[string]*types.RollerStatus)
for i := 0; i < int(m.cfg.RollersPerSession); i++ {
roller := m.selectRoller(message.BasicProve)
if roller == nil {
log.Info("selectRoller returns nil")
break
}
log.Info("roller is picked", "session id", taskId, "name", roller.Name, "public key", roller.PublicKey)
// send trace to roller
if !roller.sendTask(&message.TaskMsg{ID: taskId, Type: message.BasicProve, Traces: traces}) {
log.Error("send task failed", "roller name", roller.Name, "public key", roller.PublicKey, "id", taskId)
continue
}
m.updateMetricRollerProofsLastAssignedTimestampGauge(roller.PublicKey)
rollers[roller.PublicKey] = &types.RollerStatus{PublicKey: roller.PublicKey, Name: roller.Name, Status: types.RollerAssigned}
}
// No roller assigned.
if len(rollers) == 0 {
log.Error("no roller assigned", "id", taskId, "number of idle basic rollers", m.GetNumberOfIdleRollers(message.BasicProve))
return false
}
// Update session proving status as assigned.
if err = m.orm.UpdateProvingStatus(taskId, types.ProvingTaskAssigned); err != nil {
log.Error("failed to update task status", "id", taskId, "err", err)
return false
}
// Create a proof generation session.
sess := &session{
info: &types.SessionInfo{
ID: taskId,
Rollers: rollers,
ProveType: message.BasicProve,
StartTimestamp: time.Now().Unix(),
Attempts: 1,
},
finishChan: make(chan rollerProofStatus, proofAndPkBufferSize),
}
if prevSession != nil {
sess.info.Attempts += prevSession.info.Attempts
}
for _, roller := range sess.info.Rollers {
log.Info(
"assigned proof to roller",
"session id", sess.info.ID,
"session type", sess.info.ProveType,
"roller name", roller.Name,
"roller pk", roller.PublicKey,
"proof status", roller.Status)
}
// Store session info.
if err = m.orm.SetSessionInfo(sess.info); err != nil {
log.Error("db set session info fail", "session id", sess.info.ID, "error", err)
return false
}
m.mu.Lock()
m.sessions[taskId] = sess
m.mu.Unlock()
go m.CollectProofs(sess)
return true
}
// StartAggProofGenerationSession starts an aggregator proof generation.
func (m *Manager) StartAggProofGenerationSession(task *types.AggTask, prevSession *session) (success bool) {
var taskId string
if task != nil {
taskId = task.ID
} else {
taskId = prevSession.info.ID
}
if m.GetNumberOfIdleRollers(message.AggregatorProve) == 0 {
log.Warn("no idle common roller when starting proof generation session", "id", taskId)
return false
}
log.Info("start aggregator proof generation session", "id", taskId)
defer func() {
if !success {
if task != nil {
if err := m.orm.UpdateAggTaskStatus(taskId, types.ProvingTaskUnassigned); err != nil {
log.Error("fail to reset task_status as Unassigned", "id", taskId, "err", err)
} else if err := m.orm.UpdateAggTaskStatus(taskId, types.ProvingTaskFailed); err != nil {
log.Error("fail to reset task_status as Failed", "id", taskId, "err", err)
}
}
}
}()
// get agg task from db
subProofs, err := m.orm.GetSubProofsByAggTaskID(taskId)
if err != nil {
log.Error("failed to get sub proofs for aggregator task", "id", taskId, "error", err)
return false
}
// Dispatch task to basic rollers.
rollers := make(map[string]*types.RollerStatus)
for i := 0; i < int(m.cfg.RollersPerSession); i++ {
roller := m.selectRoller(message.AggregatorProve)
if roller == nil {
log.Info("selectRoller returns nil")
break
}
log.Info("roller is picked", "session id", taskId, "name", roller.Name, "type", roller.Type, "public key", roller.PublicKey)
// send trace to roller
if !roller.sendTask(&message.TaskMsg{
ID: taskId,
Type: message.AggregatorProve,
SubProofs: subProofs,
}) {
log.Error("send task failed", "roller name", roller.Name, "public key", roller.PublicKey, "id", taskId)
continue
}
m.updateMetricRollerProofsLastAssignedTimestampGauge(roller.PublicKey)
rollers[roller.PublicKey] = &types.RollerStatus{PublicKey: roller.PublicKey, Name: roller.Name, Status: types.RollerAssigned}
}
// No roller assigned.
if len(rollers) == 0 {
log.Error("no roller assigned", "id", taskId, "number of idle aggregator rollers", m.GetNumberOfIdleRollers(message.AggregatorProve))
return false
}
// Update session proving status as assigned.
if err = m.orm.UpdateAggTaskStatus(taskId, types.ProvingTaskAssigned); err != nil {
log.Error("failed to update task status", "id", taskId, "err", err)
return false
}
// Create a proof generation session.
sess := &session{
info: &types.SessionInfo{
ID: taskId,
Rollers: rollers,
ProveType: message.AggregatorProve,
StartTimestamp: time.Now().Unix(),
Attempts: 1,
},
finishChan: make(chan rollerProofStatus, proofAndPkBufferSize),
}
if prevSession != nil {
sess.info.Attempts += prevSession.info.Attempts
}
for _, roller := range sess.info.Rollers {
log.Info(
"assigned proof to roller",
"session id", sess.info.ID,
"session type", sess.info.ProveType,
"roller name", roller.Name,
"roller pk", roller.PublicKey,
"proof status", roller.Status)
}
// Store session info.
if err = m.orm.SetSessionInfo(sess.info); err != nil {
log.Error("db set session info fail", "session id", sess.info.ID, "error", err)
return false
}
m.mu.Lock()
m.sessions[taskId] = sess
m.mu.Unlock()
go m.CollectProofs(sess)
return true
}
func (m *Manager) addFailedSession(sess *session, errMsg string) {
m.mu.Lock()
defer m.mu.Unlock()
m.failedSessionInfos[sess.info.ID] = newSessionInfo(sess, types.ProvingTaskFailed, errMsg, true)
}
// VerifyToken verifies pukey for token and expiration time
func (m *Manager) VerifyToken(authMsg *message.AuthMsg) (bool, error) {
pubkey, _ := authMsg.PublicKey()
// GetValue returns nil if value is expired
if token, ok := m.tokenCache.Get(pubkey); !ok || token != authMsg.Identity.Token {
return false, fmt.Errorf("failed to find corresponding token. roller name: %s. roller pk: %s.", authMsg.Identity.Name, pubkey)
}
return true, nil
}
func (m *Manager) addVerifyTask(proof *message.AggProof) chan verifyResult {
c := make(chan verifyResult, 1)
m.verifierWorkerPool.AddTask(func() {
result, err := m.verifier.VerifyProof(proof)
c <- verifyResult{result, err}
})
return c
}
func (m *Manager) verifyProof(proof *message.AggProof) (bool, error) {
if !m.isRunning() {
return false, errors.New("coordinator has stopped before verification")
}
verifyResultChan := m.addVerifyTask(proof)
verifyResult := <-verifyResultChan
return verifyResult.result, verifyResult.err
}
type verifyResult struct {
result bool
err error
}