Compare commits

..

3 Commits

Author SHA1 Message Date
Péter Garamvölgyi
5d761ad812 Make sure attempts can be deserialized from db on startup (#410) 2023-04-05 19:00:54 +02:00
Nazarii Denha
4042bea6db retry proving timeout batch (#313)
Co-authored-by: Péter Garamvölgyi <peter@scroll.io>
Co-authored-by: colin <102356659+colinlyguo@users.noreply.github.com>
2023-04-05 16:42:06 +02:00
maskpp
de7c38a903 feat(test): let integration-test log verbosity be configurable (#409) 2023-04-04 16:20:12 +08:00
8 changed files with 179 additions and 38 deletions

View File

@@ -31,6 +31,8 @@ type Cmd struct {
checkFuncs cmap.ConcurrentMap //map[string]checkFunc
// open log flag.
openLog bool
// error channel
ErrChan chan error
}
@@ -64,7 +66,7 @@ func (c *Cmd) runCmd() {
// RunCmd parallel running when parallel is true.
func (c *Cmd) RunCmd(parallel bool) {
fmt.Println("cmd: ", c.args)
fmt.Println("cmd:", c.args)
if parallel {
go c.runCmd()
} else {
@@ -72,12 +74,17 @@ func (c *Cmd) RunCmd(parallel bool) {
}
}
// OpenLog open cmd log by this api.
func (c *Cmd) OpenLog(open bool) {
c.openLog = open
}
func (c *Cmd) Write(data []byte) (int, error) {
out := string(data)
if verbose {
fmt.Printf("%s: %v", c.name, out)
if verbose || c.openLog {
fmt.Printf("%s:\n\t%v", c.name, out)
} else if strings.Contains(out, "error") || strings.Contains(out, "warning") {
fmt.Printf("%s: %v", c.name, out)
fmt.Printf("%s:\n\t%v", c.name, out)
}
go c.checkFuncs.IterCb(func(_ string, value interface{}) {
check := value.(checkFunc)

View File

@@ -162,6 +162,7 @@ type SessionInfo struct {
ID string `json:"id"`
Rollers map[string]*RollerStatus `json:"rollers"`
StartTimestamp int64 `json:"start_timestamp"`
Attempts uint8 `json:"attempts,omitempty"`
}
// ProvingStatus block_batch proving_status (unassigned, assigned, proved, verified, submitted)

View File

@@ -5,7 +5,7 @@ import (
"runtime/debug"
)
var tag = "v3.0.4"
var tag = "v3.0.6"
var commit = func() string {
if info, ok := debug.ReadBuildInfo(); ok {

View File

@@ -2,6 +2,7 @@
"roller_manager_config": {
"compression_level": 9,
"rollers_per_session": 1,
"session_attempts": 2,
"collection_time": 180,
"token_time_to_live": 60,
"verifier": {

View File

@@ -11,7 +11,8 @@ import (
)
const (
defaultNumberOfVerifierWorkers = 10
defaultNumberOfVerifierWorkers = 10
defaultNumberOfSessionRetryAttempts = 2
)
// RollerManagerConfig loads sequencer configuration items.
@@ -21,6 +22,9 @@ type RollerManagerConfig struct {
OrderSession string `json:"order_session,omitempty"`
// The amount of rollers to pick per proof generation session.
RollersPerSession uint8 `json:"rollers_per_session"`
// Number of attempts that a session can be retried if previous attempts failed.
// Currently we only consider proving timeout as failure here.
SessionAttempts uint8 `json:"session_attempts,omitempty"`
// Zk verifier config.
Verifier *VerifierConfig `json:"verifier,omitempty"`
// Proof collection time (in minutes).
@@ -74,6 +78,9 @@ func NewConfig(file string) (*Config, error) {
if cfg.RollerManagerConfig.MaxVerifierWorkers == 0 {
cfg.RollerManagerConfig.MaxVerifierWorkers = defaultNumberOfVerifierWorkers
}
if cfg.RollerManagerConfig.SessionAttempts == 0 {
cfg.RollerManagerConfig.SessionAttempts = defaultNumberOfSessionRetryAttempts
}
return cfg, nil
}

View File

@@ -176,7 +176,7 @@ func (m *Manager) Loop() {
}
}
// Select roller and send message
for len(tasks) > 0 && m.StartProofGenerationSession(tasks[0]) {
for len(tasks) > 0 && m.StartProofGenerationSession(tasks[0], nil) {
tasks = tasks[1:]
}
case <-m.ctx.Done():
@@ -338,20 +338,22 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
// CollectProofs collects proofs corresponding to a proof generation session.
func (m *Manager) CollectProofs(sess *session) {
//Cleanup roller sessions before return.
defer func() {
// TODO: remove the clean-up, rollers report healthy status.
m.mu.Lock()
for pk := range sess.info.Rollers {
m.freeTaskIDForRoller(pk, sess.info.ID)
}
delete(m.sessions, sess.info.ID)
m.mu.Unlock()
}()
for {
select {
//Execute after timeout, set in config.json. Consider all rollers failed.
case <-time.After(time.Duration(m.cfg.CollectionTime) * time.Minute):
// Check if session can be replayed
if sess.info.Attempts < m.cfg.SessionAttempts {
if m.StartProofGenerationSession(nil, sess) {
m.mu.Lock()
for pk := range sess.info.Rollers {
m.freeTaskIDForRoller(pk, sess.info.ID)
}
m.mu.Unlock()
log.Info("Retrying session", "session id:", sess.info.ID)
return
}
}
// record failed session.
errMsg := "proof generation session ended without receiving any valid proofs"
m.addFailedSession(sess, errMsg)
@@ -363,6 +365,12 @@ func (m *Manager) CollectProofs(sess *session) {
if err := m.orm.UpdateProvingStatus(sess.info.ID, types.ProvingTaskFailed); err != nil {
log.Error("fail to reset task_status as Unassigned", "id", sess.info.ID, "err", err)
}
m.mu.Lock()
for pk := range sess.info.Rollers {
m.freeTaskIDForRoller(pk, sess.info.ID)
}
delete(m.sessions, sess.info.ID)
m.mu.Unlock()
return
//Execute after one of the roller finishes sending proof, return early if all rollers had sent results.
@@ -386,6 +394,11 @@ func (m *Manager) CollectProofs(sess *session) {
randIndex := mathrand.Intn(len(validRollers))
_ = validRollers[randIndex]
// TODO: reward winner
for pk := range sess.info.Rollers {
m.freeTaskIDForRoller(pk, sess.info.ID)
}
delete(m.sessions, sess.info.ID)
m.mu.Unlock()
return
}
@@ -439,27 +452,39 @@ func (m *Manager) APIs() []rpc.API {
}
// StartProofGenerationSession starts a proof generation session
func (m *Manager) StartProofGenerationSession(task *types.BlockBatch) (success bool) {
func (m *Manager) StartProofGenerationSession(task *types.BlockBatch, prevSession *session) (success bool) {
var taskId string
if task != nil {
taskId = task.Hash
} else {
taskId = prevSession.info.ID
}
if m.GetNumberOfIdleRollers() == 0 {
log.Warn("no idle roller when starting proof generation session", "id", task.Hash)
log.Warn("no idle roller when starting proof generation session", "id", taskId)
return false
}
log.Info("start proof generation session", "id", task.Hash)
log.Info("start proof generation session", "id", taskId)
defer func() {
if !success {
if err := m.orm.UpdateProvingStatus(task.Hash, types.ProvingTaskUnassigned); err != nil {
log.Error("fail to reset task_status as Unassigned", "id", task.Hash, "err", err)
if task != nil {
if err := m.orm.UpdateProvingStatus(taskId, types.ProvingTaskUnassigned); err != nil {
log.Error("fail to reset task_status as Unassigned", "id", taskId, "err", err)
}
} else {
if err := m.orm.UpdateProvingStatus(taskId, types.ProvingTaskFailed); err != nil {
log.Error("fail to reset task_status as Failed", "id", taskId, "err", err)
}
}
}
}()
// Get block traces.
blockInfos, err := m.orm.GetL2BlockInfos(map[string]interface{}{"batch_hash": task.Hash})
blockInfos, err := m.orm.GetL2BlockInfos(map[string]interface{}{"batch_hash": taskId})
if err != nil {
log.Error(
"could not GetBlockInfos",
"batch_hash", task.Hash,
"batch_hash", taskId,
"error", err,
)
return false
@@ -486,35 +511,39 @@ func (m *Manager) StartProofGenerationSession(task *types.BlockBatch) (success b
log.Info("selectRoller returns nil")
break
}
log.Info("roller is picked", "session id", task.Hash, "name", roller.Name, "public key", roller.PublicKey)
log.Info("roller is picked", "session id", taskId, "name", roller.Name, "public key", roller.PublicKey)
// send trace to roller
if !roller.sendTask(task.Hash, traces) {
log.Error("send task failed", "roller name", roller.Name, "public key", roller.PublicKey, "id", task.Hash)
if !roller.sendTask(taskId, traces) {
log.Error("send task failed", "roller name", roller.Name, "public key", roller.PublicKey, "id", taskId)
continue
}
rollers[roller.PublicKey] = &types.RollerStatus{PublicKey: roller.PublicKey, Name: roller.Name, Status: types.RollerAssigned}
}
// No roller assigned.
if len(rollers) == 0 {
log.Error("no roller assigned", "id", task.Hash, "number of idle rollers", m.GetNumberOfIdleRollers())
log.Error("no roller assigned", "id", taskId, "number of idle rollers", m.GetNumberOfIdleRollers())
return false
}
// Update session proving status as assigned.
if err = m.orm.UpdateProvingStatus(task.Hash, types.ProvingTaskAssigned); err != nil {
log.Error("failed to update task status", "id", task.Hash, "err", err)
if err = m.orm.UpdateProvingStatus(taskId, types.ProvingTaskAssigned); err != nil {
log.Error("failed to update task status", "id", taskId, "err", err)
return false
}
// Create a proof generation session.
sess := &session{
info: &types.SessionInfo{
ID: task.Hash,
ID: taskId,
Rollers: rollers,
StartTimestamp: time.Now().Unix(),
Attempts: 1,
},
finishChan: make(chan rollerProofStatus, proofAndPkBufferSize),
}
if prevSession != nil {
sess.info.Attempts += prevSession.info.Attempts
}
// Store session info.
if err = m.orm.SetSessionInfo(sess.info); err != nil {
@@ -531,7 +560,7 @@ func (m *Manager) StartProofGenerationSession(task *types.BlockBatch) (success b
}
m.mu.Lock()
m.sessions[task.Hash] = sess
m.sessions[taskId] = sess
m.mu.Unlock()
go m.CollectProofs(sess)

View File

@@ -87,6 +87,7 @@ func TestApis(t *testing.T) {
t.Run("TestSeveralConnections", testSeveralConnections)
t.Run("TestValidProof", testValidProof)
t.Run("TestInvalidProof", testInvalidProof)
t.Run("TestTimedoutProof", testTimedoutProof)
t.Run("TestIdleRollerSelection", testIdleRollerSelection)
// TODO: Restart roller alone when received task, can add this test case in integration-test.
//t.Run("TestRollerReconnect", testRollerReconnect)
@@ -356,6 +357,86 @@ func testInvalidProof(t *testing.T) {
}
}
func testTimedoutProof(t *testing.T) {
// Create db handler and reset db.
l2db, err := database.NewOrmFactory(cfg.DBConfig)
assert.NoError(t, err)
assert.NoError(t, migrate.ResetDB(l2db.GetDB().DB))
defer l2db.Close()
// Setup coordinator and ws server.
wsURL := "ws://" + randomURL()
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, 1, wsURL)
defer func() {
handler.Shutdown(context.Background())
rollerManager.Stop()
}()
// create first mock roller, that will not send any proof.
roller1 := newMockRoller(t, "roller_test"+strconv.Itoa(0), wsURL)
defer func() {
// close connection
roller1.close()
}()
assert.Equal(t, 1, rollerManager.GetNumberOfIdleRollers())
var hashes = make([]string, 1)
dbTx, err := l2db.Beginx()
assert.NoError(t, err)
for i := range hashes {
assert.NoError(t, l2db.NewBatchInDBTx(dbTx, batchData))
hashes[i] = batchData.Hash().Hex()
}
assert.NoError(t, dbTx.Commit())
// verify proof status, it should be assigned, because roller didn't send any proof
var (
tick = time.Tick(500 * time.Millisecond)
tickStop = time.Tick(10 * time.Second)
)
for len(hashes) > 0 {
select {
case <-tick:
status, err := l2db.GetProvingStatusByHash(hashes[0])
assert.NoError(t, err)
if status == types.ProvingTaskAssigned {
hashes = hashes[1:]
}
case <-tickStop:
t.Error("failed to check proof status")
return
}
}
// create second mock roller, that will send valid proof.
roller2 := newMockRoller(t, "roller_test"+strconv.Itoa(1), wsURL)
roller2.waitTaskAndSendProof(t, time.Second, false, true)
defer func() {
// close connection
roller2.close()
}()
assert.Equal(t, 1, rollerManager.GetNumberOfIdleRollers())
// wait manager to finish first CollectProofs
<-time.After(60 * time.Second)
// verify proof status, it should be verified now, because second roller sent valid proof
for len(hashes) > 0 {
select {
case <-tick:
status, err := l2db.GetProvingStatusByHash(hashes[0])
assert.NoError(t, err)
if status == types.ProvingTaskVerified {
hashes = hashes[1:]
}
case <-tickStop:
t.Error("failed to check proof status")
return
}
}
}
func testIdleRollerSelection(t *testing.T) {
// Create db handler and reset db.
l2db, err := database.NewOrmFactory(cfg.DBConfig)
@@ -505,6 +586,7 @@ func setupCoordinator(t *testing.T, dbCfg *database.DBConfig, rollersPerSession
CollectionTime: 1,
TokenTimeToLive: 5,
MaxVerifierWorkers: 10,
SessionAttempts: 2,
}, db, nil)
assert.NoError(t, err)
assert.NoError(t, rollerManager.Start())

View File

@@ -78,6 +78,7 @@ func free(t *testing.T) {
}
type appAPI interface {
OpenLog(open bool)
WaitResult(t *testing.T, timeout time.Duration, keyword string) bool
RunApp(waitResult func() bool)
WaitExit()
@@ -86,33 +87,44 @@ type appAPI interface {
func runMsgRelayerApp(t *testing.T, args ...string) appAPI {
args = append(args, "--log.debug", "--config", bridgeFile)
return cmd.NewCmd("message-relayer-test", args...)
app := cmd.NewCmd("message-relayer-test", args...)
app.OpenLog(true)
return app
}
func runGasOracleApp(t *testing.T, args ...string) appAPI {
args = append(args, "--log.debug", "--config", bridgeFile)
return cmd.NewCmd("gas-oracle-test", args...)
app := cmd.NewCmd("gas-oracle-test", args...)
app.OpenLog(true)
return app
}
func runRollupRelayerApp(t *testing.T, args ...string) appAPI {
args = append(args, "--log.debug", "--config", bridgeFile)
return cmd.NewCmd("rollup-relayer-test", args...)
app := cmd.NewCmd("rollup-relayer-test", args...)
app.OpenLog(true)
return app
}
func runEventWatcherApp(t *testing.T, args ...string) appAPI {
args = append(args, "--log.debug", "--config", bridgeFile)
return cmd.NewCmd("event-watcher-test", args...)
app := cmd.NewCmd("event-watcher-test", args...)
app.OpenLog(true)
return app
}
func runCoordinatorApp(t *testing.T, args ...string) appAPI {
args = append(args, "--log.debug", "--config", coordinatorFile, "--ws", "--ws.port", strconv.Itoa(int(wsPort)))
// start process
return cmd.NewCmd("coordinator-test", args...)
app := cmd.NewCmd("coordinator-test", args...)
app.OpenLog(true)
return app
}
func runDBCliApp(t *testing.T, option, keyword string) {
args := []string{option, "--config", dbFile}
app := cmd.NewCmd("db_cli-test", args...)
app.OpenLog(true)
defer app.WaitExit()
// Wait expect result.
@@ -122,7 +134,9 @@ func runDBCliApp(t *testing.T, option, keyword string) {
func runRollerApp(t *testing.T, args ...string) appAPI {
args = append(args, "--log.debug", "--config", rollerFile)
return cmd.NewCmd("roller-test", args...)
app := cmd.NewCmd("roller-test", args...)
app.OpenLog(true)
return app
}
func runSender(t *testing.T, endpoint string) *sender.Sender {