diff --git a/common/version/version.go b/common/version/version.go index 94e1d2d23..fa767bcd0 100644 --- a/common/version/version.go +++ b/common/version/version.go @@ -5,7 +5,7 @@ import ( "runtime/debug" ) -var tag = "v4.3.66" +var tag = "v4.3.67" var commit = func() string { if info, ok := debug.ReadBuildInfo(); ok { diff --git a/rollup/cmd/event_watcher/app/app.go b/rollup/cmd/event_watcher/app/app.go index e06792025..fd8ec6ef4 100644 --- a/rollup/cmd/event_watcher/app/app.go +++ b/rollup/cmd/event_watcher/app/app.go @@ -67,27 +67,15 @@ func action(ctx *cli.Context) error { log.Crit("failed to connect l1 geth", "config file", cfgFile, "error", err) } - l2client, err := ethclient.Dial(cfg.L2Config.Endpoint) - if err != nil { - log.Crit("failed to connect l2 geth", "config file", cfgFile, "error", err) - } - l1watcher := watcher.NewL1WatcherClient(ctx.Context, l1client, cfg.L1Config.StartHeight, cfg.L1Config.Confirmations, cfg.L1Config.L1MessageQueueAddress, cfg.L1Config.ScrollChainContractAddress, db, registry) - l2watcher := watcher.NewL2WatcherClient(ctx.Context, l2client, cfg.L2Config.Confirmations, cfg.L2Config.L2MessengerAddress, - cfg.L2Config.L2MessageQueueAddress, cfg.L2Config.WithdrawTrieRootSlot, db, registry) - go utils.Loop(subCtx, 10*time.Second, func() { if loopErr := l1watcher.FetchContractEvent(); loopErr != nil { log.Error("Failed to fetch bridge contract", "err", loopErr) } }) - // Start l2 watcher process - go utils.Loop(subCtx, 2*time.Second, l2watcher.FetchContractEvent) - // Finish start all l2 functions - log.Info("Start event-watcher successfully") // Catch CTRL-C to ensure a graceful shutdown. diff --git a/rollup/cmd/rollup_relayer/app/app.go b/rollup/cmd/rollup_relayer/app/app.go index 34aead829..b41461157 100644 --- a/rollup/cmd/rollup_relayer/app/app.go +++ b/rollup/cmd/rollup_relayer/app/app.go @@ -88,8 +88,7 @@ func action(ctx *cli.Context) error { log.Crit("failed to create batchProposer", "config file", cfgFile, "error", err) } - l2watcher := watcher.NewL2WatcherClient(subCtx, l2client, cfg.L2Config.Confirmations, cfg.L2Config.L2MessengerAddress, - cfg.L2Config.L2MessageQueueAddress, cfg.L2Config.WithdrawTrieRootSlot, db, registry) + l2watcher := watcher.NewL2WatcherClient(subCtx, l2client, cfg.L2Config.Confirmations, cfg.L2Config.L2MessageQueueAddress, cfg.L2Config.WithdrawTrieRootSlot, db, registry) // Watcher loop to fetch missing blocks go utils.LoopWithContext(subCtx, 2*time.Second, func(ctx context.Context) { diff --git a/rollup/conf/config.json b/rollup/conf/config.json index 39b804a70..feca4a84c 100644 --- a/rollup/conf/config.json +++ b/rollup/conf/config.json @@ -27,7 +27,6 @@ "l2_config": { "confirmations": "0x1", "endpoint": "https://rpc.scroll.io", - "l2_messenger_address": "0x0000000000000000000000000000000000000000", "l2_message_queue_address": "0x0000000000000000000000000000000000000000", "relayer_config": { "rollup_contract_address": "0x0000000000000000000000000000000000000000", diff --git a/rollup/internal/config/l2.go b/rollup/internal/config/l2.go index 8996b88ee..b78c2b162 100644 --- a/rollup/internal/config/l2.go +++ b/rollup/internal/config/l2.go @@ -12,8 +12,6 @@ type L2Config struct { Confirmations rpc.BlockNumber `json:"confirmations"` // l2geth node url. Endpoint string `json:"endpoint"` - // The messenger contract address deployed on layer 2 chain. - L2MessengerAddress common.Address `json:"l2_messenger_address"` // The L2MessageQueue contract address deployed on layer 2 chain. L2MessageQueueAddress common.Address `json:"l2_message_queue_address"` // The WithdrawTrieRootSlot in L2MessageQueue contract. diff --git a/rollup/internal/controller/watcher/common.go b/rollup/internal/controller/watcher/common.go index 8739b2e11..702fd5024 100644 --- a/rollup/internal/controller/watcher/common.go +++ b/rollup/internal/controller/watcher/common.go @@ -1,11 +1,3 @@ package watcher -import "github.com/scroll-tech/go-ethereum/common" - const contractEventsBlocksFetchLimit = int64(10) - -type relayedMessage struct { - msgHash common.Hash - txHash common.Hash - isSuccessful bool -} diff --git a/rollup/internal/controller/watcher/l2_watcher.go b/rollup/internal/controller/watcher/l2_watcher.go index 347633e3d..3a6a832aa 100644 --- a/rollup/internal/controller/watcher/l2_watcher.go +++ b/rollup/internal/controller/watcher/l2_watcher.go @@ -6,7 +6,6 @@ import ( "math/big" "github.com/prometheus/client_golang/prometheus" - geth "github.com/scroll-tech/go-ethereum" "github.com/scroll-tech/go-ethereum/accounts/abi" "github.com/scroll-tech/go-ethereum/common" "github.com/scroll-tech/go-ethereum/common/hexutil" @@ -21,7 +20,6 @@ import ( bridgeAbi "scroll-tech/rollup/abi" "scroll-tech/rollup/internal/orm" - "scroll-tech/rollup/internal/utils" ) // L2WatcherClient provide APIs which support others to subscribe to various event from l2geth @@ -31,65 +29,33 @@ type L2WatcherClient struct { *ethclient.Client - l2BlockOrm *orm.L2Block - l1MessageOrm *orm.L1Message + l2BlockOrm *orm.L2Block confirmations rpc.BlockNumber - messengerAddress common.Address - messengerABI *abi.ABI - messageQueueAddress common.Address messageQueueABI *abi.ABI withdrawTrieRootSlot common.Hash - // The height of the block that the watcher has retrieved event logs - processedMsgHeight uint64 - - stopped uint64 - metrics *l2WatcherMetrics } // NewL2WatcherClient take a l2geth instance to generate a l2watcherclient instance -func NewL2WatcherClient(ctx context.Context, client *ethclient.Client, confirmations rpc.BlockNumber, messengerAddress, messageQueueAddress common.Address, withdrawTrieRootSlot common.Hash, db *gorm.DB, reg prometheus.Registerer) *L2WatcherClient { - l1MessageOrm := orm.NewL1Message(db) - var savedHeight uint64 - l1msg, err := l1MessageOrm.GetLayer1LatestMessageWithLayer2Hash() - if err != nil || l1msg == nil { - log.Warn("fetch height from db failed", "err", err) - savedHeight = 0 - } else { - receipt, err := client.TransactionReceipt(ctx, common.HexToHash(l1msg.Layer2Hash)) - if err != nil || receipt == nil { - log.Warn("get tx from l2 failed", "err", err) - savedHeight = 0 - } else { - savedHeight = receipt.BlockNumber.Uint64() - } - } - - w := L2WatcherClient{ +func NewL2WatcherClient(ctx context.Context, client *ethclient.Client, confirmations rpc.BlockNumber, messageQueueAddress common.Address, withdrawTrieRootSlot common.Hash, db *gorm.DB, reg prometheus.Registerer) *L2WatcherClient { + return &L2WatcherClient{ ctx: ctx, Client: client, - l2BlockOrm: orm.NewL2Block(db), - l1MessageOrm: orm.NewL1Message(db), - processedMsgHeight: savedHeight, - confirmations: confirmations, + l2BlockOrm: orm.NewL2Block(db), - messengerAddress: messengerAddress, - messengerABI: bridgeAbi.L2ScrollMessengerABI, + confirmations: confirmations, messageQueueAddress: messageQueueAddress, messageQueueABI: bridgeAbi.L2MessageQueueABI, withdrawTrieRootSlot: withdrawTrieRootSlot, - stopped: 0, metrics: initL2WatcherMetrics(reg), } - - return &w } const blockTracesFetchLimit = uint64(10) @@ -190,124 +156,3 @@ func (w *L2WatcherClient) getAndStoreBlockTraces(ctx context.Context, from, to u return nil } - -// FetchContractEvent pull latest event logs from given contract address and save in DB -func (w *L2WatcherClient) FetchContractEvent() { - defer func() { - log.Info("l2 watcher fetchContractEvent", "w.processedMsgHeight", w.processedMsgHeight) - }() - - w.metrics.fetchContractEventTotal.Inc() - blockHeight, err := utils.GetLatestConfirmedBlockNumber(w.ctx, w.Client, w.confirmations) - if err != nil { - log.Error("failed to get block number", "err", err) - return - } - - fromBlock := int64(w.processedMsgHeight) + 1 - toBlock := int64(blockHeight) - - for from := fromBlock; from <= toBlock; from += contractEventsBlocksFetchLimit { - to := from + contractEventsBlocksFetchLimit - 1 - - if to > toBlock { - to = toBlock - } - - // warning: uint int conversion... - query := geth.FilterQuery{ - FromBlock: big.NewInt(from), // inclusive - ToBlock: big.NewInt(to), // inclusive - Addresses: []common.Address{ - w.messengerAddress, - w.messageQueueAddress, - }, - Topics: make([][]common.Hash, 1), - } - query.Topics[0] = make([]common.Hash, 4) - query.Topics[0][0] = bridgeAbi.L2SentMessageEventSignature - query.Topics[0][1] = bridgeAbi.L2RelayedMessageEventSignature - query.Topics[0][2] = bridgeAbi.L2FailedRelayedMessageEventSignature - query.Topics[0][3] = bridgeAbi.L2AppendMessageEventSignature - - logs, err := w.FilterLogs(w.ctx, query) - if err != nil { - log.Error("failed to get event logs", "err", err) - return - } - if len(logs) == 0 { - w.processedMsgHeight = uint64(to) - w.metrics.fetchContractEventHeight.Set(float64(to)) - continue - } - log.Info("received new L2 messages", "fromBlock", from, "toBlock", to, "cnt", len(logs)) - - relayedMessageEvents, err := w.parseBridgeEventLogs(logs) - if err != nil { - log.Error("failed to parse emitted event log", "err", err) - return - } - - relayedMessageCount := int64(len(relayedMessageEvents)) - w.metrics.rollupL2MsgsRelayedEventsTotal.Add(float64(relayedMessageCount)) - log.Info("L2 events types", "RelayedMessageCount", relayedMessageCount) - - // Update relayed message first to make sure we don't forget to update submited message. - // Since, we always start sync from the latest unprocessed message. - for _, msg := range relayedMessageEvents { - var msgStatus types.MsgStatus - if msg.isSuccessful { - msgStatus = types.MsgConfirmed - } else { - msgStatus = types.MsgFailed - } - if err = w.l1MessageOrm.UpdateLayer1StatusAndLayer2Hash(w.ctx, msg.msgHash.String(), msgStatus, msg.txHash.String()); err != nil { - log.Error("Failed to update layer1 status and layer2 hash", "err", err) - return - } - } - - w.processedMsgHeight = uint64(to) - w.metrics.fetchContractEventHeight.Set(float64(to)) - } -} - -func (w *L2WatcherClient) parseBridgeEventLogs(logs []gethTypes.Log) ([]relayedMessage, error) { - // Need use contract abi to parse event Log - // Can only be tested after we have our contracts set up - - var relayedMessages []relayedMessage - for _, vLog := range logs { - switch vLog.Topics[0] { - case bridgeAbi.L2RelayedMessageEventSignature: - event := bridgeAbi.L2RelayedMessageEvent{} - err := utils.UnpackLog(w.messengerABI, &event, "RelayedMessage", vLog) - if err != nil { - log.Warn("Failed to unpack layer2 RelayedMessage event", "err", err) - return relayedMessages, err - } - - relayedMessages = append(relayedMessages, relayedMessage{ - msgHash: event.MessageHash, - txHash: vLog.TxHash, - isSuccessful: true, - }) - case bridgeAbi.L2FailedRelayedMessageEventSignature: - event := bridgeAbi.L2FailedRelayedMessageEvent{} - err := utils.UnpackLog(w.messengerABI, &event, "FailedRelayedMessage", vLog) - if err != nil { - log.Warn("Failed to unpack layer2 FailedRelayedMessage event", "err", err) - return relayedMessages, err - } - - relayedMessages = append(relayedMessages, relayedMessage{ - msgHash: event.MessageHash, - txHash: vLog.TxHash, - isSuccessful: false, - }) - log.Error("Unknown event", "topic", vLog.Topics[0], "txHash", vLog.TxHash) - } - } - - return relayedMessages, nil -} diff --git a/rollup/internal/controller/watcher/l2_watcher_test.go b/rollup/internal/controller/watcher/l2_watcher_test.go index ecdbb4465..29f523efd 100644 --- a/rollup/internal/controller/watcher/l2_watcher_test.go +++ b/rollup/internal/controller/watcher/l2_watcher_test.go @@ -3,72 +3,31 @@ package watcher import ( "context" "crypto/ecdsa" - "errors" "math/big" - "strconv" "testing" - "time" "gorm.io/gorm" - "github.com/agiledragon/gomonkey/v2" - "github.com/scroll-tech/go-ethereum/accounts/abi" "github.com/scroll-tech/go-ethereum/accounts/abi/bind" "github.com/scroll-tech/go-ethereum/common" - gethTypes "github.com/scroll-tech/go-ethereum/core/types" "github.com/scroll-tech/go-ethereum/ethclient" "github.com/scroll-tech/go-ethereum/rpc" - "github.com/smartystreets/goconvey/convey" "github.com/stretchr/testify/assert" "scroll-tech/common/database" - "scroll-tech/common/types" cutils "scroll-tech/common/utils" - bridgeAbi "scroll-tech/rollup/abi" - "scroll-tech/rollup/internal/controller/sender" "scroll-tech/rollup/internal/orm" - "scroll-tech/rollup/internal/utils" "scroll-tech/rollup/mock_bridge" ) func setupL2Watcher(t *testing.T) (*L2WatcherClient, *gorm.DB) { db := setupDB(t) l2cfg := cfg.L2Config - watcher := NewL2WatcherClient(context.Background(), l2Cli, l2cfg.Confirmations, l2cfg.L2MessengerAddress, - l2cfg.L2MessageQueueAddress, l2cfg.WithdrawTrieRootSlot, db, nil) + watcher := NewL2WatcherClient(context.Background(), l2Cli, l2cfg.Confirmations, l2cfg.L2MessageQueueAddress, l2cfg.WithdrawTrieRootSlot, db, nil) return watcher, db } -func testCreateNewWatcherAndStop(t *testing.T) { - wc, db := setupL2Watcher(t) - subCtx, cancel := context.WithCancel(context.Background()) - defer func() { - cancel() - defer database.CloseDB(db) - }() - - loopToFetchEvent(subCtx, wc) - - l1cfg := cfg.L1Config - l1cfg.RelayerConfig.SenderConfig.Confirmations = rpc.LatestBlockNumber - newSender, err := sender.NewSender(context.Background(), l1cfg.RelayerConfig.SenderConfig, l1cfg.RelayerConfig.GasOracleSenderPrivateKey, "test", "test", types.SenderTypeUnknown, db, nil) - assert.NoError(t, err) - - // Create several transactions and commit to block - numTransactions := 3 - toAddress := common.HexToAddress("0x4592d8f8d7b001e72cb26a73e4fa1806a51ac79d") - for i := 0; i < numTransactions; i++ { - _, err = newSender.SendTransaction(strconv.Itoa(1000+i), &toAddress, big.NewInt(1000000000), nil, 0) - assert.NoError(t, err) - <-newSender.ConfirmChan() - } - - blockNum, err := l2Cli.BlockNumber(context.Background()) - assert.NoError(t, err) - assert.GreaterOrEqual(t, blockNum, uint64(numTransactions)) -} - func testFetchRunningMissingBlocks(t *testing.T) { _, db := setupL2Watcher(t) defer database.CloseDB(db) @@ -97,7 +56,7 @@ func testFetchRunningMissingBlocks(t *testing.T) { func prepareWatcherClient(l2Cli *ethclient.Client, db *gorm.DB, contractAddr common.Address) *L2WatcherClient { confirmations := rpc.LatestBlockNumber - return NewL2WatcherClient(context.Background(), l2Cli, confirmations, contractAddr, contractAddr, common.Hash{}, db, nil) + return NewL2WatcherClient(context.Background(), l2Cli, confirmations, contractAddr, common.Hash{}, db, nil) } func prepareAuth(t *testing.T, l2Cli *ethclient.Client, privateKey *ecdsa.PrivateKey) *bind.TransactOpts { @@ -110,87 +69,3 @@ func prepareAuth(t *testing.T, l2Cli *ethclient.Client, privateKey *ecdsa.Privat auth.GasLimit = 500000 return auth } - -func loopToFetchEvent(subCtx context.Context, watcher *L2WatcherClient) { - go cutils.Loop(subCtx, 2*time.Second, watcher.FetchContractEvent) -} - -func testParseBridgeEventLogsL2RelayedMessageEventSignature(t *testing.T) { - watcher, db := setupL2Watcher(t) - defer database.CloseDB(db) - - logs := []gethTypes.Log{ - { - Topics: []common.Hash{bridgeAbi.L2RelayedMessageEventSignature}, - BlockNumber: 100, - TxHash: common.HexToHash("0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347"), - }, - } - - convey.Convey("unpack RelayedMessage log failure", t, func() { - targetErr := errors.New("UnpackLog RelayedMessage failure") - patchGuard := gomonkey.ApplyFunc(utils.UnpackLog, func(c *abi.ABI, out interface{}, event string, log gethTypes.Log) error { - return targetErr - }) - defer patchGuard.Reset() - - relayedMessages, err := watcher.parseBridgeEventLogs(logs) - assert.EqualError(t, err, targetErr.Error()) - assert.Empty(t, relayedMessages) - }) - - convey.Convey("L2RelayedMessageEventSignature success", t, func() { - msgHash := common.HexToHash("0xad3228b676f7d3cd4284a5443f17f1962b36e491b30a40b2405849e597ba5fb5") - patchGuard := gomonkey.ApplyFunc(utils.UnpackLog, func(c *abi.ABI, out interface{}, event string, log gethTypes.Log) error { - tmpOut := out.(*bridgeAbi.L2RelayedMessageEvent) - tmpOut.MessageHash = msgHash - return nil - }) - defer patchGuard.Reset() - - relayedMessages, err := watcher.parseBridgeEventLogs(logs) - assert.NoError(t, err) - assert.Len(t, relayedMessages, 1) - assert.Equal(t, relayedMessages[0].msgHash, msgHash) - }) -} - -func testParseBridgeEventLogsL2FailedRelayedMessageEventSignature(t *testing.T) { - watcher, db := setupL2Watcher(t) - defer database.CloseDB(db) - - logs := []gethTypes.Log{ - { - Topics: []common.Hash{bridgeAbi.L2FailedRelayedMessageEventSignature}, - BlockNumber: 100, - TxHash: common.HexToHash("0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347"), - }, - } - - convey.Convey("unpack FailedRelayedMessage log failure", t, func() { - targetErr := errors.New("UnpackLog FailedRelayedMessage failure") - patchGuard := gomonkey.ApplyFunc(utils.UnpackLog, func(c *abi.ABI, out interface{}, event string, log gethTypes.Log) error { - return targetErr - }) - defer patchGuard.Reset() - - relayedMessages, err := watcher.parseBridgeEventLogs(logs) - assert.EqualError(t, err, targetErr.Error()) - assert.Empty(t, relayedMessages) - }) - - convey.Convey("L2FailedRelayedMessageEventSignature success", t, func() { - msgHash := common.HexToHash("0xad3228b676f7d3cd4284a5443f17f1962b36e491b30a40b2405849e597ba5fb5") - patchGuard := gomonkey.ApplyFunc(utils.UnpackLog, func(c *abi.ABI, out interface{}, event string, log gethTypes.Log) error { - tmpOut := out.(*bridgeAbi.L2FailedRelayedMessageEvent) - tmpOut.MessageHash = msgHash - return nil - }) - defer patchGuard.Reset() - - relayedMessages, err := watcher.parseBridgeEventLogs(logs) - assert.NoError(t, err) - assert.Len(t, relayedMessages, 1) - assert.Equal(t, relayedMessages[0].msgHash, msgHash) - }) -} diff --git a/rollup/internal/controller/watcher/watcher_test.go b/rollup/internal/controller/watcher/watcher_test.go index c032a8079..68af7c3fb 100644 --- a/rollup/internal/controller/watcher/watcher_test.go +++ b/rollup/internal/controller/watcher/watcher_test.go @@ -110,10 +110,7 @@ func TestFunction(t *testing.T) { t.Run("TestParseBridgeEventLogsL1FinalizeBatchEventSignature", testParseBridgeEventLogsL1FinalizeBatchEventSignature) // Run l2 watcher test cases. - t.Run("TestCreateNewWatcherAndStop", testCreateNewWatcherAndStop) t.Run("TestFetchRunningMissingBlocks", testFetchRunningMissingBlocks) - t.Run("TestParseBridgeEventLogsL2RelayedMessageEventSignature", testParseBridgeEventLogsL2RelayedMessageEventSignature) - t.Run("TestParseBridgeEventLogsL2FailedRelayedMessageEventSignature", testParseBridgeEventLogsL2FailedRelayedMessageEventSignature) // Run chunk proposer test cases. t.Run("TestChunkProposerLimits", testChunkProposerLimits)