mirror of
https://github.com/scroll-tech/scroll.git
synced 2026-01-11 06:58:20 -05:00
feat(coordinator): support rollers-per-session (#215)
Co-authored-by: colinlyguo <colinlyguo@gmail.com> Co-authored-by: maskpp <maskpp266@gmail.com> Co-authored-by: HAOYUatHZ <37070449+HAOYUatHZ@users.noreply.github.com>
This commit is contained in:
@@ -198,7 +198,12 @@ func (m *Manager) restorePrevSessions() {
|
||||
|
||||
log.Info("Coordinator restart reload sessions", "session start time", time.Unix(sess.info.StartTimestamp, 0))
|
||||
for _, roller := range sess.info.Rollers {
|
||||
log.Info("restore roller info for session", "session id", sess.info.ID, "roller name", roller.Name, "public key", roller.PublicKey, "proof status", roller.Status)
|
||||
log.Info(
|
||||
"restore roller info for session",
|
||||
"session id", sess.info.ID,
|
||||
"roller name", roller.Name,
|
||||
"public key", roller.PublicKey,
|
||||
"proof status", roller.Status)
|
||||
}
|
||||
|
||||
go m.CollectProofs(sess)
|
||||
@@ -274,11 +279,6 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
|
||||
"roller pk", roller.PublicKey,
|
||||
"error", msg.Error,
|
||||
)
|
||||
if dbErr = m.orm.UpdateProvingStatus(msg.ID, orm.ProvingTaskFailed); dbErr != nil {
|
||||
log.Error("failed to update task status as failed", "error", dbErr)
|
||||
}
|
||||
// record the failed session.
|
||||
m.addFailedSession(sess, msg.Error)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -303,8 +303,6 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
|
||||
|
||||
success, err = m.verifier.VerifyProof(msg.Proof)
|
||||
if err != nil {
|
||||
// record failed session.
|
||||
m.addFailedSession(sess, err.Error())
|
||||
// TODO: this is only a temp workaround for testnet, we should return err in real cases
|
||||
success = false
|
||||
log.Error("Failed to verify zk proof", "proof id", msg.ID, "error", err)
|
||||
@@ -313,21 +311,17 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
|
||||
log.Info("Verify zk proof successfully", "verification result", success, "proof id", msg.ID)
|
||||
}
|
||||
|
||||
var status orm.ProvingStatus
|
||||
if success {
|
||||
status = orm.ProvingTaskVerified
|
||||
} else {
|
||||
// Set status as skipped if verification fails.
|
||||
// Note that this is only a workaround for testnet here.
|
||||
// TODO: In real cases we should reset to orm.ProvingTaskUnassigned
|
||||
// so as to re-distribute the task in the future
|
||||
status = orm.ProvingTaskFailed
|
||||
if dbErr = m.orm.UpdateProvingStatus(msg.ID, orm.ProvingTaskVerified); dbErr != nil {
|
||||
log.Error(
|
||||
"failed to update proving_status",
|
||||
"msg.ID", msg.ID,
|
||||
"status", orm.ProvingTaskVerified,
|
||||
"error", dbErr)
|
||||
}
|
||||
return dbErr
|
||||
}
|
||||
if dbErr = m.orm.UpdateProvingStatus(msg.ID, status); dbErr != nil {
|
||||
log.Error("failed to update proving_status", "msg.ID", msg.ID, "status", status, "error", dbErr)
|
||||
}
|
||||
|
||||
return dbErr
|
||||
return nil
|
||||
}
|
||||
|
||||
// CollectProofs collects proofs corresponding to a proof generation session.
|
||||
@@ -398,12 +392,12 @@ func (m *Manager) APIs() []rpc.API {
|
||||
|
||||
// StartProofGenerationSession starts a proof generation session
|
||||
func (m *Manager) StartProofGenerationSession(task *orm.BlockBatch) (success bool) {
|
||||
roller := m.selectRoller()
|
||||
if roller == nil {
|
||||
if m.GetNumberOfIdleRollers() == 0 {
|
||||
log.Warn("no idle roller when starting proof generation session", "id", task.ID)
|
||||
return false
|
||||
}
|
||||
log.Info("start proof generation session", "id", task.ID)
|
||||
|
||||
log.Info("start proof generation session", "id", task.ID)
|
||||
defer func() {
|
||||
if !success {
|
||||
if err := m.orm.UpdateProvingStatus(task.ID, orm.ProvingTaskUnassigned); err != nil {
|
||||
@@ -411,11 +405,8 @@ func (m *Manager) StartProofGenerationSession(task *orm.BlockBatch) (success boo
|
||||
}
|
||||
}
|
||||
}()
|
||||
if err := m.orm.UpdateProvingStatus(task.ID, orm.ProvingTaskAssigned); err != nil {
|
||||
log.Error("failed to update task status", "id", task.ID, "err", err)
|
||||
return false
|
||||
}
|
||||
|
||||
// Get block traces.
|
||||
blockInfos, err := m.orm.GetBlockInfos(map[string]interface{}{"batch_id": task.ID})
|
||||
if err != nil {
|
||||
log.Error(
|
||||
@@ -425,7 +416,6 @@ func (m *Manager) StartProofGenerationSession(task *orm.BlockBatch) (success boo
|
||||
)
|
||||
return false
|
||||
}
|
||||
|
||||
traces := make([]*types.BlockTrace, len(blockInfos))
|
||||
for i, blockInfo := range blockInfos {
|
||||
traces[i], err = m.Client.GetBlockTraceByHash(m.ctx, common.HexToHash(blockInfo.Hash))
|
||||
@@ -440,41 +430,61 @@ func (m *Manager) StartProofGenerationSession(task *orm.BlockBatch) (success boo
|
||||
}
|
||||
}
|
||||
|
||||
log.Info("roller is picked", "session id", task.ID, "name", roller.Name, "public key", roller.PublicKey)
|
||||
|
||||
// send trace to roller
|
||||
if !roller.sendTask(task.ID, traces) {
|
||||
log.Error("send task failed", "roller name", roller.Name, "public key", roller.PublicKey, "id", task.ID)
|
||||
// Dispatch task to rollers.
|
||||
rollers := make(map[string]*orm.RollerStatus)
|
||||
for i := 0; i < int(m.cfg.RollersPerSession); i++ {
|
||||
roller := m.selectRoller()
|
||||
if roller == nil {
|
||||
break
|
||||
}
|
||||
log.Info("roller is picked", "session id", task.ID, "name", roller.Name, "public key", roller.PublicKey)
|
||||
// send trace to roller
|
||||
if !roller.sendTask(task.ID, traces) {
|
||||
log.Error("send task failed", "roller name", roller.Name, "public key", roller.PublicKey, "id", task.ID)
|
||||
continue
|
||||
}
|
||||
rollers[roller.PublicKey] = &orm.RollerStatus{PublicKey: roller.PublicKey, Name: roller.Name, Status: orm.RollerAssigned}
|
||||
}
|
||||
// No roller assigned.
|
||||
if len(rollers) == 0 {
|
||||
log.Error("no roller assigned", "id", task.ID, "number of idle rollers", m.GetNumberOfIdleRollers())
|
||||
return false
|
||||
}
|
||||
|
||||
// Update session proving status as assigned.
|
||||
if err := m.orm.UpdateProvingStatus(task.ID, orm.ProvingTaskAssigned); err != nil {
|
||||
log.Error("failed to update task status", "id", task.ID, "err", err)
|
||||
return false
|
||||
}
|
||||
|
||||
pk := roller.PublicKey
|
||||
// Create a proof generation session.
|
||||
s := &session{
|
||||
sess := &session{
|
||||
info: &orm.SessionInfo{
|
||||
ID: task.ID,
|
||||
Rollers: map[string]*orm.RollerStatus{
|
||||
pk: {
|
||||
PublicKey: pk,
|
||||
Name: roller.Name,
|
||||
Status: orm.RollerAssigned,
|
||||
},
|
||||
},
|
||||
ID: task.ID,
|
||||
Rollers: rollers,
|
||||
StartTimestamp: time.Now().Unix(),
|
||||
},
|
||||
finishChan: make(chan rollerProofStatus, proofAndPkBufferSize),
|
||||
}
|
||||
|
||||
// Store session info.
|
||||
if err = m.orm.SetSessionInfo(s.info); err != nil {
|
||||
log.Error("db set session info fail", "roller name", roller.Name, "public key", pk, "error", err)
|
||||
if err = m.orm.SetSessionInfo(sess.info); err != nil {
|
||||
log.Error("db set session info fail", "error", err)
|
||||
for _, roller := range sess.info.Rollers {
|
||||
log.Error(
|
||||
"restore roller info for session",
|
||||
"session id", sess.info.ID,
|
||||
"roller name", roller.Name,
|
||||
"public key", roller.PublicKey,
|
||||
"proof status", roller.Status)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
m.mu.Lock()
|
||||
m.sessions[task.ID] = s
|
||||
m.sessions[task.ID] = sess
|
||||
m.mu.Unlock()
|
||||
go m.CollectProofs(s)
|
||||
go m.CollectProofs(sess)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user