Files
scroll/coordinator/manager.go
2022-10-19 21:01:35 +08:00

538 lines
15 KiB
Go

package coordinator
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/big"
mathrand "math/rand"
"sync"
"sync/atomic"
"time"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/rpc"
"scroll-tech/common/message"
"scroll-tech/database/orm"
"scroll-tech/coordinator/config"
"scroll-tech/coordinator/verifier"
)
const (
proofAndPkBufferSize = 10
)
// Contains all the information on an ongoing proof generation session.
type session struct {
// session id
id uint64
// A list of all participating rollers and if they finished proof generation for this session.
// The map key is a hexadecimal encoding of the roller public key, as byte slices
// can not be compared explicitly.
rollers map[string]bool
roller_names map[string]string
// session start time
startTime time.Time
// finish channel is used to pass the public key of the rollers who finished proving process.
finishChan chan string
}
// Manager is responsible for maintaining connections with active rollers,
// sending the challenges, and receiving proofs. It also regulates the reward
// distribution. All read and write logic and connection handling happens through
// a modular websocket server, contained within the Manager. Incoming messages are
// then passed to the Manager where the actual handling logic resides.
type Manager struct {
// The manager context.
ctx context.Context
// The roller manager configuration.
cfg *config.RollerManagerConfig
// The indicator whether the backend is running or not.
running int32
// The websocket server which holds the connections with active rollers.
server *server
// A mutex guarding the boolean below.
mu sync.RWMutex
// A map containing all active proof generation sessions.
sessions map[uint64]session
// A map containing proof failed or verify failed proof.
failedSessionInfos map[uint64]SessionInfo
// A direct connection to the Halo2 verifier, used to verify
// incoming proofs.
verifier *verifier.Verifier
// db interface
orm orm.BlockResultOrm
}
// New returns a new instance of Manager. The instance will be not fully prepared,
// and still needs to be finalized and ran by calling `manager.Start`.
func New(ctx context.Context, cfg *config.RollerManagerConfig, orm orm.BlockResultOrm) (*Manager, error) {
var v *verifier.Verifier
if cfg.VerifierEndpoint != "" {
var err error
v, err = verifier.NewVerifier(cfg.VerifierEndpoint)
if err != nil {
return nil, err
}
}
log.Info("Start rollerManager successfully.")
return &Manager{
ctx: ctx,
cfg: cfg,
server: newServer(cfg.Endpoint),
sessions: make(map[uint64]session),
failedSessionInfos: make(map[uint64]SessionInfo),
verifier: v,
orm: orm,
}, nil
}
// Start the Manager module.
func (m *Manager) Start() error {
if m.isRunning() {
return nil
}
// m.orm may be nil in scroll tests
if m.orm != nil {
// clean up assigned but not submitted task
blocks, err := m.orm.GetBlockResults(map[string]interface{}{"status": orm.BlockAssigned})
if err == nil {
for _, block := range blocks {
if err := m.orm.UpdateBlockStatus(block.BlockTrace.Number.ToInt().Uint64(), orm.BlockUnassigned); err != nil {
log.Error("fail to reset block_status as Unassigned")
}
}
} else {
log.Error("fail to fetch assigned blocks")
}
}
if err := m.server.start(); err != nil {
return err
}
atomic.StoreInt32(&m.running, 1)
go m.Loop()
return nil
}
// Stop the Manager module, for a graceful shutdown.
func (m *Manager) Stop() {
if !m.isRunning() {
return
}
// Stop accepting connections
if err := m.server.stop(); err != nil {
log.Error("Server shutdown failed", "error", err)
return
}
atomic.StoreInt32(&m.running, 0)
}
// isRunning returns an indicator whether manager is running or not.
func (m *Manager) isRunning() bool {
return atomic.LoadInt32(&m.running) == 1
}
// Loop keeps the manager running.
func (m *Manager) Loop() {
var (
tick = time.NewTicker(time.Second * 3)
traces []*types.BlockResult
)
defer tick.Stop()
for {
select {
case <-tick.C:
if len(traces) == 0 && m.orm != nil {
var err error
numIdleRollers := m.GetNumberOfIdleRollers()
// TODO: add cache
if traces, err = m.orm.GetBlockResults(
map[string]interface{}{"status": orm.BlockUnassigned},
fmt.Sprintf(
"ORDER BY number %s LIMIT %d;",
m.cfg.OrderSession,
numIdleRollers,
),
); err != nil {
log.Error("failed to get blockResult", "error", err)
continue
}
}
// Select roller and send message
for len(traces) > 0 && m.StartProofGenerationSession(traces[0]) {
traces = traces[1:]
}
case msg := <-m.server.msgChan:
if err := m.HandleMessage(msg.pk, msg.message); err != nil {
log.Error(
"could not handle message",
"error", err,
)
}
case <-m.ctx.Done():
if m.ctx.Err() != nil {
log.Error(
"manager context canceled with error",
"error", m.ctx.Err(),
)
}
return
}
}
}
// HandleMessage handle a message from a roller.
func (m *Manager) HandleMessage(pk string, payload []byte) error {
// Recover message
msg := &message.Msg{}
if err := json.Unmarshal(payload, msg); err != nil {
return err
}
switch msg.Type {
case message.Error:
// Just log it for now.
log.Error("error message received from roller", "message", msg)
// TODO: handle in m.failedSessionInfos
return nil
case message.Register:
// We shouldn't get this message, as the sequencer should handle registering at the start
// of the connection.
return errors.New("attempted handshake at the wrong time")
case message.BlockTrace:
// We shouldn't get this message, as the sequencer should always be the one to send it
return errors.New("received illegal message")
case message.Proof:
return m.HandleZkProof(pk, msg.Payload)
default:
return fmt.Errorf("unrecognized message type %v", msg.Type)
}
}
// HandleZkProof handle a ZkProof submitted from a roller.
// For now only proving/verifying error will lead to setting status as skipped.
// db/unmarshal errors will not because they are errors on the business logic side.
func (m *Manager) HandleZkProof(pk string, payload []byte) error {
var dbErr error
msg := &message.ProofMsg{}
if err := json.Unmarshal(payload, msg); err != nil {
return err
}
// Assess if the proof generation session for the given ID is still active.
// We hold the read lock until the end of the function so that there is no
// potential race for channel deletion.
m.mu.RLock()
defer m.mu.RUnlock()
s, ok := m.sessions[msg.ID]
if !ok {
return fmt.Errorf("proof generation session for id %v does not exist", msg.ID)
}
proofTimeSec := uint64(time.Since(s.startTime).Seconds())
// Ensure this roller is eligible to participate in the session.
if _, ok = s.rollers[pk]; !ok {
return fmt.Errorf("roller %s is not eligible to partake in proof session %v", pk, msg.ID)
}
log.Info("Received zk proof", "proof id", msg.ID)
defer func() {
// notify the session that the roller finishes the proving process
s.finishChan <- pk
// TODO: maybe we should use db tx for the whole process?
// Roll back current proof's status.
if dbErr != nil {
if err := m.orm.UpdateBlockStatus(msg.ID, orm.BlockUnassigned); err != nil {
log.Error("fail to reset block_status as Unassigned", "msg.ID", msg.ID)
}
}
}()
if msg.Status != message.StatusOk {
log.Error("Roller failed to generate proof", "msg.ID", msg.ID, "error", msg.Error)
if dbErr = m.orm.UpdateBlockStatus(msg.ID, orm.BlockFailed); dbErr != nil {
log.Error("failed to update blockResult status", "status", orm.BlockFailed, "error", dbErr)
}
// record the failed session.
m.addFailedSession(&s, msg.Error)
return nil
}
// store proof content
if dbErr = m.orm.UpdateProofByNumber(m.ctx, msg.ID, msg.Proof.Proof, msg.Proof.FinalPair, proofTimeSec); dbErr != nil {
log.Error("failed to store proof into db", "error", dbErr)
return dbErr
}
if dbErr = m.orm.UpdateBlockStatus(msg.ID, orm.BlockProved); dbErr != nil {
log.Error("failed to update blockResult status", "status", orm.BlockProved, "error", dbErr)
return dbErr
}
var success bool
if m.verifier != nil {
blockResults, err := m.orm.GetBlockResults(map[string]interface{}{"number": msg.ID})
if len(blockResults) == 0 {
if err != nil {
log.Error("failed to get blockResults", "error", err)
}
return err
}
success, err = m.verifier.VerifyProof(blockResults[0], msg.Proof)
if err != nil {
// record failed session.
m.addFailedSession(&s, err.Error())
// TODO: this is only a temp workaround for testnet, we should return err in real cases
success = false
log.Error("Failed to verify zk proof", "proof id", msg.ID, "error", err)
// TODO: Roller needs to be slashed if proof is invalid.
} else {
log.Info("Verify zk proof successfully", "verification result", success, "proof id", msg.ID)
}
} else {
success = true
log.Info("Verifier disabled, VerifyProof skipped")
log.Info("Verify zk proof successfully", "verification result", success, "proof id", msg.ID)
}
var status orm.BlockStatus
if success {
status = orm.BlockVerified
} else {
// Set status as skipped if verification fails.
// Note that this is only a workaround for testnet here.
// TODO: In real cases we should reset to orm.BlockUnassigned
// so as to re-distribute the task in the future
status = orm.BlockFailed
}
if dbErr = m.orm.UpdateBlockStatus(msg.ID, status); dbErr != nil {
log.Error("failed to update blockResult status", "status", status, "error", dbErr)
}
return dbErr
}
// CollectProofs collects proofs corresponding to a proof generation session.
func (m *Manager) CollectProofs(id uint64, s session) {
timer := time.NewTimer(time.Duration(m.cfg.CollectionTime) * time.Minute)
for {
select {
case <-timer.C:
m.mu.Lock()
// Ensure proper clean-up of resources.
defer func() {
delete(m.sessions, id)
m.mu.Unlock()
}()
// Pick a random winner.
// First, round up the keys that actually sent in a proof.
var participatingRollers []string
for pk, finished := range s.rollers {
if finished {
participatingRollers = append(participatingRollers, pk)
}
}
// Ensure we got at least one proof before selecting a winner.
if len(participatingRollers) == 0 {
// record failed session.
errMsg := "proof generation session ended without receiving any proofs"
m.addFailedSession(&s, errMsg)
log.Warn(errMsg, "session id", id)
// Set status as skipped.
// Note that this is only a workaround for testnet here.
// TODO: In real cases we should reset to orm.BlockUnassigned
// so as to re-distribute the task in the future
if err := m.orm.UpdateBlockStatus(id, orm.BlockFailed); err != nil {
log.Error("fail to reset block_status as Unassigned", "id", id)
}
return
}
// Now, select a random index for this slice.
randIndex := mathrand.Intn(len(participatingRollers))
_ = participatingRollers[randIndex]
// TODO: reward winner
return
case pk := <-s.finishChan:
m.mu.Lock()
s.rollers[pk] = true
m.mu.Unlock()
}
}
}
// GetRollerChan returns the channel in which newly created wrapped roller connections are sent.
func (m *Manager) GetRollerChan() chan *Roller {
return m.server.rollerChan
}
// APIs collect API services.
func (m *Manager) APIs() []rpc.API {
return []rpc.API{
{
Namespace: "roller",
Public: true,
Service: RollerDebugAPI(m),
},
}
}
// StartProofGenerationSession starts a proof generation session
func (m *Manager) StartProofGenerationSession(trace *types.BlockResult) bool {
roller := m.SelectRoller()
if roller == nil || roller.isClosed() {
return false
}
id := (*big.Int)(trace.BlockTrace.Number).Uint64()
log.Info("start proof generation session", "id", id)
var dbErr error
defer func() {
if dbErr != nil {
if err := m.orm.UpdateBlockStatus(id, orm.BlockUnassigned); err != nil {
log.Error("fail to reset block_status as Unassigned", "id", id)
}
}
}()
pk := roller.AuthMsg.Identity.PublicKey
log.Info("roller is picked", "name", roller.AuthMsg.Identity.Name, "public_key", pk)
msg, err := createBlockTracesMsg(trace)
if err != nil {
log.Error(
"could not create block traces message",
"error", err,
)
return false
}
if err := roller.sendMessage(msg); err != nil {
log.Error(
"could not send traces message to roller",
"error", err,
)
return false
}
s := session{
id: id,
rollers: map[string]bool{
pk: false,
},
roller_names: map[string]string{
pk: roller.AuthMsg.Identity.Name,
},
startTime: time.Now(),
finishChan: make(chan string, proofAndPkBufferSize),
}
// Create a proof generation session.
m.mu.Lock()
m.sessions[id] = s
m.mu.Unlock()
dbErr = m.orm.UpdateBlockStatus(id, orm.BlockAssigned)
go m.CollectProofs(id, s)
return true
}
// SelectRoller randomly get one idle roller.
func (m *Manager) SelectRoller() *Roller {
allRollers := m.server.conns.getAll()
for len(allRollers) > 0 {
idx := mathrand.Intn(len(allRollers))
conn := allRollers[idx]
pk := conn.AuthMsg.Identity.PublicKey
if conn.isClosed() {
log.Debug("roller is closed", "public_key", pk)
// Delete closed connection.
m.server.conns.delete(conn)
// Delete the offline roller.
allRollers[idx], allRollers = allRollers[0], allRollers[1:]
continue
}
// Ensure the roller is not currently working on another session.
if !m.IsRollerIdle(pk) {
log.Debug("roller is busy", "public_key", pk)
// Delete the busy roller.
allRollers[idx], allRollers = allRollers[0], allRollers[1:]
continue
}
return conn
}
return nil
}
// IsRollerIdle determines whether this roller is idle.
func (m *Manager) IsRollerIdle(hexPk string) bool {
m.mu.RLock()
defer m.mu.RUnlock()
// We need to iterate over all sessions because finished sessions will be deleted until the
// timeout. So a busy roller could be marked as idle in a finished session.
for _, sess := range m.sessions {
for pk, finished := range sess.rollers {
if pk == hexPk && !finished {
return false
}
}
}
return true
}
// GetNumberOfIdleRollers returns the number of idle rollers in maintain list
func (m *Manager) GetNumberOfIdleRollers() int {
cnt := 0
// m.server.conns doesn't have any lock
for _, roller := range m.server.conns.getAll() {
if m.IsRollerIdle(roller.AuthMsg.Identity.PublicKey) {
cnt++
}
}
return cnt
}
func createBlockTracesMsg(traces *types.BlockResult) (message.Msg, error) {
idAndTraces := message.BlockTraces{
ID: traces.BlockTrace.Number.ToInt().Uint64(),
Traces: traces,
}
payload, err := json.Marshal(idAndTraces)
if err != nil {
return message.Msg{}, err
}
return message.Msg{
Type: message.BlockTrace,
Payload: payload,
}, nil
}
func (m *Manager) addFailedSession(s *session, errMsg string) {
m.failedSessionInfos[s.id] = *newSessionInfo(s, orm.BlockFailed, errMsg, true)
}