Compare commits

...

12 Commits

Author SHA1 Message Date
colin
bad77eac2f feat(coordinator): prover monitoring (#392)
Co-authored-by: HAOYUatHZ <37070449+HAOYUatHZ@users.noreply.github.com>
2023-04-07 09:06:58 +08:00
Péter Garamvölgyi
5d761ad812 Make sure attempts can be deserialized from db on startup (#410) 2023-04-05 19:00:54 +02:00
Nazarii Denha
4042bea6db retry proving timeout batch (#313)
Co-authored-by: Péter Garamvölgyi <peter@scroll.io>
Co-authored-by: colin <102356659+colinlyguo@users.noreply.github.com>
2023-04-05 16:42:06 +02:00
maskpp
de7c38a903 feat(test): let integration-test log verbosity be configurable (#409) 2023-04-04 16:20:12 +08:00
Péter Garamvölgyi
41e2d960d8 Fix already executed revert message (#408) 2023-04-03 21:26:30 +08:00
HAOYUatHZ
170bc08207 build(docker): auto docker push when pushing git tags (#406) 2023-04-03 16:52:51 +08:00
maskpp
d3fc4e1606 feat(pending limit): Let sender's pending limit be configurable. (#398)
Co-authored-by: Péter Garamvölgyi <peter@scroll.io>
Co-authored-by: ChuhanJin <60994121+ChuhanJin@users.noreply.github.com>
Co-authored-by: vincent <419436363@qq.com>
Co-authored-by: colinlyguo <651734127@qq.com>
2023-04-03 14:24:47 +08:00
HAOYUatHZ
77749477db build(docker): only build docker images when push github tags (#404) 2023-04-01 11:54:56 +08:00
HAOYUatHZ
1a5df6f4d7 fix(build): move docker build from jenkins to github to avoid unknown errors (#403) 2023-03-31 15:55:55 +08:00
maskpp
826280253a fix(test): fix bug in testBatchProposerProposeBatch (#399)
Co-authored-by: colinlyguo <651734127@qq.com>
2023-03-31 13:58:46 +08:00
ChuhanJin
d376c903af feat(bridge): separate bridge into subcomponents (#397)
Co-authored-by: vincent <419436363@qq.com>
Co-authored-by: colinlyguo <651734127@qq.com>
2023-03-31 11:04:24 +08:00
Max Wolff
179c6ee978 add failed relay status to db (#384)
Co-authored-by: Péter Garamvölgyi <peter@scroll.io>
2023-03-27 11:52:07 +02:00
69 changed files with 1614 additions and 921 deletions

View File

@@ -66,3 +66,11 @@ jobs:
if [ -n "$(git status --porcelain)" ]; then
exit 1
fi
# docker-build:
# runs-on: ubuntu-latest
# steps:
# - name: Checkout code
# uses: actions/checkout@v2
# - name: Set up Docker Buildx
# uses: docker/setup-buildx-action@v2
# - run: make docker

View File

@@ -62,3 +62,18 @@ jobs:
if [ -n "$(git status --porcelain)" ]; then
exit 1
fi
# docker-build:
# runs-on: ubuntu-latest
# steps:
# - name: Checkout code
# uses: actions/checkout@v2
# - name: Set up Docker Buildx
# uses: docker/setup-buildx-action@v2
# - name: Build and push
# uses: docker/build-push-action@v2
# with:
# context: .
# file: ./build/dockerfiles/coordinator.Dockerfile
# push: false
# # cache-from: type=gha,scope=${{ github.workflow }}
# # cache-to: type=gha,scope=${{ github.workflow }}

65
.github/workflows/docker.yaml vendored Normal file
View File

@@ -0,0 +1,65 @@
name: Docker
on:
push:
tags:
- v**
jobs:
build-and-push:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Login to Docker Hub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Build and push coordinator docker
uses: docker/build-push-action@v2
with:
context: .
file: ./build/dockerfiles/coordinator.Dockerfile
push: true
tags: scrolltech/coordinator:${{github.ref_name}}
# cache-from: type=gha,scope=${{ github.workflow }}
# cache-to: type=gha,scope=${{ github.workflow }}
- name: Build and push event_watcher docker
uses: docker/build-push-action@v2
with:
context: .
file: ./build/dockerfiles/event_watcher.Dockerfile
push: true
tags: scrolltech/event-watcher:${{github.ref_name}}
# cache-from: type=gha,scope=${{ github.workflow }}
# cache-to: type=gha,scope=${{ github.workflow }}
- name: Build and push gas_oracle docker
uses: docker/build-push-action@v2
with:
context: .
file: ./build/dockerfiles/gas_oracle.Dockerfile
push: true
tags: scrolltech/gas-oracle:${{github.ref_name}}
# cache-from: type=gha,scope=${{ github.workflow }}
# cache-to: type=gha,scope=${{ github.workflow }}
- name: Build and push msg_relayer docker
uses: docker/build-push-action@v2
with:
context: .
file: ./build/dockerfiles/msg_relayer.Dockerfile
push: true
tags: scrolltech/msg-relayer:${{github.ref_name}}
# cache-from: type=gha,scope=${{ github.workflow }}
# cache-to: type=gha,scope=${{ github.workflow }}
- name: Build and push rollup_relayer docker
uses: docker/build-push-action@v2
with:
context: .
file: ./build/dockerfiles/rollup_relayer.Dockerfile
push: true
tags: scrolltech/rollup-relayer:${{github.ref_name}}
# cache-from: type=gha,scope=${{ github.workflow }}
# cache-to: type=gha,scope=${{ github.workflow }}

12
Jenkinsfile vendored
View File

@@ -28,7 +28,7 @@ pipeline {
}
stage('Check Bridge Compilation') {
steps {
sh 'make -C bridge bridge'
sh 'make -C bridge bridge_bins'
}
}
stage('Check Coordinator Compilation') {
@@ -42,16 +42,6 @@ pipeline {
sh 'make -C database db_cli'
}
}
stage('Check Bridge Docker Build') {
steps {
sh 'make -C bridge docker'
}
}
stage('Check Coordinator Docker Build') {
steps {
sh 'make -C coordinator docker'
}
}
stage('Check Database Docker Build') {
steps {
sh 'make -C database docker'

View File

@@ -8,8 +8,23 @@ mock_abi:
go run github.com/scroll-tech/go-ethereum/cmd/abigen --sol mock_bridge/MockBridgeL1.sol --pkg mock_bridge --out mock_bridge/MockBridgeL1.go
go run github.com/scroll-tech/go-ethereum/cmd/abigen --sol mock_bridge/MockBridgeL2.sol --pkg mock_bridge --out mock_bridge/MockBridgeL2.go
bridge: ## Builds the Bridge instance.
go build -o $(PWD)/build/bin/bridge ./cmd
bridge_bins: ## Builds the Bridge bins.
go build -o $(PWD)/build/bin/event_watcher ./cmd/event_watcher/
go build -o $(PWD)/build/bin/gas_oracle ./cmd/gas_oracle/
go build -o $(PWD)/build/bin/message_relayer ./cmd/msg_relayer/
go build -o $(PWD)/build/bin/rollup_relayer ./cmd/rollup_relayer/
event_watcher: ## Builds the event_watcher bin
go build -o $(PWD)/build/bin/event_watcher ./cmd/event_watcher/
gas_oracle: ## Builds the gas_oracle bin
go build -o $(PWD)/build/bin/gas_oracle ./cmd/gas_oracle/
message_relayer: ## Builds the message_relayer bin
go build -o $(PWD)/build/bin/message_relayer ./cmd/msg_relayer/
rollup_relayer: ## Builds the rollup_relayer bin
go build -o $(PWD)/build/bin/rollup_relayer ./cmd/rollup_relayer/
test:
go test -v -race -coverprofile=coverage.txt -covermode=atomic -p 1 $(PWD)/...
@@ -20,8 +35,14 @@ lint: ## Lint the files - used for CI
clean: ## Empty out the bin folder
@rm -rf build/bin
docker:
DOCKER_BUILDKIT=1 docker build -t scrolltech/${IMAGE_NAME}:${IMAGE_VERSION} ${REPO_ROOT_DIR}/ -f ${REPO_ROOT_DIR}/build/dockerfiles/bridge.Dockerfile
docker_push:
docker push scrolltech/${IMAGE_NAME}:${IMAGE_VERSION}
docker docker push scrolltech/gas-oracle:${IMAGE_VERSION}
docker docker push scrolltech/event-watcher:${IMAGE_VERSION}
docker docker push scrolltech/rollup-relayer:${IMAGE_VERSION}
docker docker push scrolltech/msg-relayer:${IMAGE_VERSION}
docker:
DOCKER_BUILDKIT=1 docker build -t scrolltech/gas-oracle:${IMAGE_VERSION} ${REPO_ROOT_DIR}/ -f ${REPO_ROOT_DIR}/build/dockerfiles/gas_oracle.Dockerfile
DOCKER_BUILDKIT=1 docker build -t scrolltech/event-watcher:${IMAGE_VERSION} ${REPO_ROOT_DIR}/ -f ${REPO_ROOT_DIR}/build/dockerfiles/event_watcher.Dockerfile
DOCKER_BUILDKIT=1 docker build -t scrolltech/rollup-relayer:${IMAGE_VERSION} ${REPO_ROOT_DIR}/ -f ${REPO_ROOT_DIR}/build/dockerfiles/rollup_relayer.Dockerfile
DOCKER_BUILDKIT=1 docker build -t scrolltech/msg-relayer:${IMAGE_VERSION} ${REPO_ROOT_DIR}/ -f ${REPO_ROOT_DIR}/build/dockerfiles/msg_relayer.Dockerfile

View File

@@ -1,130 +0,0 @@
package app
import (
"context"
"fmt"
"os"
"os/signal"
"github.com/scroll-tech/go-ethereum/log"
"github.com/urfave/cli/v2"
"scroll-tech/database"
"scroll-tech/common/metrics"
"scroll-tech/common/utils"
"scroll-tech/common/version"
"scroll-tech/bridge/config"
"scroll-tech/bridge/l1"
"scroll-tech/bridge/l2"
)
var (
app *cli.App
)
func init() {
// Set up Bridge app info.
app = cli.NewApp()
app.Action = action
app.Name = "bridge"
app.Usage = "The Scroll Bridge"
app.Version = version.Version
app.Flags = append(app.Flags, utils.CommonFlags...)
app.Flags = append(app.Flags, apiFlags...)
app.Before = func(ctx *cli.Context) error {
return utils.LogSetup(ctx)
}
// Register `bridge-test` app for integration-test.
utils.RegisterSimulation(app, "bridge-test")
}
func action(ctx *cli.Context) error {
// Load config file.
cfgFile := ctx.String(utils.ConfigFileFlag.Name)
cfg, err := config.NewConfig(cfgFile)
if err != nil {
log.Crit("failed to load config file", "config file", cfgFile, "error", err)
}
// Start metrics server.
metrics.Serve(context.Background(), ctx)
// Init db connection.
var ormFactory database.OrmFactory
if ormFactory, err = database.NewOrmFactory(cfg.DBConfig); err != nil {
log.Crit("failed to init db connection", "err", err)
}
var (
l1Backend *l1.Backend
l2Backend *l2.Backend
)
// @todo change nil to actual client after https://scroll-tech/bridge/pull/40 merged
l1Backend, err = l1.New(ctx.Context, cfg.L1Config, ormFactory)
if err != nil {
return err
}
l2Backend, err = l2.New(ctx.Context, cfg.L2Config, ormFactory)
if err != nil {
return err
}
defer func() {
l1Backend.Stop()
l2Backend.Stop()
err = ormFactory.Close()
if err != nil {
log.Error("can not close ormFactory", "error", err)
}
}()
// Start all modules.
if err = l1Backend.Start(); err != nil {
log.Crit("couldn't start l1 backend", "error", err)
}
if err = l2Backend.Start(); err != nil {
log.Crit("couldn't start l2 backend", "error", err)
}
// Register api and start rpc service.
if ctx.Bool(httpEnabledFlag.Name) {
handler, addr, err := utils.StartHTTPEndpoint(
fmt.Sprintf(
"%s:%d",
ctx.String(httpListenAddrFlag.Name),
ctx.Int(httpPortFlag.Name)),
l2Backend.APIs())
if err != nil {
log.Crit("Could not start RPC api", "error", err)
}
defer func() {
_ = handler.Shutdown(ctx.Context)
log.Info("HTTP endpoint closed", "url", fmt.Sprintf("http://%v/", addr))
}()
log.Info("HTTP endpoint opened", "url", fmt.Sprintf("http://%v/", addr))
}
log.Info("Start bridge successfully")
// Catch CTRL-C to ensure a graceful shutdown.
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
// Wait until the interrupt signal is received from an OS signal.
<-interrupt
return nil
}
// Run run bridge cmd instance.
func Run() {
// Run the bridge.
if err := app.Run(os.Args); err != nil {
_, _ = fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}

View File

@@ -1,19 +0,0 @@
package app
import (
"fmt"
"testing"
"time"
"scroll-tech/common/cmd"
"scroll-tech/common/version"
)
func TestRunBridge(t *testing.T) {
bridge := cmd.NewCmd("bridge-test", "--version")
defer bridge.WaitExit()
// wait result
bridge.ExpectWithTimeout(t, true, time.Second*3, fmt.Sprintf("bridge version %s", version.Version))
bridge.RunApp(nil)
}

View File

@@ -1,31 +0,0 @@
package app
import (
"github.com/urfave/cli/v2"
)
var (
apiFlags = []cli.Flag{
&httpEnabledFlag,
&httpListenAddrFlag,
&httpPortFlag,
}
// httpEnabledFlag enable rpc server.
httpEnabledFlag = cli.BoolFlag{
Name: "http",
Usage: "Enable the HTTP-RPC server",
Value: false,
}
// httpListenAddrFlag set the http address.
httpListenAddrFlag = cli.StringFlag{
Name: "http.addr",
Usage: "HTTP-RPC server listening interface",
Value: "localhost",
}
// httpPortFlag set http.port.
httpPortFlag = cli.IntFlag{
Name: "http.port",
Usage: "HTTP-RPC server listening port",
Value: 8290,
}
)

View File

@@ -0,0 +1,114 @@
package app
import (
"context"
"fmt"
"os"
"os/signal"
"time"
"github.com/scroll-tech/go-ethereum/ethclient"
"github.com/scroll-tech/go-ethereum/log"
"github.com/urfave/cli/v2"
"scroll-tech/database"
"scroll-tech/common/metrics"
"scroll-tech/common/version"
"scroll-tech/bridge/config"
"scroll-tech/bridge/watcher"
cutils "scroll-tech/common/utils"
)
var (
app *cli.App
)
func init() {
// Set up event-watcher app info.
app = cli.NewApp()
app.Action = action
app.Name = "event-watcher"
app.Usage = "The Scroll Event Watcher"
app.Version = version.Version
app.Flags = append(app.Flags, cutils.CommonFlags...)
app.Commands = []*cli.Command{}
app.Before = func(ctx *cli.Context) error {
return cutils.LogSetup(ctx)
}
// Register `event-watcher-test` app for integration-test.
cutils.RegisterSimulation(app, "event-watcher-test")
}
func action(ctx *cli.Context) error {
// Load config file.
cfgFile := ctx.String(cutils.ConfigFileFlag.Name)
cfg, err := config.NewConfig(cfgFile)
if err != nil {
log.Crit("failed to load config file", "config file", cfgFile, "error", err)
}
subCtx, cancel := context.WithCancel(ctx.Context)
// Init db connection
var ormFactory database.OrmFactory
if ormFactory, err = database.NewOrmFactory(cfg.DBConfig); err != nil {
log.Crit("failed to init db connection", "err", err)
}
defer func() {
cancel()
err = ormFactory.Close()
if err != nil {
log.Error("can not close ormFactory", "error", err)
}
}()
// Start metrics server.
metrics.Serve(subCtx, ctx)
l1client, err := ethclient.Dial(cfg.L1Config.Endpoint)
if err != nil {
log.Error("failed to connect l1 geth", "config file", cfgFile, "error", err)
return err
}
l2client, err := ethclient.Dial(cfg.L2Config.Endpoint)
if err != nil {
log.Error("failed to connect l2 geth", "config file", cfgFile, "error", err)
return err
}
l1watcher := watcher.NewL1WatcherClient(ctx.Context, l1client, cfg.L1Config.StartHeight, cfg.L1Config.Confirmations, cfg.L1Config.L1MessengerAddress, cfg.L1Config.L1MessageQueueAddress, cfg.L1Config.ScrollChainContractAddress, ormFactory)
l2watcher := watcher.NewL2WatcherClient(ctx.Context, l2client, cfg.L2Config.Confirmations, cfg.L2Config.L2MessengerAddress, cfg.L2Config.L2MessageQueueAddress, cfg.L2Config.WithdrawTrieRootSlot, ormFactory)
go cutils.Loop(subCtx, 10*time.Second, func() {
if loopErr := l1watcher.FetchContractEvent(); loopErr != nil {
log.Error("Failed to fetch bridge contract", "err", loopErr)
}
})
// Start l2 watcher process
go cutils.Loop(subCtx, 2*time.Second, l2watcher.FetchContractEvent)
// Finish start all l2 functions
log.Info("Start event-watcher successfully")
// Catch CTRL-C to ensure a graceful shutdown.
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
// Wait until the interrupt signal is received from an OS signal.
<-interrupt
return nil
}
// Run event watcher cmd instance.
func Run() {
if err := app.Run(os.Args); err != nil {
_, _ = fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}

View File

@@ -0,0 +1,7 @@
package main
import "scroll-tech/bridge/cmd/event_watcher/app"
func main() {
app.Run()
}

View File

@@ -0,0 +1,136 @@
package app
import (
"context"
"fmt"
"os"
"os/signal"
"time"
"github.com/scroll-tech/go-ethereum/ethclient"
"github.com/scroll-tech/go-ethereum/log"
"github.com/urfave/cli/v2"
"scroll-tech/database"
"scroll-tech/common/metrics"
"scroll-tech/common/version"
"scroll-tech/bridge/config"
"scroll-tech/bridge/relayer"
"scroll-tech/bridge/utils"
"scroll-tech/bridge/watcher"
cutils "scroll-tech/common/utils"
)
var (
app *cli.App
)
func init() {
// Set up gas-oracle app info.
app = cli.NewApp()
app.Action = action
app.Name = "gas-oracle"
app.Usage = "The Scroll Gas Oracle"
app.Description = "Scroll Gas Oracle."
app.Version = version.Version
app.Flags = append(app.Flags, cutils.CommonFlags...)
app.Commands = []*cli.Command{}
app.Before = func(ctx *cli.Context) error {
return cutils.LogSetup(ctx)
}
// Register `gas-oracle-test` app for integration-test.
cutils.RegisterSimulation(app, "gas-oracle-test")
}
func action(ctx *cli.Context) error {
// Load config file.
cfgFile := ctx.String(cutils.ConfigFileFlag.Name)
cfg, err := config.NewConfig(cfgFile)
if err != nil {
log.Crit("failed to load config file", "config file", cfgFile, "error", err)
}
subCtx, cancel := context.WithCancel(ctx.Context)
// Init db connection
var ormFactory database.OrmFactory
if ormFactory, err = database.NewOrmFactory(cfg.DBConfig); err != nil {
log.Crit("failed to init db connection", "err", err)
}
defer func() {
cancel()
err = ormFactory.Close()
if err != nil {
log.Error("can not close ormFactory", "error", err)
}
}()
// Start metrics server.
metrics.Serve(subCtx, ctx)
l1client, err := ethclient.Dial(cfg.L1Config.Endpoint)
if err != nil {
log.Error("failed to connect l1 geth", "config file", cfgFile, "error", err)
return err
}
// Init l2geth connection
l2client, err := ethclient.Dial(cfg.L2Config.Endpoint)
if err != nil {
log.Error("failed to connect l2 geth", "config file", cfgFile, "error", err)
return err
}
l1watcher := watcher.NewL1WatcherClient(ctx.Context, l1client, cfg.L1Config.StartHeight, cfg.L1Config.Confirmations, cfg.L1Config.L1MessengerAddress, cfg.L1Config.L1MessageQueueAddress, cfg.L1Config.ScrollChainContractAddress, ormFactory)
l1relayer, err := relayer.NewLayer1Relayer(ctx.Context, ormFactory, cfg.L1Config.RelayerConfig)
if err != nil {
log.Error("failed to create new l1 relayer", "config file", cfgFile, "error", err)
return err
}
l2relayer, err := relayer.NewLayer2Relayer(ctx.Context, l2client, ormFactory, cfg.L2Config.RelayerConfig)
if err != nil {
log.Error("failed to create new l2 relayer", "config file", cfgFile, "error", err)
return err
}
// Start l1 watcher process
go cutils.LoopWithContext(subCtx, 10*time.Second, func(ctx context.Context) {
number, loopErr := utils.GetLatestConfirmedBlockNumber(ctx, l1client, cfg.L1Config.Confirmations)
if loopErr != nil {
log.Error("failed to get block number", "err", loopErr)
return
}
if loopErr = l1watcher.FetchBlockHeader(number); loopErr != nil {
log.Error("Failed to fetch L1 block header", "lastest", number, "err", loopErr)
}
})
// Start l1relayer process
go cutils.Loop(subCtx, 10*time.Second, l1relayer.ProcessGasPriceOracle)
go cutils.Loop(subCtx, 2*time.Second, l2relayer.ProcessGasPriceOracle)
// Finish start all message relayer functions
log.Info("Start gas-oracle successfully")
// Catch CTRL-C to ensure a graceful shutdown.
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
// Wait until the interrupt signal is received from an OS signal.
<-interrupt
return nil
}
// Run message_relayer cmd instance.
func Run() {
if err := app.Run(os.Args); err != nil {
_, _ = fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}

View File

@@ -0,0 +1,7 @@
package main
import "scroll-tech/bridge/cmd/gas_oracle/app"
func main() {
app.Run()
}

View File

@@ -1,7 +0,0 @@
package main
import "scroll-tech/bridge/cmd/app"
func main() {
app.Run()
}

View File

@@ -0,0 +1,118 @@
package app
import (
"context"
"fmt"
"os"
"os/signal"
"time"
"github.com/scroll-tech/go-ethereum/ethclient"
"github.com/scroll-tech/go-ethereum/log"
"github.com/urfave/cli/v2"
"scroll-tech/database"
"scroll-tech/common/metrics"
"scroll-tech/common/version"
"scroll-tech/bridge/config"
"scroll-tech/bridge/relayer"
cutils "scroll-tech/common/utils"
)
var (
app *cli.App
)
func init() {
// Set up message-relayer app info.
app = cli.NewApp()
app.Action = action
app.Name = "message-relayer"
app.Usage = "The Scroll Message Relayer"
app.Description = "Message Relayer contains two main service: 1) relay l1 message to l2. 2) relay l2 message to l1."
app.Version = version.Version
app.Flags = append(app.Flags, cutils.CommonFlags...)
app.Commands = []*cli.Command{}
app.Before = func(ctx *cli.Context) error {
return cutils.LogSetup(ctx)
}
// Register `message-relayer-test` app for integration-test.
cutils.RegisterSimulation(app, "message-relayer-test")
}
func action(ctx *cli.Context) error {
// Load config file.
cfgFile := ctx.String(cutils.ConfigFileFlag.Name)
cfg, err := config.NewConfig(cfgFile)
if err != nil {
log.Crit("failed to load config file", "config file", cfgFile, "error", err)
}
subCtx, cancel := context.WithCancel(ctx.Context)
// Init db connection
var ormFactory database.OrmFactory
if ormFactory, err = database.NewOrmFactory(cfg.DBConfig); err != nil {
log.Crit("failed to init db connection", "err", err)
}
defer func() {
cancel()
err = ormFactory.Close()
if err != nil {
log.Error("can not close ormFactory", "error", err)
}
}()
// Start metrics server.
metrics.Serve(subCtx, ctx)
// Init l2geth connection
l2client, err := ethclient.Dial(cfg.L2Config.Endpoint)
if err != nil {
log.Error("failed to connect l2 geth", "config file", cfgFile, "error", err)
return err
}
l1relayer, err := relayer.NewLayer1Relayer(ctx.Context, ormFactory, cfg.L1Config.RelayerConfig)
if err != nil {
log.Error("failed to create new l1 relayer", "config file", cfgFile, "error", err)
return err
}
l2relayer, err := relayer.NewLayer2Relayer(ctx.Context, l2client, ormFactory, cfg.L2Config.RelayerConfig)
if err != nil {
log.Error("failed to create new l2 relayer", "config file", cfgFile, "error", err)
return err
}
// Start l1relayer process
go cutils.Loop(subCtx, 10*time.Second, l1relayer.ProcessSavedEvents)
// Start l2relayer process
go cutils.Loop(subCtx, 2*time.Second, l2relayer.ProcessSavedEvents)
// Finish start all message relayer functions
log.Info("Start message-relayer successfully")
// Catch CTRL-C to ensure a graceful shutdown.
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
// Wait until the interrupt signal is received from an OS signal.
<-interrupt
return nil
}
// Run message_relayer cmd instance.
func Run() {
if err := app.Run(os.Args); err != nil {
_, _ = fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}

View File

@@ -0,0 +1,7 @@
package main
import "scroll-tech/bridge/cmd/msg_relayer/app"
func main() {
app.Run()
}

View File

@@ -0,0 +1,133 @@
package app
import (
"context"
"fmt"
"os"
"os/signal"
"time"
"github.com/scroll-tech/go-ethereum/ethclient"
"github.com/scroll-tech/go-ethereum/log"
"github.com/urfave/cli/v2"
"scroll-tech/database"
"scroll-tech/common/metrics"
"scroll-tech/common/version"
"scroll-tech/bridge/config"
"scroll-tech/bridge/relayer"
"scroll-tech/bridge/utils"
"scroll-tech/bridge/watcher"
cutils "scroll-tech/common/utils"
)
var (
app *cli.App
)
func init() {
// Set up rollup-relayer app info.
app = cli.NewApp()
app.Action = action
app.Name = "rollup-relayer"
app.Usage = "The Scroll Rollup Relayer"
app.Version = version.Version
app.Flags = append(app.Flags, cutils.CommonFlags...)
app.Commands = []*cli.Command{}
app.Before = func(ctx *cli.Context) error {
return cutils.LogSetup(ctx)
}
// Register `rollup-relayer-test` app for integration-test.
cutils.RegisterSimulation(app, "rollup-relayer-test")
}
func action(ctx *cli.Context) error {
// Load config file.
cfgFile := ctx.String(cutils.ConfigFileFlag.Name)
cfg, err := config.NewConfig(cfgFile)
if err != nil {
log.Crit("failed to load config file", "config file", cfgFile, "error", err)
}
subCtx, cancel := context.WithCancel(ctx.Context)
// init db connection
var ormFactory database.OrmFactory
if ormFactory, err = database.NewOrmFactory(cfg.DBConfig); err != nil {
log.Crit("failed to init db connection", "err", err)
}
defer func() {
cancel()
err = ormFactory.Close()
if err != nil {
log.Error("can not close ormFactory", "error", err)
}
}()
// Start metrics server.
metrics.Serve(subCtx, ctx)
// Init l2geth connection
l2client, err := ethclient.Dial(cfg.L2Config.Endpoint)
if err != nil {
log.Error("failed to connect l2 geth", "config file", cfgFile, "error", err)
return err
}
l2relayer, err := relayer.NewLayer2Relayer(ctx.Context, l2client, ormFactory, cfg.L2Config.RelayerConfig)
if err != nil {
log.Error("failed to create l2 relayer", "config file", cfgFile, "error", err)
return err
}
batchProposer := watcher.NewBatchProposer(subCtx, cfg.L2Config.BatchProposerConfig, l2relayer, ormFactory)
if err != nil {
log.Error("failed to create batchProposer", "config file", cfgFile, "error", err)
return err
}
l2watcher := watcher.NewL2WatcherClient(subCtx, l2client, cfg.L2Config.Confirmations, cfg.L2Config.L2MessengerAddress, cfg.L2Config.L2MessageQueueAddress, cfg.L2Config.WithdrawTrieRootSlot, ormFactory)
// Watcher loop to fetch missing blocks
go cutils.LoopWithContext(subCtx, 2*time.Second, func(ctx context.Context) {
number, loopErr := utils.GetLatestConfirmedBlockNumber(ctx, l2client, cfg.L2Config.Confirmations)
if loopErr != nil {
log.Error("failed to get block number", "err", loopErr)
return
}
l2watcher.TryFetchRunningMissingBlocks(ctx, number)
})
// Batch proposer loop
go cutils.Loop(subCtx, 2*time.Second, func() {
batchProposer.TryProposeBatch()
batchProposer.TryCommitBatches()
})
go cutils.Loop(subCtx, 2*time.Second, l2relayer.ProcessCommittedBatches)
// Finish start all rollup relayer functions.
log.Info("Start rollup-relayer successfully")
// Catch CTRL-C to ensure a graceful shutdown.
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
// Wait until the interrupt signal is received from an OS signal.
<-interrupt
return nil
}
// Run rollup relayer cmd instance.
func Run() {
if err := app.Run(os.Args); err != nil {
_, _ = fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}

View File

@@ -0,0 +1,7 @@
package main
import "scroll-tech/bridge/cmd/rollup_relayer/app"
func main() {
app.Run()
}

View File

@@ -19,7 +19,8 @@
"escalate_multiple_den": 10,
"max_gas_price": 10000000000,
"tx_type": "LegacyTx",
"min_balance": 100000000000000000000
"min_balance": 100000000000000000000,
"pending_limit": 10
},
"gas_oracle_config": {
"min_gas_price": 0,
@@ -53,7 +54,8 @@
"escalate_multiple_den": 10,
"max_gas_price": 10000000000,
"tx_type": "LegacyTx",
"min_balance": 100000000000000000000
"min_balance": 100000000000000000000,
"pending_limit": 10
},
"gas_oracle_config": {
"min_gas_price": 0,

View File

@@ -33,6 +33,8 @@ type SenderConfig struct {
MinBalance *big.Int `json:"min_balance,omitempty"`
// The interval (in seconds) to check balance and top up sender's accounts
CheckBalanceTime uint64 `json:"check_balance_time"`
// The sender's pending count limit.
PendingLimit int `json:"pending_limit,omitempty"`
}
// RelayerConfig loads relayer configuration items.

View File

@@ -4,6 +4,7 @@ go 1.18
require (
github.com/orcaman/concurrent-map v1.0.0
github.com/orcaman/concurrent-map/v2 v2.0.1
github.com/scroll-tech/go-ethereum v1.10.14-0.20230321020420-127af384ed04
github.com/stretchr/testify v1.8.2
github.com/urfave/cli/v2 v2.17.2-0.20221006022127-8f469abc00aa

View File

@@ -66,6 +66,8 @@ github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/orcaman/concurrent-map v1.0.0 h1:I/2A2XPCb4IuQWcQhBhSwGfiuybl/J0ev9HDbW65HOY=
github.com/orcaman/concurrent-map v1.0.0/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI=
github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c=
github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=

View File

@@ -1,55 +0,0 @@
package l1
import (
"context"
"github.com/scroll-tech/go-ethereum/ethclient"
"scroll-tech/database"
"scroll-tech/bridge/config"
)
// Backend manage the resources and services of L1 backend.
// The backend should monitor events in layer 1 and relay transactions to layer 2
type Backend struct {
cfg *config.L1Config
watcher *Watcher
relayer *Layer1Relayer
orm database.OrmFactory
}
// New returns a new instance of Backend.
func New(ctx context.Context, cfg *config.L1Config, orm database.OrmFactory) (*Backend, error) {
client, err := ethclient.Dial(cfg.Endpoint)
if err != nil {
return nil, err
}
relayer, err := NewLayer1Relayer(ctx, orm, cfg.RelayerConfig)
if err != nil {
return nil, err
}
watcher := NewWatcher(ctx, client, cfg.StartHeight, cfg.Confirmations, cfg.L1MessengerAddress, cfg.L1MessageQueueAddress, cfg.ScrollChainContractAddress, orm)
return &Backend{
cfg: cfg,
watcher: watcher,
relayer: relayer,
orm: orm,
}, nil
}
// Start Backend module.
func (l1 *Backend) Start() error {
l1.watcher.Start()
l1.relayer.Start()
return nil
}
// Stop Backend module.
func (l1 *Backend) Stop() {
l1.watcher.Stop()
l1.relayer.Stop()
}

View File

@@ -1,46 +0,0 @@
package l1
import (
"testing"
"github.com/stretchr/testify/assert"
"scroll-tech/common/docker"
"scroll-tech/bridge/config"
)
var (
// config
cfg *config.Config
// docker consider handler.
base *docker.App
)
func TestMain(m *testing.M) {
base = docker.NewDockerApp()
m.Run()
base.Free()
}
func setupEnv(t *testing.T) {
// Load config.
var err error
cfg, err = config.NewConfig("../config.json")
assert.NoError(t, err)
base.RunImages(t)
cfg.L2Config.RelayerConfig.SenderConfig.Endpoint = base.L1GethEndpoint()
cfg.L1Config.RelayerConfig.SenderConfig.Endpoint = base.L2GethEndpoint()
cfg.DBConfig.DSN = base.DBEndpoint()
}
func TestL1(t *testing.T) {
setupEnv(t)
t.Run("testCreateNewL1Relayer", testCreateNewL1Relayer)
t.Run("testStartWatcher", testStartWatcher)
}

View File

@@ -1,76 +0,0 @@
package l2
import (
"context"
"github.com/scroll-tech/go-ethereum/ethclient"
"github.com/scroll-tech/go-ethereum/rpc"
"scroll-tech/database"
"scroll-tech/bridge/config"
)
// Backend manage the resources and services of L2 backend.
// The backend should monitor events in layer 2 and relay transactions to layer 1
type Backend struct {
cfg *config.L2Config
watcher *WatcherClient
relayer *Layer2Relayer
batchProposer *BatchProposer
orm database.OrmFactory
}
// New returns a new instance of Backend.
func New(ctx context.Context, cfg *config.L2Config, orm database.OrmFactory) (*Backend, error) {
client, err := ethclient.Dial(cfg.Endpoint)
if err != nil {
return nil, err
}
// Note: initialize watcher before relayer to keep DB consistent.
// Otherwise, there will be a race condition between watcher.initializeGenesis and relayer.ProcessPendingBatches.
watcher := NewL2WatcherClient(ctx, client, cfg.Confirmations, cfg.L2MessengerAddress, cfg.L2MessageQueueAddress, cfg.WithdrawTrieRootSlot, orm)
relayer, err := NewLayer2Relayer(ctx, client, orm, cfg.RelayerConfig)
if err != nil {
return nil, err
}
batchProposer := NewBatchProposer(ctx, cfg.BatchProposerConfig, relayer, orm)
return &Backend{
cfg: cfg,
watcher: watcher,
relayer: relayer,
batchProposer: batchProposer,
orm: orm,
}, nil
}
// Start Backend module.
func (l2 *Backend) Start() error {
l2.watcher.Start()
l2.relayer.Start()
l2.batchProposer.Start()
return nil
}
// Stop Backend module.
func (l2 *Backend) Stop() {
l2.batchProposer.Stop()
l2.relayer.Stop()
l2.watcher.Stop()
}
// APIs collect API modules.
func (l2 *Backend) APIs() []rpc.API {
return []rpc.API{
{
Namespace: "l2",
Version: "1.0",
Service: WatcherAPI(l2.watcher),
Public: true,
},
}
}

View File

@@ -1,5 +0,0 @@
package l2
// WatcherAPI watcher api service
type WatcherAPI interface {
}

View File

@@ -1,10 +1,9 @@
package l1
package relayer
import (
"context"
"errors"
"math/big"
"time"
// not sure if this will make problems when relay with l1geth
@@ -15,7 +14,6 @@ import (
geth_metrics "github.com/scroll-tech/go-ethereum/metrics"
"scroll-tech/common/types"
"scroll-tech/common/utils"
"scroll-tech/database"
@@ -31,14 +29,6 @@ var (
bridgeL1MsgsRelayedConfirmedTotalCounter = geth_metrics.NewRegisteredCounter("bridge/l1/msgs/relayed/confirmed/total", metrics.ScrollRegistry)
)
const (
gasPriceDiffPrecision = 1000000
defaultGasPriceDiff = 50000 // 5%
defaultMessageRelayMinGasLimit = 130000 // should be enough for both ERC20 and ETH relay
)
// Layer1Relayer is responsible for
// 1. fetch pending L1Message from db
// 2. relay pending message to layer 2 node
@@ -53,11 +43,9 @@ type Layer1Relayer struct {
// channel used to communicate with transaction sender
messageSender *sender.Sender
messageCh <-chan *sender.Confirmation
l2MessengerABI *abi.ABI
gasOracleSender *sender.Sender
gasOracleCh <-chan *sender.Confirmation
l1GasOracleABI *abi.ABI
minGasLimitForMessageRelay uint64
@@ -65,8 +53,6 @@ type Layer1Relayer struct {
lastGasPrice uint64
minGasPrice uint64
gasPriceDiff uint64
stopCh chan struct{}
}
// NewLayer1Relayer will return a new instance of Layer1RelayerClient
@@ -96,21 +82,19 @@ func NewLayer1Relayer(ctx context.Context, db database.OrmFactory, cfg *config.R
gasPriceDiff = defaultGasPriceDiff
}
minGasLimitForMessageRelay := uint64(defaultMessageRelayMinGasLimit)
minGasLimitForMessageRelay := uint64(defaultL1MessageRelayMinGasLimit)
if cfg.MessageRelayMinGasLimit != 0 {
minGasLimitForMessageRelay = cfg.MessageRelayMinGasLimit
}
return &Layer1Relayer{
l1Relayer := &Layer1Relayer{
ctx: ctx,
db: db,
messageSender: messageSender,
messageCh: messageSender.ConfirmChan(),
l2MessengerABI: bridge_abi.L2ScrollMessengerABI,
gasOracleSender: gasOracleSender,
gasOracleCh: gasOracleSender.ConfirmChan(),
l1GasOracleABI: bridge_abi.L1GasPriceOracleABI,
minGasLimitForMessageRelay: minGasLimitForMessageRelay,
@@ -118,9 +102,11 @@ func NewLayer1Relayer(ctx context.Context, db database.OrmFactory, cfg *config.R
minGasPrice: minGasPrice,
gasPriceDiff: gasPriceDiff,
cfg: cfg,
stopCh: make(chan struct{}),
}, nil
cfg: cfg,
}
go l1Relayer.handleConfirmLoop(ctx)
return l1Relayer, nil
}
// ProcessSavedEvents relays saved un-processed cross-domain transactions to desired blockchain
@@ -138,7 +124,7 @@ func (r *Layer1Relayer) ProcessSavedEvents() {
for _, msg := range msgs {
if err = r.processSavedEvent(msg); err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
if !errors.Is(err, sender.ErrNoAvailableAccount) && !errors.Is(err, sender.ErrFullPending) {
log.Error("failed to process event", "msg.msgHash", msg.MsgHash, "err", err)
}
return
@@ -153,7 +139,7 @@ func (r *Layer1Relayer) processSavedEvent(msg *types.L1Message) error {
if err != nil && err.Error() == "execution reverted: Message expired" {
return r.db.UpdateLayer1Status(r.ctx, msg.MsgHash, types.MsgExpired)
}
if err != nil && err.Error() == "execution reverted: Message successfully executed" {
if err != nil && err.Error() == "execution reverted: Message was already successfully executed" {
return r.db.UpdateLayer1Status(r.ctx, msg.MsgHash, types.MsgConfirmed)
}
if err != nil {
@@ -203,7 +189,7 @@ func (r *Layer1Relayer) ProcessGasPriceOracle() {
hash, err := r.gasOracleSender.SendTransaction(block.Hash, &r.cfg.GasPriceOracleContractAddress, big.NewInt(0), data, 0)
if err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
if !errors.Is(err, sender.ErrNoAvailableAccount) && !errors.Is(err, sender.ErrFullPending) {
log.Error("Failed to send setL1BaseFee tx to layer2 ", "block.Hash", block.Hash, "block.Height", block.Number, "err", err)
}
return
@@ -220,57 +206,43 @@ func (r *Layer1Relayer) ProcessGasPriceOracle() {
}
}
// Start the relayer process
func (r *Layer1Relayer) Start() {
go func() {
ctx, cancel := context.WithCancel(r.ctx)
go utils.Loop(ctx, 2*time.Second, r.ProcessSavedEvents)
go utils.Loop(ctx, 2*time.Second, r.ProcessGasPriceOracle)
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case cfm := <-r.messageCh:
bridgeL1MsgsRelayedConfirmedTotalCounter.Inc(1)
if !cfm.IsSuccessful {
log.Warn("transaction confirmed but failed in layer2", "confirmation", cfm)
} else {
// @todo handle db error
err := r.db.UpdateLayer1StatusAndLayer2Hash(r.ctx, cfm.ID, types.MsgConfirmed, cfm.TxHash.String())
if err != nil {
log.Warn("UpdateLayer1StatusAndLayer2Hash failed", "err", err)
}
log.Info("transaction confirmed in layer2", "confirmation", cfm)
}
case cfm := <-r.gasOracleCh:
if !cfm.IsSuccessful {
// @discuss: maybe make it pending again?
err := r.db.UpdateL1GasOracleStatusAndOracleTxHash(r.ctx, cfm.ID, types.GasOracleFailed, cfm.TxHash.String())
if err != nil {
log.Warn("UpdateL1GasOracleStatusAndOracleTxHash failed", "err", err)
}
log.Warn("transaction confirmed but failed in layer2", "confirmation", cfm)
} else {
// @todo handle db error
err := r.db.UpdateL1GasOracleStatusAndOracleTxHash(r.ctx, cfm.ID, types.GasOracleImported, cfm.TxHash.String())
if err != nil {
log.Warn("UpdateGasOracleStatusAndOracleTxHash failed", "err", err)
}
log.Info("transaction confirmed in layer2", "confirmation", cfm)
}
func (r *Layer1Relayer) handleConfirmLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case cfm := <-r.messageSender.ConfirmChan():
bridgeL1MsgsRelayedConfirmedTotalCounter.Inc(1)
if !cfm.IsSuccessful {
err := r.db.UpdateLayer1StatusAndLayer2Hash(r.ctx, cfm.ID, types.MsgRelayFailed, cfm.TxHash.String())
if err != nil {
log.Warn("UpdateLayer1StatusAndLayer2Hash failed", "err", err)
}
log.Warn("transaction confirmed but failed in layer2", "confirmation", cfm)
} else {
// @todo handle db error
err := r.db.UpdateLayer1StatusAndLayer2Hash(r.ctx, cfm.ID, types.MsgConfirmed, cfm.TxHash.String())
if err != nil {
log.Warn("UpdateLayer1StatusAndLayer2Hash failed", "err", err)
}
log.Info("transaction confirmed in layer2", "confirmation", cfm)
}
}(ctx)
<-r.stopCh
cancel()
}()
}
// Stop the relayer module, for a graceful shutdown.
func (r *Layer1Relayer) Stop() {
close(r.stopCh)
case cfm := <-r.gasOracleSender.ConfirmChan():
if !cfm.IsSuccessful {
// @discuss: maybe make it pending again?
err := r.db.UpdateL1GasOracleStatusAndOracleTxHash(r.ctx, cfm.ID, types.GasOracleFailed, cfm.TxHash.String())
if err != nil {
log.Warn("UpdateL1GasOracleStatusAndOracleTxHash failed", "err", err)
}
log.Warn("transaction confirmed but failed in layer2", "confirmation", cfm)
} else {
// @todo handle db error
err := r.db.UpdateL1GasOracleStatusAndOracleTxHash(r.ctx, cfm.ID, types.GasOracleImported, cfm.TxHash.String())
if err != nil {
log.Warn("UpdateGasOracleStatusAndOracleTxHash failed", "err", err)
}
log.Info("transaction confirmed in layer2", "confirmation", cfm)
}
}
}
}

View File

@@ -1,4 +1,4 @@
package l1
package relayer_test
import (
"context"
@@ -8,6 +8,8 @@ import (
"scroll-tech/database/migrate"
"scroll-tech/bridge/relayer"
"scroll-tech/database"
)
@@ -19,9 +21,7 @@ func testCreateNewL1Relayer(t *testing.T) {
assert.NoError(t, migrate.ResetDB(db.GetDB().DB))
defer db.Close()
relayer, err := NewLayer1Relayer(context.Background(), db, cfg.L2Config.RelayerConfig)
relayer, err := relayer.NewLayer1Relayer(context.Background(), db, cfg.L2Config.RelayerConfig)
assert.NoError(t, err)
defer relayer.Stop()
relayer.Start()
assert.NotNil(t, relayer)
}

View File

@@ -1,4 +1,4 @@
package l2
package relayer
import (
"context"
@@ -7,7 +7,6 @@ import (
"math/big"
"runtime"
"sync"
"time"
"github.com/scroll-tech/go-ethereum/accounts/abi"
"github.com/scroll-tech/go-ethereum/common"
@@ -22,8 +21,6 @@ import (
"scroll-tech/common/types"
"scroll-tech/database"
cutil "scroll-tech/common/utils"
bridge_abi "scroll-tech/bridge/abi"
"scroll-tech/bridge/config"
"scroll-tech/bridge/sender"
@@ -40,14 +37,6 @@ var (
bridgeL2BatchesSkippedTotalCounter = geth_metrics.NewRegisteredCounter("bridge/l2/batches/skipped/total", metrics.ScrollRegistry)
)
const (
gasPriceDiffPrecision = 1000000
defaultGasPriceDiff = 50000 // 5%
defaultMessageRelayMinGasLimit = 200000 // should be enough for both ERC20 and ETH relay
)
// Layer2Relayer is responsible for
// 1. Committing and finalizing L2 blocks on L1
// 2. Relaying messages from L2 to L1
@@ -63,15 +52,12 @@ type Layer2Relayer struct {
cfg *config.RelayerConfig
messageSender *sender.Sender
messageCh <-chan *sender.Confirmation
l1MessengerABI *abi.ABI
rollupSender *sender.Sender
rollupCh <-chan *sender.Confirmation
l1RollupABI *abi.ABI
gasOracleSender *sender.Sender
gasOracleCh <-chan *sender.Confirmation
l2GasOracleABI *abi.ABI
minGasLimitForMessageRelay uint64
@@ -91,8 +77,6 @@ type Layer2Relayer struct {
// A list of processing batch finalization.
// key(string): confirmation ID, value(string): batch hash.
processingFinalization sync.Map
stopCh chan struct{}
}
// NewLayer2Relayer will return a new instance of Layer2RelayerClient
@@ -126,27 +110,24 @@ func NewLayer2Relayer(ctx context.Context, l2Client *ethclient.Client, db databa
gasPriceDiff = defaultGasPriceDiff
}
minGasLimitForMessageRelay := uint64(defaultMessageRelayMinGasLimit)
minGasLimitForMessageRelay := uint64(defaultL2MessageRelayMinGasLimit)
if cfg.MessageRelayMinGasLimit != 0 {
minGasLimitForMessageRelay = cfg.MessageRelayMinGasLimit
}
return &Layer2Relayer{
layer2Relayer := &Layer2Relayer{
ctx: ctx,
db: db,
l2Client: l2Client,
messageSender: messageSender,
messageCh: messageSender.ConfirmChan(),
l1MessengerABI: bridge_abi.L1ScrollMessengerABI,
rollupSender: rollupSender,
rollupCh: rollupSender.ConfirmChan(),
l1RollupABI: bridge_abi.ScrollChainABI,
gasOracleSender: gasOracleSender,
gasOracleCh: gasOracleSender.ConfirmChan(),
l2GasOracleABI: bridge_abi.L2GasPriceOracleABI,
minGasLimitForMessageRelay: minGasLimitForMessageRelay,
@@ -158,8 +139,9 @@ func NewLayer2Relayer(ctx context.Context, l2Client *ethclient.Client, db databa
processingMessage: sync.Map{},
processingBatchesCommitment: sync.Map{},
processingFinalization: sync.Map{},
stopCh: make(chan struct{}),
}, nil
}
go layer2Relayer.handleConfirmLoop(ctx)
return layer2Relayer, nil
}
const processMsgLimit = 100
@@ -198,7 +180,7 @@ func (r *Layer2Relayer) ProcessSavedEvents() {
})
}
if err := g.Wait(); err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
if !errors.Is(err, sender.ErrNoAvailableAccount) && !errors.Is(err, sender.ErrFullPending) {
log.Error("failed to process l2 saved event", "err", err)
}
return
@@ -247,11 +229,11 @@ func (r *Layer2Relayer) processSavedEvent(msg *types.L2Message) error {
if err != nil && err.Error() == "execution reverted: Message expired" {
return r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, types.MsgExpired)
}
if err != nil && err.Error() == "execution reverted: Message successfully executed" {
if err != nil && err.Error() == "execution reverted: Message was already successfully executed" {
return r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, types.MsgConfirmed)
}
if err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
if !errors.Is(err, sender.ErrNoAvailableAccount) && !errors.Is(err, sender.ErrFullPending) {
log.Error("Failed to send relayMessageWithProof tx to layer1 ", "msg.height", msg.Height, "msg.MsgHash", msg.MsgHash, "err", err)
}
return err
@@ -297,7 +279,7 @@ func (r *Layer2Relayer) ProcessGasPriceOracle() {
hash, err := r.gasOracleSender.SendTransaction(batch.Hash, &r.cfg.GasPriceOracleContractAddress, big.NewInt(0), data, 0)
if err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
if !errors.Is(err, sender.ErrNoAvailableAccount) && !errors.Is(err, sender.ErrFullPending) {
log.Error("Failed to send setL2BaseFee tx to layer2 ", "batch.Hash", batch.Hash, "err", err)
}
return
@@ -343,7 +325,7 @@ func (r *Layer2Relayer) SendCommitTx(batchData []*types.BatchData) error {
txID := crypto.Keccak256Hash(bytes).String()
txHash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), calldata, 0)
if err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
if !errors.Is(err, sender.ErrNoAvailableAccount) && !errors.Is(err, sender.ErrFullPending) {
log.Error("Failed to send commitBatches tx to layer1 ", "err", err)
}
return err
@@ -493,7 +475,7 @@ func (r *Layer2Relayer) ProcessCommittedBatches() {
txHash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), data, 0)
finalizeTxHash := &txHash
if err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
if !errors.Is(err, sender.ErrNoAvailableAccount) && !errors.Is(err, sender.ErrFullPending) {
log.Error("finalizeBatchWithProof in layer1 failed", "hash", hash, "err", err)
}
return
@@ -516,65 +498,20 @@ func (r *Layer2Relayer) ProcessCommittedBatches() {
}
}
// Start the relayer process
func (r *Layer2Relayer) Start() {
go func() {
ctx, cancel := context.WithCancel(r.ctx)
go cutil.Loop(ctx, time.Second, r.ProcessSavedEvents)
go cutil.Loop(ctx, time.Second, r.ProcessCommittedBatches)
go cutil.Loop(ctx, time.Second, r.ProcessGasPriceOracle)
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case confirmation := <-r.messageCh:
r.handleConfirmation(confirmation)
case confirmation := <-r.rollupCh:
r.handleConfirmation(confirmation)
case cfm := <-r.gasOracleCh:
if !cfm.IsSuccessful {
// @discuss: maybe make it pending again?
err := r.db.UpdateL2GasOracleStatusAndOracleTxHash(r.ctx, cfm.ID, types.GasOracleFailed, cfm.TxHash.String())
if err != nil {
log.Warn("UpdateL2GasOracleStatusAndOracleTxHash failed", "err", err)
}
log.Warn("transaction confirmed but failed in layer1", "confirmation", cfm)
} else {
// @todo handle db error
err := r.db.UpdateL2GasOracleStatusAndOracleTxHash(r.ctx, cfm.ID, types.GasOracleImported, cfm.TxHash.String())
if err != nil {
log.Warn("UpdateL2GasOracleStatusAndOracleTxHash failed", "err", err)
}
log.Info("transaction confirmed in layer1", "confirmation", cfm)
}
}
}
}(ctx)
<-r.stopCh
cancel()
}()
}
// Stop the relayer module, for a graceful shutdown.
func (r *Layer2Relayer) Stop() {
close(r.stopCh)
}
func (r *Layer2Relayer) handleConfirmation(confirmation *sender.Confirmation) {
if !confirmation.IsSuccessful {
log.Warn("transaction confirmed but failed in layer1", "confirmation", confirmation)
return
}
transactionType := "Unknown"
// check whether it is message relay transaction
if msgHash, ok := r.processingMessage.Load(confirmation.ID); ok {
transactionType = "MessageRelay"
var status types.MsgStatus
if confirmation.IsSuccessful {
status = types.MsgConfirmed
} else {
status = types.MsgRelayFailed
log.Warn("transaction confirmed but failed in layer1", "confirmation", confirmation)
}
// @todo handle db error
err := r.db.UpdateLayer2StatusAndLayer1Hash(r.ctx, msgHash.(string), types.MsgConfirmed, confirmation.TxHash.String())
err := r.db.UpdateLayer2StatusAndLayer1Hash(r.ctx, msgHash.(string), status, confirmation.TxHash.String())
if err != nil {
log.Warn("UpdateLayer2StatusAndLayer1Hash failed", "msgHash", msgHash.(string), "err", err)
}
@@ -586,9 +523,16 @@ func (r *Layer2Relayer) handleConfirmation(confirmation *sender.Confirmation) {
if batchBatches, ok := r.processingBatchesCommitment.Load(confirmation.ID); ok {
transactionType = "BatchesCommitment"
batchHashes := batchBatches.([]string)
var status types.RollupStatus
if confirmation.IsSuccessful {
status = types.RollupCommitted
} else {
status = types.RollupCommitFailed
log.Warn("transaction confirmed but failed in layer1", "confirmation", confirmation)
}
for _, batchHash := range batchHashes {
// @todo handle db error
err := r.db.UpdateCommitTxHashAndRollupStatus(r.ctx, batchHash, confirmation.TxHash.String(), types.RollupCommitted)
err := r.db.UpdateCommitTxHashAndRollupStatus(r.ctx, batchHash, confirmation.TxHash.String(), status)
if err != nil {
log.Warn("UpdateCommitTxHashAndRollupStatus failed", "batch_hash", batchHash, "err", err)
}
@@ -600,8 +544,15 @@ func (r *Layer2Relayer) handleConfirmation(confirmation *sender.Confirmation) {
// check whether it is proof finalization transaction
if batchHash, ok := r.processingFinalization.Load(confirmation.ID); ok {
transactionType = "ProofFinalization"
var status types.RollupStatus
if confirmation.IsSuccessful {
status = types.RollupFinalized
} else {
status = types.RollupFinalizeFailed
log.Warn("transaction confirmed but failed in layer1", "confirmation", confirmation)
}
// @todo handle db error
err := r.db.UpdateFinalizeTxHashAndRollupStatus(r.ctx, batchHash.(string), confirmation.TxHash.String(), types.RollupFinalized)
err := r.db.UpdateFinalizeTxHashAndRollupStatus(r.ctx, batchHash.(string), confirmation.TxHash.String(), status)
if err != nil {
log.Warn("UpdateFinalizeTxHashAndRollupStatus failed", "batch_hash", batchHash.(string), "err", err)
}
@@ -610,3 +561,32 @@ func (r *Layer2Relayer) handleConfirmation(confirmation *sender.Confirmation) {
}
log.Info("transaction confirmed in layer1", "type", transactionType, "confirmation", confirmation)
}
func (r *Layer2Relayer) handleConfirmLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case confirmation := <-r.messageSender.ConfirmChan():
r.handleConfirmation(confirmation)
case confirmation := <-r.rollupSender.ConfirmChan():
r.handleConfirmation(confirmation)
case cfm := <-r.gasOracleSender.ConfirmChan():
if !cfm.IsSuccessful {
// @discuss: maybe make it pending again?
err := r.db.UpdateL2GasOracleStatusAndOracleTxHash(r.ctx, cfm.ID, types.GasOracleFailed, cfm.TxHash.String())
if err != nil {
log.Warn("UpdateL2GasOracleStatusAndOracleTxHash failed", "err", err)
}
log.Warn("transaction confirmed but failed in layer1", "confirmation", cfm)
} else {
// @todo handle db error
err := r.db.UpdateL2GasOracleStatusAndOracleTxHash(r.ctx, cfm.ID, types.GasOracleImported, cfm.TxHash.String())
if err != nil {
log.Warn("UpdateL2GasOracleStatusAndOracleTxHash failed", "err", err)
}
log.Info("transaction confirmed in layer1", "confirmation", cfm)
}
}
}
}

View File

@@ -1,4 +1,4 @@
package l2
package relayer_test
import (
"context"
@@ -14,6 +14,8 @@ import (
"scroll-tech/common/types"
"scroll-tech/bridge/relayer"
"scroll-tech/database"
"scroll-tech/database/migrate"
)
@@ -39,11 +41,9 @@ func testCreateNewRelayer(t *testing.T) {
assert.NoError(t, migrate.ResetDB(db.GetDB().DB))
defer db.Close()
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, cfg.L2Config.RelayerConfig)
relayer, err := relayer.NewLayer2Relayer(context.Background(), l2Cli, db, cfg.L2Config.RelayerConfig)
assert.NoError(t, err)
defer relayer.Stop()
relayer.Start()
assert.NotNil(t, relayer)
}
func testL2RelayerProcessSaveEvents(t *testing.T) {
@@ -54,9 +54,8 @@ func testL2RelayerProcessSaveEvents(t *testing.T) {
defer db.Close()
l2Cfg := cfg.L2Config
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, l2Cfg.RelayerConfig)
relayer, err := relayer.NewLayer2Relayer(context.Background(), l2Cli, db, l2Cfg.RelayerConfig)
assert.NoError(t, err)
defer relayer.Stop()
err = db.SaveL2Messages(context.Background(), templateL2Message)
assert.NoError(t, err)
@@ -79,6 +78,12 @@ func testL2RelayerProcessSaveEvents(t *testing.T) {
}
assert.NoError(t, db.InsertWrappedBlocks(traces))
parentBatch1 := &types.BlockBatch{
Index: 0,
Hash: common.Hash{}.String(),
StateRoot: common.Hash{}.String(),
}
batchData1 := types.NewBatchData(parentBatch1, []*types.WrappedBlock{wrappedBlock1}, nil)
dbTx, err := db.Beginx()
assert.NoError(t, err)
assert.NoError(t, db.NewBatchInDBTx(dbTx, batchData1))
@@ -104,10 +109,15 @@ func testL2RelayerProcessCommittedBatches(t *testing.T) {
defer db.Close()
l2Cfg := cfg.L2Config
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, l2Cfg.RelayerConfig)
relayer, err := relayer.NewLayer2Relayer(context.Background(), l2Cli, db, l2Cfg.RelayerConfig)
assert.NoError(t, err)
defer relayer.Stop()
parentBatch1 := &types.BlockBatch{
Index: 0,
Hash: common.Hash{}.String(),
StateRoot: common.Hash{}.String(),
}
batchData1 := types.NewBatchData(parentBatch1, []*types.WrappedBlock{wrappedBlock1}, nil)
dbTx, err := db.Beginx()
assert.NoError(t, err)
assert.NoError(t, db.NewBatchInDBTx(dbTx, batchData1))
@@ -140,9 +150,8 @@ func testL2RelayerSkipBatches(t *testing.T) {
defer db.Close()
l2Cfg := cfg.L2Config
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, l2Cfg.RelayerConfig)
relayer, err := relayer.NewLayer2Relayer(context.Background(), l2Cli, db, l2Cfg.RelayerConfig)
assert.NoError(t, err)
defer relayer.Stop()
createBatch := func(rollupStatus types.RollupStatus, provingStatus types.ProvingStatus, index uint64) string {
dbTx, err := db.Beginx()

11
bridge/relayer/params.go Normal file
View File

@@ -0,0 +1,11 @@
package relayer
const (
gasPriceDiffPrecision = 1000000
defaultGasPriceDiff = 50000 // 5%
defaultL1MessageRelayMinGasLimit = 130000 // should be enough for both ERC20 and ETH relay
defaultL2MessageRelayMinGasLimit = 200000
)

View File

@@ -1,4 +1,4 @@
package l2
package relayer_test
import (
"encoding/json"
@@ -93,24 +93,16 @@ func TestMain(m *testing.M) {
base.Free()
}
func TestFunction(t *testing.T) {
func TestFunctions(t *testing.T) {
if err := setupEnv(t); err != nil {
t.Fatal(err)
}
// Run l2 watcher test cases.
t.Run("TestCreateNewWatcherAndStop", testCreateNewWatcherAndStop)
t.Run("TestMonitorBridgeContract", testMonitorBridgeContract)
t.Run("TestFetchMultipleSentMessageInOneBlock", testFetchMultipleSentMessageInOneBlock)
// Run l1 relayer test cases.
t.Run("TestCreateNewL1Relayer", testCreateNewL1Relayer)
// Run l2 relayer test cases.
t.Run("TestCreateNewRelayer", testCreateNewRelayer)
t.Run("TestL2RelayerProcessSaveEvents", testL2RelayerProcessSaveEvents)
t.Run("TestL2RelayerProcessCommittedBatches", testL2RelayerProcessCommittedBatches)
t.Run("TestL2RelayerSkipBatches", testL2RelayerSkipBatches)
// Run batch proposer test cases.
t.Run("TestBatchProposerProposeBatch", testBatchProposerProposeBatch)
t.Run("TestBatchProposerGracefulRestart", testBatchProposerGracefulRestart)
}

View File

@@ -6,21 +6,19 @@ import (
"errors"
"fmt"
"math/big"
"reflect"
"strings"
"sync"
"sync/atomic"
"time"
cmapV2 "github.com/orcaman/concurrent-map/v2"
"github.com/scroll-tech/go-ethereum/accounts/abi/bind"
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/ethclient"
"github.com/scroll-tech/go-ethereum/log"
"scroll-tech/bridge/utils"
"scroll-tech/bridge/config"
"scroll-tech/bridge/utils"
)
const (
@@ -37,6 +35,12 @@ const (
var (
// ErrNoAvailableAccount indicates no available account error in the account pool.
ErrNoAvailableAccount = errors.New("sender has no available account to send transaction")
// ErrFullPending sender's pending pool is full.
ErrFullPending = errors.New("sender's pending pool is full")
)
var (
defaultPendingLimit = 10
)
// Confirmation struct used to indicate transaction confirmation details
@@ -74,9 +78,9 @@ type Sender struct {
// account fields.
auths *accountPool
blockNumber uint64 // Current block number on chain.
baseFeePerGas uint64 // Current base fee per gas on chain
pendingTxs sync.Map // Mapping from nonce to pending transaction
blockNumber uint64 // Current block number on chain.
baseFeePerGas uint64 // Current base fee per gas on chain
pendingTxs cmapV2.ConcurrentMap[string, *PendingTransaction] // Mapping from nonce to pending transaction
confirmCh chan *Confirmation
stopCh chan struct{}
@@ -116,6 +120,11 @@ func NewSender(ctx context.Context, config *config.SenderConfig, privs []*ecdsa.
}
}
// initialize pending limit with a default value
if config.PendingLimit == 0 {
config.PendingLimit = defaultPendingLimit
}
sender := &Sender{
ctx: ctx,
config: config,
@@ -125,7 +134,7 @@ func NewSender(ctx context.Context, config *config.SenderConfig, privs []*ecdsa.
confirmCh: make(chan *Confirmation, 128),
blockNumber: header.Number.Uint64(),
baseFeePerGas: baseFeePerGas,
pendingTxs: sync.Map{},
pendingTxs: cmapV2.New[*PendingTransaction](),
stopCh: make(chan struct{}),
}
@@ -134,6 +143,21 @@ func NewSender(ctx context.Context, config *config.SenderConfig, privs []*ecdsa.
return sender, nil
}
// PendingCount returns the current number of pending txs.
func (s *Sender) PendingCount() int {
return s.pendingTxs.Count()
}
// PendingLimit returns the maximum number of pending txs the sender can handle.
func (s *Sender) PendingLimit() int {
return s.config.PendingLimit
}
// IsFull returns true if the sender's pending tx pool is full.
func (s *Sender) IsFull() bool {
return s.pendingTxs.Count() >= s.config.PendingLimit
}
// Stop stop the sender module.
func (s *Sender) Stop() {
close(s.stopCh)
@@ -159,21 +183,24 @@ func (s *Sender) getFeeData(auth *bind.TransactOpts, target *common.Address, val
// SendTransaction send a signed L2tL1 transaction.
func (s *Sender) SendTransaction(ID string, target *common.Address, value *big.Int, data []byte, minGasLimit uint64) (hash common.Hash, err error) {
if s.IsFull() {
return common.Hash{}, ErrFullPending
}
// We occupy the ID, in case some other threads call with the same ID in the same time
if _, loaded := s.pendingTxs.LoadOrStore(ID, nil); loaded {
if ok := s.pendingTxs.SetIfAbsent(ID, nil); !ok {
return common.Hash{}, fmt.Errorf("has the repeat tx ID, ID: %s", ID)
}
// get
auth := s.auths.getAccount()
if auth == nil {
s.pendingTxs.Delete(ID) // release the ID on failure
s.pendingTxs.Remove(ID) // release the ID on failure
return common.Hash{}, ErrNoAvailableAccount
}
defer s.auths.releaseAccount(auth)
defer func() {
if err != nil {
s.pendingTxs.Delete(ID) // release the ID on failure
s.pendingTxs.Remove(ID) // release the ID on failure
}
}()
@@ -194,7 +221,7 @@ func (s *Sender) SendTransaction(ID string, target *common.Address, value *big.I
submitAt: atomic.LoadUint64(&s.blockNumber),
feeData: feeData,
}
s.pendingTxs.Store(ID, pending)
s.pendingTxs.Set(ID, pending)
return tx.Hash(), nil
}
@@ -335,17 +362,17 @@ func (s *Sender) checkPendingTransaction(header *types.Header, confirmed uint64)
}
}
s.pendingTxs.Range(func(key, value interface{}) bool {
for item := range s.pendingTxs.IterBuffered() {
key, pending := item.Key, item.Val
// ignore empty id, since we use empty id to occupy pending task
if value == nil || reflect.ValueOf(value).IsNil() {
return true
if pending == nil {
continue
}
pending := value.(*PendingTransaction)
receipt, err := s.client.TransactionReceipt(s.ctx, pending.tx.Hash())
if (err == nil) && (receipt != nil) {
if receipt.BlockNumber.Uint64() <= confirmed {
s.pendingTxs.Delete(key)
s.pendingTxs.Remove(key)
// send confirm message
s.confirmCh <- &Confirmation{
ID: pending.id,
@@ -376,7 +403,7 @@ func (s *Sender) checkPendingTransaction(header *types.Header, confirmed uint64)
// We need to stop the program and manually handle the situation.
if strings.Contains(err.Error(), "nonce") {
// This key can be deleted
s.pendingTxs.Delete(key)
s.pendingTxs.Remove(key)
// Try get receipt by the latest replaced tx hash
receipt, err := s.client.TransactionReceipt(s.ctx, pending.tx.Hash())
if (err == nil) && (receipt != nil) {
@@ -398,8 +425,7 @@ func (s *Sender) checkPendingTransaction(header *types.Header, confirmed uint64)
pending.submitAt = number
}
}
return true
})
}
}
// Loop is the main event loop

View File

@@ -57,6 +57,8 @@ func TestSender(t *testing.T) {
// Setup
setupEnv(t)
t.Run("test pending limit", func(t *testing.T) { testPendLimit(t) })
t.Run("test min gas limit", func(t *testing.T) { testMinGasLimit(t) })
t.Run("test 1 account sender", func(t *testing.T) { testBatchSender(t, 1) })
@@ -64,6 +66,21 @@ func TestSender(t *testing.T) {
t.Run("test 8 account sender", func(t *testing.T) { testBatchSender(t, 8) })
}
func testPendLimit(t *testing.T) {
senderCfg := cfg.L1Config.RelayerConfig.SenderConfig
senderCfg.Confirmations = rpc.LatestBlockNumber
senderCfg.PendingLimit = 2
newSender, err := sender.NewSender(context.Background(), senderCfg, privateKeys)
assert.NoError(t, err)
defer newSender.Stop()
for i := 0; i < newSender.PendingLimit(); i++ {
_, err = newSender.SendTransaction(strconv.Itoa(i), &common.Address{}, big.NewInt(1), nil, 0)
assert.NoError(t, err)
}
assert.True(t, newSender.PendingCount() <= newSender.PendingLimit())
}
func testMinGasLimit(t *testing.T) {
senderCfg := cfg.L1Config.RelayerConfig.SenderConfig
senderCfg.Confirmations = rpc.LatestBlockNumber
@@ -100,6 +117,7 @@ func testBatchSender(t *testing.T, batchSize int) {
senderCfg := cfg.L1Config.RelayerConfig.SenderConfig
senderCfg.Confirmations = rpc.LatestBlockNumber
senderCfg.PendingLimit = batchSize * TXBatch
newSender, err := sender.NewSender(context.Background(), senderCfg, privateKeys)
if err != nil {
t.Fatal(err)
@@ -119,7 +137,7 @@ func testBatchSender(t *testing.T, batchSize int) {
toAddr := common.HexToAddress("0x4592d8f8d7b001e72cb26a73e4fa1806a51ac79d")
id := strconv.Itoa(i + index*1000)
_, err := newSender.SendTransaction(id, &toAddr, big.NewInt(1), nil, 0)
if errors.Is(err, sender.ErrNoAvailableAccount) {
if errors.Is(err, sender.ErrNoAvailableAccount) || errors.Is(err, sender.ErrFullPending) {
<-time.After(time.Second)
continue
}

View File

@@ -11,8 +11,8 @@ import (
"scroll-tech/common/types"
"scroll-tech/bridge/l1"
"scroll-tech/bridge/l2"
"scroll-tech/bridge/relayer"
"scroll-tech/bridge/watcher"
"scroll-tech/database"
"scroll-tech/database/migrate"
@@ -30,14 +30,13 @@ func testImportL1GasPrice(t *testing.T) {
l1Cfg := cfg.L1Config
// Create L1Relayer
l1Relayer, err := l1.NewLayer1Relayer(context.Background(), db, l1Cfg.RelayerConfig)
l1Relayer, err := relayer.NewLayer1Relayer(context.Background(), db, l1Cfg.RelayerConfig)
assert.NoError(t, err)
defer l1Relayer.Stop()
// Create L1Watcher
startHeight, err := l1Client.BlockNumber(context.Background())
assert.NoError(t, err)
l1Watcher := l1.NewWatcher(context.Background(), l1Client, startHeight-1, 0, l1Cfg.L1MessengerAddress, l1Cfg.L1MessageQueueAddress, l1Cfg.ScrollChainContractAddress, db)
l1Watcher := watcher.NewL1WatcherClient(context.Background(), l1Client, startHeight-1, 0, l1Cfg.L1MessengerAddress, l1Cfg.L1MessageQueueAddress, l1Cfg.ScrollChainContractAddress, db)
// fetch new blocks
number, err := l1Client.BlockNumber(context.Background())
@@ -81,9 +80,8 @@ func testImportL2GasPrice(t *testing.T) {
l2Cfg := cfg.L2Config
// Create L2Relayer
l2Relayer, err := l2.NewLayer2Relayer(context.Background(), l2Client, db, l2Cfg.RelayerConfig)
l2Relayer, err := relayer.NewLayer2Relayer(context.Background(), l2Client, db, l2Cfg.RelayerConfig)
assert.NoError(t, err)
defer l2Relayer.Stop()
// add fake blocks
traces := []*types.WrappedBlock{

View File

@@ -13,8 +13,8 @@ import (
"scroll-tech/common/types"
"scroll-tech/bridge/l1"
"scroll-tech/bridge/l2"
"scroll-tech/bridge/relayer"
"scroll-tech/bridge/watcher"
"scroll-tech/database"
"scroll-tech/database/migrate"
@@ -33,16 +33,14 @@ func testRelayL1MessageSucceed(t *testing.T) {
l2Cfg := cfg.L2Config
// Create L1Relayer
l1Relayer, err := l1.NewLayer1Relayer(context.Background(), db, l1Cfg.RelayerConfig)
l1Relayer, err := relayer.NewLayer1Relayer(context.Background(), db, l1Cfg.RelayerConfig)
assert.NoError(t, err)
defer l1Relayer.Stop()
// Create L1Watcher
confirmations := rpc.LatestBlockNumber
l1Watcher := l1.NewWatcher(context.Background(), l1Client, 0, confirmations, l1Cfg.L1MessengerAddress, l1Cfg.L1MessageQueueAddress, l1Cfg.ScrollChainContractAddress, db)
l1Watcher := watcher.NewL1WatcherClient(context.Background(), l1Client, 0, confirmations, l1Cfg.L1MessengerAddress, l1Cfg.L1MessageQueueAddress, l1Cfg.ScrollChainContractAddress, db)
// Create L2Watcher
l2Watcher := l2.NewL2WatcherClient(context.Background(), l2Client, confirmations, l2Cfg.L2MessengerAddress, l2Cfg.L2MessageQueueAddress, l2Cfg.WithdrawTrieRootSlot, db)
l2Watcher := watcher.NewL2WatcherClient(context.Background(), l2Client, confirmations, l2Cfg.L2MessengerAddress, l2Cfg.L2MessageQueueAddress, l2Cfg.WithdrawTrieRootSlot, db)
// send message through l1 messenger contract
nonce, err := l1MessengerInstance.MessageNonce(&bind.CallOpts{})
@@ -56,7 +54,7 @@ func testRelayL1MessageSucceed(t *testing.T) {
}
// l1 watch process events
l1Watcher.FetchContractEvent(sendReceipt.BlockNumber.Uint64())
l1Watcher.FetchContractEvent()
// check db status
msg, err := db.GetL1MessageByQueueIndex(nonce.Uint64())
@@ -79,7 +77,7 @@ func testRelayL1MessageSucceed(t *testing.T) {
assert.Equal(t, len(relayTxReceipt.Logs), 1)
// fetch message relayed events
l2Watcher.FetchContractEvent(relayTxReceipt.BlockNumber.Uint64())
l2Watcher.FetchContractEvent()
msg, err = db.GetL1MessageByQueueIndex(nonce.Uint64())
assert.NoError(t, err)
assert.Equal(t, msg.Status, types.MsgConfirmed)

View File

@@ -13,8 +13,8 @@ import (
"scroll-tech/common/types"
"scroll-tech/bridge/l1"
"scroll-tech/bridge/l2"
"scroll-tech/bridge/relayer"
"scroll-tech/bridge/watcher"
"scroll-tech/database"
"scroll-tech/database/migrate"
@@ -33,15 +33,15 @@ func testRelayL2MessageSucceed(t *testing.T) {
// Create L2Watcher
confirmations := rpc.LatestBlockNumber
l2Watcher := l2.NewL2WatcherClient(context.Background(), l2Client, confirmations, l2Cfg.L2MessengerAddress, l2Cfg.L2MessageQueueAddress, l2Cfg.WithdrawTrieRootSlot, db)
l2Watcher := watcher.NewL2WatcherClient(context.Background(), l2Client, confirmations, l2Cfg.L2MessengerAddress, l2Cfg.L2MessageQueueAddress, l2Cfg.WithdrawTrieRootSlot, db)
// Create L2Relayer
l2Relayer, err := l2.NewLayer2Relayer(context.Background(), l2Client, db, l2Cfg.RelayerConfig)
l2Relayer, err := relayer.NewLayer2Relayer(context.Background(), l2Client, db, l2Cfg.RelayerConfig)
assert.NoError(t, err)
// Create L1Watcher
l1Cfg := cfg.L1Config
l1Watcher := l1.NewWatcher(context.Background(), l1Client, 0, confirmations, l1Cfg.L1MessengerAddress, l1Cfg.L1MessageQueueAddress, l1Cfg.ScrollChainContractAddress, db)
l1Watcher := watcher.NewL1WatcherClient(context.Background(), l1Client, 0, confirmations, l1Cfg.L1MessengerAddress, l1Cfg.L1MessageQueueAddress, l1Cfg.ScrollChainContractAddress, db)
// send message through l2 messenger contract
nonce, err := l2MessengerInstance.MessageNonce(&bind.CallOpts{})
@@ -55,7 +55,7 @@ func testRelayL2MessageSucceed(t *testing.T) {
}
// l2 watch process events
l2Watcher.FetchContractEvent(sendReceipt.BlockNumber.Uint64())
l2Watcher.FetchContractEvent()
// check db status
msg, err := db.GetL2MessageByNonce(nonce.Uint64())
@@ -123,7 +123,7 @@ func testRelayL2MessageSucceed(t *testing.T) {
assert.Equal(t, len(commitTxReceipt.Logs), 1)
// fetch CommitBatch rollup events
err = l1Watcher.FetchContractEvent(commitTxReceipt.BlockNumber.Uint64())
err = l1Watcher.FetchContractEvent()
assert.NoError(t, err)
status, err = db.GetRollupStatus(batchHash)
assert.NoError(t, err)
@@ -144,7 +144,7 @@ func testRelayL2MessageSucceed(t *testing.T) {
assert.Equal(t, len(finalizeTxReceipt.Logs), 1)
// fetch FinalizeBatch events
err = l1Watcher.FetchContractEvent(finalizeTxReceipt.BlockNumber.Uint64())
err = l1Watcher.FetchContractEvent()
assert.NoError(t, err)
status, err = db.GetRollupStatus(batchHash)
assert.NoError(t, err)
@@ -165,7 +165,7 @@ func testRelayL2MessageSucceed(t *testing.T) {
assert.Equal(t, len(relayTxReceipt.Logs), 1)
// fetch message relayed events
err = l1Watcher.FetchContractEvent(relayTxReceipt.BlockNumber.Uint64())
err = l1Watcher.FetchContractEvent()
assert.NoError(t, err)
msg, err = db.GetL2MessageByNonce(nonce.Uint64())
assert.NoError(t, err)

View File

@@ -12,8 +12,8 @@ import (
"scroll-tech/common/types"
"scroll-tech/bridge/l1"
"scroll-tech/bridge/l2"
"scroll-tech/bridge/relayer"
"scroll-tech/bridge/watcher"
"scroll-tech/database"
"scroll-tech/database/migrate"
@@ -30,13 +30,12 @@ func testCommitBatchAndFinalizeBatch(t *testing.T) {
// Create L2Relayer
l2Cfg := cfg.L2Config
l2Relayer, err := l2.NewLayer2Relayer(context.Background(), l2Client, db, l2Cfg.RelayerConfig)
l2Relayer, err := relayer.NewLayer2Relayer(context.Background(), l2Client, db, l2Cfg.RelayerConfig)
assert.NoError(t, err)
defer l2Relayer.Stop()
// Create L1Watcher
l1Cfg := cfg.L1Config
l1Watcher := l1.NewWatcher(context.Background(), l1Client, 0, l1Cfg.Confirmations, l1Cfg.L1MessengerAddress, l1Cfg.L1MessageQueueAddress, l1Cfg.ScrollChainContractAddress, db)
l1Watcher := watcher.NewL1WatcherClient(context.Background(), l1Client, 0, l1Cfg.Confirmations, l1Cfg.L1MessengerAddress, l1Cfg.L1MessageQueueAddress, l1Cfg.ScrollChainContractAddress, db)
// add some blocks to db
var wrappedBlocks []*types.WrappedBlock
@@ -96,7 +95,7 @@ func testCommitBatchAndFinalizeBatch(t *testing.T) {
assert.Equal(t, len(commitTxReceipt.Logs), 1)
// fetch rollup events
err = l1Watcher.FetchContractEvent(commitTxReceipt.BlockNumber.Uint64())
err = l1Watcher.FetchContractEvent()
assert.NoError(t, err)
status, err = db.GetRollupStatus(batchHash)
assert.NoError(t, err)
@@ -126,7 +125,7 @@ func testCommitBatchAndFinalizeBatch(t *testing.T) {
assert.Equal(t, len(finalizeTxReceipt.Logs), 1)
// fetch rollup events
err = l1Watcher.FetchContractEvent(finalizeTxReceipt.BlockNumber.Uint64())
err = l1Watcher.FetchContractEvent()
assert.NoError(t, err)
status, err = db.GetRollupStatus(batchHash)
assert.NoError(t, err)

View File

@@ -1,4 +1,4 @@
package l2
package utils
import (
"fmt"

View File

@@ -1,10 +1,9 @@
package l2
package watcher
import (
"context"
"fmt"
"math"
"reflect"
"sync"
"time"
@@ -13,22 +12,22 @@ import (
"scroll-tech/common/metrics"
"scroll-tech/common/types"
"scroll-tech/common/utils"
"scroll-tech/database"
bridgeabi "scroll-tech/bridge/abi"
"scroll-tech/bridge/config"
"scroll-tech/bridge/relayer"
)
var (
bridgeL2BatchesGasOverThresholdTotalCounter = geth_metrics.NewRegisteredCounter("bridge/l2/batches/gas/over/threshold/total", metrics.ScrollRegistry)
bridgeL2BatchesTxsOverThresholdTotalCounter = geth_metrics.NewRegisteredCounter("bridge/l2/batches/txs/over/threshold/total", metrics.ScrollRegistry)
bridgeL2BatchesCommitTotalCounter = geth_metrics.NewRegisteredCounter("bridge/l2/batches/commit/total", metrics.ScrollRegistry)
bridgeL2BatchesBlocksCreatedTotalCounter = geth_metrics.NewRegisteredCounter("bridge/l2/batches/blocks/created/total", metrics.ScrollRegistry)
bridgeL2BatchesCommitsSentTotalCounter = geth_metrics.NewRegisteredCounter("bridge/l2/batches/commits/sent/total", metrics.ScrollRegistry)
bridgeL2BatchesCreatedRateMeter = geth_metrics.NewRegisteredMeter("bridge/l2/batches/blocks/created/rate", metrics.ScrollRegistry)
bridgeL2BatchesTxsCreatedRateMeter = geth_metrics.NewRegisteredMeter("bridge/l2/batches/txs/created/rate", metrics.ScrollRegistry)
bridgeL2BatchesGasCreatedRateMeter = geth_metrics.NewRegisteredMeter("bridge/l2/batches/gas/created/rate", metrics.ScrollRegistry)
bridgeL2BatchesTxsCreatedPerBatchGauge = geth_metrics.NewRegisteredGauge("bridge/l2/batches/txs/created/per/batch", metrics.ScrollRegistry)
bridgeL2BatchesGasCreatedPerBatchGauge = geth_metrics.NewRegisteredGauge("bridge/l2/batches/gas/created/per/batch", metrics.ScrollRegistry)
)
// AddBatchInfoToDB inserts the batch information to the BlockBatch table and updates the batch_hash
@@ -83,15 +82,13 @@ type BatchProposer struct {
proofGenerationFreq uint64
batchDataBuffer []*types.BatchData
relayer *Layer2Relayer
relayer *relayer.Layer2Relayer
piCfg *types.PublicInputHashConfig
stopCh chan struct{}
}
// NewBatchProposer will return a new instance of BatchProposer.
func NewBatchProposer(ctx context.Context, cfg *config.BatchProposerConfig, relayer *Layer2Relayer, orm database.OrmFactory) *BatchProposer {
func NewBatchProposer(ctx context.Context, cfg *config.BatchProposerConfig, relayer *relayer.Layer2Relayer, orm database.OrmFactory) *BatchProposer {
p := &BatchProposer{
mutex: sync.Mutex{},
ctx: ctx,
@@ -107,42 +104,17 @@ func NewBatchProposer(ctx context.Context, cfg *config.BatchProposerConfig, rela
proofGenerationFreq: cfg.ProofGenerationFreq,
piCfg: cfg.PublicInputConfig,
relayer: relayer,
stopCh: make(chan struct{}),
}
// for graceful restart.
p.recoverBatchDataBuffer()
// try to commit the leftover pending batches
p.tryCommitBatches()
p.TryCommitBatches()
return p
}
// Start the Listening process
func (p *BatchProposer) Start() {
go func() {
if reflect.ValueOf(p.orm).IsNil() {
panic("must run BatchProposer with DB")
}
ctx, cancel := context.WithCancel(p.ctx)
go utils.Loop(ctx, 2*time.Second, func() {
p.tryProposeBatch()
p.tryCommitBatches()
})
<-p.stopCh
cancel()
}()
}
// Stop the Watcher module, for a graceful shutdown.
func (p *BatchProposer) Stop() {
p.stopCh <- struct{}{}
}
func (p *BatchProposer) recoverBatchDataBuffer() {
// batches are sorted by batch index in increasing order
batchHashes, err := p.orm.GetPendingBatches(math.MaxInt32)
@@ -214,7 +186,8 @@ func (p *BatchProposer) recoverBatchDataBuffer() {
}
}
func (p *BatchProposer) tryProposeBatch() {
// TryProposeBatch will try to propose a batch.
func (p *BatchProposer) TryProposeBatch() {
p.mutex.Lock()
defer p.mutex.Unlock()
@@ -243,7 +216,8 @@ func (p *BatchProposer) tryProposeBatch() {
}
}
func (p *BatchProposer) tryCommitBatches() {
// TryCommitBatches will try to commit the pending batches.
func (p *BatchProposer) TryCommitBatches() {
p.mutex.Lock()
defer p.mutex.Unlock()
@@ -283,7 +257,7 @@ func (p *BatchProposer) tryCommitBatches() {
log.Error("SendCommitTx failed", "error", err)
} else {
// pop the processed batches from the buffer
bridgeL2BatchesCommitTotalCounter.Inc(1)
bridgeL2BatchesCommitsSentTotalCounter.Inc(1)
p.batchDataBuffer = p.batchDataBuffer[index:]
}
}
@@ -299,9 +273,9 @@ func (p *BatchProposer) proposeBatch(blocks []*types.BlockInfo) bool {
if err := p.createBatchForBlocks(blocks[:1]); err != nil {
log.Error("failed to create batch", "number", blocks[0].Number, "err", err)
} else {
bridgeL2BatchesTxsCreatedRateMeter.Mark(int64(blocks[0].TxNum))
bridgeL2BatchesGasCreatedRateMeter.Mark(int64(blocks[0].GasUsed))
bridgeL2BatchesCreatedRateMeter.Mark(1)
bridgeL2BatchesTxsCreatedPerBatchGauge.Update(int64(blocks[0].TxNum))
bridgeL2BatchesGasCreatedPerBatchGauge.Update(int64(blocks[0].GasUsed))
bridgeL2BatchesBlocksCreatedTotalCounter.Inc(1)
}
return true
}
@@ -312,9 +286,9 @@ func (p *BatchProposer) proposeBatch(blocks []*types.BlockInfo) bool {
if err := p.createBatchForBlocks(blocks[:1]); err != nil {
log.Error("failed to create batch", "number", blocks[0].Number, "err", err)
} else {
bridgeL2BatchesTxsCreatedRateMeter.Mark(int64(blocks[0].TxNum))
bridgeL2BatchesGasCreatedRateMeter.Mark(int64(blocks[0].GasUsed))
bridgeL2BatchesCreatedRateMeter.Mark(1)
bridgeL2BatchesTxsCreatedPerBatchGauge.Update(int64(blocks[0].TxNum))
bridgeL2BatchesGasCreatedPerBatchGauge.Update(int64(blocks[0].GasUsed))
bridgeL2BatchesBlocksCreatedTotalCounter.Inc(1)
}
return true
}
@@ -342,9 +316,9 @@ func (p *BatchProposer) proposeBatch(blocks []*types.BlockInfo) bool {
if err := p.createBatchForBlocks(blocks); err != nil {
log.Error("failed to create batch", "from", blocks[0].Number, "to", blocks[len(blocks)-1].Number, "err", err)
} else {
bridgeL2BatchesTxsCreatedRateMeter.Mark(int64(txNum))
bridgeL2BatchesGasCreatedRateMeter.Mark(int64(gasUsed))
bridgeL2BatchesCreatedRateMeter.Mark(int64(len(blocks)))
bridgeL2BatchesTxsCreatedPerBatchGauge.Update(int64(txNum))
bridgeL2BatchesGasCreatedPerBatchGauge.Update(int64(gasUsed))
bridgeL2BatchesBlocksCreatedTotalCounter.Inc(int64(len(blocks)))
}
return true

View File

@@ -1,4 +1,4 @@
package l2
package watcher_test
import (
"context"
@@ -6,12 +6,15 @@ import (
"math"
"testing"
"github.com/scroll-tech/go-ethereum/common"
"github.com/stretchr/testify/assert"
"scroll-tech/database"
"scroll-tech/database/migrate"
"scroll-tech/bridge/config"
"scroll-tech/bridge/relayer"
"scroll-tech/bridge/watcher"
"scroll-tech/common/types"
)
@@ -21,34 +24,49 @@ func testBatchProposerProposeBatch(t *testing.T) {
db, err := database.NewOrmFactory(cfg.DBConfig)
assert.NoError(t, err)
assert.NoError(t, migrate.ResetDB(db.GetDB().DB))
defer db.Close()
ctx := context.Background()
subCtx, cancel := context.WithCancel(ctx)
defer func() {
cancel()
db.Close()
}()
// Insert traces into db.
assert.NoError(t, db.InsertWrappedBlocks([]*types.WrappedBlock{wrappedBlock1}))
l2cfg := cfg.L2Config
wc := NewL2WatcherClient(context.Background(), l2Cli, l2cfg.Confirmations, l2cfg.L2MessengerAddress, l2cfg.L2MessageQueueAddress, l2cfg.WithdrawTrieRootSlot, db)
wc.Start()
defer wc.Stop()
wc := watcher.NewL2WatcherClient(context.Background(), l2Cli, l2cfg.Confirmations, l2cfg.L2MessengerAddress, l2cfg.L2MessageQueueAddress, l2cfg.WithdrawTrieRootSlot, db)
loopToFetchEvent(subCtx, wc)
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, cfg.L2Config.RelayerConfig)
batch, err := db.GetLatestBatch()
assert.NoError(t, err)
proposer := NewBatchProposer(context.Background(), &config.BatchProposerConfig{
// Create a new batch.
batchData := types.NewBatchData(&types.BlockBatch{
Index: 0,
Hash: batch.Hash,
StateRoot: batch.StateRoot,
}, []*types.WrappedBlock{wrappedBlock1}, nil)
relayer, err := relayer.NewLayer2Relayer(context.Background(), l2Cli, db, cfg.L2Config.RelayerConfig)
assert.NoError(t, err)
proposer := watcher.NewBatchProposer(context.Background(), &config.BatchProposerConfig{
ProofGenerationFreq: 1,
BatchGasThreshold: 3000000,
BatchTxNumThreshold: 135,
BatchTimeSec: 1,
BatchBlocksLimit: 100,
}, relayer, db)
proposer.tryProposeBatch()
proposer.TryProposeBatch()
infos, err := db.GetUnbatchedL2Blocks(map[string]interface{}{},
fmt.Sprintf("order by number ASC LIMIT %d", 100))
assert.NoError(t, err)
assert.Equal(t, 0, len(infos))
exist, err := db.BatchRecordExist(batchData1.Hash().Hex())
exist, err := db.BatchRecordExist(batchData.Hash().Hex())
assert.NoError(t, err)
assert.Equal(t, true, exist)
}
@@ -60,13 +78,26 @@ func testBatchProposerGracefulRestart(t *testing.T) {
assert.NoError(t, migrate.ResetDB(db.GetDB().DB))
defer db.Close()
relayer, err := NewLayer2Relayer(context.Background(), l2Cli, db, cfg.L2Config.RelayerConfig)
relayer, err := relayer.NewLayer2Relayer(context.Background(), l2Cli, db, cfg.L2Config.RelayerConfig)
assert.NoError(t, err)
// Insert traces into db.
assert.NoError(t, db.InsertWrappedBlocks([]*types.WrappedBlock{wrappedBlock2}))
// Insert block batch into db.
batchData1 := types.NewBatchData(&types.BlockBatch{
Index: 0,
Hash: common.Hash{}.String(),
StateRoot: common.Hash{}.String(),
}, []*types.WrappedBlock{wrappedBlock1}, nil)
parentBatch2 := &types.BlockBatch{
Index: batchData1.Batch.BatchIndex,
Hash: batchData1.Hash().Hex(),
StateRoot: batchData1.Batch.NewStateRoot.String(),
}
batchData2 := types.NewBatchData(parentBatch2, []*types.WrappedBlock{wrappedBlock2}, nil)
dbTx, err := db.Beginx()
assert.NoError(t, err)
assert.NoError(t, db.NewBatchInDBTx(dbTx, batchData1))
@@ -84,7 +115,7 @@ func testBatchProposerGracefulRestart(t *testing.T) {
assert.Equal(t, 1, len(batchHashes))
assert.Equal(t, batchData2.Hash().Hex(), batchHashes[0])
// test p.recoverBatchDataBuffer().
_ = NewBatchProposer(context.Background(), &config.BatchProposerConfig{
_ = watcher.NewBatchProposer(context.Background(), &config.BatchProposerConfig{
ProofGenerationFreq: 1,
BatchGasThreshold: 3000000,
BatchTxNumThreshold: 135,

11
bridge/watcher/common.go Normal file
View File

@@ -0,0 +1,11 @@
package watcher
import "github.com/scroll-tech/go-ethereum/common"
const contractEventsBlocksFetchLimit = int64(10)
type relayedMessage struct {
msgHash common.Hash
txHash common.Hash
isSuccessful bool
}

View File

@@ -1,9 +1,8 @@
package l1
package watcher
import (
"context"
"math/big"
"time"
geth "github.com/scroll-tech/go-ethereum"
"github.com/scroll-tech/go-ethereum/accounts/abi"
@@ -19,8 +18,6 @@ import (
"scroll-tech/common/types"
"scroll-tech/database"
cutil "scroll-tech/common/utils"
bridge_abi "scroll-tech/bridge/abi"
"scroll-tech/bridge/utils"
)
@@ -33,20 +30,14 @@ var (
bridgeL1MsgsRollupEventsTotalCounter = geth_metrics.NewRegisteredCounter("bridge/l1/msgs/rollup/events/total", metrics.ScrollRegistry)
)
type relayedMessage struct {
msgHash common.Hash
txHash common.Hash
isSuccessful bool
}
type rollupEvent struct {
batchHash common.Hash
txHash common.Hash
status types.RollupStatus
}
// Watcher will listen for smart contract events from Eth L1.
type Watcher struct {
// L1WatcherClient will listen for smart contract events from Eth L1.
type L1WatcherClient struct {
ctx context.Context
client *ethclient.Client
db database.OrmFactory
@@ -67,13 +58,10 @@ type Watcher struct {
processedMsgHeight uint64
// The height of the block that the watcher has retrieved header rlp
processedBlockHeight uint64
stopCh chan bool
}
// NewWatcher returns a new instance of Watcher. The instance will be not fully prepared,
// and still needs to be finalized and ran by calling `watcher.Start`.
func NewWatcher(ctx context.Context, client *ethclient.Client, startHeight uint64, confirmations rpc.BlockNumber, messengerAddress, messageQueueAddress, scrollChainAddress common.Address, db database.OrmFactory) *Watcher {
// NewL1WatcherClient returns a new instance of L1WatcherClient.
func NewL1WatcherClient(ctx context.Context, client *ethclient.Client, startHeight uint64, confirmations rpc.BlockNumber, messengerAddress, messageQueueAddress, scrollChainAddress common.Address, db database.OrmFactory) *L1WatcherClient {
savedHeight, err := db.GetLayer1LatestWatchedHeight()
if err != nil {
log.Warn("Failed to fetch height from db", "err", err)
@@ -92,9 +80,7 @@ func NewWatcher(ctx context.Context, client *ethclient.Client, startHeight uint6
savedL1BlockHeight = startHeight
}
stopCh := make(chan bool)
return &Watcher{
return &L1WatcherClient{
ctx: ctx,
client: client,
db: db,
@@ -111,51 +97,11 @@ func NewWatcher(ctx context.Context, client *ethclient.Client, startHeight uint6
processedMsgHeight: uint64(savedHeight),
processedBlockHeight: savedL1BlockHeight,
stopCh: stopCh,
}
}
// Start the Watcher module.
func (w *Watcher) Start() {
go func() {
ctx, cancel := context.WithCancel(w.ctx)
go cutil.LoopWithContext(ctx, 2*time.Second, func(subCtx context.Context) {
number, err := utils.GetLatestConfirmedBlockNumber(subCtx, w.client, w.confirmations)
if err != nil {
log.Error("failed to get block number", "err", err)
} else {
if err := w.FetchBlockHeader(number); err != nil {
log.Error("Failed to fetch L1 block header", "lastest", number, "err", err)
}
}
})
go cutil.LoopWithContext(ctx, 2*time.Second, func(subCtx context.Context) {
number, err := utils.GetLatestConfirmedBlockNumber(subCtx, w.client, w.confirmations)
if err != nil {
log.Error("failed to get block number", "err", err)
} else {
if err := w.FetchContractEvent(number); err != nil {
log.Error("Failed to fetch bridge contract", "err", err)
}
}
})
<-w.stopCh
cancel()
}()
}
// Stop the Watcher module, for a graceful shutdown.
func (w *Watcher) Stop() {
w.stopCh <- true
}
const contractEventsBlocksFetchLimit = int64(10)
// FetchBlockHeader pull latest L1 blocks and save in DB
func (w *Watcher) FetchBlockHeader(blockHeight uint64) error {
func (w *L1WatcherClient) FetchBlockHeader(blockHeight uint64) error {
fromBlock := int64(w.processedBlockHeight) + 1
toBlock := int64(blockHeight)
if toBlock < fromBlock {
@@ -201,10 +147,15 @@ func (w *Watcher) FetchBlockHeader(blockHeight uint64) error {
}
// FetchContractEvent pull latest event logs from given contract address and save in DB
func (w *Watcher) FetchContractEvent(blockHeight uint64) error {
func (w *L1WatcherClient) FetchContractEvent() error {
defer func() {
log.Info("l1 watcher fetchContractEvent", "w.processedMsgHeight", w.processedMsgHeight)
}()
blockHeight, err := utils.GetLatestConfirmedBlockNumber(w.ctx, w.client, w.confirmations)
if err != nil {
log.Error("failed to get block number", "err", err)
return err
}
fromBlock := int64(w.processedMsgHeight) + 1
toBlock := int64(blockHeight)
@@ -317,7 +268,7 @@ func (w *Watcher) FetchContractEvent(blockHeight uint64) error {
return nil
}
func (w *Watcher) parseBridgeEventLogs(logs []geth_types.Log) ([]*types.L1Message, []relayedMessage, []rollupEvent, error) {
func (w *L1WatcherClient) parseBridgeEventLogs(logs []geth_types.Log) ([]*types.L1Message, []relayedMessage, []rollupEvent, error) {
// Need use contract abi to parse event Log
// Can only be tested after we have our contracts set up

View File

@@ -1,4 +1,4 @@
package l1
package watcher_test
import (
"context"
@@ -9,6 +9,8 @@ import (
"scroll-tech/database"
"scroll-tech/database/migrate"
"scroll-tech/bridge/watcher"
)
func testStartWatcher(t *testing.T) {
@@ -23,7 +25,6 @@ func testStartWatcher(t *testing.T) {
l1Cfg := cfg.L1Config
watcher := NewWatcher(context.Background(), client, l1Cfg.StartHeight, l1Cfg.Confirmations, l1Cfg.L1MessengerAddress, l1Cfg.L1MessageQueueAddress, l1Cfg.RelayerConfig.RollupContractAddress, db)
watcher.Start()
defer watcher.Stop()
watcher := watcher.NewL1WatcherClient(context.Background(), client, l1Cfg.StartHeight, l1Cfg.Confirmations, l1Cfg.L1MessengerAddress, l1Cfg.L1MessageQueueAddress, l1Cfg.RelayerConfig.RollupContractAddress, db)
assert.NoError(t, watcher.FetchContractEvent())
}

View File

@@ -1,12 +1,10 @@
package l2
package watcher
import (
"context"
"errors"
"fmt"
"math/big"
"reflect"
"time"
geth "github.com/scroll-tech/go-ethereum"
"github.com/scroll-tech/go-ethereum/accounts/abi"
@@ -21,7 +19,6 @@ import (
"scroll-tech/common/metrics"
"scroll-tech/common/types"
cutil "scroll-tech/common/utils"
"scroll-tech/database"
bridge_abi "scroll-tech/bridge/abi"
@@ -31,22 +28,16 @@ import (
// Metrics
var (
bridgeL2MsgsSyncHeightGauge = geth_metrics.NewRegisteredGauge("bridge/l2/msgs/sync/height", metrics.ScrollRegistry)
bridgeL2TracesFetchedHeightGauge = geth_metrics.NewRegisteredGauge("bridge/l2/traces/fetched/height", metrics.ScrollRegistry)
bridgeL2TracesFetchedGapGauge = geth_metrics.NewRegisteredGauge("bridge/l2/traces/fetched/gap", metrics.ScrollRegistry)
bridgeL2BlocksFetchedHeightGauge = geth_metrics.NewRegisteredGauge("bridge/l2/blocks/fetched/height", metrics.ScrollRegistry)
bridgeL2BlocksFetchedGapGauge = geth_metrics.NewRegisteredGauge("bridge/l2/blocks/fetched/gap", metrics.ScrollRegistry)
bridgeL2MsgsSentEventsTotalCounter = geth_metrics.NewRegisteredCounter("bridge/l2/msgs/sent/events/total", metrics.ScrollRegistry)
bridgeL2MsgsAppendEventsTotalCounter = geth_metrics.NewRegisteredCounter("bridge/l2/msgs/append/events/total", metrics.ScrollRegistry)
bridgeL2MsgsRelayedEventsTotalCounter = geth_metrics.NewRegisteredCounter("bridge/l2/msgs/relayed/events/total", metrics.ScrollRegistry)
)
type relayedMessage struct {
msgHash common.Hash
txHash common.Hash
isSuccessful bool
}
// WatcherClient provide APIs which support others to subscribe to various event from l2geth
type WatcherClient struct {
// L2WatcherClient provide APIs which support others to subscribe to various event from l2geth
type L2WatcherClient struct {
ctx context.Context
event.Feed
@@ -67,18 +58,17 @@ type WatcherClient struct {
processedMsgHeight uint64
stopped uint64
stopCh chan struct{}
}
// NewL2WatcherClient take a l2geth instance to generate a l2watcherclient instance
func NewL2WatcherClient(ctx context.Context, client *ethclient.Client, confirmations rpc.BlockNumber, messengerAddress, messageQueueAddress common.Address, withdrawTrieRootSlot common.Hash, orm database.OrmFactory) *WatcherClient {
func NewL2WatcherClient(ctx context.Context, client *ethclient.Client, confirmations rpc.BlockNumber, messengerAddress, messageQueueAddress common.Address, withdrawTrieRootSlot common.Hash, orm database.OrmFactory) *L2WatcherClient {
savedHeight, err := orm.GetLayer2LatestWatchedHeight()
if err != nil {
log.Warn("fetch height from db failed", "err", err)
savedHeight = 0
}
w := WatcherClient{
w := L2WatcherClient{
ctx: ctx,
Client: client,
orm: orm,
@@ -92,7 +82,6 @@ func NewL2WatcherClient(ctx context.Context, client *ethclient.Client, confirmat
messageQueueABI: bridge_abi.L2MessageQueueABI,
withdrawTrieRootSlot: withdrawTrieRootSlot,
stopCh: make(chan struct{}),
stopped: 0,
}
@@ -104,7 +93,7 @@ func NewL2WatcherClient(ctx context.Context, client *ethclient.Client, confirmat
return &w
}
func (w *WatcherClient) initializeGenesis() error {
func (w *L2WatcherClient) initializeGenesis() error {
if count, err := w.orm.GetBatchCount(); err != nil {
return fmt.Errorf("failed to get batch count: %v", err)
} else if count > 0 {
@@ -142,46 +131,10 @@ func (w *WatcherClient) initializeGenesis() error {
return nil
}
// Start the Listening process
func (w *WatcherClient) Start() {
go func() {
if reflect.ValueOf(w.orm).IsNil() {
panic("must run L2 watcher with DB")
}
ctx, cancel := context.WithCancel(w.ctx)
go cutil.LoopWithContext(ctx, 2*time.Second, func(subCtx context.Context) {
number, err := utils.GetLatestConfirmedBlockNumber(subCtx, w.Client, w.confirmations)
if err != nil {
log.Error("failed to get block number", "err", err)
} else {
w.tryFetchRunningMissingBlocks(ctx, number)
}
})
go cutil.LoopWithContext(ctx, 2*time.Second, func(subCtx context.Context) {
number, err := utils.GetLatestConfirmedBlockNumber(subCtx, w.Client, w.confirmations)
if err != nil {
log.Error("failed to get block number", "err", err)
} else {
w.FetchContractEvent(number)
}
})
<-w.stopCh
cancel()
}()
}
// Stop the Watcher module, for a graceful shutdown.
func (w *WatcherClient) Stop() {
w.stopCh <- struct{}{}
}
const blockTracesFetchLimit = uint64(10)
// try fetch missing blocks if inconsistent
func (w *WatcherClient) tryFetchRunningMissingBlocks(ctx context.Context, blockHeight uint64) {
// TryFetchRunningMissingBlocks try fetch missing blocks if inconsistent
func (w *L2WatcherClient) TryFetchRunningMissingBlocks(ctx context.Context, blockHeight uint64) {
// Get newest block in DB. must have blocks at that time.
// Don't use "block_trace" table "trace" column's BlockTrace.Number,
// because it might be empty if the corresponding rollup_result is finalized/finalization_skipped
@@ -209,8 +162,8 @@ func (w *WatcherClient) tryFetchRunningMissingBlocks(ctx context.Context, blockH
log.Error("fail to getAndStoreBlockTraces", "from", from, "to", to, "err", err)
return
}
bridgeL2TracesFetchedHeightGauge.Update(int64(to))
bridgeL2TracesFetchedGapGauge.Update(int64(blockHeight - to))
bridgeL2BlocksFetchedHeightGauge.Update(int64(to))
bridgeL2BlocksFetchedGapGauge.Update(int64(blockHeight - to))
}
}
@@ -237,7 +190,7 @@ func txsToTxsData(txs geth_types.Transactions) []*geth_types.TransactionData {
return txsData
}
func (w *WatcherClient) getAndStoreBlockTraces(ctx context.Context, from, to uint64) error {
func (w *L2WatcherClient) getAndStoreBlockTraces(ctx context.Context, from, to uint64) error {
var blocks []*types.WrappedBlock
for number := from; number <= to; number++ {
@@ -270,14 +223,18 @@ func (w *WatcherClient) getAndStoreBlockTraces(ctx context.Context, from, to uin
return nil
}
const contractEventsBlocksFetchLimit = int64(10)
// FetchContractEvent pull latest event logs from given contract address and save in DB
func (w *WatcherClient) FetchContractEvent(blockHeight uint64) {
func (w *L2WatcherClient) FetchContractEvent() {
defer func() {
log.Info("l2 watcher fetchContractEvent", "w.processedMsgHeight", w.processedMsgHeight)
}()
blockHeight, err := utils.GetLatestConfirmedBlockNumber(w.ctx, w.Client, w.confirmations)
if err != nil {
log.Error("failed to get block number", "err", err)
return
}
fromBlock := int64(w.processedMsgHeight) + 1
toBlock := int64(blockHeight)
@@ -353,7 +310,7 @@ func (w *WatcherClient) FetchContractEvent(blockHeight uint64) {
}
}
func (w *WatcherClient) parseBridgeEventLogs(logs []geth_types.Log) ([]*types.L2Message, []relayedMessage, error) {
func (w *L2WatcherClient) parseBridgeEventLogs(logs []geth_types.Log) ([]*types.L2Message, []relayedMessage, error) {
// Need use contract abi to parse event Log
// Can only be tested after we have our contracts set up

View File

@@ -1,4 +1,4 @@
package l2
package watcher_test
import (
"context"
@@ -19,7 +19,9 @@ import (
"scroll-tech/bridge/mock_bridge"
"scroll-tech/bridge/sender"
"scroll-tech/bridge/watcher"
cutils "scroll-tech/common/utils"
"scroll-tech/database"
"scroll-tech/database/migrate"
)
@@ -29,12 +31,16 @@ func testCreateNewWatcherAndStop(t *testing.T) {
l2db, err := database.NewOrmFactory(cfg.DBConfig)
assert.NoError(t, err)
assert.NoError(t, migrate.ResetDB(l2db.GetDB().DB))
defer l2db.Close()
ctx := context.Background()
subCtx, cancel := context.WithCancel(ctx)
defer func() {
cancel()
l2db.Close()
}()
l2cfg := cfg.L2Config
rc := NewL2WatcherClient(context.Background(), l2Cli, l2cfg.Confirmations, l2cfg.L2MessengerAddress, l2cfg.L2MessageQueueAddress, l2cfg.WithdrawTrieRootSlot, l2db)
rc.Start()
defer rc.Stop()
rc := watcher.NewL2WatcherClient(context.Background(), l2Cli, l2cfg.Confirmations, l2cfg.L2MessengerAddress, l2cfg.L2MessageQueueAddress, l2cfg.WithdrawTrieRootSlot, l2db)
loopToFetchEvent(subCtx, rc)
l1cfg := cfg.L1Config
l1cfg.RelayerConfig.SenderConfig.Confirmations = rpc.LatestBlockNumber
@@ -60,12 +66,17 @@ func testMonitorBridgeContract(t *testing.T) {
db, err := database.NewOrmFactory(cfg.DBConfig)
assert.NoError(t, err)
assert.NoError(t, migrate.ResetDB(db.GetDB().DB))
defer db.Close()
ctx := context.Background()
subCtx, cancel := context.WithCancel(ctx)
defer func() {
cancel()
db.Close()
}()
l2cfg := cfg.L2Config
wc := NewL2WatcherClient(context.Background(), l2Cli, l2cfg.Confirmations, l2cfg.L2MessengerAddress, l2cfg.L2MessageQueueAddress, l2cfg.WithdrawTrieRootSlot, db)
wc.Start()
defer wc.Stop()
wc := watcher.NewL2WatcherClient(context.Background(), l2Cli, l2cfg.Confirmations, l2cfg.L2MessengerAddress, l2cfg.L2MessageQueueAddress, l2cfg.WithdrawTrieRootSlot, db)
loopToFetchEvent(subCtx, wc)
previousHeight, err := l2Cli.BlockNumber(context.Background())
assert.NoError(t, err)
@@ -79,9 +90,7 @@ func testMonitorBridgeContract(t *testing.T) {
assert.NoError(t, err)
rc := prepareWatcherClient(l2Cli, db, address)
rc.Start()
defer rc.Stop()
loopToFetchEvent(subCtx, rc)
// Call mock_bridge instance sendMessage to trigger emit events
toAddress := common.HexToAddress("0x4592d8f8d7b001e72cb26a73e4fa1806a51ac79d")
message := []byte("testbridgecontract")
@@ -128,7 +137,13 @@ func testFetchMultipleSentMessageInOneBlock(t *testing.T) {
db, err := database.NewOrmFactory(cfg.DBConfig)
assert.NoError(t, err)
assert.NoError(t, migrate.ResetDB(db.GetDB().DB))
defer db.Close()
ctx := context.Background()
subCtx, cancel := context.WithCancel(ctx)
defer func() {
cancel()
db.Close()
}()
previousHeight, err := l2Cli.BlockNumber(context.Background()) // shallow the global previousHeight
assert.NoError(t, err)
@@ -141,8 +156,7 @@ func testFetchMultipleSentMessageInOneBlock(t *testing.T) {
assert.NoError(t, err)
rc := prepareWatcherClient(l2Cli, db, address)
rc.Start()
defer rc.Stop()
loopToFetchEvent(subCtx, rc)
// Call mock_bridge instance sendMessage to trigger emit events multiple times
numTransactions := 4
@@ -195,9 +209,9 @@ func testFetchMultipleSentMessageInOneBlock(t *testing.T) {
assert.Equal(t, 5, len(msgs))
}
func prepareWatcherClient(l2Cli *ethclient.Client, db database.OrmFactory, contractAddr common.Address) *WatcherClient {
func prepareWatcherClient(l2Cli *ethclient.Client, db database.OrmFactory, contractAddr common.Address) *watcher.L2WatcherClient {
confirmations := rpc.LatestBlockNumber
return NewL2WatcherClient(context.Background(), l2Cli, confirmations, contractAddr, contractAddr, common.Hash{}, db)
return watcher.NewL2WatcherClient(context.Background(), l2Cli, confirmations, contractAddr, contractAddr, common.Hash{}, db)
}
func prepareAuth(t *testing.T, l2Cli *ethclient.Client, privateKey *ecdsa.PrivateKey) *bind.TransactOpts {
@@ -209,3 +223,7 @@ func prepareAuth(t *testing.T, l2Cli *ethclient.Client, privateKey *ecdsa.Privat
assert.NoError(t, err)
return auth
}
func loopToFetchEvent(subCtx context.Context, watcher *watcher.L2WatcherClient) {
go cutils.Loop(subCtx, 2*time.Second, watcher.FetchContractEvent)
}

View File

@@ -0,0 +1,91 @@
package watcher_test
import (
"encoding/json"
"os"
"testing"
"github.com/scroll-tech/go-ethereum/ethclient"
"github.com/stretchr/testify/assert"
"scroll-tech/common/docker"
"scroll-tech/common/types"
"scroll-tech/bridge/config"
)
var (
// config
cfg *config.Config
base *docker.App
// l2geth client
l2Cli *ethclient.Client
// block trace
wrappedBlock1 *types.WrappedBlock
wrappedBlock2 *types.WrappedBlock
)
func setupEnv(t *testing.T) (err error) {
// Load config.
cfg, err = config.NewConfig("../config.json")
assert.NoError(t, err)
base.RunImages(t)
cfg.L2Config.RelayerConfig.SenderConfig.Endpoint = base.L1GethEndpoint()
cfg.L1Config.RelayerConfig.SenderConfig.Endpoint = base.L2GethEndpoint()
cfg.DBConfig.DSN = base.DBEndpoint()
// Create l2geth client.
l2Cli, err = base.L2Client()
assert.NoError(t, err)
templateBlockTrace1, err := os.ReadFile("../../common/testdata/blockTrace_02.json")
if err != nil {
return err
}
// unmarshal blockTrace
wrappedBlock1 = &types.WrappedBlock{}
if err = json.Unmarshal(templateBlockTrace1, wrappedBlock1); err != nil {
return err
}
templateBlockTrace2, err := os.ReadFile("../../common/testdata/blockTrace_03.json")
if err != nil {
return err
}
// unmarshal blockTrace
wrappedBlock2 = &types.WrappedBlock{}
if err = json.Unmarshal(templateBlockTrace2, wrappedBlock2); err != nil {
return err
}
return err
}
func TestMain(m *testing.M) {
base = docker.NewDockerApp()
m.Run()
base.Free()
}
func TestFunction(t *testing.T) {
if err := setupEnv(t); err != nil {
t.Fatal(err)
}
// Run l1 watcher test cases.
t.Run("TestStartWatcher", testStartWatcher)
// Run l2 watcher test cases.
t.Run("TestCreateNewWatcherAndStop", testCreateNewWatcherAndStop)
t.Run("TestMonitorBridgeContract", testMonitorBridgeContract)
t.Run("TestFetchMultipleSentMessageInOneBlock", testFetchMultipleSentMessageInOneBlock)
// Run batch proposer test cases.
t.Run("TestBatchProposerProposeBatch", testBatchProposerProposeBatch)
t.Run("TestBatchProposerGracefulRestart", testBatchProposerGracefulRestart)
}

View File

@@ -0,0 +1,26 @@
# Download Go dependencies
FROM scrolltech/go-alpine-builder:1.18 as base
WORKDIR /src
COPY go.work* ./
COPY ./bridge/go.* ./bridge/
COPY ./common/go.* ./common/
COPY ./coordinator/go.* ./coordinator/
COPY ./database/go.* ./database/
COPY ./roller/go.* ./roller/
COPY ./tests/integration-test/go.* ./tests/integration-test/
RUN go mod download -x
# Build event_watcher
FROM base as builder
RUN --mount=target=. \
--mount=type=cache,target=/root/.cache/go-build \
cd /src/bridge/cmd/event_watcher/ && go build -v -p 4 -o /bin/event_watcher
# Pull event_watcher into a second stage deploy alpine container
FROM alpine:latest
COPY --from=builder /bin/event_watcher /bin/
ENTRYPOINT ["event_watcher"]

View File

@@ -2,4 +2,4 @@ assets/
docs/
l2geth/
rpc-gateway/
*target/*
*target/*

View File

@@ -11,16 +11,16 @@ COPY ./roller/go.* ./roller/
COPY ./tests/integration-test/go.* ./tests/integration-test/
RUN go mod download -x
# Build bridge
# Build gas_oracle
FROM base as builder
RUN --mount=target=. \
--mount=type=cache,target=/root/.cache/go-build \
cd /src/bridge/cmd && go build -v -p 4 -o /bin/bridge
cd /src/bridge/cmd/gas_oracle/ && go build -v -p 4 -o /bin/gas_oracle
# Pull bridge into a second stage deploy alpine container
# Pull gas_oracle into a second stage deploy alpine container
FROM alpine:latest
COPY --from=builder /bin/bridge /bin/
COPY --from=builder /bin/gas_oracle /bin/
ENTRYPOINT ["bridge"]
ENTRYPOINT ["gas_oracle"]

View File

@@ -0,0 +1,5 @@
assets/
docs/
l2geth/
rpc-gateway/
*target/*

View File

@@ -0,0 +1,26 @@
# Download Go dependencies
FROM scrolltech/go-alpine-builder:1.18 as base
WORKDIR /src
COPY go.work* ./
COPY ./bridge/go.* ./bridge/
COPY ./common/go.* ./common/
COPY ./coordinator/go.* ./coordinator/
COPY ./database/go.* ./database/
COPY ./roller/go.* ./roller/
COPY ./tests/integration-test/go.* ./tests/integration-test/
RUN go mod download -x
# Build msg_relayer
FROM base as builder
RUN --mount=target=. \
--mount=type=cache,target=/root/.cache/go-build \
cd /src/bridge/cmd/msg_relayer/ && go build -v -p 4 -o /bin/msg_relayer
# Pull msg_relayer into a second stage deploy alpine container
FROM alpine:latest
COPY --from=builder /bin/msg_relayer /bin/
ENTRYPOINT ["msg_relayer"]

View File

@@ -0,0 +1,5 @@
assets/
docs/
l2geth/
rpc-gateway/
*target/*

View File

@@ -0,0 +1,26 @@
# Download Go dependencies
FROM scrolltech/go-alpine-builder:1.18 as base
WORKDIR /src
COPY go.work* ./
COPY ./bridge/go.* ./bridge/
COPY ./common/go.* ./common/
COPY ./coordinator/go.* ./coordinator/
COPY ./database/go.* ./database/
COPY ./roller/go.* ./roller/
COPY ./tests/integration-test/go.* ./tests/integration-test/
RUN go mod download -x
# Build rollup_relayer
FROM base as builder
RUN --mount=target=. \
--mount=type=cache,target=/root/.cache/go-build \
cd /src/bridge/cmd/rollup_relayer/ && go build -v -p 4 -o /bin/rollup_relayer
# Pull rollup_relayer into a second stage deploy alpine container
FROM alpine:latest
COPY --from=builder /bin/rollup_relayer /bin/
ENTRYPOINT ["rollup_relayer"]

View File

@@ -0,0 +1,5 @@
assets/
docs/
l2geth/
rpc-gateway/
*target/*

View File

@@ -31,6 +31,8 @@ type Cmd struct {
checkFuncs cmap.ConcurrentMap //map[string]checkFunc
// open log flag.
openLog bool
// error channel
ErrChan chan error
}
@@ -64,7 +66,7 @@ func (c *Cmd) runCmd() {
// RunCmd parallel running when parallel is true.
func (c *Cmd) RunCmd(parallel bool) {
fmt.Println("cmd: ", c.args)
fmt.Println("cmd:", c.args)
if parallel {
go c.runCmd()
} else {
@@ -72,12 +74,17 @@ func (c *Cmd) RunCmd(parallel bool) {
}
}
// OpenLog open cmd log by this api.
func (c *Cmd) OpenLog(open bool) {
c.openLog = open
}
func (c *Cmd) Write(data []byte) (int, error) {
out := string(data)
if verbose {
fmt.Printf("%s: %v", c.name, out)
if verbose || c.openLog {
fmt.Printf("%s:\n\t%v", c.name, out)
} else if strings.Contains(out, "error") || strings.Contains(out, "warning") {
fmt.Printf("%s: %v", c.name, out)
fmt.Printf("%s:\n\t%v", c.name, out)
}
go c.checkFuncs.IterCb(func(_ string, value interface{}) {
check := value.(checkFunc)

View File

@@ -82,6 +82,9 @@ const (
// MsgExpired represents the from_layer message status is expired
MsgExpired
// MsgRelayFailed represents the from_layer message status is relay failed
MsgRelayFailed
)
// L1Message is structure of stored layer1 bridge message
@@ -159,6 +162,7 @@ type SessionInfo struct {
ID string `json:"id"`
Rollers map[string]*RollerStatus `json:"rollers"`
StartTimestamp int64 `json:"start_timestamp"`
Attempts uint8 `json:"attempts,omitempty"`
}
// ProvingStatus block_batch proving_status (unassigned, assigned, proved, verified, submitted)
@@ -200,7 +204,7 @@ func (ps ProvingStatus) String() string {
}
}
// RollupStatus block_batch rollup_status (pending, committing, committed, finalizing, finalized)
// RollupStatus block_batch rollup_status (pending, committing, committed, commit_failed, finalizing, finalized, finalize_skipped, finalize_failed)
type RollupStatus int
const (
@@ -218,6 +222,10 @@ const (
RollupFinalized
// RollupFinalizationSkipped : batch finalization is skipped
RollupFinalizationSkipped
// RollupCommitFailed : rollup commit transaction confirmed but failed
RollupCommitFailed
// RollupFinalizeFailed : rollup finalize transaction is confirmed but failed
RollupFinalizeFailed
)
// BlockBatch is structure of stored block_batch

View File

@@ -5,7 +5,7 @@ import (
"runtime/debug"
)
var tag = "v3.0.1"
var tag = "v3.0.7"
var commit = func() string {
if info, ok := debug.ReadBuildInfo(); ok {

View File

@@ -117,6 +117,8 @@ func (m *Manager) SubmitProof(proof *message.ProofMsg) (bool, error) {
return false, fmt.Errorf("the roller or session id doesn't exist, pubkey: %s, ID: %s", pubkey, proof.ID)
}
m.updateMetricRollerProofsLastFinishedTimestampGauge(pubkey)
err := m.handleZkProof(pubkey, proof.ProofDetail)
if err != nil {
return false, err

View File

@@ -2,6 +2,7 @@
"roller_manager_config": {
"compression_level": 9,
"rollers_per_session": 1,
"session_attempts": 2,
"collection_time": 180,
"token_time_to_live": 60,
"verifier": {

View File

@@ -11,7 +11,8 @@ import (
)
const (
defaultNumberOfVerifierWorkers = 10
defaultNumberOfVerifierWorkers = 10
defaultNumberOfSessionRetryAttempts = 2
)
// RollerManagerConfig loads sequencer configuration items.
@@ -21,6 +22,9 @@ type RollerManagerConfig struct {
OrderSession string `json:"order_session,omitempty"`
// The amount of rollers to pick per proof generation session.
RollersPerSession uint8 `json:"rollers_per_session"`
// Number of attempts that a session can be retried if previous attempts failed.
// Currently we only consider proving timeout as failure here.
SessionAttempts uint8 `json:"session_attempts,omitempty"`
// Zk verifier config.
Verifier *VerifierConfig `json:"verifier,omitempty"`
// Proof collection time (in minutes).
@@ -74,6 +78,9 @@ func NewConfig(file string) (*Config, error) {
if cfg.RollerManagerConfig.MaxVerifierWorkers == 0 {
cfg.RollerManagerConfig.MaxVerifierWorkers = defaultNumberOfVerifierWorkers
}
if cfg.RollerManagerConfig.SessionAttempts == 0 {
cfg.RollerManagerConfig.SessionAttempts = defaultNumberOfSessionRetryAttempts
}
return cfg, nil
}

View File

@@ -30,10 +30,19 @@ import (
)
var (
coordinatorSessionsTimeoutTotalCounter = geth_metrics.NewRegisteredCounter("coordinator/sessions/timeout/total", metrics.ScrollRegistry)
coordinatorProofsReceivedTotalCounter = geth_metrics.NewRegisteredCounter("coordinator/proofs/received/total", metrics.ScrollRegistry)
coordinatorProofsVerifiedTotalCounter = geth_metrics.NewRegisteredCounter("coordinator/proofs/verified/total", metrics.ScrollRegistry)
coordinatorProofsVerifiedFailedTotalCounter = geth_metrics.NewRegisteredCounter("coordinator/proofs/verified/failed/total", metrics.ScrollRegistry)
// proofs
coordinatorProofsReceivedTotalCounter = geth_metrics.NewRegisteredCounter("coordinator/proofs/received/total", metrics.ScrollRegistry)
coordinatorProofsVerifiedSuccessTimeTimer = geth_metrics.NewRegisteredTimer("coordinator/proofs/verified/success/time", metrics.ScrollRegistry)
coordinatorProofsVerifiedFailedTimeTimer = geth_metrics.NewRegisteredTimer("coordinator/proofs/verified/failed/time", metrics.ScrollRegistry)
coordinatorProofsGeneratedFailedTimeTimer = geth_metrics.NewRegisteredTimer("coordinator/proofs/generated/failed/time", metrics.ScrollRegistry)
// sessions
coordinatorSessionsSuccessTotalCounter = geth_metrics.NewRegisteredCounter("coordinator/sessions/success/total", metrics.ScrollRegistry)
coordinatorSessionsTimeoutTotalCounter = geth_metrics.NewRegisteredCounter("coordinator/sessions/timeout/total", metrics.ScrollRegistry)
coordinatorSessionsFailedTotalCounter = geth_metrics.NewRegisteredCounter("coordinator/sessions/failed/total", metrics.ScrollRegistry)
coordinatorSessionsActiveNumberGauge = geth_metrics.NewRegisteredCounter("coordinator/sessions/active/number", metrics.ScrollRegistry)
)
const (
@@ -176,7 +185,7 @@ func (m *Manager) Loop() {
}
}
// Select roller and send message
for len(tasks) > 0 && m.StartProofGenerationSession(tasks[0]) {
for len(tasks) > 0 && m.StartProofGenerationSession(tasks[0], nil) {
tasks = tasks[1:]
}
case <-m.ctx.Done():
@@ -242,7 +251,8 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
if !ok {
return fmt.Errorf("proof generation session for id %v does not existID", msg.ID)
}
proofTimeSec := uint64(time.Since(time.Unix(sess.info.StartTimestamp, 0)).Seconds())
proofTime := time.Since(time.Unix(sess.info.StartTimestamp, 0))
proofTimeSec := uint64(proofTime.Seconds())
// Ensure this roller is eligible to participate in the session.
roller, ok := sess.info.Rollers[pk]
@@ -267,6 +277,7 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
"proof id", msg.ID,
"roller name", roller.Name,
"roller pk", roller.PublicKey,
"proof time", proofTimeSec,
)
defer func() {
@@ -287,11 +298,13 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
}()
if msg.Status != message.StatusOk {
log.Error(
"Roller failed to generate proof",
"msg.ID", msg.ID,
coordinatorProofsGeneratedFailedTimeTimer.Update(proofTime)
log.Info(
"proof generated by roller failed",
"proof id", msg.ID,
"roller name", roller.Name,
"roller pk", roller.PublicKey,
"proof time", proofTimeSec,
"error", msg.Error,
)
return nil
@@ -329,29 +342,40 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
"error", dbErr)
return dbErr
}
coordinatorProofsVerifiedTotalCounter.Inc(1)
coordinatorProofsVerifiedSuccessTimeTimer.Update(proofTime)
m.updateMetricRollerProofsVerifiedSuccessTimeTimer(roller.PublicKey, proofTime)
log.Info("proof verified by coordinator success", "proof id", msg.ID, "roller name", roller.Name,
"roller pk", roller.PublicKey, "proof time", proofTimeSec)
} else {
coordinatorProofsVerifiedFailedTotalCounter.Inc(1)
coordinatorProofsVerifiedFailedTimeTimer.Update(proofTime)
m.updateMetricRollerProofsVerifiedFailedTimeTimer(roller.PublicKey, proofTime)
log.Info("proof verified by coordinator failed", "proof id", msg.ID, "roller name", roller.Name,
"roller pk", roller.PublicKey, "proof time", proofTimeSec)
}
return nil
}
// CollectProofs collects proofs corresponding to a proof generation session.
func (m *Manager) CollectProofs(sess *session) {
//Cleanup roller sessions before return.
defer func() {
// TODO: remove the clean-up, rollers report healthy status.
m.mu.Lock()
for pk := range sess.info.Rollers {
m.freeTaskIDForRoller(pk, sess.info.ID)
}
delete(m.sessions, sess.info.ID)
m.mu.Unlock()
}()
coordinatorSessionsActiveNumberGauge.Inc(1)
defer coordinatorSessionsActiveNumberGauge.Dec(1)
for {
select {
//Execute after timeout, set in config.json. Consider all rollers failed.
case <-time.After(time.Duration(m.cfg.CollectionTime) * time.Minute):
// Check if session can be replayed
if sess.info.Attempts < m.cfg.SessionAttempts {
if m.StartProofGenerationSession(nil, sess) {
m.mu.Lock()
for pk := range sess.info.Rollers {
m.freeTaskIDForRoller(pk, sess.info.ID)
}
m.mu.Unlock()
log.Info("Retrying session", "session id:", sess.info.ID)
return
}
}
// record failed session.
errMsg := "proof generation session ended without receiving any valid proofs"
m.addFailedSession(sess, errMsg)
@@ -363,6 +387,13 @@ func (m *Manager) CollectProofs(sess *session) {
if err := m.orm.UpdateProvingStatus(sess.info.ID, types.ProvingTaskFailed); err != nil {
log.Error("fail to reset task_status as Unassigned", "id", sess.info.ID, "err", err)
}
m.mu.Lock()
for pk := range sess.info.Rollers {
m.freeTaskIDForRoller(pk, sess.info.ID)
}
delete(m.sessions, sess.info.ID)
m.mu.Unlock()
coordinatorSessionsTimeoutTotalCounter.Inc(1)
return
//Execute after one of the roller finishes sending proof, return early if all rollers had sent results.
@@ -373,6 +404,7 @@ func (m *Manager) CollectProofs(sess *session) {
if err := m.orm.UpdateProvingStatus(ret.id, types.ProvingTaskFailed); err != nil {
log.Error("failed to update proving_status as failed", "msg.ID", ret.id, "error", err)
}
coordinatorSessionsFailedTotalCounter.Inc(1)
}
if err := m.orm.SetSessionInfo(sess.info); err != nil {
log.Error("db set session info fail", "pk", ret.pk, "error", err)
@@ -386,7 +418,14 @@ func (m *Manager) CollectProofs(sess *session) {
randIndex := mathrand.Intn(len(validRollers))
_ = validRollers[randIndex]
// TODO: reward winner
for pk := range sess.info.Rollers {
m.freeTaskIDForRoller(pk, sess.info.ID)
}
delete(m.sessions, sess.info.ID)
m.mu.Unlock()
coordinatorSessionsSuccessTotalCounter.Inc(1)
return
}
m.mu.Unlock()
@@ -439,27 +478,39 @@ func (m *Manager) APIs() []rpc.API {
}
// StartProofGenerationSession starts a proof generation session
func (m *Manager) StartProofGenerationSession(task *types.BlockBatch) (success bool) {
func (m *Manager) StartProofGenerationSession(task *types.BlockBatch, prevSession *session) (success bool) {
var taskId string
if task != nil {
taskId = task.Hash
} else {
taskId = prevSession.info.ID
}
if m.GetNumberOfIdleRollers() == 0 {
log.Warn("no idle roller when starting proof generation session", "id", task.Hash)
log.Warn("no idle roller when starting proof generation session", "id", taskId)
return false
}
log.Info("start proof generation session", "id", task.Hash)
log.Info("start proof generation session", "id", taskId)
defer func() {
if !success {
if err := m.orm.UpdateProvingStatus(task.Hash, types.ProvingTaskUnassigned); err != nil {
log.Error("fail to reset task_status as Unassigned", "id", task.Hash, "err", err)
if task != nil {
if err := m.orm.UpdateProvingStatus(taskId, types.ProvingTaskUnassigned); err != nil {
log.Error("fail to reset task_status as Unassigned", "id", taskId, "err", err)
}
} else {
if err := m.orm.UpdateProvingStatus(taskId, types.ProvingTaskFailed); err != nil {
log.Error("fail to reset task_status as Failed", "id", taskId, "err", err)
}
}
}
}()
// Get block traces.
blockInfos, err := m.orm.GetL2BlockInfos(map[string]interface{}{"batch_hash": task.Hash})
blockInfos, err := m.orm.GetL2BlockInfos(map[string]interface{}{"batch_hash": taskId})
if err != nil {
log.Error(
"could not GetBlockInfos",
"batch_hash", task.Hash,
"batch_hash", taskId,
"error", err,
)
return false
@@ -486,52 +537,58 @@ func (m *Manager) StartProofGenerationSession(task *types.BlockBatch) (success b
log.Info("selectRoller returns nil")
break
}
log.Info("roller is picked", "session id", task.Hash, "name", roller.Name, "public key", roller.PublicKey)
log.Info("roller is picked", "session id", taskId, "name", roller.Name, "public key", roller.PublicKey)
// send trace to roller
if !roller.sendTask(task.Hash, traces) {
log.Error("send task failed", "roller name", roller.Name, "public key", roller.PublicKey, "id", task.Hash)
if !roller.sendTask(taskId, traces) {
log.Error("send task failed", "roller name", roller.Name, "public key", roller.PublicKey, "id", taskId)
continue
}
m.updateMetricRollerProofsLastAssignedTimestampGauge(roller.PublicKey)
rollers[roller.PublicKey] = &types.RollerStatus{PublicKey: roller.PublicKey, Name: roller.Name, Status: types.RollerAssigned}
}
// No roller assigned.
if len(rollers) == 0 {
log.Error("no roller assigned", "id", task.Hash, "number of idle rollers", m.GetNumberOfIdleRollers())
log.Error("no roller assigned", "id", taskId, "number of idle rollers", m.GetNumberOfIdleRollers())
return false
}
// Update session proving status as assigned.
if err = m.orm.UpdateProvingStatus(task.Hash, types.ProvingTaskAssigned); err != nil {
log.Error("failed to update task status", "id", task.Hash, "err", err)
if err = m.orm.UpdateProvingStatus(taskId, types.ProvingTaskAssigned); err != nil {
log.Error("failed to update task status", "id", taskId, "err", err)
return false
}
// Create a proof generation session.
sess := &session{
info: &types.SessionInfo{
ID: task.Hash,
ID: taskId,
Rollers: rollers,
StartTimestamp: time.Now().Unix(),
Attempts: 1,
},
finishChan: make(chan rollerProofStatus, proofAndPkBufferSize),
}
if prevSession != nil {
sess.info.Attempts += prevSession.info.Attempts
}
for _, roller := range sess.info.Rollers {
log.Info(
"assigned proof to roller",
"session id", sess.info.ID,
"roller name", roller.Name,
"roller pk", roller.PublicKey,
"proof status", roller.Status)
}
// Store session info.
if err = m.orm.SetSessionInfo(sess.info); err != nil {
log.Error("db set session info fail", "error", err)
for _, roller := range sess.info.Rollers {
log.Error(
"restore roller info for session",
"session id", sess.info.ID,
"roller name", roller.Name,
"public key", roller.PublicKey,
"proof status", roller.Status)
}
log.Error("db set session info fail", "session id", sess.info.ID, "error", err)
return false
}
m.mu.Lock()
m.sessions[task.Hash] = sess
m.sessions[taskId] = sess
m.mu.Unlock()
go m.CollectProofs(sess)

View File

@@ -87,6 +87,7 @@ func TestApis(t *testing.T) {
t.Run("TestSeveralConnections", testSeveralConnections)
t.Run("TestValidProof", testValidProof)
t.Run("TestInvalidProof", testInvalidProof)
t.Run("TestTimedoutProof", testTimedoutProof)
t.Run("TestIdleRollerSelection", testIdleRollerSelection)
// TODO: Restart roller alone when received task, can add this test case in integration-test.
//t.Run("TestRollerReconnect", testRollerReconnect)
@@ -356,6 +357,86 @@ func testInvalidProof(t *testing.T) {
}
}
func testTimedoutProof(t *testing.T) {
// Create db handler and reset db.
l2db, err := database.NewOrmFactory(cfg.DBConfig)
assert.NoError(t, err)
assert.NoError(t, migrate.ResetDB(l2db.GetDB().DB))
defer l2db.Close()
// Setup coordinator and ws server.
wsURL := "ws://" + randomURL()
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, 1, wsURL)
defer func() {
handler.Shutdown(context.Background())
rollerManager.Stop()
}()
// create first mock roller, that will not send any proof.
roller1 := newMockRoller(t, "roller_test"+strconv.Itoa(0), wsURL)
defer func() {
// close connection
roller1.close()
}()
assert.Equal(t, 1, rollerManager.GetNumberOfIdleRollers())
var hashes = make([]string, 1)
dbTx, err := l2db.Beginx()
assert.NoError(t, err)
for i := range hashes {
assert.NoError(t, l2db.NewBatchInDBTx(dbTx, batchData))
hashes[i] = batchData.Hash().Hex()
}
assert.NoError(t, dbTx.Commit())
// verify proof status, it should be assigned, because roller didn't send any proof
var (
tick = time.Tick(500 * time.Millisecond)
tickStop = time.Tick(10 * time.Second)
)
for len(hashes) > 0 {
select {
case <-tick:
status, err := l2db.GetProvingStatusByHash(hashes[0])
assert.NoError(t, err)
if status == types.ProvingTaskAssigned {
hashes = hashes[1:]
}
case <-tickStop:
t.Error("failed to check proof status")
return
}
}
// create second mock roller, that will send valid proof.
roller2 := newMockRoller(t, "roller_test"+strconv.Itoa(1), wsURL)
roller2.waitTaskAndSendProof(t, time.Second, false, true)
defer func() {
// close connection
roller2.close()
}()
assert.Equal(t, 1, rollerManager.GetNumberOfIdleRollers())
// wait manager to finish first CollectProofs
<-time.After(60 * time.Second)
// verify proof status, it should be verified now, because second roller sent valid proof
for len(hashes) > 0 {
select {
case <-tick:
status, err := l2db.GetProvingStatusByHash(hashes[0])
assert.NoError(t, err)
if status == types.ProvingTaskVerified {
hashes = hashes[1:]
}
case <-tickStop:
t.Error("failed to check proof status")
return
}
}
}
func testIdleRollerSelection(t *testing.T) {
// Create db handler and reset db.
l2db, err := database.NewOrmFactory(cfg.DBConfig)
@@ -505,6 +586,7 @@ func setupCoordinator(t *testing.T, dbCfg *database.DBConfig, rollersPerSession
CollectionTime: 1,
TokenTimeToLive: 5,
MaxVerifierWorkers: 10,
SessionAttempts: 2,
}, db, nil)
assert.NoError(t, err)
assert.NoError(t, rollerManager.Start())

View File

@@ -0,0 +1,60 @@
package coordinator
import (
"time"
"github.com/scroll-tech/go-ethereum/log"
geth_metrics "github.com/scroll-tech/go-ethereum/metrics"
)
type rollerMetrics struct {
rollerProofsVerifiedSuccessTimeTimer geth_metrics.Timer
rollerProofsVerifiedFailedTimeTimer geth_metrics.Timer
rollerProofsGeneratedFailedTimeTimer geth_metrics.Timer
rollerProofsLastAssignedTimestampGauge geth_metrics.Gauge
rollerProofsLastFinishedTimestampGauge geth_metrics.Gauge
}
func (m *Manager) updateMetricRollerProofsLastFinishedTimestampGauge(pk string) {
if node, ok := m.rollerPool.Get(pk); ok {
rMs := node.(*rollerNode).rollerMetrics
if rMs != nil {
rMs.rollerProofsLastFinishedTimestampGauge.Update(time.Now().Unix())
} else {
log.Error("rollerProofsLastFinishedTimestampGauge is nil", "roller pk", pk)
}
}
}
func (m *Manager) updateMetricRollerProofsLastAssignedTimestampGauge(pk string) {
if node, ok := m.rollerPool.Get(pk); ok {
rMs := node.(*rollerNode).rollerMetrics
if rMs != nil {
rMs.rollerProofsLastAssignedTimestampGauge.Update(time.Now().Unix())
} else {
log.Error("rollerProofsLastAssignedTimestampGauge is nil", "roller pk", pk)
}
}
}
func (m *Manager) updateMetricRollerProofsVerifiedSuccessTimeTimer(pk string, d time.Duration) {
if node, ok := m.rollerPool.Get(pk); ok {
rMs := node.(*rollerNode).rollerMetrics
if rMs != nil {
rMs.rollerProofsVerifiedSuccessTimeTimer.Update(d)
} else {
log.Error("rollerProofsVerifiedSuccessTimeTimer is nil", "roller pk", pk)
}
}
}
func (m *Manager) updateMetricRollerProofsVerifiedFailedTimeTimer(pk string, d time.Duration) {
if node, ok := m.rollerPool.Get(pk); ok {
rMs := node.(*rollerNode).rollerMetrics
if rMs != nil {
rMs.rollerProofsVerifiedFailedTimeTimer.Update(d)
} else {
log.Error("rollerProofsVerifiedFailedTimeTimer is nil", "roller pk", pk)
}
}
}

View File

@@ -9,8 +9,10 @@ import (
cmap "github.com/orcaman/concurrent-map"
geth_types "github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/log"
geth_metrics "github.com/scroll-tech/go-ethereum/metrics"
"scroll-tech/common/message"
"scroll-tech/common/metrics"
"scroll-tech/common/types"
)
@@ -30,6 +32,8 @@ type rollerNode struct {
// Time of message creation
registerTime time.Time
*rollerMetrics
}
func (r *rollerNode) sendTask(id string, traces []*geth_types.BlockTrace) bool {
@@ -64,12 +68,20 @@ func (m *Manager) register(pubkey string, identity *message.Identity) (<-chan *m
node, ok := m.rollerPool.Get(pubkey)
if !ok {
taskIDs := m.reloadRollerAssignedTasks(pubkey)
rMs := &rollerMetrics{
rollerProofsVerifiedSuccessTimeTimer: geth_metrics.GetOrRegisterTimer(fmt.Sprintf("roller/proofs/verified/success/time/%s", pubkey), metrics.ScrollRegistry),
rollerProofsVerifiedFailedTimeTimer: geth_metrics.GetOrRegisterTimer(fmt.Sprintf("roller/proofs/verified/failed/time/%s", pubkey), metrics.ScrollRegistry),
rollerProofsGeneratedFailedTimeTimer: geth_metrics.GetOrRegisterTimer(fmt.Sprintf("roller/proofs/generated/failed/time/%s", pubkey), metrics.ScrollRegistry),
rollerProofsLastAssignedTimestampGauge: geth_metrics.GetOrRegisterGauge(fmt.Sprintf("roller/proofs/last/assigned/timestamp/%s", pubkey), metrics.ScrollRegistry),
rollerProofsLastFinishedTimestampGauge: geth_metrics.GetOrRegisterGauge(fmt.Sprintf("roller/proofs/last/finished/timestamp/%s", pubkey), metrics.ScrollRegistry),
}
node = &rollerNode{
Name: identity.Name,
Version: identity.Version,
PublicKey: pubkey,
TaskIDs: *taskIDs,
taskChan: make(chan *message.TaskMsg, 4),
Name: identity.Name,
Version: identity.Version,
PublicKey: pubkey,
TaskIDs: *taskIDs,
taskChan: make(chan *message.TaskMsg, 4),
rollerMetrics: rMs,
}
m.rollerPool.Set(pubkey, node)
}

View File

@@ -18,7 +18,7 @@ create table l1_message
);
comment
on column l1_message.status is 'undefined, pending, submitted, confirmed, failed, expired';
on column l1_message.status is 'undefined, pending, submitted, confirmed, failed, expired, relay_failed';
create unique index l1_message_hash_uindex
on l1_message (msg_hash);

View File

@@ -18,7 +18,7 @@ create table l2_message
);
comment
on column l2_message.status is 'undefined, pending, submitted, confirmed, failed, expired';
on column l2_message.status is 'undefined, pending, submitted, confirmed, failed, expired, relay_failed';
create unique index l2_message_hash_uindex
on l2_message (msg_hash);

View File

@@ -33,7 +33,7 @@ create table block_batch
comment
on column block_batch.proving_status is 'undefined, unassigned, skipped, assigned, proved, verified, failed';
comment
on column block_batch.rollup_status is 'undefined, pending, committing, committed, finalizing, finalized, finalization_skipped';
on column block_batch.rollup_status is 'undefined, pending, committing, committed, finalizing, finalized, finalization_skipped, commit_failed, finalize_failed';
comment
on column block_batch.oracle_status is 'undefined, pending, importing, imported, failed';

View File

@@ -22,7 +22,10 @@ import (
_ "scroll-tech/roller/cmd/app"
rollerConfig "scroll-tech/roller/config"
_ "scroll-tech/bridge/cmd/app"
_ "scroll-tech/bridge/cmd/event_watcher/app"
_ "scroll-tech/bridge/cmd/gas_oracle/app"
_ "scroll-tech/bridge/cmd/msg_relayer/app"
_ "scroll-tech/bridge/cmd/rollup_relayer/app"
bridgeConfig "scroll-tech/bridge/config"
"scroll-tech/bridge/sender"
@@ -75,26 +78,53 @@ func free(t *testing.T) {
}
type appAPI interface {
OpenLog(open bool)
WaitResult(t *testing.T, timeout time.Duration, keyword string) bool
RunApp(waitResult func() bool)
WaitExit()
ExpectWithTimeout(t *testing.T, parallel bool, timeout time.Duration, keyword string)
}
func runBridgeApp(t *testing.T, args ...string) appAPI {
func runMsgRelayerApp(t *testing.T, args ...string) appAPI {
args = append(args, "--log.debug", "--config", bridgeFile)
return cmd.NewCmd("bridge-test", args...)
app := cmd.NewCmd("message-relayer-test", args...)
app.OpenLog(true)
return app
}
func runGasOracleApp(t *testing.T, args ...string) appAPI {
args = append(args, "--log.debug", "--config", bridgeFile)
app := cmd.NewCmd("gas-oracle-test", args...)
app.OpenLog(true)
return app
}
func runRollupRelayerApp(t *testing.T, args ...string) appAPI {
args = append(args, "--log.debug", "--config", bridgeFile)
app := cmd.NewCmd("rollup-relayer-test", args...)
app.OpenLog(true)
return app
}
func runEventWatcherApp(t *testing.T, args ...string) appAPI {
args = append(args, "--log.debug", "--config", bridgeFile)
app := cmd.NewCmd("event-watcher-test", args...)
app.OpenLog(true)
return app
}
func runCoordinatorApp(t *testing.T, args ...string) appAPI {
args = append(args, "--log.debug", "--config", coordinatorFile, "--ws", "--ws.port", strconv.Itoa(int(wsPort)))
// start process
return cmd.NewCmd("coordinator-test", args...)
app := cmd.NewCmd("coordinator-test", args...)
app.OpenLog(true)
return app
}
func runDBCliApp(t *testing.T, option, keyword string) {
args := []string{option, "--config", dbFile}
app := cmd.NewCmd("db_cli-test", args...)
app.OpenLog(true)
defer app.WaitExit()
// Wait expect result.
@@ -104,7 +134,9 @@ func runDBCliApp(t *testing.T, option, keyword string) {
func runRollerApp(t *testing.T, args ...string) appAPI {
args = append(args, "--log.debug", "--config", rollerFile)
return cmd.NewCmd("roller-test", args...)
app := cmd.NewCmd("roller-test", args...)
app.OpenLog(true)
return app
}
func runSender(t *testing.T, endpoint string) *sender.Sender {

View File

@@ -41,8 +41,17 @@ func testStartProcess(t *testing.T) {
runDBCliApp(t, "migrate", "current version:")
// Start bridge process.
bridgeCmd := runBridgeApp(t)
bridgeCmd.RunApp(func() bool { return bridgeCmd.WaitResult(t, time.Second*20, "Start bridge successfully") })
ewCmd := runEventWatcherApp(t)
ewCmd.RunApp(func() bool { return ewCmd.WaitResult(t, time.Second*20, "Start event-watcher successfully") })
goCmd := runGasOracleApp(t)
goCmd.RunApp(func() bool { return goCmd.WaitResult(t, time.Second*20, "Start gas-oracle successfully") })
mrCmd := runMsgRelayerApp(t)
mrCmd.RunApp(func() bool { return mrCmd.WaitResult(t, time.Second*20, "Start message-relayer successfully") })
rrCmd := runRollupRelayerApp(t)
rrCmd.RunApp(func() bool { return rrCmd.WaitResult(t, time.Second*20, "Start rollup-relayer successfully") })
// Start coordinator process.
coordinatorCmd := runCoordinatorApp(t, "--ws", "--ws.port", "8391")
@@ -53,8 +62,11 @@ func testStartProcess(t *testing.T) {
rollerCmd.ExpectWithTimeout(t, true, time.Second*60, "register to coordinator successfully!")
rollerCmd.RunApp(func() bool { return rollerCmd.WaitResult(t, time.Second*40, "roller start successfully") })
ewCmd.WaitExit()
goCmd.WaitExit()
mrCmd.WaitExit()
rrCmd.WaitExit()
rollerCmd.WaitExit()
bridgeCmd.WaitExit()
coordinatorCmd.WaitExit()
}
@@ -63,16 +75,30 @@ func testMonitorMetrics(t *testing.T) {
runDBCliApp(t, "reset", "successful to reset")
runDBCliApp(t, "migrate", "current version:")
// Start bridge process with metrics server.
port1, _ := rand.Int(rand.Reader, big.NewInt(2000))
svrPort1 := strconv.FormatInt(port1.Int64()+50000, 10)
bridgeCmd := runBridgeApp(t, "--metrics", "--metrics.addr", "localhost", "--metrics.port", svrPort1)
bridgeCmd.RunApp(func() bool { return bridgeCmd.WaitResult(t, time.Second*20, "Start bridge successfully") })
ewCmd := runEventWatcherApp(t, "--metrics", "--metrics.addr", "localhost", "--metrics.port", svrPort1)
ewCmd.RunApp(func() bool { return ewCmd.WaitResult(t, time.Second*20, "Start event-watcher successfully") })
port2, _ := rand.Int(rand.Reader, big.NewInt(2000))
svrPort2 := strconv.FormatInt(port2.Int64()+50000, 10)
goCmd := runGasOracleApp(t, "--metrics", "--metrics.addr", "localhost", "--metrics.port", svrPort2)
goCmd.RunApp(func() bool { return goCmd.WaitResult(t, time.Second*20, "Start gas-oracle successfully") })
port3, _ := rand.Int(rand.Reader, big.NewInt(2000))
svrPort3 := strconv.FormatInt(port3.Int64()+50000, 10)
mrCmd := runMsgRelayerApp(t, "--metrics", "--metrics.addr", "localhost", "--metrics.port", svrPort3)
mrCmd.RunApp(func() bool { return mrCmd.WaitResult(t, time.Second*20, "Start message-relayer successfully") })
port4, _ := rand.Int(rand.Reader, big.NewInt(2000))
svrPort4 := strconv.FormatInt(port4.Int64()+50000, 10)
rrCmd := runRollupRelayerApp(t, "--metrics", "--metrics.addr", "localhost", "--metrics.port", svrPort4)
rrCmd.RunApp(func() bool { return rrCmd.WaitResult(t, time.Second*20, "Start rollup-relayer successfully") })
// Start coordinator process with metrics server.
port, _ := rand.Int(rand.Reader, big.NewInt(2000))
svrPort2 := strconv.FormatInt(port.Int64()+52000, 10)
coordinatorCmd := runCoordinatorApp(t, "--metrics", "--metrics.addr", "localhost", "--metrics.port", svrPort2)
port5, _ := rand.Int(rand.Reader, big.NewInt(2000))
svrPort5 := strconv.FormatInt(port5.Int64()+52000, 10)
coordinatorCmd := runCoordinatorApp(t, "--metrics", "--metrics.addr", "localhost", "--metrics.port", svrPort5)
coordinatorCmd.RunApp(func() bool { return coordinatorCmd.WaitResult(t, time.Second*20, "Start coordinator successfully") })
// Get bridge monitor metrics.
@@ -87,7 +113,7 @@ func testMonitorMetrics(t *testing.T) {
assert.Equal(t, true, strings.Contains(bodyStr, "bridge_l2_msgs_sync_height"))
// Get coordinator monitor metrics.
resp, err = http.Get("http://localhost:" + svrPort2)
resp, err = http.Get("http://localhost:" + svrPort5)
assert.NoError(t, err)
defer resp.Body.Close()
body, err = ioutil.ReadAll(resp.Body)
@@ -98,6 +124,9 @@ func testMonitorMetrics(t *testing.T) {
assert.Equal(t, true, strings.Contains(bodyStr, "coordinator_rollers_disconnects_total"))
// Exit.
bridgeCmd.WaitExit()
ewCmd.WaitExit()
goCmd.WaitExit()
mrCmd.WaitExit()
rrCmd.WaitExit()
coordinatorCmd.WaitExit()
}