diff --git a/Jenkinsfile b/Jenkinsfile index da0a508d4..b322f33ff 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -28,7 +28,7 @@ pipeline { } stage('Check Bridge Compilation') { steps { - sh 'make -C bridge bridge' + sh 'make -C bridge bridge_bins' } } stage('Check Coordinator Compilation') { diff --git a/bridge/Makefile b/bridge/Makefile index d7301acb3..ba47c656d 100644 --- a/bridge/Makefile +++ b/bridge/Makefile @@ -8,8 +8,23 @@ mock_abi: go run github.com/scroll-tech/go-ethereum/cmd/abigen --sol mock_bridge/MockBridgeL1.sol --pkg mock_bridge --out mock_bridge/MockBridgeL1.go go run github.com/scroll-tech/go-ethereum/cmd/abigen --sol mock_bridge/MockBridgeL2.sol --pkg mock_bridge --out mock_bridge/MockBridgeL2.go -bridge: ## Builds the Bridge instance. - go build -o $(PWD)/build/bin/bridge ./cmd +bridge_bins: ## Builds the Bridge bins. + go build -o $(PWD)/build/bin/event_watcher ./cmd/event_watcher/ + go build -o $(PWD)/build/bin/gas_oracle ./cmd/gas_oracle/ + go build -o $(PWD)/build/bin/message_relayer ./cmd/msg_relayer/ + go build -o $(PWD)/build/bin/rollup_relayer ./cmd/rollup_relayer/ + +event_watcher: ## Builds the event_watcher bin + go build -o $(PWD)/build/bin/event_watcher ./cmd/event_watcher/ + +gas_oracle: ## Builds the gas_oracle bin + go build -o $(PWD)/build/bin/gas_oracle ./cmd/gas_oracle/ + +message_relayer: ## Builds the message_relayer bin + go build -o $(PWD)/build/bin/message_relayer ./cmd/msg_relayer/ + +rollup_relayer: ## Builds the rollup_relayer bin + go build -o $(PWD)/build/bin/rollup_relayer ./cmd/rollup_relayer/ test: go test -v -race -coverprofile=coverage.txt -covermode=atomic -p 1 $(PWD)/... @@ -20,8 +35,14 @@ lint: ## Lint the files - used for CI clean: ## Empty out the bin folder @rm -rf build/bin -docker: - DOCKER_BUILDKIT=1 docker build -t scrolltech/${IMAGE_NAME}:${IMAGE_VERSION} ${REPO_ROOT_DIR}/ -f ${REPO_ROOT_DIR}/build/dockerfiles/bridge.Dockerfile - docker_push: - docker push scrolltech/${IMAGE_NAME}:${IMAGE_VERSION} + docker docker push scrolltech/gas-oracle:${IMAGE_VERSION} + docker docker push scrolltech/event-watcher:${IMAGE_VERSION} + docker docker push scrolltech/rollup-relayer:${IMAGE_VERSION} + docker docker push scrolltech/msg-relayer:${IMAGE_VERSION} + +docker: + DOCKER_BUILDKIT=1 docker build -t scrolltech/gas-oracle:${IMAGE_VERSION} ${REPO_ROOT_DIR}/ -f ${REPO_ROOT_DIR}/build/dockerfiles/gas_oracle.Dockerfile + DOCKER_BUILDKIT=1 docker build -t scrolltech/event-watcher:${IMAGE_VERSION} ${REPO_ROOT_DIR}/ -f ${REPO_ROOT_DIR}/build/dockerfiles/event_watcher.Dockerfile + DOCKER_BUILDKIT=1 docker build -t scrolltech/rollup-relayer:${IMAGE_VERSION} ${REPO_ROOT_DIR}/ -f ${REPO_ROOT_DIR}/build/dockerfiles/rollup_relayer.Dockerfile + DOCKER_BUILDKIT=1 docker build -t scrolltech/msg-relayer:${IMAGE_VERSION} ${REPO_ROOT_DIR}/ -f ${REPO_ROOT_DIR}/build/dockerfiles/msg_relayer.Dockerfile diff --git a/bridge/cmd/app/app.go b/bridge/cmd/app/app.go deleted file mode 100644 index 5f192e79e..000000000 --- a/bridge/cmd/app/app.go +++ /dev/null @@ -1,130 +0,0 @@ -package app - -import ( - "context" - "fmt" - "os" - "os/signal" - - "github.com/scroll-tech/go-ethereum/log" - "github.com/urfave/cli/v2" - - "scroll-tech/database" - - "scroll-tech/common/metrics" - "scroll-tech/common/utils" - "scroll-tech/common/version" - - "scroll-tech/bridge/config" - "scroll-tech/bridge/l1" - "scroll-tech/bridge/l2" -) - -var ( - app *cli.App -) - -func init() { - // Set up Bridge app info. - app = cli.NewApp() - - app.Action = action - app.Name = "bridge" - app.Usage = "The Scroll Bridge" - app.Version = version.Version - app.Flags = append(app.Flags, utils.CommonFlags...) - app.Flags = append(app.Flags, apiFlags...) - - app.Before = func(ctx *cli.Context) error { - return utils.LogSetup(ctx) - } - - // Register `bridge-test` app for integration-test. - utils.RegisterSimulation(app, "bridge-test") -} - -func action(ctx *cli.Context) error { - // Load config file. - cfgFile := ctx.String(utils.ConfigFileFlag.Name) - cfg, err := config.NewConfig(cfgFile) - if err != nil { - log.Crit("failed to load config file", "config file", cfgFile, "error", err) - } - - // Start metrics server. - metrics.Serve(context.Background(), ctx) - - // Init db connection. - var ormFactory database.OrmFactory - if ormFactory, err = database.NewOrmFactory(cfg.DBConfig); err != nil { - log.Crit("failed to init db connection", "err", err) - } - - var ( - l1Backend *l1.Backend - l2Backend *l2.Backend - ) - // @todo change nil to actual client after https://scroll-tech/bridge/pull/40 merged - l1Backend, err = l1.New(ctx.Context, cfg.L1Config, ormFactory) - if err != nil { - return err - } - l2Backend, err = l2.New(ctx.Context, cfg.L2Config, ormFactory) - if err != nil { - return err - } - defer func() { - l1Backend.Stop() - l2Backend.Stop() - err = ormFactory.Close() - if err != nil { - log.Error("can not close ormFactory", "error", err) - } - }() - - // Start all modules. - if err = l1Backend.Start(); err != nil { - log.Crit("couldn't start l1 backend", "error", err) - } - if err = l2Backend.Start(); err != nil { - log.Crit("couldn't start l2 backend", "error", err) - } - - // Register api and start rpc service. - if ctx.Bool(httpEnabledFlag.Name) { - handler, addr, err := utils.StartHTTPEndpoint( - fmt.Sprintf( - "%s:%d", - ctx.String(httpListenAddrFlag.Name), - ctx.Int(httpPortFlag.Name)), - l2Backend.APIs()) - if err != nil { - log.Crit("Could not start RPC api", "error", err) - } - defer func() { - _ = handler.Shutdown(ctx.Context) - log.Info("HTTP endpoint closed", "url", fmt.Sprintf("http://%v/", addr)) - }() - log.Info("HTTP endpoint opened", "url", fmt.Sprintf("http://%v/", addr)) - } - - log.Info("Start bridge successfully") - - // Catch CTRL-C to ensure a graceful shutdown. - interrupt := make(chan os.Signal, 1) - signal.Notify(interrupt, os.Interrupt) - - // Wait until the interrupt signal is received from an OS signal. - <-interrupt - - return nil -} - -// Run run bridge cmd instance. -func Run() { - // Run the bridge. - if err := app.Run(os.Args); err != nil { - _, _ = fmt.Fprintln(os.Stderr, err) - os.Exit(1) - } -} diff --git a/bridge/cmd/app/app_test.go b/bridge/cmd/app/app_test.go deleted file mode 100644 index 64701eba2..000000000 --- a/bridge/cmd/app/app_test.go +++ /dev/null @@ -1,19 +0,0 @@ -package app - -import ( - "fmt" - "testing" - "time" - - "scroll-tech/common/cmd" - "scroll-tech/common/version" -) - -func TestRunBridge(t *testing.T) { - bridge := cmd.NewCmd("bridge-test", "--version") - defer bridge.WaitExit() - - // wait result - bridge.ExpectWithTimeout(t, true, time.Second*3, fmt.Sprintf("bridge version %s", version.Version)) - bridge.RunApp(nil) -} diff --git a/bridge/cmd/app/flags.go b/bridge/cmd/app/flags.go deleted file mode 100644 index 5a5c28174..000000000 --- a/bridge/cmd/app/flags.go +++ /dev/null @@ -1,31 +0,0 @@ -package app - -import ( - "github.com/urfave/cli/v2" -) - -var ( - apiFlags = []cli.Flag{ - &httpEnabledFlag, - &httpListenAddrFlag, - &httpPortFlag, - } - // httpEnabledFlag enable rpc server. - httpEnabledFlag = cli.BoolFlag{ - Name: "http", - Usage: "Enable the HTTP-RPC server", - Value: false, - } - // httpListenAddrFlag set the http address. - httpListenAddrFlag = cli.StringFlag{ - Name: "http.addr", - Usage: "HTTP-RPC server listening interface", - Value: "localhost", - } - // httpPortFlag set http.port. - httpPortFlag = cli.IntFlag{ - Name: "http.port", - Usage: "HTTP-RPC server listening port", - Value: 8290, - } -) diff --git a/bridge/cmd/event_watcher/app/app.go b/bridge/cmd/event_watcher/app/app.go new file mode 100644 index 000000000..24b96e489 --- /dev/null +++ b/bridge/cmd/event_watcher/app/app.go @@ -0,0 +1,114 @@ +package app + +import ( + "context" + "fmt" + "os" + "os/signal" + "time" + + "github.com/scroll-tech/go-ethereum/ethclient" + "github.com/scroll-tech/go-ethereum/log" + "github.com/urfave/cli/v2" + + "scroll-tech/database" + + "scroll-tech/common/metrics" + "scroll-tech/common/version" + + "scroll-tech/bridge/config" + "scroll-tech/bridge/watcher" + + cutils "scroll-tech/common/utils" +) + +var ( + app *cli.App +) + +func init() { + // Set up event-watcher app info. + app = cli.NewApp() + + app.Action = action + app.Name = "event-watcher" + app.Usage = "The Scroll Event Watcher" + app.Version = version.Version + app.Flags = append(app.Flags, cutils.CommonFlags...) + app.Commands = []*cli.Command{} + + app.Before = func(ctx *cli.Context) error { + return cutils.LogSetup(ctx) + } + + // Register `event-watcher-test` app for integration-test. + cutils.RegisterSimulation(app, "event-watcher-test") +} + +func action(ctx *cli.Context) error { + // Load config file. + cfgFile := ctx.String(cutils.ConfigFileFlag.Name) + cfg, err := config.NewConfig(cfgFile) + if err != nil { + log.Crit("failed to load config file", "config file", cfgFile, "error", err) + } + + subCtx, cancel := context.WithCancel(ctx.Context) + // Init db connection + var ormFactory database.OrmFactory + if ormFactory, err = database.NewOrmFactory(cfg.DBConfig); err != nil { + log.Crit("failed to init db connection", "err", err) + } + + defer func() { + cancel() + err = ormFactory.Close() + if err != nil { + log.Error("can not close ormFactory", "error", err) + } + }() + + // Start metrics server. + metrics.Serve(subCtx, ctx) + l1client, err := ethclient.Dial(cfg.L1Config.Endpoint) + if err != nil { + log.Error("failed to connect l1 geth", "config file", cfgFile, "error", err) + return err + } + + l2client, err := ethclient.Dial(cfg.L2Config.Endpoint) + if err != nil { + log.Error("failed to connect l2 geth", "config file", cfgFile, "error", err) + return err + } + l1watcher := watcher.NewL1WatcherClient(ctx.Context, l1client, cfg.L1Config.StartHeight, cfg.L1Config.Confirmations, cfg.L1Config.L1MessengerAddress, cfg.L1Config.L1MessageQueueAddress, cfg.L1Config.ScrollChainContractAddress, ormFactory) + l2watcher := watcher.NewL2WatcherClient(ctx.Context, l2client, cfg.L2Config.Confirmations, cfg.L2Config.L2MessengerAddress, cfg.L2Config.L2MessageQueueAddress, cfg.L2Config.WithdrawTrieRootSlot, ormFactory) + + go cutils.Loop(subCtx, 10*time.Second, func() { + if loopErr := l1watcher.FetchContractEvent(); loopErr != nil { + log.Error("Failed to fetch bridge contract", "err", loopErr) + } + }) + + // Start l2 watcher process + go cutils.Loop(subCtx, 2*time.Second, l2watcher.FetchContractEvent) + // Finish start all l2 functions + log.Info("Start event-watcher successfully") + + // Catch CTRL-C to ensure a graceful shutdown. + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, os.Interrupt) + + // Wait until the interrupt signal is received from an OS signal. + <-interrupt + + return nil +} + +// Run event watcher cmd instance. +func Run() { + if err := app.Run(os.Args); err != nil { + _, _ = fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } +} diff --git a/bridge/cmd/event_watcher/main.go b/bridge/cmd/event_watcher/main.go new file mode 100644 index 000000000..7a29b61fd --- /dev/null +++ b/bridge/cmd/event_watcher/main.go @@ -0,0 +1,7 @@ +package main + +import "scroll-tech/bridge/cmd/event_watcher/app" + +func main() { + app.Run() +} diff --git a/bridge/cmd/gas_oracle/app/app.go b/bridge/cmd/gas_oracle/app/app.go new file mode 100644 index 000000000..5fbde6696 --- /dev/null +++ b/bridge/cmd/gas_oracle/app/app.go @@ -0,0 +1,136 @@ +package app + +import ( + "context" + "fmt" + "os" + "os/signal" + "time" + + "github.com/scroll-tech/go-ethereum/ethclient" + "github.com/scroll-tech/go-ethereum/log" + "github.com/urfave/cli/v2" + + "scroll-tech/database" + + "scroll-tech/common/metrics" + "scroll-tech/common/version" + + "scroll-tech/bridge/config" + "scroll-tech/bridge/relayer" + "scroll-tech/bridge/utils" + "scroll-tech/bridge/watcher" + + cutils "scroll-tech/common/utils" +) + +var ( + app *cli.App +) + +func init() { + // Set up gas-oracle app info. + app = cli.NewApp() + + app.Action = action + app.Name = "gas-oracle" + app.Usage = "The Scroll Gas Oracle" + app.Description = "Scroll Gas Oracle." + app.Version = version.Version + app.Flags = append(app.Flags, cutils.CommonFlags...) + app.Commands = []*cli.Command{} + + app.Before = func(ctx *cli.Context) error { + return cutils.LogSetup(ctx) + } + + // Register `gas-oracle-test` app for integration-test. + cutils.RegisterSimulation(app, "gas-oracle-test") +} + +func action(ctx *cli.Context) error { + // Load config file. + cfgFile := ctx.String(cutils.ConfigFileFlag.Name) + cfg, err := config.NewConfig(cfgFile) + if err != nil { + log.Crit("failed to load config file", "config file", cfgFile, "error", err) + } + subCtx, cancel := context.WithCancel(ctx.Context) + // Init db connection + var ormFactory database.OrmFactory + if ormFactory, err = database.NewOrmFactory(cfg.DBConfig); err != nil { + log.Crit("failed to init db connection", "err", err) + } + + defer func() { + cancel() + err = ormFactory.Close() + if err != nil { + log.Error("can not close ormFactory", "error", err) + } + }() + // Start metrics server. + metrics.Serve(subCtx, ctx) + + l1client, err := ethclient.Dial(cfg.L1Config.Endpoint) + if err != nil { + log.Error("failed to connect l1 geth", "config file", cfgFile, "error", err) + return err + } + + // Init l2geth connection + l2client, err := ethclient.Dial(cfg.L2Config.Endpoint) + if err != nil { + log.Error("failed to connect l2 geth", "config file", cfgFile, "error", err) + return err + } + + l1watcher := watcher.NewL1WatcherClient(ctx.Context, l1client, cfg.L1Config.StartHeight, cfg.L1Config.Confirmations, cfg.L1Config.L1MessengerAddress, cfg.L1Config.L1MessageQueueAddress, cfg.L1Config.ScrollChainContractAddress, ormFactory) + + l1relayer, err := relayer.NewLayer1Relayer(ctx.Context, ormFactory, cfg.L1Config.RelayerConfig) + if err != nil { + log.Error("failed to create new l1 relayer", "config file", cfgFile, "error", err) + return err + } + l2relayer, err := relayer.NewLayer2Relayer(ctx.Context, l2client, ormFactory, cfg.L2Config.RelayerConfig) + if err != nil { + log.Error("failed to create new l2 relayer", "config file", cfgFile, "error", err) + return err + } + // Start l1 watcher process + go cutils.LoopWithContext(subCtx, 10*time.Second, func(ctx context.Context) { + number, loopErr := utils.GetLatestConfirmedBlockNumber(ctx, l1client, cfg.L1Config.Confirmations) + if loopErr != nil { + log.Error("failed to get block number", "err", loopErr) + return + } + + if loopErr = l1watcher.FetchBlockHeader(number); loopErr != nil { + log.Error("Failed to fetch L1 block header", "lastest", number, "err", loopErr) + } + }) + + // Start l1relayer process + go cutils.Loop(subCtx, 10*time.Second, l1relayer.ProcessGasPriceOracle) + go cutils.Loop(subCtx, 2*time.Second, l2relayer.ProcessGasPriceOracle) + + // Finish start all message relayer functions + log.Info("Start gas-oracle successfully") + + // Catch CTRL-C to ensure a graceful shutdown. + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, os.Interrupt) + + // Wait until the interrupt signal is received from an OS signal. + <-interrupt + + return nil +} + +// Run message_relayer cmd instance. +func Run() { + if err := app.Run(os.Args); err != nil { + _, _ = fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } +} diff --git a/bridge/cmd/gas_oracle/main.go b/bridge/cmd/gas_oracle/main.go new file mode 100644 index 000000000..18950bf81 --- /dev/null +++ b/bridge/cmd/gas_oracle/main.go @@ -0,0 +1,7 @@ +package main + +import "scroll-tech/bridge/cmd/gas_oracle/app" + +func main() { + app.Run() +} diff --git a/bridge/cmd/main.go b/bridge/cmd/main.go deleted file mode 100644 index 1438d2586..000000000 --- a/bridge/cmd/main.go +++ /dev/null @@ -1,7 +0,0 @@ -package main - -import "scroll-tech/bridge/cmd/app" - -func main() { - app.Run() -} diff --git a/bridge/cmd/msg_relayer/app/app.go b/bridge/cmd/msg_relayer/app/app.go new file mode 100644 index 000000000..d58b4f57a --- /dev/null +++ b/bridge/cmd/msg_relayer/app/app.go @@ -0,0 +1,118 @@ +package app + +import ( + "context" + "fmt" + "os" + "os/signal" + "time" + + "github.com/scroll-tech/go-ethereum/ethclient" + "github.com/scroll-tech/go-ethereum/log" + "github.com/urfave/cli/v2" + + "scroll-tech/database" + + "scroll-tech/common/metrics" + "scroll-tech/common/version" + + "scroll-tech/bridge/config" + "scroll-tech/bridge/relayer" + + cutils "scroll-tech/common/utils" +) + +var ( + app *cli.App +) + +func init() { + // Set up message-relayer app info. + app = cli.NewApp() + + app.Action = action + app.Name = "message-relayer" + app.Usage = "The Scroll Message Relayer" + app.Description = "Message Relayer contains two main service: 1) relay l1 message to l2. 2) relay l2 message to l1." + app.Version = version.Version + app.Flags = append(app.Flags, cutils.CommonFlags...) + app.Commands = []*cli.Command{} + + app.Before = func(ctx *cli.Context) error { + return cutils.LogSetup(ctx) + } + + // Register `message-relayer-test` app for integration-test. + cutils.RegisterSimulation(app, "message-relayer-test") +} + +func action(ctx *cli.Context) error { + // Load config file. + cfgFile := ctx.String(cutils.ConfigFileFlag.Name) + cfg, err := config.NewConfig(cfgFile) + if err != nil { + log.Crit("failed to load config file", "config file", cfgFile, "error", err) + } + subCtx, cancel := context.WithCancel(ctx.Context) + + // Init db connection + var ormFactory database.OrmFactory + if ormFactory, err = database.NewOrmFactory(cfg.DBConfig); err != nil { + log.Crit("failed to init db connection", "err", err) + } + + defer func() { + cancel() + err = ormFactory.Close() + if err != nil { + log.Error("can not close ormFactory", "error", err) + } + }() + + // Start metrics server. + metrics.Serve(subCtx, ctx) + + // Init l2geth connection + l2client, err := ethclient.Dial(cfg.L2Config.Endpoint) + if err != nil { + log.Error("failed to connect l2 geth", "config file", cfgFile, "error", err) + return err + } + + l1relayer, err := relayer.NewLayer1Relayer(ctx.Context, ormFactory, cfg.L1Config.RelayerConfig) + if err != nil { + log.Error("failed to create new l1 relayer", "config file", cfgFile, "error", err) + return err + } + l2relayer, err := relayer.NewLayer2Relayer(ctx.Context, l2client, ormFactory, cfg.L2Config.RelayerConfig) + if err != nil { + log.Error("failed to create new l2 relayer", "config file", cfgFile, "error", err) + return err + } + + // Start l1relayer process + go cutils.Loop(subCtx, 10*time.Second, l1relayer.ProcessSavedEvents) + + // Start l2relayer process + go cutils.Loop(subCtx, 2*time.Second, l2relayer.ProcessSavedEvents) + + // Finish start all message relayer functions + log.Info("Start message-relayer successfully") + + // Catch CTRL-C to ensure a graceful shutdown. + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, os.Interrupt) + + // Wait until the interrupt signal is received from an OS signal. + <-interrupt + + return nil +} + +// Run message_relayer cmd instance. +func Run() { + if err := app.Run(os.Args); err != nil { + _, _ = fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } +} diff --git a/bridge/cmd/msg_relayer/main.go b/bridge/cmd/msg_relayer/main.go new file mode 100644 index 000000000..4ef8c79e1 --- /dev/null +++ b/bridge/cmd/msg_relayer/main.go @@ -0,0 +1,7 @@ +package main + +import "scroll-tech/bridge/cmd/msg_relayer/app" + +func main() { + app.Run() +} diff --git a/bridge/cmd/rollup_relayer/app/app.go b/bridge/cmd/rollup_relayer/app/app.go new file mode 100644 index 000000000..1257242e8 --- /dev/null +++ b/bridge/cmd/rollup_relayer/app/app.go @@ -0,0 +1,133 @@ +package app + +import ( + "context" + "fmt" + "os" + "os/signal" + "time" + + "github.com/scroll-tech/go-ethereum/ethclient" + "github.com/scroll-tech/go-ethereum/log" + "github.com/urfave/cli/v2" + + "scroll-tech/database" + + "scroll-tech/common/metrics" + "scroll-tech/common/version" + + "scroll-tech/bridge/config" + "scroll-tech/bridge/relayer" + "scroll-tech/bridge/utils" + "scroll-tech/bridge/watcher" + + cutils "scroll-tech/common/utils" +) + +var ( + app *cli.App +) + +func init() { + // Set up rollup-relayer app info. + app = cli.NewApp() + + app.Action = action + app.Name = "rollup-relayer" + app.Usage = "The Scroll Rollup Relayer" + app.Version = version.Version + app.Flags = append(app.Flags, cutils.CommonFlags...) + app.Commands = []*cli.Command{} + + app.Before = func(ctx *cli.Context) error { + return cutils.LogSetup(ctx) + } + // Register `rollup-relayer-test` app for integration-test. + cutils.RegisterSimulation(app, "rollup-relayer-test") +} + +func action(ctx *cli.Context) error { + // Load config file. + cfgFile := ctx.String(cutils.ConfigFileFlag.Name) + cfg, err := config.NewConfig(cfgFile) + if err != nil { + log.Crit("failed to load config file", "config file", cfgFile, "error", err) + } + + subCtx, cancel := context.WithCancel(ctx.Context) + + // init db connection + var ormFactory database.OrmFactory + if ormFactory, err = database.NewOrmFactory(cfg.DBConfig); err != nil { + log.Crit("failed to init db connection", "err", err) + } + defer func() { + cancel() + err = ormFactory.Close() + if err != nil { + log.Error("can not close ormFactory", "error", err) + } + }() + + // Start metrics server. + metrics.Serve(subCtx, ctx) + + // Init l2geth connection + l2client, err := ethclient.Dial(cfg.L2Config.Endpoint) + if err != nil { + log.Error("failed to connect l2 geth", "config file", cfgFile, "error", err) + return err + } + + l2relayer, err := relayer.NewLayer2Relayer(ctx.Context, l2client, ormFactory, cfg.L2Config.RelayerConfig) + if err != nil { + log.Error("failed to create l2 relayer", "config file", cfgFile, "error", err) + return err + } + + batchProposer := watcher.NewBatchProposer(subCtx, cfg.L2Config.BatchProposerConfig, l2relayer, ormFactory) + if err != nil { + log.Error("failed to create batchProposer", "config file", cfgFile, "error", err) + return err + } + + l2watcher := watcher.NewL2WatcherClient(subCtx, l2client, cfg.L2Config.Confirmations, cfg.L2Config.L2MessengerAddress, cfg.L2Config.L2MessageQueueAddress, cfg.L2Config.WithdrawTrieRootSlot, ormFactory) + + // Watcher loop to fetch missing blocks + go cutils.LoopWithContext(subCtx, 2*time.Second, func(ctx context.Context) { + number, loopErr := utils.GetLatestConfirmedBlockNumber(ctx, l2client, cfg.L2Config.Confirmations) + if loopErr != nil { + log.Error("failed to get block number", "err", loopErr) + return + } + l2watcher.TryFetchRunningMissingBlocks(ctx, number) + }) + + // Batch proposer loop + go cutils.Loop(subCtx, 2*time.Second, func() { + batchProposer.TryProposeBatch() + batchProposer.TryCommitBatches() + }) + + go cutils.Loop(subCtx, 2*time.Second, l2relayer.ProcessCommittedBatches) + + // Finish start all rollup relayer functions. + log.Info("Start rollup-relayer successfully") + + // Catch CTRL-C to ensure a graceful shutdown. + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, os.Interrupt) + + // Wait until the interrupt signal is received from an OS signal. + <-interrupt + + return nil +} + +// Run rollup relayer cmd instance. +func Run() { + if err := app.Run(os.Args); err != nil { + _, _ = fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } +} diff --git a/bridge/cmd/rollup_relayer/main.go b/bridge/cmd/rollup_relayer/main.go new file mode 100644 index 000000000..8503d4a6d --- /dev/null +++ b/bridge/cmd/rollup_relayer/main.go @@ -0,0 +1,7 @@ +package main + +import "scroll-tech/bridge/cmd/rollup_relayer/app" + +func main() { + app.Run() +} diff --git a/bridge/l1/backend.go b/bridge/l1/backend.go deleted file mode 100644 index 2e6491613..000000000 --- a/bridge/l1/backend.go +++ /dev/null @@ -1,55 +0,0 @@ -package l1 - -import ( - "context" - - "github.com/scroll-tech/go-ethereum/ethclient" - - "scroll-tech/database" - - "scroll-tech/bridge/config" -) - -// Backend manage the resources and services of L1 backend. -// The backend should monitor events in layer 1 and relay transactions to layer 2 -type Backend struct { - cfg *config.L1Config - watcher *Watcher - relayer *Layer1Relayer - orm database.OrmFactory -} - -// New returns a new instance of Backend. -func New(ctx context.Context, cfg *config.L1Config, orm database.OrmFactory) (*Backend, error) { - client, err := ethclient.Dial(cfg.Endpoint) - if err != nil { - return nil, err - } - - relayer, err := NewLayer1Relayer(ctx, orm, cfg.RelayerConfig) - if err != nil { - return nil, err - } - - watcher := NewWatcher(ctx, client, cfg.StartHeight, cfg.Confirmations, cfg.L1MessengerAddress, cfg.L1MessageQueueAddress, cfg.ScrollChainContractAddress, orm) - - return &Backend{ - cfg: cfg, - watcher: watcher, - relayer: relayer, - orm: orm, - }, nil -} - -// Start Backend module. -func (l1 *Backend) Start() error { - l1.watcher.Start() - l1.relayer.Start() - return nil -} - -// Stop Backend module. -func (l1 *Backend) Stop() { - l1.watcher.Stop() - l1.relayer.Stop() -} diff --git a/bridge/l1/l1_test.go b/bridge/l1/l1_test.go deleted file mode 100644 index 458eee4d4..000000000 --- a/bridge/l1/l1_test.go +++ /dev/null @@ -1,46 +0,0 @@ -package l1 - -import ( - "testing" - - "github.com/stretchr/testify/assert" - - "scroll-tech/common/docker" - - "scroll-tech/bridge/config" -) - -var ( - // config - cfg *config.Config - - // docker consider handler. - base *docker.App -) - -func TestMain(m *testing.M) { - base = docker.NewDockerApp() - - m.Run() - - base.Free() -} - -func setupEnv(t *testing.T) { - // Load config. - var err error - cfg, err = config.NewConfig("../config.json") - assert.NoError(t, err) - base.RunImages(t) - - cfg.L2Config.RelayerConfig.SenderConfig.Endpoint = base.L1GethEndpoint() - cfg.L1Config.RelayerConfig.SenderConfig.Endpoint = base.L2GethEndpoint() - cfg.DBConfig.DSN = base.DBEndpoint() -} - -func TestL1(t *testing.T) { - setupEnv(t) - - t.Run("testCreateNewL1Relayer", testCreateNewL1Relayer) - t.Run("testStartWatcher", testStartWatcher) -} diff --git a/bridge/l2/backend.go b/bridge/l2/backend.go deleted file mode 100644 index 455e348da..000000000 --- a/bridge/l2/backend.go +++ /dev/null @@ -1,76 +0,0 @@ -package l2 - -import ( - "context" - - "github.com/scroll-tech/go-ethereum/ethclient" - "github.com/scroll-tech/go-ethereum/rpc" - - "scroll-tech/database" - - "scroll-tech/bridge/config" -) - -// Backend manage the resources and services of L2 backend. -// The backend should monitor events in layer 2 and relay transactions to layer 1 -type Backend struct { - cfg *config.L2Config - watcher *WatcherClient - relayer *Layer2Relayer - batchProposer *BatchProposer - orm database.OrmFactory -} - -// New returns a new instance of Backend. -func New(ctx context.Context, cfg *config.L2Config, orm database.OrmFactory) (*Backend, error) { - client, err := ethclient.Dial(cfg.Endpoint) - if err != nil { - return nil, err - } - - // Note: initialize watcher before relayer to keep DB consistent. - // Otherwise, there will be a race condition between watcher.initializeGenesis and relayer.ProcessPendingBatches. - watcher := NewL2WatcherClient(ctx, client, cfg.Confirmations, cfg.L2MessengerAddress, cfg.L2MessageQueueAddress, cfg.WithdrawTrieRootSlot, orm) - - relayer, err := NewLayer2Relayer(ctx, client, orm, cfg.RelayerConfig) - if err != nil { - return nil, err - } - - batchProposer := NewBatchProposer(ctx, cfg.BatchProposerConfig, relayer, orm) - - return &Backend{ - cfg: cfg, - watcher: watcher, - relayer: relayer, - batchProposer: batchProposer, - orm: orm, - }, nil -} - -// Start Backend module. -func (l2 *Backend) Start() error { - l2.watcher.Start() - l2.relayer.Start() - l2.batchProposer.Start() - return nil -} - -// Stop Backend module. -func (l2 *Backend) Stop() { - l2.batchProposer.Stop() - l2.relayer.Stop() - l2.watcher.Stop() -} - -// APIs collect API modules. -func (l2 *Backend) APIs() []rpc.API { - return []rpc.API{ - { - Namespace: "l2", - Version: "1.0", - Service: WatcherAPI(l2.watcher), - Public: true, - }, - } -} diff --git a/bridge/l2/watcher_api.go b/bridge/l2/watcher_api.go deleted file mode 100644 index 004bcf445..000000000 --- a/bridge/l2/watcher_api.go +++ /dev/null @@ -1,5 +0,0 @@ -package l2 - -// WatcherAPI watcher api service -type WatcherAPI interface { -} diff --git a/bridge/l1/relayer.go b/bridge/relayer/l1_relayer.go similarity index 72% rename from bridge/l1/relayer.go rename to bridge/relayer/l1_relayer.go index 13e75e8b1..ec2005bf3 100644 --- a/bridge/l1/relayer.go +++ b/bridge/relayer/l1_relayer.go @@ -1,10 +1,9 @@ -package l1 +package relayer import ( "context" "errors" "math/big" - "time" // not sure if this will make problems when relay with l1geth @@ -15,7 +14,6 @@ import ( geth_metrics "github.com/scroll-tech/go-ethereum/metrics" "scroll-tech/common/types" - "scroll-tech/common/utils" "scroll-tech/database" @@ -31,14 +29,6 @@ var ( bridgeL1MsgsRelayedConfirmedTotalCounter = geth_metrics.NewRegisteredCounter("bridge/l1/msgs/relayed/confirmed/total", metrics.ScrollRegistry) ) -const ( - gasPriceDiffPrecision = 1000000 - - defaultGasPriceDiff = 50000 // 5% - - defaultMessageRelayMinGasLimit = 130000 // should be enough for both ERC20 and ETH relay -) - // Layer1Relayer is responsible for // 1. fetch pending L1Message from db // 2. relay pending message to layer 2 node @@ -53,11 +43,9 @@ type Layer1Relayer struct { // channel used to communicate with transaction sender messageSender *sender.Sender - messageCh <-chan *sender.Confirmation l2MessengerABI *abi.ABI gasOracleSender *sender.Sender - gasOracleCh <-chan *sender.Confirmation l1GasOracleABI *abi.ABI minGasLimitForMessageRelay uint64 @@ -65,8 +53,6 @@ type Layer1Relayer struct { lastGasPrice uint64 minGasPrice uint64 gasPriceDiff uint64 - - stopCh chan struct{} } // NewLayer1Relayer will return a new instance of Layer1RelayerClient @@ -96,21 +82,19 @@ func NewLayer1Relayer(ctx context.Context, db database.OrmFactory, cfg *config.R gasPriceDiff = defaultGasPriceDiff } - minGasLimitForMessageRelay := uint64(defaultMessageRelayMinGasLimit) + minGasLimitForMessageRelay := uint64(defaultL1MessageRelayMinGasLimit) if cfg.MessageRelayMinGasLimit != 0 { minGasLimitForMessageRelay = cfg.MessageRelayMinGasLimit } - return &Layer1Relayer{ + l1Relayer := &Layer1Relayer{ ctx: ctx, db: db, messageSender: messageSender, - messageCh: messageSender.ConfirmChan(), l2MessengerABI: bridge_abi.L2ScrollMessengerABI, gasOracleSender: gasOracleSender, - gasOracleCh: gasOracleSender.ConfirmChan(), l1GasOracleABI: bridge_abi.L1GasPriceOracleABI, minGasLimitForMessageRelay: minGasLimitForMessageRelay, @@ -118,9 +102,11 @@ func NewLayer1Relayer(ctx context.Context, db database.OrmFactory, cfg *config.R minGasPrice: minGasPrice, gasPriceDiff: gasPriceDiff, - cfg: cfg, - stopCh: make(chan struct{}), - }, nil + cfg: cfg, + } + + go l1Relayer.handleConfirmLoop(ctx) + return l1Relayer, nil } // ProcessSavedEvents relays saved un-processed cross-domain transactions to desired blockchain @@ -220,61 +206,43 @@ func (r *Layer1Relayer) ProcessGasPriceOracle() { } } -// Start the relayer process -func (r *Layer1Relayer) Start() { - go func() { - ctx, cancel := context.WithCancel(r.ctx) - - go utils.Loop(ctx, 2*time.Second, r.ProcessSavedEvents) - go utils.Loop(ctx, 2*time.Second, r.ProcessGasPriceOracle) - - go func(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case cfm := <-r.messageCh: - bridgeL1MsgsRelayedConfirmedTotalCounter.Inc(1) - if !cfm.IsSuccessful { - err := r.db.UpdateLayer1StatusAndLayer2Hash(r.ctx, cfm.ID, types.MsgRelayFailed, cfm.TxHash.String()) - if err != nil { - log.Warn("UpdateLayer1StatusAndLayer2Hash failed", "err", err) - } - log.Warn("transaction confirmed but failed in layer2", "confirmation", cfm) - } else { - // @todo handle db error - err := r.db.UpdateLayer1StatusAndLayer2Hash(r.ctx, cfm.ID, types.MsgConfirmed, cfm.TxHash.String()) - if err != nil { - log.Warn("UpdateLayer1StatusAndLayer2Hash failed", "err", err) - } - log.Info("transaction confirmed in layer2", "confirmation", cfm) - } - case cfm := <-r.gasOracleCh: - if !cfm.IsSuccessful { - // @discuss: maybe make it pending again? - err := r.db.UpdateL1GasOracleStatusAndOracleTxHash(r.ctx, cfm.ID, types.GasOracleFailed, cfm.TxHash.String()) - if err != nil { - log.Warn("UpdateL1GasOracleStatusAndOracleTxHash failed", "err", err) - } - log.Warn("transaction confirmed but failed in layer2", "confirmation", cfm) - } else { - // @todo handle db error - err := r.db.UpdateL1GasOracleStatusAndOracleTxHash(r.ctx, cfm.ID, types.GasOracleImported, cfm.TxHash.String()) - if err != nil { - log.Warn("UpdateGasOracleStatusAndOracleTxHash failed", "err", err) - } - log.Info("transaction confirmed in layer2", "confirmation", cfm) - } +func (r *Layer1Relayer) handleConfirmLoop(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case cfm := <-r.messageSender.ConfirmChan(): + bridgeL1MsgsRelayedConfirmedTotalCounter.Inc(1) + if !cfm.IsSuccessful { + err := r.db.UpdateLayer1StatusAndLayer2Hash(r.ctx, cfm.ID, types.MsgRelayFailed, cfm.TxHash.String()) + if err != nil { + log.Warn("UpdateLayer1StatusAndLayer2Hash failed", "err", err) } + log.Warn("transaction confirmed but failed in layer2", "confirmation", cfm) + } else { + // @todo handle db error + err := r.db.UpdateLayer1StatusAndLayer2Hash(r.ctx, cfm.ID, types.MsgConfirmed, cfm.TxHash.String()) + if err != nil { + log.Warn("UpdateLayer1StatusAndLayer2Hash failed", "err", err) + } + log.Info("transaction confirmed in layer2", "confirmation", cfm) } - }(ctx) - - <-r.stopCh - cancel() - }() -} - -// Stop the relayer module, for a graceful shutdown. -func (r *Layer1Relayer) Stop() { - close(r.stopCh) + case cfm := <-r.gasOracleSender.ConfirmChan(): + if !cfm.IsSuccessful { + // @discuss: maybe make it pending again? + err := r.db.UpdateL1GasOracleStatusAndOracleTxHash(r.ctx, cfm.ID, types.GasOracleFailed, cfm.TxHash.String()) + if err != nil { + log.Warn("UpdateL1GasOracleStatusAndOracleTxHash failed", "err", err) + } + log.Warn("transaction confirmed but failed in layer2", "confirmation", cfm) + } else { + // @todo handle db error + err := r.db.UpdateL1GasOracleStatusAndOracleTxHash(r.ctx, cfm.ID, types.GasOracleImported, cfm.TxHash.String()) + if err != nil { + log.Warn("UpdateGasOracleStatusAndOracleTxHash failed", "err", err) + } + log.Info("transaction confirmed in layer2", "confirmation", cfm) + } + } + } } diff --git a/bridge/l1/relayer_test.go b/bridge/relayer/l1_relayer_test.go similarity index 72% rename from bridge/l1/relayer_test.go rename to bridge/relayer/l1_relayer_test.go index 65a92ee2c..a151bcc6b 100644 --- a/bridge/l1/relayer_test.go +++ b/bridge/relayer/l1_relayer_test.go @@ -1,4 +1,4 @@ -package l1 +package relayer_test import ( "context" @@ -8,6 +8,8 @@ import ( "scroll-tech/database/migrate" + "scroll-tech/bridge/relayer" + "scroll-tech/database" ) @@ -19,9 +21,7 @@ func testCreateNewL1Relayer(t *testing.T) { assert.NoError(t, migrate.ResetDB(db.GetDB().DB)) defer db.Close() - relayer, err := NewLayer1Relayer(context.Background(), db, cfg.L2Config.RelayerConfig) + relayer, err := relayer.NewLayer1Relayer(context.Background(), db, cfg.L2Config.RelayerConfig) assert.NoError(t, err) - defer relayer.Stop() - - relayer.Start() + assert.NotNil(t, relayer) } diff --git a/bridge/l2/relayer.go b/bridge/relayer/l2_relayer.go similarity index 90% rename from bridge/l2/relayer.go rename to bridge/relayer/l2_relayer.go index 6d03588ab..1014d7e8a 100644 --- a/bridge/l2/relayer.go +++ b/bridge/relayer/l2_relayer.go @@ -1,4 +1,4 @@ -package l2 +package relayer import ( "context" @@ -7,7 +7,6 @@ import ( "math/big" "runtime" "sync" - "time" "github.com/scroll-tech/go-ethereum/accounts/abi" "github.com/scroll-tech/go-ethereum/common" @@ -22,8 +21,6 @@ import ( "scroll-tech/common/types" "scroll-tech/database" - cutil "scroll-tech/common/utils" - bridge_abi "scroll-tech/bridge/abi" "scroll-tech/bridge/config" "scroll-tech/bridge/sender" @@ -40,14 +37,6 @@ var ( bridgeL2BatchesSkippedTotalCounter = geth_metrics.NewRegisteredCounter("bridge/l2/batches/skipped/total", metrics.ScrollRegistry) ) -const ( - gasPriceDiffPrecision = 1000000 - - defaultGasPriceDiff = 50000 // 5% - - defaultMessageRelayMinGasLimit = 200000 // should be enough for both ERC20 and ETH relay -) - // Layer2Relayer is responsible for // 1. Committing and finalizing L2 blocks on L1 // 2. Relaying messages from L2 to L1 @@ -63,15 +52,12 @@ type Layer2Relayer struct { cfg *config.RelayerConfig messageSender *sender.Sender - messageCh <-chan *sender.Confirmation l1MessengerABI *abi.ABI rollupSender *sender.Sender - rollupCh <-chan *sender.Confirmation l1RollupABI *abi.ABI gasOracleSender *sender.Sender - gasOracleCh <-chan *sender.Confirmation l2GasOracleABI *abi.ABI minGasLimitForMessageRelay uint64 @@ -91,8 +77,6 @@ type Layer2Relayer struct { // A list of processing batch finalization. // key(string): confirmation ID, value(string): batch hash. processingFinalization sync.Map - - stopCh chan struct{} } // NewLayer2Relayer will return a new instance of Layer2RelayerClient @@ -126,27 +110,24 @@ func NewLayer2Relayer(ctx context.Context, l2Client *ethclient.Client, db databa gasPriceDiff = defaultGasPriceDiff } - minGasLimitForMessageRelay := uint64(defaultMessageRelayMinGasLimit) + minGasLimitForMessageRelay := uint64(defaultL2MessageRelayMinGasLimit) if cfg.MessageRelayMinGasLimit != 0 { minGasLimitForMessageRelay = cfg.MessageRelayMinGasLimit } - return &Layer2Relayer{ + layer2Relayer := &Layer2Relayer{ ctx: ctx, db: db, l2Client: l2Client, messageSender: messageSender, - messageCh: messageSender.ConfirmChan(), l1MessengerABI: bridge_abi.L1ScrollMessengerABI, rollupSender: rollupSender, - rollupCh: rollupSender.ConfirmChan(), l1RollupABI: bridge_abi.ScrollChainABI, gasOracleSender: gasOracleSender, - gasOracleCh: gasOracleSender.ConfirmChan(), l2GasOracleABI: bridge_abi.L2GasPriceOracleABI, minGasLimitForMessageRelay: minGasLimitForMessageRelay, @@ -158,8 +139,9 @@ func NewLayer2Relayer(ctx context.Context, l2Client *ethclient.Client, db databa processingMessage: sync.Map{}, processingBatchesCommitment: sync.Map{}, processingFinalization: sync.Map{}, - stopCh: make(chan struct{}), - }, nil + } + go layer2Relayer.handleConfirmLoop(ctx) + return layer2Relayer, nil } const processMsgLimit = 100 @@ -516,53 +498,6 @@ func (r *Layer2Relayer) ProcessCommittedBatches() { } } -// Start the relayer process -func (r *Layer2Relayer) Start() { - go func() { - ctx, cancel := context.WithCancel(r.ctx) - go cutil.Loop(ctx, time.Second, r.ProcessSavedEvents) - go cutil.Loop(ctx, time.Second, r.ProcessCommittedBatches) - go cutil.Loop(ctx, time.Second, r.ProcessGasPriceOracle) - - go func(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case confirmation := <-r.messageCh: - r.handleConfirmation(confirmation) - case confirmation := <-r.rollupCh: - r.handleConfirmation(confirmation) - case cfm := <-r.gasOracleCh: - if !cfm.IsSuccessful { - // @discuss: maybe make it pending again? - err := r.db.UpdateL2GasOracleStatusAndOracleTxHash(r.ctx, cfm.ID, types.GasOracleFailed, cfm.TxHash.String()) - if err != nil { - log.Warn("UpdateL2GasOracleStatusAndOracleTxHash failed", "err", err) - } - log.Warn("transaction confirmed but failed in layer1", "confirmation", cfm) - } else { - // @todo handle db error - err := r.db.UpdateL2GasOracleStatusAndOracleTxHash(r.ctx, cfm.ID, types.GasOracleImported, cfm.TxHash.String()) - if err != nil { - log.Warn("UpdateL2GasOracleStatusAndOracleTxHash failed", "err", err) - } - log.Info("transaction confirmed in layer1", "confirmation", cfm) - } - } - } - }(ctx) - - <-r.stopCh - cancel() - }() -} - -// Stop the relayer module, for a graceful shutdown. -func (r *Layer2Relayer) Stop() { - close(r.stopCh) -} - func (r *Layer2Relayer) handleConfirmation(confirmation *sender.Confirmation) { transactionType := "Unknown" // check whether it is message relay transaction @@ -626,3 +561,32 @@ func (r *Layer2Relayer) handleConfirmation(confirmation *sender.Confirmation) { } log.Info("transaction confirmed in layer1", "type", transactionType, "confirmation", confirmation) } + +func (r *Layer2Relayer) handleConfirmLoop(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case confirmation := <-r.messageSender.ConfirmChan(): + r.handleConfirmation(confirmation) + case confirmation := <-r.rollupSender.ConfirmChan(): + r.handleConfirmation(confirmation) + case cfm := <-r.gasOracleSender.ConfirmChan(): + if !cfm.IsSuccessful { + // @discuss: maybe make it pending again? + err := r.db.UpdateL2GasOracleStatusAndOracleTxHash(r.ctx, cfm.ID, types.GasOracleFailed, cfm.TxHash.String()) + if err != nil { + log.Warn("UpdateL2GasOracleStatusAndOracleTxHash failed", "err", err) + } + log.Warn("transaction confirmed but failed in layer1", "confirmation", cfm) + } else { + // @todo handle db error + err := r.db.UpdateL2GasOracleStatusAndOracleTxHash(r.ctx, cfm.ID, types.GasOracleImported, cfm.TxHash.String()) + if err != nil { + log.Warn("UpdateL2GasOracleStatusAndOracleTxHash failed", "err", err) + } + log.Info("transaction confirmed in layer1", "confirmation", cfm) + } + } + } +} diff --git a/bridge/l2/relayer_test.go b/bridge/relayer/l2_relayer_test.go similarity index 93% rename from bridge/l2/relayer_test.go rename to bridge/relayer/l2_relayer_test.go index eaa4b7dbf..1d8df0cb1 100644 --- a/bridge/l2/relayer_test.go +++ b/bridge/relayer/l2_relayer_test.go @@ -1,4 +1,4 @@ -package l2 +package relayer_test import ( "context" @@ -14,6 +14,8 @@ import ( "scroll-tech/common/types" + "scroll-tech/bridge/relayer" + "scroll-tech/database" "scroll-tech/database/migrate" ) @@ -39,11 +41,9 @@ func testCreateNewRelayer(t *testing.T) { assert.NoError(t, migrate.ResetDB(db.GetDB().DB)) defer db.Close() - relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, cfg.L2Config.RelayerConfig) + relayer, err := relayer.NewLayer2Relayer(context.Background(), l2Cli, db, cfg.L2Config.RelayerConfig) assert.NoError(t, err) - defer relayer.Stop() - - relayer.Start() + assert.NotNil(t, relayer) } func testL2RelayerProcessSaveEvents(t *testing.T) { @@ -54,9 +54,8 @@ func testL2RelayerProcessSaveEvents(t *testing.T) { defer db.Close() l2Cfg := cfg.L2Config - relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, l2Cfg.RelayerConfig) + relayer, err := relayer.NewLayer2Relayer(context.Background(), l2Cli, db, l2Cfg.RelayerConfig) assert.NoError(t, err) - defer relayer.Stop() err = db.SaveL2Messages(context.Background(), templateL2Message) assert.NoError(t, err) @@ -104,9 +103,8 @@ func testL2RelayerProcessCommittedBatches(t *testing.T) { defer db.Close() l2Cfg := cfg.L2Config - relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, l2Cfg.RelayerConfig) + relayer, err := relayer.NewLayer2Relayer(context.Background(), l2Cli, db, l2Cfg.RelayerConfig) assert.NoError(t, err) - defer relayer.Stop() dbTx, err := db.Beginx() assert.NoError(t, err) @@ -140,9 +138,8 @@ func testL2RelayerSkipBatches(t *testing.T) { defer db.Close() l2Cfg := cfg.L2Config - relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, l2Cfg.RelayerConfig) + relayer, err := relayer.NewLayer2Relayer(context.Background(), l2Cli, db, l2Cfg.RelayerConfig) assert.NoError(t, err) - defer relayer.Stop() createBatch := func(rollupStatus types.RollupStatus, provingStatus types.ProvingStatus, index uint64) string { dbTx, err := db.Beginx() diff --git a/bridge/relayer/params.go b/bridge/relayer/params.go new file mode 100644 index 000000000..7feed613b --- /dev/null +++ b/bridge/relayer/params.go @@ -0,0 +1,11 @@ +package relayer + +const ( + gasPriceDiffPrecision = 1000000 + + defaultGasPriceDiff = 50000 // 5% + + defaultL1MessageRelayMinGasLimit = 130000 // should be enough for both ERC20 and ETH relay + + defaultL2MessageRelayMinGasLimit = 200000 +) diff --git a/bridge/relayer/relayer_test.go b/bridge/relayer/relayer_test.go new file mode 100644 index 000000000..bbf796319 --- /dev/null +++ b/bridge/relayer/relayer_test.go @@ -0,0 +1,108 @@ +package relayer_test + +import ( + "encoding/json" + "os" + "testing" + + "github.com/scroll-tech/go-ethereum/ethclient" + "github.com/scroll-tech/go-ethereum/log" + "github.com/stretchr/testify/assert" + + "scroll-tech/common/docker" + "scroll-tech/common/types" + + "scroll-tech/bridge/config" +) + +var ( + // config + cfg *config.Config + + base *docker.App + + // l2geth client + l2Cli *ethclient.Client + + // block trace + wrappedBlock1 *types.WrappedBlock + wrappedBlock2 *types.WrappedBlock + + // batch data + batchData1 *types.BatchData + batchData2 *types.BatchData +) + +func setupEnv(t *testing.T) (err error) { + // Load config. + cfg, err = config.NewConfig("../config.json") + assert.NoError(t, err) + + base.RunImages(t) + + cfg.L2Config.RelayerConfig.SenderConfig.Endpoint = base.L1GethEndpoint() + cfg.L1Config.RelayerConfig.SenderConfig.Endpoint = base.L2GethEndpoint() + cfg.DBConfig.DSN = base.DBEndpoint() + + // Create l2geth client. + l2Cli, err = base.L2Client() + assert.NoError(t, err) + + templateBlockTrace1, err := os.ReadFile("../../common/testdata/blockTrace_02.json") + if err != nil { + return err + } + // unmarshal blockTrace + wrappedBlock1 = &types.WrappedBlock{} + if err = json.Unmarshal(templateBlockTrace1, wrappedBlock1); err != nil { + return err + } + parentBatch1 := &types.BlockBatch{ + Index: 0, + Hash: "0x0cc6b102c2924402c14b2e3a19baccc316252bfdc44d9ec62e942d34e39ec729", + StateRoot: "0x2579122e8f9ec1e862e7d415cef2fb495d7698a8e5f0dddc5651ba4236336e7d", + } + batchData1 = types.NewBatchData(parentBatch1, []*types.WrappedBlock{wrappedBlock1}, nil) + + templateBlockTrace2, err := os.ReadFile("../../common/testdata/blockTrace_03.json") + if err != nil { + return err + } + // unmarshal blockTrace + wrappedBlock2 = &types.WrappedBlock{} + if err = json.Unmarshal(templateBlockTrace2, wrappedBlock2); err != nil { + return err + } + parentBatch2 := &types.BlockBatch{ + Index: batchData1.Batch.BatchIndex, + Hash: batchData1.Hash().Hex(), + StateRoot: batchData1.Batch.NewStateRoot.String(), + } + batchData2 = types.NewBatchData(parentBatch2, []*types.WrappedBlock{wrappedBlock2}, nil) + + log.Info("batchHash", "batchhash1", batchData1.Hash().Hex(), "batchhash2", batchData2.Hash().Hex()) + + return err +} + +func TestMain(m *testing.M) { + base = docker.NewDockerApp() + + m.Run() + + base.Free() +} + +func TestFunctions(t *testing.T) { + if err := setupEnv(t); err != nil { + t.Fatal(err) + } + // Run l1 relayer test cases. + t.Run("TestCreateNewL1Relayer", testCreateNewL1Relayer) + // Run l2 relayer test cases. + t.Run("TestCreateNewRelayer", testCreateNewRelayer) + t.Run("TestL2RelayerProcessSaveEvents", testL2RelayerProcessSaveEvents) + t.Run("TestL2RelayerProcessCommittedBatches", testL2RelayerProcessCommittedBatches) + t.Run("TestL2RelayerSkipBatches", testL2RelayerSkipBatches) + +} diff --git a/bridge/tests/gas_oracle_test.go b/bridge/tests/gas_oracle_test.go index a7a5b3b44..8c6cb78a0 100644 --- a/bridge/tests/gas_oracle_test.go +++ b/bridge/tests/gas_oracle_test.go @@ -11,8 +11,8 @@ import ( "scroll-tech/common/types" - "scroll-tech/bridge/l1" - "scroll-tech/bridge/l2" + "scroll-tech/bridge/relayer" + "scroll-tech/bridge/watcher" "scroll-tech/database" "scroll-tech/database/migrate" @@ -30,14 +30,13 @@ func testImportL1GasPrice(t *testing.T) { l1Cfg := cfg.L1Config // Create L1Relayer - l1Relayer, err := l1.NewLayer1Relayer(context.Background(), db, l1Cfg.RelayerConfig) + l1Relayer, err := relayer.NewLayer1Relayer(context.Background(), db, l1Cfg.RelayerConfig) assert.NoError(t, err) - defer l1Relayer.Stop() // Create L1Watcher startHeight, err := l1Client.BlockNumber(context.Background()) assert.NoError(t, err) - l1Watcher := l1.NewWatcher(context.Background(), l1Client, startHeight-1, 0, l1Cfg.L1MessengerAddress, l1Cfg.L1MessageQueueAddress, l1Cfg.ScrollChainContractAddress, db) + l1Watcher := watcher.NewL1WatcherClient(context.Background(), l1Client, startHeight-1, 0, l1Cfg.L1MessengerAddress, l1Cfg.L1MessageQueueAddress, l1Cfg.ScrollChainContractAddress, db) // fetch new blocks number, err := l1Client.BlockNumber(context.Background()) @@ -81,9 +80,8 @@ func testImportL2GasPrice(t *testing.T) { l2Cfg := cfg.L2Config // Create L2Relayer - l2Relayer, err := l2.NewLayer2Relayer(context.Background(), l2Client, db, l2Cfg.RelayerConfig) + l2Relayer, err := relayer.NewLayer2Relayer(context.Background(), l2Client, db, l2Cfg.RelayerConfig) assert.NoError(t, err) - defer l2Relayer.Stop() // add fake blocks traces := []*types.WrappedBlock{ diff --git a/bridge/tests/l1_message_relay_test.go b/bridge/tests/l1_message_relay_test.go index 7786b73be..b89a79a36 100644 --- a/bridge/tests/l1_message_relay_test.go +++ b/bridge/tests/l1_message_relay_test.go @@ -13,8 +13,8 @@ import ( "scroll-tech/common/types" - "scroll-tech/bridge/l1" - "scroll-tech/bridge/l2" + "scroll-tech/bridge/relayer" + "scroll-tech/bridge/watcher" "scroll-tech/database" "scroll-tech/database/migrate" @@ -33,16 +33,14 @@ func testRelayL1MessageSucceed(t *testing.T) { l2Cfg := cfg.L2Config // Create L1Relayer - l1Relayer, err := l1.NewLayer1Relayer(context.Background(), db, l1Cfg.RelayerConfig) + l1Relayer, err := relayer.NewLayer1Relayer(context.Background(), db, l1Cfg.RelayerConfig) assert.NoError(t, err) - defer l1Relayer.Stop() - // Create L1Watcher confirmations := rpc.LatestBlockNumber - l1Watcher := l1.NewWatcher(context.Background(), l1Client, 0, confirmations, l1Cfg.L1MessengerAddress, l1Cfg.L1MessageQueueAddress, l1Cfg.ScrollChainContractAddress, db) + l1Watcher := watcher.NewL1WatcherClient(context.Background(), l1Client, 0, confirmations, l1Cfg.L1MessengerAddress, l1Cfg.L1MessageQueueAddress, l1Cfg.ScrollChainContractAddress, db) // Create L2Watcher - l2Watcher := l2.NewL2WatcherClient(context.Background(), l2Client, confirmations, l2Cfg.L2MessengerAddress, l2Cfg.L2MessageQueueAddress, l2Cfg.WithdrawTrieRootSlot, db) + l2Watcher := watcher.NewL2WatcherClient(context.Background(), l2Client, confirmations, l2Cfg.L2MessengerAddress, l2Cfg.L2MessageQueueAddress, l2Cfg.WithdrawTrieRootSlot, db) // send message through l1 messenger contract nonce, err := l1MessengerInstance.MessageNonce(&bind.CallOpts{}) @@ -56,7 +54,7 @@ func testRelayL1MessageSucceed(t *testing.T) { } // l1 watch process events - l1Watcher.FetchContractEvent(sendReceipt.BlockNumber.Uint64()) + l1Watcher.FetchContractEvent() // check db status msg, err := db.GetL1MessageByQueueIndex(nonce.Uint64()) @@ -79,7 +77,7 @@ func testRelayL1MessageSucceed(t *testing.T) { assert.Equal(t, len(relayTxReceipt.Logs), 1) // fetch message relayed events - l2Watcher.FetchContractEvent(relayTxReceipt.BlockNumber.Uint64()) + l2Watcher.FetchContractEvent() msg, err = db.GetL1MessageByQueueIndex(nonce.Uint64()) assert.NoError(t, err) assert.Equal(t, msg.Status, types.MsgConfirmed) diff --git a/bridge/tests/l2_message_relay_test.go b/bridge/tests/l2_message_relay_test.go index 8b46f6558..dbe3bcd76 100644 --- a/bridge/tests/l2_message_relay_test.go +++ b/bridge/tests/l2_message_relay_test.go @@ -13,8 +13,8 @@ import ( "scroll-tech/common/types" - "scroll-tech/bridge/l1" - "scroll-tech/bridge/l2" + "scroll-tech/bridge/relayer" + "scroll-tech/bridge/watcher" "scroll-tech/database" "scroll-tech/database/migrate" @@ -33,15 +33,15 @@ func testRelayL2MessageSucceed(t *testing.T) { // Create L2Watcher confirmations := rpc.LatestBlockNumber - l2Watcher := l2.NewL2WatcherClient(context.Background(), l2Client, confirmations, l2Cfg.L2MessengerAddress, l2Cfg.L2MessageQueueAddress, l2Cfg.WithdrawTrieRootSlot, db) + l2Watcher := watcher.NewL2WatcherClient(context.Background(), l2Client, confirmations, l2Cfg.L2MessengerAddress, l2Cfg.L2MessageQueueAddress, l2Cfg.WithdrawTrieRootSlot, db) // Create L2Relayer - l2Relayer, err := l2.NewLayer2Relayer(context.Background(), l2Client, db, l2Cfg.RelayerConfig) + l2Relayer, err := relayer.NewLayer2Relayer(context.Background(), l2Client, db, l2Cfg.RelayerConfig) assert.NoError(t, err) // Create L1Watcher l1Cfg := cfg.L1Config - l1Watcher := l1.NewWatcher(context.Background(), l1Client, 0, confirmations, l1Cfg.L1MessengerAddress, l1Cfg.L1MessageQueueAddress, l1Cfg.ScrollChainContractAddress, db) + l1Watcher := watcher.NewL1WatcherClient(context.Background(), l1Client, 0, confirmations, l1Cfg.L1MessengerAddress, l1Cfg.L1MessageQueueAddress, l1Cfg.ScrollChainContractAddress, db) // send message through l2 messenger contract nonce, err := l2MessengerInstance.MessageNonce(&bind.CallOpts{}) @@ -55,7 +55,7 @@ func testRelayL2MessageSucceed(t *testing.T) { } // l2 watch process events - l2Watcher.FetchContractEvent(sendReceipt.BlockNumber.Uint64()) + l2Watcher.FetchContractEvent() // check db status msg, err := db.GetL2MessageByNonce(nonce.Uint64()) @@ -123,7 +123,7 @@ func testRelayL2MessageSucceed(t *testing.T) { assert.Equal(t, len(commitTxReceipt.Logs), 1) // fetch CommitBatch rollup events - err = l1Watcher.FetchContractEvent(commitTxReceipt.BlockNumber.Uint64()) + err = l1Watcher.FetchContractEvent() assert.NoError(t, err) status, err = db.GetRollupStatus(batchHash) assert.NoError(t, err) @@ -144,7 +144,7 @@ func testRelayL2MessageSucceed(t *testing.T) { assert.Equal(t, len(finalizeTxReceipt.Logs), 1) // fetch FinalizeBatch events - err = l1Watcher.FetchContractEvent(finalizeTxReceipt.BlockNumber.Uint64()) + err = l1Watcher.FetchContractEvent() assert.NoError(t, err) status, err = db.GetRollupStatus(batchHash) assert.NoError(t, err) @@ -165,7 +165,7 @@ func testRelayL2MessageSucceed(t *testing.T) { assert.Equal(t, len(relayTxReceipt.Logs), 1) // fetch message relayed events - err = l1Watcher.FetchContractEvent(relayTxReceipt.BlockNumber.Uint64()) + err = l1Watcher.FetchContractEvent() assert.NoError(t, err) msg, err = db.GetL2MessageByNonce(nonce.Uint64()) assert.NoError(t, err) diff --git a/bridge/tests/rollup_test.go b/bridge/tests/rollup_test.go index ca78b74d1..d000991c7 100644 --- a/bridge/tests/rollup_test.go +++ b/bridge/tests/rollup_test.go @@ -12,8 +12,8 @@ import ( "scroll-tech/common/types" - "scroll-tech/bridge/l1" - "scroll-tech/bridge/l2" + "scroll-tech/bridge/relayer" + "scroll-tech/bridge/watcher" "scroll-tech/database" "scroll-tech/database/migrate" @@ -30,13 +30,12 @@ func testCommitBatchAndFinalizeBatch(t *testing.T) { // Create L2Relayer l2Cfg := cfg.L2Config - l2Relayer, err := l2.NewLayer2Relayer(context.Background(), l2Client, db, l2Cfg.RelayerConfig) + l2Relayer, err := relayer.NewLayer2Relayer(context.Background(), l2Client, db, l2Cfg.RelayerConfig) assert.NoError(t, err) - defer l2Relayer.Stop() // Create L1Watcher l1Cfg := cfg.L1Config - l1Watcher := l1.NewWatcher(context.Background(), l1Client, 0, l1Cfg.Confirmations, l1Cfg.L1MessengerAddress, l1Cfg.L1MessageQueueAddress, l1Cfg.ScrollChainContractAddress, db) + l1Watcher := watcher.NewL1WatcherClient(context.Background(), l1Client, 0, l1Cfg.Confirmations, l1Cfg.L1MessengerAddress, l1Cfg.L1MessageQueueAddress, l1Cfg.ScrollChainContractAddress, db) // add some blocks to db var wrappedBlocks []*types.WrappedBlock @@ -96,7 +95,7 @@ func testCommitBatchAndFinalizeBatch(t *testing.T) { assert.Equal(t, len(commitTxReceipt.Logs), 1) // fetch rollup events - err = l1Watcher.FetchContractEvent(commitTxReceipt.BlockNumber.Uint64()) + err = l1Watcher.FetchContractEvent() assert.NoError(t, err) status, err = db.GetRollupStatus(batchHash) assert.NoError(t, err) @@ -126,7 +125,7 @@ func testCommitBatchAndFinalizeBatch(t *testing.T) { assert.Equal(t, len(finalizeTxReceipt.Logs), 1) // fetch rollup events - err = l1Watcher.FetchContractEvent(finalizeTxReceipt.BlockNumber.Uint64()) + err = l1Watcher.FetchContractEvent() assert.NoError(t, err) status, err = db.GetRollupStatus(batchHash) assert.NoError(t, err) diff --git a/bridge/l2/check_trace.go b/bridge/utils/check_trace.go similarity index 99% rename from bridge/l2/check_trace.go rename to bridge/utils/check_trace.go index 76e6ae3e7..2ab42830a 100644 --- a/bridge/l2/check_trace.go +++ b/bridge/utils/check_trace.go @@ -1,4 +1,4 @@ -package l2 +package utils import ( "fmt" diff --git a/bridge/l2/batch_proposer.go b/bridge/watcher/batch_proposer.go similarity index 93% rename from bridge/l2/batch_proposer.go rename to bridge/watcher/batch_proposer.go index 1638f9a62..4532bd963 100644 --- a/bridge/l2/batch_proposer.go +++ b/bridge/watcher/batch_proposer.go @@ -1,10 +1,9 @@ -package l2 +package watcher import ( "context" "fmt" "math" - "reflect" "sync" "time" @@ -13,12 +12,12 @@ import ( "scroll-tech/common/metrics" "scroll-tech/common/types" - "scroll-tech/common/utils" "scroll-tech/database" bridgeabi "scroll-tech/bridge/abi" "scroll-tech/bridge/config" + "scroll-tech/bridge/relayer" ) var ( @@ -83,15 +82,13 @@ type BatchProposer struct { proofGenerationFreq uint64 batchDataBuffer []*types.BatchData - relayer *Layer2Relayer + relayer *relayer.Layer2Relayer piCfg *types.PublicInputHashConfig - - stopCh chan struct{} } // NewBatchProposer will return a new instance of BatchProposer. -func NewBatchProposer(ctx context.Context, cfg *config.BatchProposerConfig, relayer *Layer2Relayer, orm database.OrmFactory) *BatchProposer { +func NewBatchProposer(ctx context.Context, cfg *config.BatchProposerConfig, relayer *relayer.Layer2Relayer, orm database.OrmFactory) *BatchProposer { p := &BatchProposer{ mutex: sync.Mutex{}, ctx: ctx, @@ -107,42 +104,17 @@ func NewBatchProposer(ctx context.Context, cfg *config.BatchProposerConfig, rela proofGenerationFreq: cfg.ProofGenerationFreq, piCfg: cfg.PublicInputConfig, relayer: relayer, - stopCh: make(chan struct{}), } // for graceful restart. p.recoverBatchDataBuffer() // try to commit the leftover pending batches - p.tryCommitBatches() + p.TryCommitBatches() return p } -// Start the Listening process -func (p *BatchProposer) Start() { - go func() { - if reflect.ValueOf(p.orm).IsNil() { - panic("must run BatchProposer with DB") - } - - ctx, cancel := context.WithCancel(p.ctx) - - go utils.Loop(ctx, 2*time.Second, func() { - p.tryProposeBatch() - p.tryCommitBatches() - }) - - <-p.stopCh - cancel() - }() -} - -// Stop the Watcher module, for a graceful shutdown. -func (p *BatchProposer) Stop() { - p.stopCh <- struct{}{} -} - func (p *BatchProposer) recoverBatchDataBuffer() { // batches are sorted by batch index in increasing order batchHashes, err := p.orm.GetPendingBatches(math.MaxInt32) @@ -214,7 +186,8 @@ func (p *BatchProposer) recoverBatchDataBuffer() { } } -func (p *BatchProposer) tryProposeBatch() { +// TryProposeBatch will try to propose a batch. +func (p *BatchProposer) TryProposeBatch() { p.mutex.Lock() defer p.mutex.Unlock() @@ -243,7 +216,8 @@ func (p *BatchProposer) tryProposeBatch() { } } -func (p *BatchProposer) tryCommitBatches() { +// TryCommitBatches will try to commit the pending batches. +func (p *BatchProposer) TryCommitBatches() { p.mutex.Lock() defer p.mutex.Unlock() diff --git a/bridge/l2/batch_proposer_test.go b/bridge/watcher/batch_proposer_test.go similarity index 76% rename from bridge/l2/batch_proposer_test.go rename to bridge/watcher/batch_proposer_test.go index 434e371f8..9f9007505 100644 --- a/bridge/l2/batch_proposer_test.go +++ b/bridge/watcher/batch_proposer_test.go @@ -1,4 +1,4 @@ -package l2 +package watcher_test import ( "context" @@ -12,6 +12,8 @@ import ( "scroll-tech/database/migrate" "scroll-tech/bridge/config" + "scroll-tech/bridge/relayer" + "scroll-tech/bridge/watcher" "scroll-tech/common/types" ) @@ -21,27 +23,32 @@ func testBatchProposerProposeBatch(t *testing.T) { db, err := database.NewOrmFactory(cfg.DBConfig) assert.NoError(t, err) assert.NoError(t, migrate.ResetDB(db.GetDB().DB)) - defer db.Close() + ctx := context.Background() + subCtx, cancel := context.WithCancel(ctx) + + defer func() { + cancel() + db.Close() + }() // Insert traces into db. assert.NoError(t, db.InsertWrappedBlocks([]*types.WrappedBlock{wrappedBlock1})) l2cfg := cfg.L2Config - wc := NewL2WatcherClient(context.Background(), l2Cli, l2cfg.Confirmations, l2cfg.L2MessengerAddress, l2cfg.L2MessageQueueAddress, l2cfg.WithdrawTrieRootSlot, db) - wc.Start() - defer wc.Stop() + wc := watcher.NewL2WatcherClient(context.Background(), l2Cli, l2cfg.Confirmations, l2cfg.L2MessengerAddress, l2cfg.L2MessageQueueAddress, l2cfg.WithdrawTrieRootSlot, db) + loopToFetchEvent(subCtx, wc) - relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, cfg.L2Config.RelayerConfig) + relayer, err := relayer.NewLayer2Relayer(context.Background(), l2Cli, db, cfg.L2Config.RelayerConfig) assert.NoError(t, err) - proposer := NewBatchProposer(context.Background(), &config.BatchProposerConfig{ + proposer := watcher.NewBatchProposer(context.Background(), &config.BatchProposerConfig{ ProofGenerationFreq: 1, BatchGasThreshold: 3000000, BatchTxNumThreshold: 135, BatchTimeSec: 1, BatchBlocksLimit: 100, }, relayer, db) - proposer.tryProposeBatch() + proposer.TryProposeBatch() infos, err := db.GetUnbatchedL2Blocks(map[string]interface{}{}, fmt.Sprintf("order by number ASC LIMIT %d", 100)) @@ -60,7 +67,7 @@ func testBatchProposerGracefulRestart(t *testing.T) { assert.NoError(t, migrate.ResetDB(db.GetDB().DB)) defer db.Close() - relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, cfg.L2Config.RelayerConfig) + relayer, err := relayer.NewLayer2Relayer(context.Background(), l2Cli, db, cfg.L2Config.RelayerConfig) assert.NoError(t, err) // Insert traces into db. @@ -84,7 +91,7 @@ func testBatchProposerGracefulRestart(t *testing.T) { assert.Equal(t, 1, len(batchHashes)) assert.Equal(t, batchData2.Hash().Hex(), batchHashes[0]) // test p.recoverBatchDataBuffer(). - _ = NewBatchProposer(context.Background(), &config.BatchProposerConfig{ + _ = watcher.NewBatchProposer(context.Background(), &config.BatchProposerConfig{ ProofGenerationFreq: 1, BatchGasThreshold: 3000000, BatchTxNumThreshold: 135, diff --git a/bridge/watcher/common.go b/bridge/watcher/common.go new file mode 100644 index 000000000..8739b2e11 --- /dev/null +++ b/bridge/watcher/common.go @@ -0,0 +1,11 @@ +package watcher + +import "github.com/scroll-tech/go-ethereum/common" + +const contractEventsBlocksFetchLimit = int64(10) + +type relayedMessage struct { + msgHash common.Hash + txHash common.Hash + isSuccessful bool +} diff --git a/bridge/l1/watcher.go b/bridge/watcher/l1_watcher.go similarity index 84% rename from bridge/l1/watcher.go rename to bridge/watcher/l1_watcher.go index 2eae1fe45..38a5dbb47 100644 --- a/bridge/l1/watcher.go +++ b/bridge/watcher/l1_watcher.go @@ -1,9 +1,8 @@ -package l1 +package watcher import ( "context" "math/big" - "time" geth "github.com/scroll-tech/go-ethereum" "github.com/scroll-tech/go-ethereum/accounts/abi" @@ -19,8 +18,6 @@ import ( "scroll-tech/common/types" "scroll-tech/database" - cutil "scroll-tech/common/utils" - bridge_abi "scroll-tech/bridge/abi" "scroll-tech/bridge/utils" ) @@ -33,20 +30,14 @@ var ( bridgeL1MsgsRollupEventsTotalCounter = geth_metrics.NewRegisteredCounter("bridge/l1/msgs/rollup/events/total", metrics.ScrollRegistry) ) -type relayedMessage struct { - msgHash common.Hash - txHash common.Hash - isSuccessful bool -} - type rollupEvent struct { batchHash common.Hash txHash common.Hash status types.RollupStatus } -// Watcher will listen for smart contract events from Eth L1. -type Watcher struct { +// L1WatcherClient will listen for smart contract events from Eth L1. +type L1WatcherClient struct { ctx context.Context client *ethclient.Client db database.OrmFactory @@ -67,13 +58,10 @@ type Watcher struct { processedMsgHeight uint64 // The height of the block that the watcher has retrieved header rlp processedBlockHeight uint64 - - stopCh chan bool } -// NewWatcher returns a new instance of Watcher. The instance will be not fully prepared, -// and still needs to be finalized and ran by calling `watcher.Start`. -func NewWatcher(ctx context.Context, client *ethclient.Client, startHeight uint64, confirmations rpc.BlockNumber, messengerAddress, messageQueueAddress, scrollChainAddress common.Address, db database.OrmFactory) *Watcher { +// NewL1WatcherClient returns a new instance of L1WatcherClient. +func NewL1WatcherClient(ctx context.Context, client *ethclient.Client, startHeight uint64, confirmations rpc.BlockNumber, messengerAddress, messageQueueAddress, scrollChainAddress common.Address, db database.OrmFactory) *L1WatcherClient { savedHeight, err := db.GetLayer1LatestWatchedHeight() if err != nil { log.Warn("Failed to fetch height from db", "err", err) @@ -92,9 +80,7 @@ func NewWatcher(ctx context.Context, client *ethclient.Client, startHeight uint6 savedL1BlockHeight = startHeight } - stopCh := make(chan bool) - - return &Watcher{ + return &L1WatcherClient{ ctx: ctx, client: client, db: db, @@ -111,51 +97,11 @@ func NewWatcher(ctx context.Context, client *ethclient.Client, startHeight uint6 processedMsgHeight: uint64(savedHeight), processedBlockHeight: savedL1BlockHeight, - stopCh: stopCh, } } -// Start the Watcher module. -func (w *Watcher) Start() { - go func() { - ctx, cancel := context.WithCancel(w.ctx) - - go cutil.LoopWithContext(ctx, 2*time.Second, func(subCtx context.Context) { - number, err := utils.GetLatestConfirmedBlockNumber(subCtx, w.client, w.confirmations) - if err != nil { - log.Error("failed to get block number", "err", err) - } else { - if err := w.FetchBlockHeader(number); err != nil { - log.Error("Failed to fetch L1 block header", "lastest", number, "err", err) - } - } - }) - - go cutil.LoopWithContext(ctx, 2*time.Second, func(subCtx context.Context) { - number, err := utils.GetLatestConfirmedBlockNumber(subCtx, w.client, w.confirmations) - if err != nil { - log.Error("failed to get block number", "err", err) - } else { - if err := w.FetchContractEvent(number); err != nil { - log.Error("Failed to fetch bridge contract", "err", err) - } - } - }) - - <-w.stopCh - cancel() - }() -} - -// Stop the Watcher module, for a graceful shutdown. -func (w *Watcher) Stop() { - w.stopCh <- true -} - -const contractEventsBlocksFetchLimit = int64(10) - // FetchBlockHeader pull latest L1 blocks and save in DB -func (w *Watcher) FetchBlockHeader(blockHeight uint64) error { +func (w *L1WatcherClient) FetchBlockHeader(blockHeight uint64) error { fromBlock := int64(w.processedBlockHeight) + 1 toBlock := int64(blockHeight) if toBlock < fromBlock { @@ -201,10 +147,15 @@ func (w *Watcher) FetchBlockHeader(blockHeight uint64) error { } // FetchContractEvent pull latest event logs from given contract address and save in DB -func (w *Watcher) FetchContractEvent(blockHeight uint64) error { +func (w *L1WatcherClient) FetchContractEvent() error { defer func() { log.Info("l1 watcher fetchContractEvent", "w.processedMsgHeight", w.processedMsgHeight) }() + blockHeight, err := utils.GetLatestConfirmedBlockNumber(w.ctx, w.client, w.confirmations) + if err != nil { + log.Error("failed to get block number", "err", err) + return err + } fromBlock := int64(w.processedMsgHeight) + 1 toBlock := int64(blockHeight) @@ -317,7 +268,7 @@ func (w *Watcher) FetchContractEvent(blockHeight uint64) error { return nil } -func (w *Watcher) parseBridgeEventLogs(logs []geth_types.Log) ([]*types.L1Message, []relayedMessage, []rollupEvent, error) { +func (w *L1WatcherClient) parseBridgeEventLogs(logs []geth_types.Log) ([]*types.L1Message, []relayedMessage, []rollupEvent, error) { // Need use contract abi to parse event Log // Can only be tested after we have our contracts set up diff --git a/bridge/l1/watcher_test.go b/bridge/watcher/l1_watcher_test.go similarity index 61% rename from bridge/l1/watcher_test.go rename to bridge/watcher/l1_watcher_test.go index 8d6996adc..2bc6381d1 100644 --- a/bridge/l1/watcher_test.go +++ b/bridge/watcher/l1_watcher_test.go @@ -1,4 +1,4 @@ -package l1 +package watcher_test import ( "context" @@ -9,6 +9,8 @@ import ( "scroll-tech/database" "scroll-tech/database/migrate" + + "scroll-tech/bridge/watcher" ) func testStartWatcher(t *testing.T) { @@ -23,7 +25,6 @@ func testStartWatcher(t *testing.T) { l1Cfg := cfg.L1Config - watcher := NewWatcher(context.Background(), client, l1Cfg.StartHeight, l1Cfg.Confirmations, l1Cfg.L1MessengerAddress, l1Cfg.L1MessageQueueAddress, l1Cfg.RelayerConfig.RollupContractAddress, db) - watcher.Start() - defer watcher.Stop() + watcher := watcher.NewL1WatcherClient(context.Background(), client, l1Cfg.StartHeight, l1Cfg.Confirmations, l1Cfg.L1MessengerAddress, l1Cfg.L1MessageQueueAddress, l1Cfg.RelayerConfig.RollupContractAddress, db) + assert.NoError(t, watcher.FetchContractEvent()) } diff --git a/bridge/l2/watcher.go b/bridge/watcher/l2_watcher.go similarity index 87% rename from bridge/l2/watcher.go rename to bridge/watcher/l2_watcher.go index 8758d14bb..d09118e6c 100644 --- a/bridge/l2/watcher.go +++ b/bridge/watcher/l2_watcher.go @@ -1,12 +1,10 @@ -package l2 +package watcher import ( "context" "errors" "fmt" "math/big" - "reflect" - "time" geth "github.com/scroll-tech/go-ethereum" "github.com/scroll-tech/go-ethereum/accounts/abi" @@ -21,7 +19,6 @@ import ( "scroll-tech/common/metrics" "scroll-tech/common/types" - cutil "scroll-tech/common/utils" "scroll-tech/database" bridge_abi "scroll-tech/bridge/abi" @@ -39,14 +36,8 @@ var ( bridgeL2MsgsRelayedEventsTotalCounter = geth_metrics.NewRegisteredCounter("bridge/l2/msgs/relayed/events/total", metrics.ScrollRegistry) ) -type relayedMessage struct { - msgHash common.Hash - txHash common.Hash - isSuccessful bool -} - -// WatcherClient provide APIs which support others to subscribe to various event from l2geth -type WatcherClient struct { +// L2WatcherClient provide APIs which support others to subscribe to various event from l2geth +type L2WatcherClient struct { ctx context.Context event.Feed @@ -67,18 +58,17 @@ type WatcherClient struct { processedMsgHeight uint64 stopped uint64 - stopCh chan struct{} } // NewL2WatcherClient take a l2geth instance to generate a l2watcherclient instance -func NewL2WatcherClient(ctx context.Context, client *ethclient.Client, confirmations rpc.BlockNumber, messengerAddress, messageQueueAddress common.Address, withdrawTrieRootSlot common.Hash, orm database.OrmFactory) *WatcherClient { +func NewL2WatcherClient(ctx context.Context, client *ethclient.Client, confirmations rpc.BlockNumber, messengerAddress, messageQueueAddress common.Address, withdrawTrieRootSlot common.Hash, orm database.OrmFactory) *L2WatcherClient { savedHeight, err := orm.GetLayer2LatestWatchedHeight() if err != nil { log.Warn("fetch height from db failed", "err", err) savedHeight = 0 } - w := WatcherClient{ + w := L2WatcherClient{ ctx: ctx, Client: client, orm: orm, @@ -92,7 +82,6 @@ func NewL2WatcherClient(ctx context.Context, client *ethclient.Client, confirmat messageQueueABI: bridge_abi.L2MessageQueueABI, withdrawTrieRootSlot: withdrawTrieRootSlot, - stopCh: make(chan struct{}), stopped: 0, } @@ -104,7 +93,7 @@ func NewL2WatcherClient(ctx context.Context, client *ethclient.Client, confirmat return &w } -func (w *WatcherClient) initializeGenesis() error { +func (w *L2WatcherClient) initializeGenesis() error { if count, err := w.orm.GetBatchCount(); err != nil { return fmt.Errorf("failed to get batch count: %v", err) } else if count > 0 { @@ -142,46 +131,10 @@ func (w *WatcherClient) initializeGenesis() error { return nil } -// Start the Listening process -func (w *WatcherClient) Start() { - go func() { - if reflect.ValueOf(w.orm).IsNil() { - panic("must run L2 watcher with DB") - } - - ctx, cancel := context.WithCancel(w.ctx) - go cutil.LoopWithContext(ctx, 2*time.Second, func(subCtx context.Context) { - number, err := utils.GetLatestConfirmedBlockNumber(subCtx, w.Client, w.confirmations) - if err != nil { - log.Error("failed to get block number", "err", err) - } else { - w.tryFetchRunningMissingBlocks(ctx, number) - } - }) - - go cutil.LoopWithContext(ctx, 2*time.Second, func(subCtx context.Context) { - number, err := utils.GetLatestConfirmedBlockNumber(subCtx, w.Client, w.confirmations) - if err != nil { - log.Error("failed to get block number", "err", err) - } else { - w.FetchContractEvent(number) - } - }) - - <-w.stopCh - cancel() - }() -} - -// Stop the Watcher module, for a graceful shutdown. -func (w *WatcherClient) Stop() { - w.stopCh <- struct{}{} -} - const blockTracesFetchLimit = uint64(10) -// try fetch missing blocks if inconsistent -func (w *WatcherClient) tryFetchRunningMissingBlocks(ctx context.Context, blockHeight uint64) { +// TryFetchRunningMissingBlocks try fetch missing blocks if inconsistent +func (w *L2WatcherClient) TryFetchRunningMissingBlocks(ctx context.Context, blockHeight uint64) { // Get newest block in DB. must have blocks at that time. // Don't use "block_trace" table "trace" column's BlockTrace.Number, // because it might be empty if the corresponding rollup_result is finalized/finalization_skipped @@ -237,7 +190,7 @@ func txsToTxsData(txs geth_types.Transactions) []*geth_types.TransactionData { return txsData } -func (w *WatcherClient) getAndStoreBlockTraces(ctx context.Context, from, to uint64) error { +func (w *L2WatcherClient) getAndStoreBlockTraces(ctx context.Context, from, to uint64) error { var blocks []*types.WrappedBlock for number := from; number <= to; number++ { @@ -270,14 +223,18 @@ func (w *WatcherClient) getAndStoreBlockTraces(ctx context.Context, from, to uin return nil } -const contractEventsBlocksFetchLimit = int64(10) - // FetchContractEvent pull latest event logs from given contract address and save in DB -func (w *WatcherClient) FetchContractEvent(blockHeight uint64) { +func (w *L2WatcherClient) FetchContractEvent() { defer func() { log.Info("l2 watcher fetchContractEvent", "w.processedMsgHeight", w.processedMsgHeight) }() + blockHeight, err := utils.GetLatestConfirmedBlockNumber(w.ctx, w.Client, w.confirmations) + if err != nil { + log.Error("failed to get block number", "err", err) + return + } + fromBlock := int64(w.processedMsgHeight) + 1 toBlock := int64(blockHeight) @@ -353,7 +310,7 @@ func (w *WatcherClient) FetchContractEvent(blockHeight uint64) { } } -func (w *WatcherClient) parseBridgeEventLogs(logs []geth_types.Log) ([]*types.L2Message, []relayedMessage, error) { +func (w *L2WatcherClient) parseBridgeEventLogs(logs []geth_types.Log) ([]*types.L2Message, []relayedMessage, error) { // Need use contract abi to parse event Log // Can only be tested after we have our contracts set up diff --git a/bridge/l2/watcher_test.go b/bridge/watcher/l2_watcher_test.go similarity index 84% rename from bridge/l2/watcher_test.go rename to bridge/watcher/l2_watcher_test.go index e224e0a84..9f470c7dc 100644 --- a/bridge/l2/watcher_test.go +++ b/bridge/watcher/l2_watcher_test.go @@ -1,4 +1,4 @@ -package l2 +package watcher_test import ( "context" @@ -19,7 +19,9 @@ import ( "scroll-tech/bridge/mock_bridge" "scroll-tech/bridge/sender" + "scroll-tech/bridge/watcher" + cutils "scroll-tech/common/utils" "scroll-tech/database" "scroll-tech/database/migrate" ) @@ -29,12 +31,16 @@ func testCreateNewWatcherAndStop(t *testing.T) { l2db, err := database.NewOrmFactory(cfg.DBConfig) assert.NoError(t, err) assert.NoError(t, migrate.ResetDB(l2db.GetDB().DB)) - defer l2db.Close() + ctx := context.Background() + subCtx, cancel := context.WithCancel(ctx) + defer func() { + cancel() + l2db.Close() + }() l2cfg := cfg.L2Config - rc := NewL2WatcherClient(context.Background(), l2Cli, l2cfg.Confirmations, l2cfg.L2MessengerAddress, l2cfg.L2MessageQueueAddress, l2cfg.WithdrawTrieRootSlot, l2db) - rc.Start() - defer rc.Stop() + rc := watcher.NewL2WatcherClient(context.Background(), l2Cli, l2cfg.Confirmations, l2cfg.L2MessengerAddress, l2cfg.L2MessageQueueAddress, l2cfg.WithdrawTrieRootSlot, l2db) + loopToFetchEvent(subCtx, rc) l1cfg := cfg.L1Config l1cfg.RelayerConfig.SenderConfig.Confirmations = rpc.LatestBlockNumber @@ -60,12 +66,17 @@ func testMonitorBridgeContract(t *testing.T) { db, err := database.NewOrmFactory(cfg.DBConfig) assert.NoError(t, err) assert.NoError(t, migrate.ResetDB(db.GetDB().DB)) - defer db.Close() + ctx := context.Background() + subCtx, cancel := context.WithCancel(ctx) + + defer func() { + cancel() + db.Close() + }() l2cfg := cfg.L2Config - wc := NewL2WatcherClient(context.Background(), l2Cli, l2cfg.Confirmations, l2cfg.L2MessengerAddress, l2cfg.L2MessageQueueAddress, l2cfg.WithdrawTrieRootSlot, db) - wc.Start() - defer wc.Stop() + wc := watcher.NewL2WatcherClient(context.Background(), l2Cli, l2cfg.Confirmations, l2cfg.L2MessengerAddress, l2cfg.L2MessageQueueAddress, l2cfg.WithdrawTrieRootSlot, db) + loopToFetchEvent(subCtx, wc) previousHeight, err := l2Cli.BlockNumber(context.Background()) assert.NoError(t, err) @@ -79,9 +90,7 @@ func testMonitorBridgeContract(t *testing.T) { assert.NoError(t, err) rc := prepareWatcherClient(l2Cli, db, address) - rc.Start() - defer rc.Stop() - + loopToFetchEvent(subCtx, rc) // Call mock_bridge instance sendMessage to trigger emit events toAddress := common.HexToAddress("0x4592d8f8d7b001e72cb26a73e4fa1806a51ac79d") message := []byte("testbridgecontract") @@ -128,7 +137,13 @@ func testFetchMultipleSentMessageInOneBlock(t *testing.T) { db, err := database.NewOrmFactory(cfg.DBConfig) assert.NoError(t, err) assert.NoError(t, migrate.ResetDB(db.GetDB().DB)) - defer db.Close() + ctx := context.Background() + subCtx, cancel := context.WithCancel(ctx) + + defer func() { + cancel() + db.Close() + }() previousHeight, err := l2Cli.BlockNumber(context.Background()) // shallow the global previousHeight assert.NoError(t, err) @@ -141,8 +156,7 @@ func testFetchMultipleSentMessageInOneBlock(t *testing.T) { assert.NoError(t, err) rc := prepareWatcherClient(l2Cli, db, address) - rc.Start() - defer rc.Stop() + loopToFetchEvent(subCtx, rc) // Call mock_bridge instance sendMessage to trigger emit events multiple times numTransactions := 4 @@ -195,9 +209,9 @@ func testFetchMultipleSentMessageInOneBlock(t *testing.T) { assert.Equal(t, 5, len(msgs)) } -func prepareWatcherClient(l2Cli *ethclient.Client, db database.OrmFactory, contractAddr common.Address) *WatcherClient { +func prepareWatcherClient(l2Cli *ethclient.Client, db database.OrmFactory, contractAddr common.Address) *watcher.L2WatcherClient { confirmations := rpc.LatestBlockNumber - return NewL2WatcherClient(context.Background(), l2Cli, confirmations, contractAddr, contractAddr, common.Hash{}, db) + return watcher.NewL2WatcherClient(context.Background(), l2Cli, confirmations, contractAddr, contractAddr, common.Hash{}, db) } func prepareAuth(t *testing.T, l2Cli *ethclient.Client, privateKey *ecdsa.PrivateKey) *bind.TransactOpts { @@ -209,3 +223,7 @@ func prepareAuth(t *testing.T, l2Cli *ethclient.Client, privateKey *ecdsa.Privat assert.NoError(t, err) return auth } + +func loopToFetchEvent(subCtx context.Context, watcher *watcher.L2WatcherClient) { + go cutils.Loop(subCtx, 2*time.Second, watcher.FetchContractEvent) +} diff --git a/bridge/l2/l2_test.go b/bridge/watcher/watcher_test.go similarity index 89% rename from bridge/l2/l2_test.go rename to bridge/watcher/watcher_test.go index 3d7758580..8f19ba5ab 100644 --- a/bridge/l2/l2_test.go +++ b/bridge/watcher/watcher_test.go @@ -1,4 +1,4 @@ -package l2 +package watcher_test import ( "encoding/json" @@ -97,18 +97,13 @@ func TestFunction(t *testing.T) { if err := setupEnv(t); err != nil { t.Fatal(err) } - + // Run l1 watcher test cases. + t.Run("TestStartWatcher", testStartWatcher) // Run l2 watcher test cases. t.Run("TestCreateNewWatcherAndStop", testCreateNewWatcherAndStop) t.Run("TestMonitorBridgeContract", testMonitorBridgeContract) t.Run("TestFetchMultipleSentMessageInOneBlock", testFetchMultipleSentMessageInOneBlock) - // Run l2 relayer test cases. - t.Run("TestCreateNewRelayer", testCreateNewRelayer) - t.Run("TestL2RelayerProcessSaveEvents", testL2RelayerProcessSaveEvents) - t.Run("TestL2RelayerProcessCommittedBatches", testL2RelayerProcessCommittedBatches) - t.Run("TestL2RelayerSkipBatches", testL2RelayerSkipBatches) - // Run batch proposer test cases. t.Run("TestBatchProposerProposeBatch", testBatchProposerProposeBatch) t.Run("TestBatchProposerGracefulRestart", testBatchProposerGracefulRestart) diff --git a/build/dockerfiles/event_watcher.Dockerfile b/build/dockerfiles/event_watcher.Dockerfile new file mode 100644 index 000000000..cd17f7399 --- /dev/null +++ b/build/dockerfiles/event_watcher.Dockerfile @@ -0,0 +1,26 @@ +# Download Go dependencies +FROM scrolltech/go-alpine-builder:1.18 as base + +WORKDIR /src +COPY go.work* ./ +COPY ./bridge/go.* ./bridge/ +COPY ./common/go.* ./common/ +COPY ./coordinator/go.* ./coordinator/ +COPY ./database/go.* ./database/ +COPY ./roller/go.* ./roller/ +COPY ./tests/integration-test/go.* ./tests/integration-test/ +RUN go mod download -x + +# Build event_watcher +FROM base as builder + +RUN --mount=target=. \ + --mount=type=cache,target=/root/.cache/go-build \ + cd /src/bridge/cmd/event_watcher/ && go build -v -p 4 -o /bin/event_watcher + +# Pull event_watcher into a second stage deploy alpine container +FROM alpine:latest + +COPY --from=builder /bin/event_watcher /bin/ + +ENTRYPOINT ["event_watcher"] \ No newline at end of file diff --git a/build/dockerfiles/bridge.Dockerfile.dockerignore b/build/dockerfiles/event_watcher.Dockerfile.dockerignore similarity index 77% rename from build/dockerfiles/bridge.Dockerfile.dockerignore rename to build/dockerfiles/event_watcher.Dockerfile.dockerignore index 5348dd31a..8734d3f9b 100644 --- a/build/dockerfiles/bridge.Dockerfile.dockerignore +++ b/build/dockerfiles/event_watcher.Dockerfile.dockerignore @@ -2,4 +2,4 @@ assets/ docs/ l2geth/ rpc-gateway/ -*target/* +*target/* \ No newline at end of file diff --git a/build/dockerfiles/bridge.Dockerfile b/build/dockerfiles/gas_oracle.Dockerfile similarity index 67% rename from build/dockerfiles/bridge.Dockerfile rename to build/dockerfiles/gas_oracle.Dockerfile index 0a044aeb3..86bf762db 100644 --- a/build/dockerfiles/bridge.Dockerfile +++ b/build/dockerfiles/gas_oracle.Dockerfile @@ -11,16 +11,16 @@ COPY ./roller/go.* ./roller/ COPY ./tests/integration-test/go.* ./tests/integration-test/ RUN go mod download -x -# Build bridge +# Build gas_oracle FROM base as builder RUN --mount=target=. \ --mount=type=cache,target=/root/.cache/go-build \ - cd /src/bridge/cmd && go build -v -p 4 -o /bin/bridge + cd /src/bridge/cmd/gas_oracle/ && go build -v -p 4 -o /bin/gas_oracle -# Pull bridge into a second stage deploy alpine container +# Pull gas_oracle into a second stage deploy alpine container FROM alpine:latest -COPY --from=builder /bin/bridge /bin/ +COPY --from=builder /bin/gas_oracle /bin/ -ENTRYPOINT ["bridge"] +ENTRYPOINT ["gas_oracle"] \ No newline at end of file diff --git a/build/dockerfiles/gas_oracle.Dockerfile.dockerignore b/build/dockerfiles/gas_oracle.Dockerfile.dockerignore new file mode 100644 index 000000000..8734d3f9b --- /dev/null +++ b/build/dockerfiles/gas_oracle.Dockerfile.dockerignore @@ -0,0 +1,5 @@ +assets/ +docs/ +l2geth/ +rpc-gateway/ +*target/* \ No newline at end of file diff --git a/build/dockerfiles/msg_relayer.Dockerfile b/build/dockerfiles/msg_relayer.Dockerfile new file mode 100644 index 000000000..4cc2f5f07 --- /dev/null +++ b/build/dockerfiles/msg_relayer.Dockerfile @@ -0,0 +1,26 @@ +# Download Go dependencies +FROM scrolltech/go-alpine-builder:1.18 as base + +WORKDIR /src +COPY go.work* ./ +COPY ./bridge/go.* ./bridge/ +COPY ./common/go.* ./common/ +COPY ./coordinator/go.* ./coordinator/ +COPY ./database/go.* ./database/ +COPY ./roller/go.* ./roller/ +COPY ./tests/integration-test/go.* ./tests/integration-test/ +RUN go mod download -x + +# Build msg_relayer +FROM base as builder + +RUN --mount=target=. \ + --mount=type=cache,target=/root/.cache/go-build \ + cd /src/bridge/cmd/msg_relayer/ && go build -v -p 4 -o /bin/msg_relayer + +# Pull msg_relayer into a second stage deploy alpine container +FROM alpine:latest + +COPY --from=builder /bin/msg_relayer /bin/ + +ENTRYPOINT ["msg_relayer"] \ No newline at end of file diff --git a/build/dockerfiles/msg_relayer.Dockerfile.dockerignore b/build/dockerfiles/msg_relayer.Dockerfile.dockerignore new file mode 100644 index 000000000..8734d3f9b --- /dev/null +++ b/build/dockerfiles/msg_relayer.Dockerfile.dockerignore @@ -0,0 +1,5 @@ +assets/ +docs/ +l2geth/ +rpc-gateway/ +*target/* \ No newline at end of file diff --git a/build/dockerfiles/rollup_relayer.Dockerfile b/build/dockerfiles/rollup_relayer.Dockerfile new file mode 100644 index 000000000..3a3f3c674 --- /dev/null +++ b/build/dockerfiles/rollup_relayer.Dockerfile @@ -0,0 +1,26 @@ +# Download Go dependencies +FROM scrolltech/go-alpine-builder:1.18 as base + +WORKDIR /src +COPY go.work* ./ +COPY ./bridge/go.* ./bridge/ +COPY ./common/go.* ./common/ +COPY ./coordinator/go.* ./coordinator/ +COPY ./database/go.* ./database/ +COPY ./roller/go.* ./roller/ +COPY ./tests/integration-test/go.* ./tests/integration-test/ +RUN go mod download -x + +# Build rollup_relayer +FROM base as builder + +RUN --mount=target=. \ + --mount=type=cache,target=/root/.cache/go-build \ + cd /src/bridge/cmd/rollup_relayer/ && go build -v -p 4 -o /bin/rollup_relayer + +# Pull rollup_relayer into a second stage deploy alpine container +FROM alpine:latest + +COPY --from=builder /bin/rollup_relayer /bin/ + +ENTRYPOINT ["rollup_relayer"] \ No newline at end of file diff --git a/build/dockerfiles/rollup_relayer.Dockerfile.dockerignore b/build/dockerfiles/rollup_relayer.Dockerfile.dockerignore new file mode 100644 index 000000000..8734d3f9b --- /dev/null +++ b/build/dockerfiles/rollup_relayer.Dockerfile.dockerignore @@ -0,0 +1,5 @@ +assets/ +docs/ +l2geth/ +rpc-gateway/ +*target/* \ No newline at end of file diff --git a/common/version/version.go b/common/version/version.go index 9967c8040..4cd01f10e 100644 --- a/common/version/version.go +++ b/common/version/version.go @@ -5,7 +5,7 @@ import ( "runtime/debug" ) -var tag = "v3.0.2" +var tag = "v3.0.3" var commit = func() string { if info, ok := debug.ReadBuildInfo(); ok { diff --git a/tests/integration-test/common.go b/tests/integration-test/common.go index c2fc8fb8b..3663eaba6 100644 --- a/tests/integration-test/common.go +++ b/tests/integration-test/common.go @@ -22,7 +22,10 @@ import ( _ "scroll-tech/roller/cmd/app" rollerConfig "scroll-tech/roller/config" - _ "scroll-tech/bridge/cmd/app" + _ "scroll-tech/bridge/cmd/event_watcher/app" + _ "scroll-tech/bridge/cmd/gas_oracle/app" + _ "scroll-tech/bridge/cmd/msg_relayer/app" + _ "scroll-tech/bridge/cmd/rollup_relayer/app" bridgeConfig "scroll-tech/bridge/config" "scroll-tech/bridge/sender" @@ -81,9 +84,24 @@ type appAPI interface { ExpectWithTimeout(t *testing.T, parallel bool, timeout time.Duration, keyword string) } -func runBridgeApp(t *testing.T, args ...string) appAPI { +func runMsgRelayerApp(t *testing.T, args ...string) appAPI { args = append(args, "--log.debug", "--config", bridgeFile) - return cmd.NewCmd("bridge-test", args...) + return cmd.NewCmd("message-relayer-test", args...) +} + +func runGasOracleApp(t *testing.T, args ...string) appAPI { + args = append(args, "--log.debug", "--config", bridgeFile) + return cmd.NewCmd("gas-oracle-test", args...) +} + +func runRollupRelayerApp(t *testing.T, args ...string) appAPI { + args = append(args, "--log.debug", "--config", bridgeFile) + return cmd.NewCmd("rollup-relayer-test", args...) +} + +func runEventWatcherApp(t *testing.T, args ...string) appAPI { + args = append(args, "--log.debug", "--config", bridgeFile) + return cmd.NewCmd("event-watcher-test", args...) } func runCoordinatorApp(t *testing.T, args ...string) appAPI { diff --git a/tests/integration-test/integration_test.go b/tests/integration-test/integration_test.go index e710edcc4..5ace458b2 100644 --- a/tests/integration-test/integration_test.go +++ b/tests/integration-test/integration_test.go @@ -41,8 +41,17 @@ func testStartProcess(t *testing.T) { runDBCliApp(t, "migrate", "current version:") // Start bridge process. - bridgeCmd := runBridgeApp(t) - bridgeCmd.RunApp(func() bool { return bridgeCmd.WaitResult(t, time.Second*20, "Start bridge successfully") }) + ewCmd := runEventWatcherApp(t) + ewCmd.RunApp(func() bool { return ewCmd.WaitResult(t, time.Second*20, "Start event-watcher successfully") }) + + goCmd := runGasOracleApp(t) + goCmd.RunApp(func() bool { return goCmd.WaitResult(t, time.Second*20, "Start gas-oracle successfully") }) + + mrCmd := runMsgRelayerApp(t) + mrCmd.RunApp(func() bool { return mrCmd.WaitResult(t, time.Second*20, "Start message-relayer successfully") }) + + rrCmd := runRollupRelayerApp(t) + rrCmd.RunApp(func() bool { return rrCmd.WaitResult(t, time.Second*20, "Start rollup-relayer successfully") }) // Start coordinator process. coordinatorCmd := runCoordinatorApp(t, "--ws", "--ws.port", "8391") @@ -53,8 +62,11 @@ func testStartProcess(t *testing.T) { rollerCmd.ExpectWithTimeout(t, true, time.Second*60, "register to coordinator successfully!") rollerCmd.RunApp(func() bool { return rollerCmd.WaitResult(t, time.Second*40, "roller start successfully") }) + ewCmd.WaitExit() + goCmd.WaitExit() + mrCmd.WaitExit() + rrCmd.WaitExit() rollerCmd.WaitExit() - bridgeCmd.WaitExit() coordinatorCmd.WaitExit() } @@ -63,16 +75,30 @@ func testMonitorMetrics(t *testing.T) { runDBCliApp(t, "reset", "successful to reset") runDBCliApp(t, "migrate", "current version:") - // Start bridge process with metrics server. port1, _ := rand.Int(rand.Reader, big.NewInt(2000)) svrPort1 := strconv.FormatInt(port1.Int64()+50000, 10) - bridgeCmd := runBridgeApp(t, "--metrics", "--metrics.addr", "localhost", "--metrics.port", svrPort1) - bridgeCmd.RunApp(func() bool { return bridgeCmd.WaitResult(t, time.Second*20, "Start bridge successfully") }) + ewCmd := runEventWatcherApp(t, "--metrics", "--metrics.addr", "localhost", "--metrics.port", svrPort1) + ewCmd.RunApp(func() bool { return ewCmd.WaitResult(t, time.Second*20, "Start event-watcher successfully") }) + + port2, _ := rand.Int(rand.Reader, big.NewInt(2000)) + svrPort2 := strconv.FormatInt(port2.Int64()+50000, 10) + goCmd := runGasOracleApp(t, "--metrics", "--metrics.addr", "localhost", "--metrics.port", svrPort2) + goCmd.RunApp(func() bool { return goCmd.WaitResult(t, time.Second*20, "Start gas-oracle successfully") }) + + port3, _ := rand.Int(rand.Reader, big.NewInt(2000)) + svrPort3 := strconv.FormatInt(port3.Int64()+50000, 10) + mrCmd := runMsgRelayerApp(t, "--metrics", "--metrics.addr", "localhost", "--metrics.port", svrPort3) + mrCmd.RunApp(func() bool { return mrCmd.WaitResult(t, time.Second*20, "Start message-relayer successfully") }) + + port4, _ := rand.Int(rand.Reader, big.NewInt(2000)) + svrPort4 := strconv.FormatInt(port4.Int64()+50000, 10) + rrCmd := runRollupRelayerApp(t, "--metrics", "--metrics.addr", "localhost", "--metrics.port", svrPort4) + rrCmd.RunApp(func() bool { return rrCmd.WaitResult(t, time.Second*20, "Start rollup-relayer successfully") }) // Start coordinator process with metrics server. - port, _ := rand.Int(rand.Reader, big.NewInt(2000)) - svrPort2 := strconv.FormatInt(port.Int64()+52000, 10) - coordinatorCmd := runCoordinatorApp(t, "--metrics", "--metrics.addr", "localhost", "--metrics.port", svrPort2) + port5, _ := rand.Int(rand.Reader, big.NewInt(2000)) + svrPort5 := strconv.FormatInt(port5.Int64()+52000, 10) + coordinatorCmd := runCoordinatorApp(t, "--metrics", "--metrics.addr", "localhost", "--metrics.port", svrPort5) coordinatorCmd.RunApp(func() bool { return coordinatorCmd.WaitResult(t, time.Second*20, "Start coordinator successfully") }) // Get bridge monitor metrics. @@ -87,7 +113,7 @@ func testMonitorMetrics(t *testing.T) { assert.Equal(t, true, strings.Contains(bodyStr, "bridge_l2_msgs_sync_height")) // Get coordinator monitor metrics. - resp, err = http.Get("http://localhost:" + svrPort2) + resp, err = http.Get("http://localhost:" + svrPort5) assert.NoError(t, err) defer resp.Body.Close() body, err = ioutil.ReadAll(resp.Body) @@ -98,6 +124,9 @@ func testMonitorMetrics(t *testing.T) { assert.Equal(t, true, strings.Contains(bodyStr, "coordinator_rollers_disconnects_total")) // Exit. - bridgeCmd.WaitExit() + ewCmd.WaitExit() + goCmd.WaitExit() + mrCmd.WaitExit() + rrCmd.WaitExit() coordinatorCmd.WaitExit() }