mirror of
https://github.com/scroll-tech/scroll.git
synced 2026-01-22 12:28:13 -05:00
538 lines
15 KiB
Go
538 lines
15 KiB
Go
package coordinator
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"math/big"
|
|
mathrand "math/rand"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/scroll-tech/go-ethereum/core/types"
|
|
"github.com/scroll-tech/go-ethereum/log"
|
|
"github.com/scroll-tech/go-ethereum/rpc"
|
|
|
|
"scroll-tech/common/message"
|
|
"scroll-tech/database/orm"
|
|
|
|
"scroll-tech/coordinator/config"
|
|
"scroll-tech/coordinator/verifier"
|
|
)
|
|
|
|
const (
|
|
proofAndPkBufferSize = 10
|
|
)
|
|
|
|
// Contains all the information on an ongoing proof generation session.
|
|
type session struct {
|
|
// session id
|
|
id uint64
|
|
// A list of all participating rollers and if they finished proof generation for this session.
|
|
// The map key is a hexadecimal encoding of the roller public key, as byte slices
|
|
// can not be compared explicitly.
|
|
rollers map[string]bool
|
|
roller_names map[string]string
|
|
// session start time
|
|
startTime time.Time
|
|
// finish channel is used to pass the public key of the rollers who finished proving process.
|
|
finishChan chan string
|
|
}
|
|
|
|
// Manager is responsible for maintaining connections with active rollers,
|
|
// sending the challenges, and receiving proofs. It also regulates the reward
|
|
// distribution. All read and write logic and connection handling happens through
|
|
// a modular websocket server, contained within the Manager. Incoming messages are
|
|
// then passed to the Manager where the actual handling logic resides.
|
|
type Manager struct {
|
|
// The manager context.
|
|
ctx context.Context
|
|
|
|
// The roller manager configuration.
|
|
cfg *config.RollerManagerConfig
|
|
|
|
// The indicator whether the backend is running or not.
|
|
running int32
|
|
// The websocket server which holds the connections with active rollers.
|
|
server *server
|
|
|
|
// A mutex guarding the boolean below.
|
|
mu sync.RWMutex
|
|
// A map containing all active proof generation sessions.
|
|
sessions map[uint64]session
|
|
// A map containing proof failed or verify failed proof.
|
|
failedSessionInfos map[uint64]SessionInfo
|
|
|
|
// A direct connection to the Halo2 verifier, used to verify
|
|
// incoming proofs.
|
|
verifier *verifier.Verifier
|
|
|
|
// db interface
|
|
orm orm.BlockResultOrm
|
|
}
|
|
|
|
// New returns a new instance of Manager. The instance will be not fully prepared,
|
|
// and still needs to be finalized and ran by calling `manager.Start`.
|
|
func New(ctx context.Context, cfg *config.RollerManagerConfig, orm orm.BlockResultOrm) (*Manager, error) {
|
|
var v *verifier.Verifier
|
|
if cfg.VerifierEndpoint != "" {
|
|
var err error
|
|
v, err = verifier.NewVerifier(cfg.VerifierEndpoint)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
log.Info("Start rollerManager successfully.")
|
|
|
|
return &Manager{
|
|
ctx: ctx,
|
|
cfg: cfg,
|
|
server: newServer(cfg.Endpoint),
|
|
sessions: make(map[uint64]session),
|
|
failedSessionInfos: make(map[uint64]SessionInfo),
|
|
verifier: v,
|
|
orm: orm,
|
|
}, nil
|
|
}
|
|
|
|
// Start the Manager module.
|
|
func (m *Manager) Start() error {
|
|
if m.isRunning() {
|
|
return nil
|
|
}
|
|
|
|
// m.orm may be nil in scroll tests
|
|
if m.orm != nil {
|
|
// clean up assigned but not submitted task
|
|
blocks, err := m.orm.GetBlockResults(map[string]interface{}{"status": orm.BlockAssigned})
|
|
if err == nil {
|
|
for _, block := range blocks {
|
|
if err := m.orm.UpdateBlockStatus(block.BlockTrace.Number.ToInt().Uint64(), orm.BlockUnassigned); err != nil {
|
|
log.Error("fail to reset block_status as Unassigned")
|
|
}
|
|
}
|
|
} else {
|
|
log.Error("fail to fetch assigned blocks")
|
|
}
|
|
}
|
|
|
|
if err := m.server.start(); err != nil {
|
|
return err
|
|
}
|
|
atomic.StoreInt32(&m.running, 1)
|
|
|
|
go m.Loop()
|
|
return nil
|
|
}
|
|
|
|
// Stop the Manager module, for a graceful shutdown.
|
|
func (m *Manager) Stop() {
|
|
if !m.isRunning() {
|
|
return
|
|
}
|
|
|
|
// Stop accepting connections
|
|
if err := m.server.stop(); err != nil {
|
|
log.Error("Server shutdown failed", "error", err)
|
|
return
|
|
}
|
|
|
|
atomic.StoreInt32(&m.running, 0)
|
|
}
|
|
|
|
// isRunning returns an indicator whether manager is running or not.
|
|
func (m *Manager) isRunning() bool {
|
|
return atomic.LoadInt32(&m.running) == 1
|
|
}
|
|
|
|
// Loop keeps the manager running.
|
|
func (m *Manager) Loop() {
|
|
var (
|
|
tick = time.NewTicker(time.Second * 3)
|
|
traces []*types.BlockResult
|
|
)
|
|
defer tick.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-tick.C:
|
|
if len(traces) == 0 && m.orm != nil {
|
|
var err error
|
|
numIdleRollers := m.GetNumberOfIdleRollers()
|
|
// TODO: add cache
|
|
if traces, err = m.orm.GetBlockResults(
|
|
map[string]interface{}{"status": orm.BlockUnassigned},
|
|
fmt.Sprintf(
|
|
"ORDER BY number %s LIMIT %d;",
|
|
m.cfg.OrderSession,
|
|
numIdleRollers,
|
|
),
|
|
); err != nil {
|
|
log.Error("failed to get blockResult", "error", err)
|
|
continue
|
|
}
|
|
}
|
|
// Select roller and send message
|
|
for len(traces) > 0 && m.StartProofGenerationSession(traces[0]) {
|
|
traces = traces[1:]
|
|
}
|
|
case msg := <-m.server.msgChan:
|
|
if err := m.HandleMessage(msg.pk, msg.message); err != nil {
|
|
log.Error(
|
|
"could not handle message",
|
|
"error", err,
|
|
)
|
|
}
|
|
case <-m.ctx.Done():
|
|
if m.ctx.Err() != nil {
|
|
log.Error(
|
|
"manager context canceled with error",
|
|
"error", m.ctx.Err(),
|
|
)
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// HandleMessage handle a message from a roller.
|
|
func (m *Manager) HandleMessage(pk string, payload []byte) error {
|
|
// Recover message
|
|
msg := &message.Msg{}
|
|
if err := json.Unmarshal(payload, msg); err != nil {
|
|
return err
|
|
}
|
|
|
|
switch msg.Type {
|
|
case message.Error:
|
|
// Just log it for now.
|
|
log.Error("error message received from roller", "message", msg)
|
|
// TODO: handle in m.failedSessionInfos
|
|
return nil
|
|
case message.Register:
|
|
// We shouldn't get this message, as the sequencer should handle registering at the start
|
|
// of the connection.
|
|
return errors.New("attempted handshake at the wrong time")
|
|
case message.BlockTrace:
|
|
// We shouldn't get this message, as the sequencer should always be the one to send it
|
|
return errors.New("received illegal message")
|
|
case message.Proof:
|
|
return m.HandleZkProof(pk, msg.Payload)
|
|
default:
|
|
return fmt.Errorf("unrecognized message type %v", msg.Type)
|
|
}
|
|
}
|
|
|
|
// HandleZkProof handle a ZkProof submitted from a roller.
|
|
// For now only proving/verifying error will lead to setting status as skipped.
|
|
// db/unmarshal errors will not because they are errors on the business logic side.
|
|
func (m *Manager) HandleZkProof(pk string, payload []byte) error {
|
|
var dbErr error
|
|
|
|
msg := &message.ProofMsg{}
|
|
if err := json.Unmarshal(payload, msg); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Assess if the proof generation session for the given ID is still active.
|
|
// We hold the read lock until the end of the function so that there is no
|
|
// potential race for channel deletion.
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
s, ok := m.sessions[msg.ID]
|
|
if !ok {
|
|
return fmt.Errorf("proof generation session for id %v does not exist", msg.ID)
|
|
}
|
|
proofTimeSec := uint64(time.Since(s.startTime).Seconds())
|
|
|
|
// Ensure this roller is eligible to participate in the session.
|
|
if _, ok = s.rollers[pk]; !ok {
|
|
return fmt.Errorf("roller %s is not eligible to partake in proof session %v", pk, msg.ID)
|
|
}
|
|
log.Info("Received zk proof", "proof id", msg.ID)
|
|
|
|
defer func() {
|
|
// notify the session that the roller finishes the proving process
|
|
s.finishChan <- pk
|
|
// TODO: maybe we should use db tx for the whole process?
|
|
// Roll back current proof's status.
|
|
if dbErr != nil {
|
|
if err := m.orm.UpdateBlockStatus(msg.ID, orm.BlockUnassigned); err != nil {
|
|
log.Error("fail to reset block_status as Unassigned", "msg.ID", msg.ID)
|
|
}
|
|
}
|
|
}()
|
|
|
|
if msg.Status != message.StatusOk {
|
|
log.Error("Roller failed to generate proof", "msg.ID", msg.ID, "error", msg.Error)
|
|
if dbErr = m.orm.UpdateBlockStatus(msg.ID, orm.BlockFailed); dbErr != nil {
|
|
log.Error("failed to update blockResult status", "status", orm.BlockFailed, "error", dbErr)
|
|
}
|
|
// record the failed session.
|
|
m.addFailedSession(&s, msg.Error)
|
|
return nil
|
|
}
|
|
|
|
// store proof content
|
|
if dbErr = m.orm.UpdateProofByNumber(m.ctx, msg.ID, msg.Proof.Proof, msg.Proof.FinalPair, proofTimeSec); dbErr != nil {
|
|
log.Error("failed to store proof into db", "error", dbErr)
|
|
return dbErr
|
|
}
|
|
if dbErr = m.orm.UpdateBlockStatus(msg.ID, orm.BlockProved); dbErr != nil {
|
|
log.Error("failed to update blockResult status", "status", orm.BlockProved, "error", dbErr)
|
|
return dbErr
|
|
}
|
|
|
|
var success bool
|
|
if m.verifier != nil {
|
|
blockResults, err := m.orm.GetBlockResults(map[string]interface{}{"number": msg.ID})
|
|
if len(blockResults) == 0 {
|
|
if err != nil {
|
|
log.Error("failed to get blockResults", "error", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
success, err = m.verifier.VerifyProof(blockResults[0], msg.Proof)
|
|
if err != nil {
|
|
// record failed session.
|
|
m.addFailedSession(&s, err.Error())
|
|
// TODO: this is only a temp workaround for testnet, we should return err in real cases
|
|
success = false
|
|
log.Error("Failed to verify zk proof", "proof id", msg.ID, "error", err)
|
|
// TODO: Roller needs to be slashed if proof is invalid.
|
|
} else {
|
|
log.Info("Verify zk proof successfully", "verification result", success, "proof id", msg.ID)
|
|
}
|
|
} else {
|
|
success = true
|
|
log.Info("Verifier disabled, VerifyProof skipped")
|
|
log.Info("Verify zk proof successfully", "verification result", success, "proof id", msg.ID)
|
|
}
|
|
|
|
var status orm.BlockStatus
|
|
if success {
|
|
status = orm.BlockVerified
|
|
} else {
|
|
// Set status as skipped if verification fails.
|
|
// Note that this is only a workaround for testnet here.
|
|
// TODO: In real cases we should reset to orm.BlockUnassigned
|
|
// so as to re-distribute the task in the future
|
|
status = orm.BlockFailed
|
|
}
|
|
if dbErr = m.orm.UpdateBlockStatus(msg.ID, status); dbErr != nil {
|
|
log.Error("failed to update blockResult status", "status", status, "error", dbErr)
|
|
}
|
|
|
|
return dbErr
|
|
}
|
|
|
|
// CollectProofs collects proofs corresponding to a proof generation session.
|
|
func (m *Manager) CollectProofs(id uint64, s session) {
|
|
timer := time.NewTimer(time.Duration(m.cfg.CollectionTime) * time.Minute)
|
|
|
|
for {
|
|
select {
|
|
case <-timer.C:
|
|
m.mu.Lock()
|
|
|
|
// Ensure proper clean-up of resources.
|
|
defer func() {
|
|
delete(m.sessions, id)
|
|
m.mu.Unlock()
|
|
}()
|
|
|
|
// Pick a random winner.
|
|
// First, round up the keys that actually sent in a proof.
|
|
var participatingRollers []string
|
|
for pk, finished := range s.rollers {
|
|
if finished {
|
|
participatingRollers = append(participatingRollers, pk)
|
|
}
|
|
}
|
|
// Ensure we got at least one proof before selecting a winner.
|
|
if len(participatingRollers) == 0 {
|
|
// record failed session.
|
|
errMsg := "proof generation session ended without receiving any proofs"
|
|
m.addFailedSession(&s, errMsg)
|
|
log.Warn(errMsg, "session id", id)
|
|
// Set status as skipped.
|
|
// Note that this is only a workaround for testnet here.
|
|
// TODO: In real cases we should reset to orm.BlockUnassigned
|
|
// so as to re-distribute the task in the future
|
|
if err := m.orm.UpdateBlockStatus(id, orm.BlockFailed); err != nil {
|
|
log.Error("fail to reset block_status as Unassigned", "id", id)
|
|
}
|
|
return
|
|
}
|
|
|
|
// Now, select a random index for this slice.
|
|
randIndex := mathrand.Intn(len(participatingRollers))
|
|
_ = participatingRollers[randIndex]
|
|
// TODO: reward winner
|
|
return
|
|
case pk := <-s.finishChan:
|
|
m.mu.Lock()
|
|
s.rollers[pk] = true
|
|
m.mu.Unlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
// GetRollerChan returns the channel in which newly created wrapped roller connections are sent.
|
|
func (m *Manager) GetRollerChan() chan *Roller {
|
|
return m.server.rollerChan
|
|
}
|
|
|
|
// APIs collect API services.
|
|
func (m *Manager) APIs() []rpc.API {
|
|
return []rpc.API{
|
|
{
|
|
Namespace: "roller",
|
|
Public: true,
|
|
Service: RollerDebugAPI(m),
|
|
},
|
|
}
|
|
}
|
|
|
|
// StartProofGenerationSession starts a proof generation session
|
|
func (m *Manager) StartProofGenerationSession(trace *types.BlockResult) bool {
|
|
roller := m.SelectRoller()
|
|
if roller == nil || roller.isClosed() {
|
|
return false
|
|
}
|
|
|
|
id := (*big.Int)(trace.BlockTrace.Number).Uint64()
|
|
log.Info("start proof generation session", "id", id)
|
|
|
|
var dbErr error
|
|
defer func() {
|
|
if dbErr != nil {
|
|
if err := m.orm.UpdateBlockStatus(id, orm.BlockUnassigned); err != nil {
|
|
log.Error("fail to reset block_status as Unassigned", "id", id)
|
|
}
|
|
}
|
|
}()
|
|
|
|
pk := roller.AuthMsg.Identity.PublicKey
|
|
log.Info("roller is picked", "name", roller.AuthMsg.Identity.Name, "public_key", pk)
|
|
|
|
msg, err := createBlockTracesMsg(trace)
|
|
if err != nil {
|
|
log.Error(
|
|
"could not create block traces message",
|
|
"error", err,
|
|
)
|
|
return false
|
|
}
|
|
if err := roller.sendMessage(msg); err != nil {
|
|
log.Error(
|
|
"could not send traces message to roller",
|
|
"error", err,
|
|
)
|
|
return false
|
|
}
|
|
|
|
s := session{
|
|
id: id,
|
|
rollers: map[string]bool{
|
|
pk: false,
|
|
},
|
|
roller_names: map[string]string{
|
|
pk: roller.AuthMsg.Identity.Name,
|
|
},
|
|
startTime: time.Now(),
|
|
finishChan: make(chan string, proofAndPkBufferSize),
|
|
}
|
|
|
|
// Create a proof generation session.
|
|
m.mu.Lock()
|
|
m.sessions[id] = s
|
|
m.mu.Unlock()
|
|
|
|
dbErr = m.orm.UpdateBlockStatus(id, orm.BlockAssigned)
|
|
go m.CollectProofs(id, s)
|
|
|
|
return true
|
|
}
|
|
|
|
// SelectRoller randomly get one idle roller.
|
|
func (m *Manager) SelectRoller() *Roller {
|
|
allRollers := m.server.conns.getAll()
|
|
for len(allRollers) > 0 {
|
|
idx := mathrand.Intn(len(allRollers))
|
|
conn := allRollers[idx]
|
|
pk := conn.AuthMsg.Identity.PublicKey
|
|
if conn.isClosed() {
|
|
log.Debug("roller is closed", "public_key", pk)
|
|
// Delete closed connection.
|
|
m.server.conns.delete(conn)
|
|
// Delete the offline roller.
|
|
allRollers[idx], allRollers = allRollers[0], allRollers[1:]
|
|
continue
|
|
}
|
|
// Ensure the roller is not currently working on another session.
|
|
if !m.IsRollerIdle(pk) {
|
|
log.Debug("roller is busy", "public_key", pk)
|
|
// Delete the busy roller.
|
|
allRollers[idx], allRollers = allRollers[0], allRollers[1:]
|
|
continue
|
|
}
|
|
return conn
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// IsRollerIdle determines whether this roller is idle.
|
|
func (m *Manager) IsRollerIdle(hexPk string) bool {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
// We need to iterate over all sessions because finished sessions will be deleted until the
|
|
// timeout. So a busy roller could be marked as idle in a finished session.
|
|
for _, sess := range m.sessions {
|
|
for pk, finished := range sess.rollers {
|
|
if pk == hexPk && !finished {
|
|
return false
|
|
}
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// GetNumberOfIdleRollers returns the number of idle rollers in maintain list
|
|
func (m *Manager) GetNumberOfIdleRollers() int {
|
|
cnt := 0
|
|
// m.server.conns doesn't have any lock
|
|
for _, roller := range m.server.conns.getAll() {
|
|
if m.IsRollerIdle(roller.AuthMsg.Identity.PublicKey) {
|
|
cnt++
|
|
}
|
|
}
|
|
return cnt
|
|
}
|
|
|
|
func createBlockTracesMsg(traces *types.BlockResult) (message.Msg, error) {
|
|
idAndTraces := message.BlockTraces{
|
|
ID: traces.BlockTrace.Number.ToInt().Uint64(),
|
|
Traces: traces,
|
|
}
|
|
|
|
payload, err := json.Marshal(idAndTraces)
|
|
if err != nil {
|
|
return message.Msg{}, err
|
|
}
|
|
|
|
return message.Msg{
|
|
Type: message.BlockTrace,
|
|
Payload: payload,
|
|
}, nil
|
|
}
|
|
|
|
func (m *Manager) addFailedSession(s *session, errMsg string) {
|
|
m.failedSessionInfos[s.id] = *newSessionInfo(s, orm.BlockFailed, errMsg, true)
|
|
}
|