mirror of
https://github.com/scroll-tech/scroll.git
synced 2026-01-11 23:18:07 -05:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5d761ad812 | ||
|
|
4042bea6db | ||
|
|
de7c38a903 |
@@ -31,6 +31,8 @@ type Cmd struct {
|
||||
|
||||
checkFuncs cmap.ConcurrentMap //map[string]checkFunc
|
||||
|
||||
// open log flag.
|
||||
openLog bool
|
||||
// error channel
|
||||
ErrChan chan error
|
||||
}
|
||||
@@ -64,7 +66,7 @@ func (c *Cmd) runCmd() {
|
||||
|
||||
// RunCmd parallel running when parallel is true.
|
||||
func (c *Cmd) RunCmd(parallel bool) {
|
||||
fmt.Println("cmd: ", c.args)
|
||||
fmt.Println("cmd:", c.args)
|
||||
if parallel {
|
||||
go c.runCmd()
|
||||
} else {
|
||||
@@ -72,12 +74,17 @@ func (c *Cmd) RunCmd(parallel bool) {
|
||||
}
|
||||
}
|
||||
|
||||
// OpenLog open cmd log by this api.
|
||||
func (c *Cmd) OpenLog(open bool) {
|
||||
c.openLog = open
|
||||
}
|
||||
|
||||
func (c *Cmd) Write(data []byte) (int, error) {
|
||||
out := string(data)
|
||||
if verbose {
|
||||
fmt.Printf("%s: %v", c.name, out)
|
||||
if verbose || c.openLog {
|
||||
fmt.Printf("%s:\n\t%v", c.name, out)
|
||||
} else if strings.Contains(out, "error") || strings.Contains(out, "warning") {
|
||||
fmt.Printf("%s: %v", c.name, out)
|
||||
fmt.Printf("%s:\n\t%v", c.name, out)
|
||||
}
|
||||
go c.checkFuncs.IterCb(func(_ string, value interface{}) {
|
||||
check := value.(checkFunc)
|
||||
|
||||
@@ -162,6 +162,7 @@ type SessionInfo struct {
|
||||
ID string `json:"id"`
|
||||
Rollers map[string]*RollerStatus `json:"rollers"`
|
||||
StartTimestamp int64 `json:"start_timestamp"`
|
||||
Attempts uint8 `json:"attempts,omitempty"`
|
||||
}
|
||||
|
||||
// ProvingStatus block_batch proving_status (unassigned, assigned, proved, verified, submitted)
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"runtime/debug"
|
||||
)
|
||||
|
||||
var tag = "v3.0.4"
|
||||
var tag = "v3.0.6"
|
||||
|
||||
var commit = func() string {
|
||||
if info, ok := debug.ReadBuildInfo(); ok {
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
"roller_manager_config": {
|
||||
"compression_level": 9,
|
||||
"rollers_per_session": 1,
|
||||
"session_attempts": 2,
|
||||
"collection_time": 180,
|
||||
"token_time_to_live": 60,
|
||||
"verifier": {
|
||||
|
||||
@@ -11,7 +11,8 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
defaultNumberOfVerifierWorkers = 10
|
||||
defaultNumberOfVerifierWorkers = 10
|
||||
defaultNumberOfSessionRetryAttempts = 2
|
||||
)
|
||||
|
||||
// RollerManagerConfig loads sequencer configuration items.
|
||||
@@ -21,6 +22,9 @@ type RollerManagerConfig struct {
|
||||
OrderSession string `json:"order_session,omitempty"`
|
||||
// The amount of rollers to pick per proof generation session.
|
||||
RollersPerSession uint8 `json:"rollers_per_session"`
|
||||
// Number of attempts that a session can be retried if previous attempts failed.
|
||||
// Currently we only consider proving timeout as failure here.
|
||||
SessionAttempts uint8 `json:"session_attempts,omitempty"`
|
||||
// Zk verifier config.
|
||||
Verifier *VerifierConfig `json:"verifier,omitempty"`
|
||||
// Proof collection time (in minutes).
|
||||
@@ -74,6 +78,9 @@ func NewConfig(file string) (*Config, error) {
|
||||
if cfg.RollerManagerConfig.MaxVerifierWorkers == 0 {
|
||||
cfg.RollerManagerConfig.MaxVerifierWorkers = defaultNumberOfVerifierWorkers
|
||||
}
|
||||
if cfg.RollerManagerConfig.SessionAttempts == 0 {
|
||||
cfg.RollerManagerConfig.SessionAttempts = defaultNumberOfSessionRetryAttempts
|
||||
}
|
||||
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
@@ -176,7 +176,7 @@ func (m *Manager) Loop() {
|
||||
}
|
||||
}
|
||||
// Select roller and send message
|
||||
for len(tasks) > 0 && m.StartProofGenerationSession(tasks[0]) {
|
||||
for len(tasks) > 0 && m.StartProofGenerationSession(tasks[0], nil) {
|
||||
tasks = tasks[1:]
|
||||
}
|
||||
case <-m.ctx.Done():
|
||||
@@ -338,20 +338,22 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
|
||||
|
||||
// CollectProofs collects proofs corresponding to a proof generation session.
|
||||
func (m *Manager) CollectProofs(sess *session) {
|
||||
//Cleanup roller sessions before return.
|
||||
defer func() {
|
||||
// TODO: remove the clean-up, rollers report healthy status.
|
||||
m.mu.Lock()
|
||||
for pk := range sess.info.Rollers {
|
||||
m.freeTaskIDForRoller(pk, sess.info.ID)
|
||||
}
|
||||
delete(m.sessions, sess.info.ID)
|
||||
m.mu.Unlock()
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
//Execute after timeout, set in config.json. Consider all rollers failed.
|
||||
case <-time.After(time.Duration(m.cfg.CollectionTime) * time.Minute):
|
||||
// Check if session can be replayed
|
||||
if sess.info.Attempts < m.cfg.SessionAttempts {
|
||||
if m.StartProofGenerationSession(nil, sess) {
|
||||
m.mu.Lock()
|
||||
for pk := range sess.info.Rollers {
|
||||
m.freeTaskIDForRoller(pk, sess.info.ID)
|
||||
}
|
||||
m.mu.Unlock()
|
||||
log.Info("Retrying session", "session id:", sess.info.ID)
|
||||
return
|
||||
}
|
||||
}
|
||||
// record failed session.
|
||||
errMsg := "proof generation session ended without receiving any valid proofs"
|
||||
m.addFailedSession(sess, errMsg)
|
||||
@@ -363,6 +365,12 @@ func (m *Manager) CollectProofs(sess *session) {
|
||||
if err := m.orm.UpdateProvingStatus(sess.info.ID, types.ProvingTaskFailed); err != nil {
|
||||
log.Error("fail to reset task_status as Unassigned", "id", sess.info.ID, "err", err)
|
||||
}
|
||||
m.mu.Lock()
|
||||
for pk := range sess.info.Rollers {
|
||||
m.freeTaskIDForRoller(pk, sess.info.ID)
|
||||
}
|
||||
delete(m.sessions, sess.info.ID)
|
||||
m.mu.Unlock()
|
||||
return
|
||||
|
||||
//Execute after one of the roller finishes sending proof, return early if all rollers had sent results.
|
||||
@@ -386,6 +394,11 @@ func (m *Manager) CollectProofs(sess *session) {
|
||||
randIndex := mathrand.Intn(len(validRollers))
|
||||
_ = validRollers[randIndex]
|
||||
// TODO: reward winner
|
||||
|
||||
for pk := range sess.info.Rollers {
|
||||
m.freeTaskIDForRoller(pk, sess.info.ID)
|
||||
}
|
||||
delete(m.sessions, sess.info.ID)
|
||||
m.mu.Unlock()
|
||||
return
|
||||
}
|
||||
@@ -439,27 +452,39 @@ func (m *Manager) APIs() []rpc.API {
|
||||
}
|
||||
|
||||
// StartProofGenerationSession starts a proof generation session
|
||||
func (m *Manager) StartProofGenerationSession(task *types.BlockBatch) (success bool) {
|
||||
func (m *Manager) StartProofGenerationSession(task *types.BlockBatch, prevSession *session) (success bool) {
|
||||
var taskId string
|
||||
if task != nil {
|
||||
taskId = task.Hash
|
||||
} else {
|
||||
taskId = prevSession.info.ID
|
||||
}
|
||||
if m.GetNumberOfIdleRollers() == 0 {
|
||||
log.Warn("no idle roller when starting proof generation session", "id", task.Hash)
|
||||
log.Warn("no idle roller when starting proof generation session", "id", taskId)
|
||||
return false
|
||||
}
|
||||
|
||||
log.Info("start proof generation session", "id", task.Hash)
|
||||
log.Info("start proof generation session", "id", taskId)
|
||||
defer func() {
|
||||
if !success {
|
||||
if err := m.orm.UpdateProvingStatus(task.Hash, types.ProvingTaskUnassigned); err != nil {
|
||||
log.Error("fail to reset task_status as Unassigned", "id", task.Hash, "err", err)
|
||||
if task != nil {
|
||||
if err := m.orm.UpdateProvingStatus(taskId, types.ProvingTaskUnassigned); err != nil {
|
||||
log.Error("fail to reset task_status as Unassigned", "id", taskId, "err", err)
|
||||
}
|
||||
} else {
|
||||
if err := m.orm.UpdateProvingStatus(taskId, types.ProvingTaskFailed); err != nil {
|
||||
log.Error("fail to reset task_status as Failed", "id", taskId, "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Get block traces.
|
||||
blockInfos, err := m.orm.GetL2BlockInfos(map[string]interface{}{"batch_hash": task.Hash})
|
||||
blockInfos, err := m.orm.GetL2BlockInfos(map[string]interface{}{"batch_hash": taskId})
|
||||
if err != nil {
|
||||
log.Error(
|
||||
"could not GetBlockInfos",
|
||||
"batch_hash", task.Hash,
|
||||
"batch_hash", taskId,
|
||||
"error", err,
|
||||
)
|
||||
return false
|
||||
@@ -486,35 +511,39 @@ func (m *Manager) StartProofGenerationSession(task *types.BlockBatch) (success b
|
||||
log.Info("selectRoller returns nil")
|
||||
break
|
||||
}
|
||||
log.Info("roller is picked", "session id", task.Hash, "name", roller.Name, "public key", roller.PublicKey)
|
||||
log.Info("roller is picked", "session id", taskId, "name", roller.Name, "public key", roller.PublicKey)
|
||||
// send trace to roller
|
||||
if !roller.sendTask(task.Hash, traces) {
|
||||
log.Error("send task failed", "roller name", roller.Name, "public key", roller.PublicKey, "id", task.Hash)
|
||||
if !roller.sendTask(taskId, traces) {
|
||||
log.Error("send task failed", "roller name", roller.Name, "public key", roller.PublicKey, "id", taskId)
|
||||
continue
|
||||
}
|
||||
rollers[roller.PublicKey] = &types.RollerStatus{PublicKey: roller.PublicKey, Name: roller.Name, Status: types.RollerAssigned}
|
||||
}
|
||||
// No roller assigned.
|
||||
if len(rollers) == 0 {
|
||||
log.Error("no roller assigned", "id", task.Hash, "number of idle rollers", m.GetNumberOfIdleRollers())
|
||||
log.Error("no roller assigned", "id", taskId, "number of idle rollers", m.GetNumberOfIdleRollers())
|
||||
return false
|
||||
}
|
||||
|
||||
// Update session proving status as assigned.
|
||||
if err = m.orm.UpdateProvingStatus(task.Hash, types.ProvingTaskAssigned); err != nil {
|
||||
log.Error("failed to update task status", "id", task.Hash, "err", err)
|
||||
if err = m.orm.UpdateProvingStatus(taskId, types.ProvingTaskAssigned); err != nil {
|
||||
log.Error("failed to update task status", "id", taskId, "err", err)
|
||||
return false
|
||||
}
|
||||
|
||||
// Create a proof generation session.
|
||||
sess := &session{
|
||||
info: &types.SessionInfo{
|
||||
ID: task.Hash,
|
||||
ID: taskId,
|
||||
Rollers: rollers,
|
||||
StartTimestamp: time.Now().Unix(),
|
||||
Attempts: 1,
|
||||
},
|
||||
finishChan: make(chan rollerProofStatus, proofAndPkBufferSize),
|
||||
}
|
||||
if prevSession != nil {
|
||||
sess.info.Attempts += prevSession.info.Attempts
|
||||
}
|
||||
|
||||
// Store session info.
|
||||
if err = m.orm.SetSessionInfo(sess.info); err != nil {
|
||||
@@ -531,7 +560,7 @@ func (m *Manager) StartProofGenerationSession(task *types.BlockBatch) (success b
|
||||
}
|
||||
|
||||
m.mu.Lock()
|
||||
m.sessions[task.Hash] = sess
|
||||
m.sessions[taskId] = sess
|
||||
m.mu.Unlock()
|
||||
go m.CollectProofs(sess)
|
||||
|
||||
|
||||
@@ -87,6 +87,7 @@ func TestApis(t *testing.T) {
|
||||
t.Run("TestSeveralConnections", testSeveralConnections)
|
||||
t.Run("TestValidProof", testValidProof)
|
||||
t.Run("TestInvalidProof", testInvalidProof)
|
||||
t.Run("TestTimedoutProof", testTimedoutProof)
|
||||
t.Run("TestIdleRollerSelection", testIdleRollerSelection)
|
||||
// TODO: Restart roller alone when received task, can add this test case in integration-test.
|
||||
//t.Run("TestRollerReconnect", testRollerReconnect)
|
||||
@@ -356,6 +357,86 @@ func testInvalidProof(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func testTimedoutProof(t *testing.T) {
|
||||
// Create db handler and reset db.
|
||||
l2db, err := database.NewOrmFactory(cfg.DBConfig)
|
||||
assert.NoError(t, err)
|
||||
assert.NoError(t, migrate.ResetDB(l2db.GetDB().DB))
|
||||
defer l2db.Close()
|
||||
|
||||
// Setup coordinator and ws server.
|
||||
wsURL := "ws://" + randomURL()
|
||||
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, 1, wsURL)
|
||||
defer func() {
|
||||
handler.Shutdown(context.Background())
|
||||
rollerManager.Stop()
|
||||
}()
|
||||
|
||||
// create first mock roller, that will not send any proof.
|
||||
roller1 := newMockRoller(t, "roller_test"+strconv.Itoa(0), wsURL)
|
||||
defer func() {
|
||||
// close connection
|
||||
roller1.close()
|
||||
}()
|
||||
assert.Equal(t, 1, rollerManager.GetNumberOfIdleRollers())
|
||||
|
||||
var hashes = make([]string, 1)
|
||||
dbTx, err := l2db.Beginx()
|
||||
assert.NoError(t, err)
|
||||
for i := range hashes {
|
||||
assert.NoError(t, l2db.NewBatchInDBTx(dbTx, batchData))
|
||||
hashes[i] = batchData.Hash().Hex()
|
||||
|
||||
}
|
||||
assert.NoError(t, dbTx.Commit())
|
||||
|
||||
// verify proof status, it should be assigned, because roller didn't send any proof
|
||||
var (
|
||||
tick = time.Tick(500 * time.Millisecond)
|
||||
tickStop = time.Tick(10 * time.Second)
|
||||
)
|
||||
for len(hashes) > 0 {
|
||||
select {
|
||||
case <-tick:
|
||||
status, err := l2db.GetProvingStatusByHash(hashes[0])
|
||||
assert.NoError(t, err)
|
||||
if status == types.ProvingTaskAssigned {
|
||||
hashes = hashes[1:]
|
||||
}
|
||||
case <-tickStop:
|
||||
t.Error("failed to check proof status")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// create second mock roller, that will send valid proof.
|
||||
roller2 := newMockRoller(t, "roller_test"+strconv.Itoa(1), wsURL)
|
||||
roller2.waitTaskAndSendProof(t, time.Second, false, true)
|
||||
defer func() {
|
||||
// close connection
|
||||
roller2.close()
|
||||
}()
|
||||
assert.Equal(t, 1, rollerManager.GetNumberOfIdleRollers())
|
||||
|
||||
// wait manager to finish first CollectProofs
|
||||
<-time.After(60 * time.Second)
|
||||
|
||||
// verify proof status, it should be verified now, because second roller sent valid proof
|
||||
for len(hashes) > 0 {
|
||||
select {
|
||||
case <-tick:
|
||||
status, err := l2db.GetProvingStatusByHash(hashes[0])
|
||||
assert.NoError(t, err)
|
||||
if status == types.ProvingTaskVerified {
|
||||
hashes = hashes[1:]
|
||||
}
|
||||
case <-tickStop:
|
||||
t.Error("failed to check proof status")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testIdleRollerSelection(t *testing.T) {
|
||||
// Create db handler and reset db.
|
||||
l2db, err := database.NewOrmFactory(cfg.DBConfig)
|
||||
@@ -505,6 +586,7 @@ func setupCoordinator(t *testing.T, dbCfg *database.DBConfig, rollersPerSession
|
||||
CollectionTime: 1,
|
||||
TokenTimeToLive: 5,
|
||||
MaxVerifierWorkers: 10,
|
||||
SessionAttempts: 2,
|
||||
}, db, nil)
|
||||
assert.NoError(t, err)
|
||||
assert.NoError(t, rollerManager.Start())
|
||||
|
||||
@@ -78,6 +78,7 @@ func free(t *testing.T) {
|
||||
}
|
||||
|
||||
type appAPI interface {
|
||||
OpenLog(open bool)
|
||||
WaitResult(t *testing.T, timeout time.Duration, keyword string) bool
|
||||
RunApp(waitResult func() bool)
|
||||
WaitExit()
|
||||
@@ -86,33 +87,44 @@ type appAPI interface {
|
||||
|
||||
func runMsgRelayerApp(t *testing.T, args ...string) appAPI {
|
||||
args = append(args, "--log.debug", "--config", bridgeFile)
|
||||
return cmd.NewCmd("message-relayer-test", args...)
|
||||
app := cmd.NewCmd("message-relayer-test", args...)
|
||||
app.OpenLog(true)
|
||||
return app
|
||||
}
|
||||
|
||||
func runGasOracleApp(t *testing.T, args ...string) appAPI {
|
||||
args = append(args, "--log.debug", "--config", bridgeFile)
|
||||
return cmd.NewCmd("gas-oracle-test", args...)
|
||||
app := cmd.NewCmd("gas-oracle-test", args...)
|
||||
app.OpenLog(true)
|
||||
return app
|
||||
}
|
||||
|
||||
func runRollupRelayerApp(t *testing.T, args ...string) appAPI {
|
||||
args = append(args, "--log.debug", "--config", bridgeFile)
|
||||
return cmd.NewCmd("rollup-relayer-test", args...)
|
||||
app := cmd.NewCmd("rollup-relayer-test", args...)
|
||||
app.OpenLog(true)
|
||||
return app
|
||||
}
|
||||
|
||||
func runEventWatcherApp(t *testing.T, args ...string) appAPI {
|
||||
args = append(args, "--log.debug", "--config", bridgeFile)
|
||||
return cmd.NewCmd("event-watcher-test", args...)
|
||||
app := cmd.NewCmd("event-watcher-test", args...)
|
||||
app.OpenLog(true)
|
||||
return app
|
||||
}
|
||||
|
||||
func runCoordinatorApp(t *testing.T, args ...string) appAPI {
|
||||
args = append(args, "--log.debug", "--config", coordinatorFile, "--ws", "--ws.port", strconv.Itoa(int(wsPort)))
|
||||
// start process
|
||||
return cmd.NewCmd("coordinator-test", args...)
|
||||
app := cmd.NewCmd("coordinator-test", args...)
|
||||
app.OpenLog(true)
|
||||
return app
|
||||
}
|
||||
|
||||
func runDBCliApp(t *testing.T, option, keyword string) {
|
||||
args := []string{option, "--config", dbFile}
|
||||
app := cmd.NewCmd("db_cli-test", args...)
|
||||
app.OpenLog(true)
|
||||
defer app.WaitExit()
|
||||
|
||||
// Wait expect result.
|
||||
@@ -122,7 +134,9 @@ func runDBCliApp(t *testing.T, option, keyword string) {
|
||||
|
||||
func runRollerApp(t *testing.T, args ...string) appAPI {
|
||||
args = append(args, "--log.debug", "--config", rollerFile)
|
||||
return cmd.NewCmd("roller-test", args...)
|
||||
app := cmd.NewCmd("roller-test", args...)
|
||||
app.OpenLog(true)
|
||||
return app
|
||||
}
|
||||
|
||||
func runSender(t *testing.T, endpoint string) *sender.Sender {
|
||||
|
||||
Reference in New Issue
Block a user