diff --git a/bridge/l2/relayer.go b/bridge/l2/relayer.go index 92ae3eba6..9a4ba593d 100644 --- a/bridge/l2/relayer.go +++ b/bridge/l2/relayer.go @@ -101,8 +101,7 @@ func (r *Layer2Relayer) GetMsgConfirmCh() <-chan *sender.Confirmation { const processMsgLimit = 100 // ProcessSavedEvents relays saved un-processed cross-domain transactions to desired blockchain -func (r *Layer2Relayer) ProcessSavedEvents(wg *sync.WaitGroup) { - defer wg.Done() +func (r *Layer2Relayer) ProcessSavedEvents() { batch, err := r.db.GetLatestFinalizedBatch() if err != nil { log.Error("GetLatestFinalizedBatch failed", "err", err) @@ -198,8 +197,7 @@ func (r *Layer2Relayer) processSavedEvent(msg *orm.L2Message, index uint64) erro } // ProcessPendingBatches submit batch data to layer 1 rollup contract -func (r *Layer2Relayer) ProcessPendingBatches(wg *sync.WaitGroup) { - defer wg.Done() +func (r *Layer2Relayer) ProcessPendingBatches() { // batches are sorted by batch index in increasing order batchesInDB, err := r.db.GetPendingBatches(1) if err != nil { @@ -292,9 +290,7 @@ func (r *Layer2Relayer) ProcessPendingBatches(wg *sync.WaitGroup) { } // ProcessCommittedBatches submit proof to layer 1 rollup contract -func (r *Layer2Relayer) ProcessCommittedBatches(wg *sync.WaitGroup) { - defer wg.Done() - +func (r *Layer2Relayer) ProcessCommittedBatches() { // set skipped batches in a single db operation if count, err := r.db.UpdateSkippedBatches(); err != nil { log.Error("UpdateSkippedBatches failed", "err", err) @@ -407,28 +403,42 @@ func (r *Layer2Relayer) ProcessCommittedBatches(wg *sync.WaitGroup) { // Start the relayer process func (r *Layer2Relayer) Start() { - go func() { - // trigger by timer + loop := func(ctx context.Context, f func()) { ticker := time.NewTicker(time.Second) defer ticker.Stop() for { select { - case <-ticker.C: - var wg = sync.WaitGroup{} - wg.Add(3) - go r.ProcessSavedEvents(&wg) - go r.ProcessPendingBatches(&wg) - go r.ProcessCommittedBatches(&wg) - wg.Wait() - case confirmation := <-r.messageCh: - r.HandleConfirmation(confirmation) - case confirmation := <-r.rollupCh: - r.HandleConfirmation(confirmation) - case <-r.stopCh: + case <-ctx.Done(): return + case <-ticker.C: + f() } } + } + + go func() { + ctx, cancel := context.WithCancel(r.ctx) + + go loop(ctx, r.ProcessSavedEvents) + go loop(ctx, r.ProcessPendingBatches) + go loop(ctx, r.ProcessCommittedBatches) + + go func(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case confirmation := <-r.messageCh: + r.HandleConfirmation(confirmation) + case confirmation := <-r.rollupCh: + r.HandleConfirmation(confirmation) + } + } + }(ctx) + + <-r.stopCh + cancel() }() } diff --git a/bridge/l2/relayer_test.go b/bridge/l2/relayer_test.go index 6ca6d6a13..8e0726db8 100644 --- a/bridge/l2/relayer_test.go +++ b/bridge/l2/relayer_test.go @@ -5,7 +5,6 @@ import ( "encoding/json" "math/big" "os" - "sync" "testing" "time" @@ -95,10 +94,7 @@ func testL2RelayerProcessSaveEvents(t *testing.T) { err = db.UpdateRollupStatus(context.Background(), batchID, orm.RollupFinalized) assert.NoError(t, err) - var wg = sync.WaitGroup{} - wg.Add(1) - relayer.ProcessSavedEvents(&wg) - wg.Wait() + relayer.ProcessSavedEvents() msg, err := db.GetL2MessageByNonce(templateL2Message[0].Nonce) assert.NoError(t, err) @@ -154,10 +150,7 @@ func testL2RelayerProcessPendingBatches(t *testing.T) { // err = db.UpdateRollupStatus(context.Background(), batchID, orm.RollupPending) // assert.NoError(t, err) - var wg = sync.WaitGroup{} - wg.Add(1) - relayer.ProcessPendingBatches(&wg) - wg.Wait() + relayer.ProcessPendingBatches() // Check if Rollup Result is changed successfully status, err := db.GetRollupStatus(batchID) @@ -194,10 +187,7 @@ func testL2RelayerProcessCommittedBatches(t *testing.T) { err = db.UpdateProvingStatus(batchID, orm.ProvingTaskVerified) assert.NoError(t, err) - var wg = sync.WaitGroup{} - wg.Add(1) - relayer.ProcessCommittedBatches(&wg) - wg.Wait() + relayer.ProcessCommittedBatches() status, err := db.GetRollupStatus(batchID) assert.NoError(t, err) @@ -254,10 +244,7 @@ func testL2RelayerSkipBatches(t *testing.T) { createBatch(orm.RollupCommitted, orm.ProvingTaskVerified), } - var wg = sync.WaitGroup{} - wg.Add(1) - relayer.ProcessCommittedBatches(&wg) - wg.Wait() + relayer.ProcessCommittedBatches() for _, id := range skipped { status, err := db.GetRollupStatus(id) diff --git a/bridge/tests/l2_message_relay_test.go b/bridge/tests/l2_message_relay_test.go index d5387fdb2..9dea2e6fb 100644 --- a/bridge/tests/l2_message_relay_test.go +++ b/bridge/tests/l2_message_relay_test.go @@ -6,7 +6,6 @@ import ( "scroll-tech/database" "scroll-tech/database/migrate" "scroll-tech/database/orm" - "sync" "testing" "scroll-tech/bridge/l1" @@ -25,9 +24,6 @@ func testRelayL2MessageSucceed(t *testing.T) { assert.NoError(t, migrate.ResetDB(db.GetDB().DB)) defer db.Close() - var wg sync.WaitGroup - wg.Add(3) - prepareContracts(t) // Create L2Relayer @@ -111,7 +107,7 @@ func testRelayL2MessageSucceed(t *testing.T) { assert.NoError(t, err) // process pending batch and check status - l2Relayer.ProcessPendingBatches(&wg) + l2Relayer.ProcessPendingBatches() status, err := db.GetRollupStatus(batchID) assert.NoError(t, err) assert.Equal(t, orm.RollupCommitting, status) @@ -132,7 +128,7 @@ func testRelayL2MessageSucceed(t *testing.T) { assert.Equal(t, orm.RollupCommitted, status) // process committed batch and check status - l2Relayer.ProcessCommittedBatches(&wg) + l2Relayer.ProcessCommittedBatches() status, err = db.GetRollupStatus(batchID) assert.NoError(t, err) assert.Equal(t, orm.RollupFinalizing, status) @@ -153,7 +149,7 @@ func testRelayL2MessageSucceed(t *testing.T) { assert.Equal(t, orm.RollupFinalized, status) // process l2 messages - l2Relayer.ProcessSavedEvents(&wg) + l2Relayer.ProcessSavedEvents() msg, err = db.GetL2MessageByNonce(nonce.Uint64()) assert.NoError(t, err) assert.Equal(t, msg.Status, orm.MsgSubmitted) diff --git a/bridge/tests/rollup_test.go b/bridge/tests/rollup_test.go index 3a697e9b6..22d8e2c7b 100644 --- a/bridge/tests/rollup_test.go +++ b/bridge/tests/rollup_test.go @@ -6,7 +6,6 @@ import ( "scroll-tech/database" "scroll-tech/database/migrate" "scroll-tech/database/orm" - "sync" "testing" "scroll-tech/bridge/l1" @@ -79,11 +78,8 @@ func testCommitBatchAndFinalizeBatch(t *testing.T) { err = dbTx.Commit() assert.NoError(t, err) - var wg = sync.WaitGroup{} - wg.Add(1) // process pending batch and check status - l2Relayer.ProcessPendingBatches(&wg) - wg.Wait() + l2Relayer.ProcessPendingBatches() status, err := db.GetRollupStatus(batchID) assert.NoError(t, err) @@ -112,10 +108,8 @@ func testCommitBatchAndFinalizeBatch(t *testing.T) { err = db.UpdateProvingStatus(batchID, orm.ProvingTaskVerified) assert.NoError(t, err) - wg.Add(1) // process committed batch and check status - l2Relayer.ProcessCommittedBatches(&wg) - wg.Wait() + l2Relayer.ProcessCommittedBatches() status, err = db.GetRollupStatus(batchID) assert.NoError(t, err) diff --git a/common/version/version.go b/common/version/version.go index fe0bcd542..8cce05031 100644 --- a/common/version/version.go +++ b/common/version/version.go @@ -5,7 +5,7 @@ import ( "runtime/debug" ) -var tag = "prealpha-v11.19" +var tag = "prealpha-v11.20" var commit = func() string { if info, ok := debug.ReadBuildInfo(); ok {