diff --git a/bridge/relayer/l1_relayer_test.go b/bridge/relayer/l1_relayer_test.go index 1e7f8133f..8bfb92183 100644 --- a/bridge/relayer/l1_relayer_test.go +++ b/bridge/relayer/l1_relayer_test.go @@ -95,6 +95,6 @@ func testL1CheckSubmittedMessages(t *testing.T) { assert.Equal(t, templateL1Message[0].QueueIndex, maxIndex) // check tx is on chain. - _, err = l1Cli.TransactionReceipt(context.Background(), common.HexToHash(txMsgs[0].TxHash.String)) + _, err = l2Cli.TransactionReceipt(context.Background(), common.HexToHash(txMsgs[0].TxHash.String)) assert.NoError(t, err) } diff --git a/bridge/relayer/l2_relayer.go b/bridge/relayer/l2_relayer.go index 4397e4dc8..0a0244809 100644 --- a/bridge/relayer/l2_relayer.go +++ b/bridge/relayer/l2_relayer.go @@ -18,12 +18,10 @@ import ( "golang.org/x/sync/errgroup" "modernc.org/mathutil" - "scroll-tech/database" - "scroll-tech/database/orm" - "scroll-tech/common/metrics" "scroll-tech/common/types" cutils "scroll-tech/common/utils" + "scroll-tech/database" bridge_abi "scroll-tech/bridge/abi" "scroll-tech/bridge/config" @@ -194,6 +192,7 @@ func (r *Layer2Relayer) CheckSubmittedMessages() error { log.Error("failed to load or send l2 submitted tx", "msg.hash", msg.ID, "err", err) return err } + r.processingMessage.Store(msg.ID, msg.ID) log.Info("successfully check l2 submitted tx", "resend", isResend, "tx.Hash", tx.Hash().String()) } } @@ -366,6 +365,23 @@ func (r *Layer2Relayer) ProcessGasPriceOracle() { } } +func (r *Layer2Relayer) CheckRollupCommittingMessages() error { + var ( + batchIndex uint64 + batchLimit uint64 = 10 + ) + for { + maxIndex, batchHashes, err := r.checkRollupBatches(types.RollupCommitting, batchIndex, batchLimit) + if err != nil { + return err + } + batchIndex = maxIndex + for _, hash := range batchHashes { + r.processingBatchesCommitment.Store(hash+"-finalize", hash) + } + } +} + // SendCommitTx sends commitBatches tx to L1. func (r *Layer2Relayer) SendCommitTx(batchData []*types.BatchData) error { if len(batchData) == 0 { @@ -424,53 +440,66 @@ func (r *Layer2Relayer) SendCommitTx(batchData []*types.BatchData) error { return nil } +func (r *Layer2Relayer) checkRollupBatches(rollupStatus types.RollupStatus, batchIndex, batchLimit uint64) (uint64, []string, error) { + maxIndex, batches, err := r.db.GetBlockBatchTxMessages( + map[string]interface{}{"rollup_status": rollupStatus}, + fmt.Sprintf("AND index > %d", batchIndex), + fmt.Sprintf("ORDER BY index ASC LIMIT %d", batchLimit), + ) + if err != nil { + log.Error("failed to get Rollup batches", "rollup status", rollupStatus, "batch index", batchIndex, "err", err) + return 0, nil, err + } + if len(batches) == 0 { + return 0, nil, nil + } + + batchHashes := make([]string, len(batches)) + for i, msg := range batches { + // TODO: Is it necessary repair tx message? + if !msg.TxHash.Valid { + log.Warn("Rollup tx message is empty", "rollup status", rollupStatus, "tx id", msg.ID) + continue + } + cutils.TryTimes(-1, func() bool { + return !r.rollupSender.IsFull() + }) + batchHashes[i] = msg.ID + + isResend, tx, err := r.rollupSender.LoadOrResendTx( + msg.GetTxHash(), + msg.GetSender(), + msg.GetNonce(), + msg.ID, + msg.GetTarget(), + msg.GetValue(), + msg.Data, + 0, + ) + if err != nil { + log.Error("failed to load or send rollup tx", "batch hash", msg.ID, "rollup status", rollupStatus, "err", err) + return 0, nil, err + } + r.processingFinalization.Store(msg.ID+"-finalize", msg.ID) + log.Info("successfully check rollup batch tx", "resend", isResend, "tx.Hash", tx.Hash().String()) + } + return maxIndex, batchHashes, nil +} + // CheckRollupBatches rollupStatus: types.RollupCommitting, types.RollupFinalizing func (r *Layer2Relayer) CheckRollupBatches(rollupStatus types.RollupStatus) error { var ( batchIndex uint64 - batchLimit = 10 - db = orm.ScrollTxOrm(r.db) + batchLimit uint64 = 10 ) for { - index, batches, err := db.GetBlockBatchTxMessages( - map[string]interface{}{"rollup_status": rollupStatus}, - fmt.Sprintf("AND index > %d", batchIndex), - fmt.Sprintf("ORDER BY index ASC LIMIT %d", batchLimit), - ) + maxIndex, batchHashes, err := r.checkRollupBatches(types.RollupFinalizing, batchIndex, batchLimit) if err != nil { - log.Error("failed to get Rollup batches", "rollup status", rollupStatus, "batch index", batchIndex, "err", err) return err } - if len(batches) == 0 { - return nil - } - batchIndex = index - - for _, msg := range batches { - // TODO: Is it necessary repair tx message? - if !msg.TxHash.Valid { - log.Warn("Rollup tx message is empty", "rollup status", rollupStatus, "tx id", msg.ID) - continue - } - cutils.TryTimes(-1, func() bool { - return !r.rollupSender.IsFull() - }) - - isResend, tx, err := r.rollupSender.LoadOrResendTx( - msg.GetTxHash(), - msg.GetSender(), - msg.GetNonce(), - msg.ID, - msg.GetTarget(), - msg.GetValue(), - msg.Data, - 0, - ) - if err != nil { - log.Error("failed to load or send rollup tx", "batch hash", msg.ID, "rollup status", rollupStatus, "err", err) - return err - } - log.Info("successfully check rollup batch tx", "resend", isResend, "tx.Hash", tx.Hash().String()) + batchIndex = maxIndex + for _, hash := range batchHashes { + r.processingFinalization.Store(hash+"-finalize", hash) } } } diff --git a/bridge/relayer/l2_relayer_test.go b/bridge/relayer/l2_relayer_test.go index b3417b841..f8e760698 100644 --- a/bridge/relayer/l2_relayer_test.go +++ b/bridge/relayer/l2_relayer_test.go @@ -26,6 +26,7 @@ var ( templateL2Message = []*types.L2Message{ { Nonce: 1, + MsgHash: "msg_hash1", Height: 1, Sender: "0x596a746661dbed76a84556111c2872249b070e15", Value: "100", @@ -248,7 +249,80 @@ func testL2CheckSubmittedMessages(t *testing.T) { assert.Equal(t, templateL2Message[0].Nonce, maxNonce) // check tx is on chain. - _, err = l2Cli.TransactionReceipt(context.Background(), common.HexToHash(txMsgs[0].TxHash.String)) + _, err = l1Cli.TransactionReceipt(context.Background(), common.HexToHash(txMsgs[0].TxHash.String)) + assert.NoError(t, err) +} + +func testL2CheckRollupBatches(t *testing.T) { + // Create db handler and reset db. + db, err := database.NewOrmFactory(cfg.DBConfig) + assert.NoError(t, err) + assert.NoError(t, migrate.ResetDB(db.GetDB().DB)) + defer db.Close() + + err = db.SaveL2Messages(context.Background(), templateL2Message) + assert.NoError(t, err) + + traces := []*types.WrappedBlock{ + { + Header: &geth_types.Header{ + Number: big.NewInt(int64(templateL2Message[0].Height)), + }, + Transactions: nil, + WithdrawTrieRoot: common.Hash{}, + }, + { + Header: &geth_types.Header{ + Number: big.NewInt(int64(templateL2Message[0].Height + 1)), + }, + Transactions: nil, + WithdrawTrieRoot: common.Hash{}, + }, + } + assert.NoError(t, db.InsertWrappedBlocks(traces)) + + batchData1 := types.NewBatchData(&types.BlockBatch{ + Index: 0, + Hash: common.Hash{}.String(), + StateRoot: common.Hash{}.String(), + }, []*types.WrappedBlock{wrappedBlock1}, nil) + dbTx, err := db.Beginx() + assert.NoError(t, err) + assert.NoError(t, db.NewBatchInDBTx(dbTx, batchData1)) + batchHash := batchData1.Hash().Hex() + assert.NoError(t, db.SetBatchHashForL2BlocksInDBTx(dbTx, []uint64{1}, batchHash)) + assert.NoError(t, dbTx.Commit()) + + err = db.UpdateRollupStatus(context.Background(), batchHash, types.RollupCommitting) + assert.NoError(t, err) + + auth, err := bind.NewKeyedTransactorWithChainID(cfg.L2Config.RelayerConfig.MessageSenderPrivateKeys[0], l2ChainID) + assert.NoError(t, err) + + signedTx, err := mockTx(auth) + assert.NoError(t, err) + err = db.SaveTx(batchHash, auth.From.String(), types.RollUpCommitTx, signedTx) + assert.NoError(t, err) + + l2Cfg := cfg.L2Config + l2Cfg.RelayerConfig.SenderConfig.Confirmations = 0 + relayer, err := relayer.NewLayer2Relayer(context.Background(), l2Cli, db, l2Cfg.RelayerConfig) + assert.NoError(t, err) + assert.NoError(t, relayer.CheckRollupBatches(types.RollupCommitting)) + relayer.WaitL2RollupSender() + + // check tx is confirmed. + maxIndex, batches, err := db.GetBlockBatchTxMessages( + map[string]interface{}{"rollup_status": types.RollupCommitting}, + fmt.Sprintf("AND index > %d", 0), + fmt.Sprintf("ORDER BY index ASC LIMIT %d", 10), + ) + assert.NoError(t, err) + assert.Equal(t, 1, len(batches)) + assert.Equal(t, batchData1.Batch.BatchIndex+1, maxIndex) + + // check tx is on chain. + _, err = l2Cli.TransactionReceipt(context.Background(), common.HexToHash(batches[0].TxHash.String)) assert.NoError(t, err) } diff --git a/bridge/relayer/relayer_test.go b/bridge/relayer/relayer_test.go index e7686463b..53abb09af 100644 --- a/bridge/relayer/relayer_test.go +++ b/bridge/relayer/relayer_test.go @@ -124,6 +124,7 @@ func TestFunctions(t *testing.T) { t.Run("TestL2RelayerProcessCommittedBatches", testL2RelayerProcessCommittedBatches) t.Run("TestL2RelayerSkipBatches", testL2RelayerSkipBatches) t.Run("testL2CheckSubmittedMessages", testL2CheckSubmittedMessages) + t.Run("testL2CheckRollupBatches", testL2CheckRollupBatches) } func mockTx(auth *bind.TransactOpts) (*etypes.Transaction, error) { diff --git a/database/orm/transaction.go b/database/orm/transaction.go index 868896fb9..87b7929b9 100644 --- a/database/orm/transaction.go +++ b/database/orm/transaction.go @@ -145,7 +145,7 @@ func (t *scrollTxOrm) GetBlockBatchTxMessages(fields map[string]interface{}, arg query = query + fmt.Sprintf(" AND %s = :%s", key, key) } query = strings.Join(append([]string{query}, args...), " ") - query = fmt.Sprintf("select bt.index as index, bt.hash as id, tx.tx_hash, tx.sender, tx.nonce, tx.target, tx.value, tx.data from transaction as tx right join (%s) as bt on tx.id = bt.hash;", query) + query = fmt.Sprintf("select bt.index as index, bt.hash as id, tx.tx_hash, tx.sender, tx.nonce, tx.target, tx.value, tx.data from scroll_transaction as tx right join (%s) as bt on tx.id = bt.hash;", query) db := t.db rows, err := db.NamedQuery(db.Rebind(query), fields)