Compare commits

..

9 Commits

Author SHA1 Message Date
Péter Garamvölgyi
5d761ad812 Make sure attempts can be deserialized from db on startup (#410) 2023-04-05 19:00:54 +02:00
Nazarii Denha
4042bea6db retry proving timeout batch (#313)
Co-authored-by: Péter Garamvölgyi <peter@scroll.io>
Co-authored-by: colin <102356659+colinlyguo@users.noreply.github.com>
2023-04-05 16:42:06 +02:00
maskpp
de7c38a903 feat(test): let integration-test log verbosity be configurable (#409) 2023-04-04 16:20:12 +08:00
Péter Garamvölgyi
41e2d960d8 Fix already executed revert message (#408) 2023-04-03 21:26:30 +08:00
HAOYUatHZ
170bc08207 build(docker): auto docker push when pushing git tags (#406) 2023-04-03 16:52:51 +08:00
maskpp
d3fc4e1606 feat(pending limit): Let sender's pending limit be configurable. (#398)
Co-authored-by: Péter Garamvölgyi <peter@scroll.io>
Co-authored-by: ChuhanJin <60994121+ChuhanJin@users.noreply.github.com>
Co-authored-by: vincent <419436363@qq.com>
Co-authored-by: colinlyguo <651734127@qq.com>
2023-04-03 14:24:47 +08:00
HAOYUatHZ
77749477db build(docker): only build docker images when push github tags (#404) 2023-04-01 11:54:56 +08:00
HAOYUatHZ
1a5df6f4d7 fix(build): move docker build from jenkins to github to avoid unknown errors (#403) 2023-03-31 15:55:55 +08:00
maskpp
826280253a fix(test): fix bug in testBatchProposerProposeBatch (#399)
Co-authored-by: colinlyguo <651734127@qq.com>
2023-03-31 13:58:46 +08:00
23 changed files with 387 additions and 101 deletions

View File

@@ -66,3 +66,11 @@ jobs:
if [ -n "$(git status --porcelain)" ]; then
exit 1
fi
# docker-build:
# runs-on: ubuntu-latest
# steps:
# - name: Checkout code
# uses: actions/checkout@v2
# - name: Set up Docker Buildx
# uses: docker/setup-buildx-action@v2
# - run: make docker

View File

@@ -62,3 +62,18 @@ jobs:
if [ -n "$(git status --porcelain)" ]; then
exit 1
fi
# docker-build:
# runs-on: ubuntu-latest
# steps:
# - name: Checkout code
# uses: actions/checkout@v2
# - name: Set up Docker Buildx
# uses: docker/setup-buildx-action@v2
# - name: Build and push
# uses: docker/build-push-action@v2
# with:
# context: .
# file: ./build/dockerfiles/coordinator.Dockerfile
# push: false
# # cache-from: type=gha,scope=${{ github.workflow }}
# # cache-to: type=gha,scope=${{ github.workflow }}

65
.github/workflows/docker.yaml vendored Normal file
View File

@@ -0,0 +1,65 @@
name: Docker
on:
push:
tags:
- v**
jobs:
build-and-push:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Login to Docker Hub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Build and push coordinator docker
uses: docker/build-push-action@v2
with:
context: .
file: ./build/dockerfiles/coordinator.Dockerfile
push: true
tags: scrolltech/coordinator:${{github.ref_name}}
# cache-from: type=gha,scope=${{ github.workflow }}
# cache-to: type=gha,scope=${{ github.workflow }}
- name: Build and push event_watcher docker
uses: docker/build-push-action@v2
with:
context: .
file: ./build/dockerfiles/event_watcher.Dockerfile
push: true
tags: scrolltech/event-watcher:${{github.ref_name}}
# cache-from: type=gha,scope=${{ github.workflow }}
# cache-to: type=gha,scope=${{ github.workflow }}
- name: Build and push gas_oracle docker
uses: docker/build-push-action@v2
with:
context: .
file: ./build/dockerfiles/gas_oracle.Dockerfile
push: true
tags: scrolltech/gas-oracle:${{github.ref_name}}
# cache-from: type=gha,scope=${{ github.workflow }}
# cache-to: type=gha,scope=${{ github.workflow }}
- name: Build and push msg_relayer docker
uses: docker/build-push-action@v2
with:
context: .
file: ./build/dockerfiles/msg_relayer.Dockerfile
push: true
tags: scrolltech/msg-relayer:${{github.ref_name}}
# cache-from: type=gha,scope=${{ github.workflow }}
# cache-to: type=gha,scope=${{ github.workflow }}
- name: Build and push rollup_relayer docker
uses: docker/build-push-action@v2
with:
context: .
file: ./build/dockerfiles/rollup_relayer.Dockerfile
push: true
tags: scrolltech/rollup-relayer:${{github.ref_name}}
# cache-from: type=gha,scope=${{ github.workflow }}
# cache-to: type=gha,scope=${{ github.workflow }}

10
Jenkinsfile vendored
View File

@@ -42,16 +42,6 @@ pipeline {
sh 'make -C database db_cli'
}
}
stage('Check Bridge Docker Build') {
steps {
sh 'make -C bridge docker'
}
}
stage('Check Coordinator Docker Build') {
steps {
sh 'make -C coordinator docker'
}
}
stage('Check Database Docker Build') {
steps {
sh 'make -C database docker'

View File

@@ -19,7 +19,8 @@
"escalate_multiple_den": 10,
"max_gas_price": 10000000000,
"tx_type": "LegacyTx",
"min_balance": 100000000000000000000
"min_balance": 100000000000000000000,
"pending_limit": 10
},
"gas_oracle_config": {
"min_gas_price": 0,
@@ -53,7 +54,8 @@
"escalate_multiple_den": 10,
"max_gas_price": 10000000000,
"tx_type": "LegacyTx",
"min_balance": 100000000000000000000
"min_balance": 100000000000000000000,
"pending_limit": 10
},
"gas_oracle_config": {
"min_gas_price": 0,

View File

@@ -33,6 +33,8 @@ type SenderConfig struct {
MinBalance *big.Int `json:"min_balance,omitempty"`
// The interval (in seconds) to check balance and top up sender's accounts
CheckBalanceTime uint64 `json:"check_balance_time"`
// The sender's pending count limit.
PendingLimit int `json:"pending_limit,omitempty"`
}
// RelayerConfig loads relayer configuration items.

View File

@@ -4,6 +4,7 @@ go 1.18
require (
github.com/orcaman/concurrent-map v1.0.0
github.com/orcaman/concurrent-map/v2 v2.0.1
github.com/scroll-tech/go-ethereum v1.10.14-0.20230321020420-127af384ed04
github.com/stretchr/testify v1.8.2
github.com/urfave/cli/v2 v2.17.2-0.20221006022127-8f469abc00aa

View File

@@ -66,6 +66,8 @@ github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/orcaman/concurrent-map v1.0.0 h1:I/2A2XPCb4IuQWcQhBhSwGfiuybl/J0ev9HDbW65HOY=
github.com/orcaman/concurrent-map v1.0.0/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI=
github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c=
github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=

View File

@@ -124,7 +124,7 @@ func (r *Layer1Relayer) ProcessSavedEvents() {
for _, msg := range msgs {
if err = r.processSavedEvent(msg); err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
if !errors.Is(err, sender.ErrNoAvailableAccount) && !errors.Is(err, sender.ErrFullPending) {
log.Error("failed to process event", "msg.msgHash", msg.MsgHash, "err", err)
}
return
@@ -139,7 +139,7 @@ func (r *Layer1Relayer) processSavedEvent(msg *types.L1Message) error {
if err != nil && err.Error() == "execution reverted: Message expired" {
return r.db.UpdateLayer1Status(r.ctx, msg.MsgHash, types.MsgExpired)
}
if err != nil && err.Error() == "execution reverted: Message successfully executed" {
if err != nil && err.Error() == "execution reverted: Message was already successfully executed" {
return r.db.UpdateLayer1Status(r.ctx, msg.MsgHash, types.MsgConfirmed)
}
if err != nil {
@@ -189,7 +189,7 @@ func (r *Layer1Relayer) ProcessGasPriceOracle() {
hash, err := r.gasOracleSender.SendTransaction(block.Hash, &r.cfg.GasPriceOracleContractAddress, big.NewInt(0), data, 0)
if err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
if !errors.Is(err, sender.ErrNoAvailableAccount) && !errors.Is(err, sender.ErrFullPending) {
log.Error("Failed to send setL1BaseFee tx to layer2 ", "block.Hash", block.Hash, "block.Height", block.Number, "err", err)
}
return

View File

@@ -180,7 +180,7 @@ func (r *Layer2Relayer) ProcessSavedEvents() {
})
}
if err := g.Wait(); err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
if !errors.Is(err, sender.ErrNoAvailableAccount) && !errors.Is(err, sender.ErrFullPending) {
log.Error("failed to process l2 saved event", "err", err)
}
return
@@ -229,11 +229,11 @@ func (r *Layer2Relayer) processSavedEvent(msg *types.L2Message) error {
if err != nil && err.Error() == "execution reverted: Message expired" {
return r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, types.MsgExpired)
}
if err != nil && err.Error() == "execution reverted: Message successfully executed" {
if err != nil && err.Error() == "execution reverted: Message was already successfully executed" {
return r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, types.MsgConfirmed)
}
if err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
if !errors.Is(err, sender.ErrNoAvailableAccount) && !errors.Is(err, sender.ErrFullPending) {
log.Error("Failed to send relayMessageWithProof tx to layer1 ", "msg.height", msg.Height, "msg.MsgHash", msg.MsgHash, "err", err)
}
return err
@@ -279,7 +279,7 @@ func (r *Layer2Relayer) ProcessGasPriceOracle() {
hash, err := r.gasOracleSender.SendTransaction(batch.Hash, &r.cfg.GasPriceOracleContractAddress, big.NewInt(0), data, 0)
if err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
if !errors.Is(err, sender.ErrNoAvailableAccount) && !errors.Is(err, sender.ErrFullPending) {
log.Error("Failed to send setL2BaseFee tx to layer2 ", "batch.Hash", batch.Hash, "err", err)
}
return
@@ -325,7 +325,7 @@ func (r *Layer2Relayer) SendCommitTx(batchData []*types.BatchData) error {
txID := crypto.Keccak256Hash(bytes).String()
txHash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), calldata, 0)
if err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
if !errors.Is(err, sender.ErrNoAvailableAccount) && !errors.Is(err, sender.ErrFullPending) {
log.Error("Failed to send commitBatches tx to layer1 ", "err", err)
}
return err
@@ -475,7 +475,7 @@ func (r *Layer2Relayer) ProcessCommittedBatches() {
txHash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), data, 0)
finalizeTxHash := &txHash
if err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
if !errors.Is(err, sender.ErrNoAvailableAccount) && !errors.Is(err, sender.ErrFullPending) {
log.Error("finalizeBatchWithProof in layer1 failed", "hash", hash, "err", err)
}
return

View File

@@ -78,6 +78,12 @@ func testL2RelayerProcessSaveEvents(t *testing.T) {
}
assert.NoError(t, db.InsertWrappedBlocks(traces))
parentBatch1 := &types.BlockBatch{
Index: 0,
Hash: common.Hash{}.String(),
StateRoot: common.Hash{}.String(),
}
batchData1 := types.NewBatchData(parentBatch1, []*types.WrappedBlock{wrappedBlock1}, nil)
dbTx, err := db.Beginx()
assert.NoError(t, err)
assert.NoError(t, db.NewBatchInDBTx(dbTx, batchData1))
@@ -106,6 +112,12 @@ func testL2RelayerProcessCommittedBatches(t *testing.T) {
relayer, err := relayer.NewLayer2Relayer(context.Background(), l2Cli, db, l2Cfg.RelayerConfig)
assert.NoError(t, err)
parentBatch1 := &types.BlockBatch{
Index: 0,
Hash: common.Hash{}.String(),
StateRoot: common.Hash{}.String(),
}
batchData1 := types.NewBatchData(parentBatch1, []*types.WrappedBlock{wrappedBlock1}, nil)
dbTx, err := db.Beginx()
assert.NoError(t, err)
assert.NoError(t, db.NewBatchInDBTx(dbTx, batchData1))

View File

@@ -6,21 +6,19 @@ import (
"errors"
"fmt"
"math/big"
"reflect"
"strings"
"sync"
"sync/atomic"
"time"
cmapV2 "github.com/orcaman/concurrent-map/v2"
"github.com/scroll-tech/go-ethereum/accounts/abi/bind"
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/ethclient"
"github.com/scroll-tech/go-ethereum/log"
"scroll-tech/bridge/utils"
"scroll-tech/bridge/config"
"scroll-tech/bridge/utils"
)
const (
@@ -37,6 +35,12 @@ const (
var (
// ErrNoAvailableAccount indicates no available account error in the account pool.
ErrNoAvailableAccount = errors.New("sender has no available account to send transaction")
// ErrFullPending sender's pending pool is full.
ErrFullPending = errors.New("sender's pending pool is full")
)
var (
defaultPendingLimit = 10
)
// Confirmation struct used to indicate transaction confirmation details
@@ -74,9 +78,9 @@ type Sender struct {
// account fields.
auths *accountPool
blockNumber uint64 // Current block number on chain.
baseFeePerGas uint64 // Current base fee per gas on chain
pendingTxs sync.Map // Mapping from nonce to pending transaction
blockNumber uint64 // Current block number on chain.
baseFeePerGas uint64 // Current base fee per gas on chain
pendingTxs cmapV2.ConcurrentMap[string, *PendingTransaction] // Mapping from nonce to pending transaction
confirmCh chan *Confirmation
stopCh chan struct{}
@@ -116,6 +120,11 @@ func NewSender(ctx context.Context, config *config.SenderConfig, privs []*ecdsa.
}
}
// initialize pending limit with a default value
if config.PendingLimit == 0 {
config.PendingLimit = defaultPendingLimit
}
sender := &Sender{
ctx: ctx,
config: config,
@@ -125,7 +134,7 @@ func NewSender(ctx context.Context, config *config.SenderConfig, privs []*ecdsa.
confirmCh: make(chan *Confirmation, 128),
blockNumber: header.Number.Uint64(),
baseFeePerGas: baseFeePerGas,
pendingTxs: sync.Map{},
pendingTxs: cmapV2.New[*PendingTransaction](),
stopCh: make(chan struct{}),
}
@@ -134,6 +143,21 @@ func NewSender(ctx context.Context, config *config.SenderConfig, privs []*ecdsa.
return sender, nil
}
// PendingCount returns the current number of pending txs.
func (s *Sender) PendingCount() int {
return s.pendingTxs.Count()
}
// PendingLimit returns the maximum number of pending txs the sender can handle.
func (s *Sender) PendingLimit() int {
return s.config.PendingLimit
}
// IsFull returns true if the sender's pending tx pool is full.
func (s *Sender) IsFull() bool {
return s.pendingTxs.Count() >= s.config.PendingLimit
}
// Stop stop the sender module.
func (s *Sender) Stop() {
close(s.stopCh)
@@ -159,21 +183,24 @@ func (s *Sender) getFeeData(auth *bind.TransactOpts, target *common.Address, val
// SendTransaction send a signed L2tL1 transaction.
func (s *Sender) SendTransaction(ID string, target *common.Address, value *big.Int, data []byte, minGasLimit uint64) (hash common.Hash, err error) {
if s.IsFull() {
return common.Hash{}, ErrFullPending
}
// We occupy the ID, in case some other threads call with the same ID in the same time
if _, loaded := s.pendingTxs.LoadOrStore(ID, nil); loaded {
if ok := s.pendingTxs.SetIfAbsent(ID, nil); !ok {
return common.Hash{}, fmt.Errorf("has the repeat tx ID, ID: %s", ID)
}
// get
auth := s.auths.getAccount()
if auth == nil {
s.pendingTxs.Delete(ID) // release the ID on failure
s.pendingTxs.Remove(ID) // release the ID on failure
return common.Hash{}, ErrNoAvailableAccount
}
defer s.auths.releaseAccount(auth)
defer func() {
if err != nil {
s.pendingTxs.Delete(ID) // release the ID on failure
s.pendingTxs.Remove(ID) // release the ID on failure
}
}()
@@ -194,7 +221,7 @@ func (s *Sender) SendTransaction(ID string, target *common.Address, value *big.I
submitAt: atomic.LoadUint64(&s.blockNumber),
feeData: feeData,
}
s.pendingTxs.Store(ID, pending)
s.pendingTxs.Set(ID, pending)
return tx.Hash(), nil
}
@@ -335,17 +362,17 @@ func (s *Sender) checkPendingTransaction(header *types.Header, confirmed uint64)
}
}
s.pendingTxs.Range(func(key, value interface{}) bool {
for item := range s.pendingTxs.IterBuffered() {
key, pending := item.Key, item.Val
// ignore empty id, since we use empty id to occupy pending task
if value == nil || reflect.ValueOf(value).IsNil() {
return true
if pending == nil {
continue
}
pending := value.(*PendingTransaction)
receipt, err := s.client.TransactionReceipt(s.ctx, pending.tx.Hash())
if (err == nil) && (receipt != nil) {
if receipt.BlockNumber.Uint64() <= confirmed {
s.pendingTxs.Delete(key)
s.pendingTxs.Remove(key)
// send confirm message
s.confirmCh <- &Confirmation{
ID: pending.id,
@@ -376,7 +403,7 @@ func (s *Sender) checkPendingTransaction(header *types.Header, confirmed uint64)
// We need to stop the program and manually handle the situation.
if strings.Contains(err.Error(), "nonce") {
// This key can be deleted
s.pendingTxs.Delete(key)
s.pendingTxs.Remove(key)
// Try get receipt by the latest replaced tx hash
receipt, err := s.client.TransactionReceipt(s.ctx, pending.tx.Hash())
if (err == nil) && (receipt != nil) {
@@ -398,8 +425,7 @@ func (s *Sender) checkPendingTransaction(header *types.Header, confirmed uint64)
pending.submitAt = number
}
}
return true
})
}
}
// Loop is the main event loop

View File

@@ -57,6 +57,8 @@ func TestSender(t *testing.T) {
// Setup
setupEnv(t)
t.Run("test pending limit", func(t *testing.T) { testPendLimit(t) })
t.Run("test min gas limit", func(t *testing.T) { testMinGasLimit(t) })
t.Run("test 1 account sender", func(t *testing.T) { testBatchSender(t, 1) })
@@ -64,6 +66,21 @@ func TestSender(t *testing.T) {
t.Run("test 8 account sender", func(t *testing.T) { testBatchSender(t, 8) })
}
func testPendLimit(t *testing.T) {
senderCfg := cfg.L1Config.RelayerConfig.SenderConfig
senderCfg.Confirmations = rpc.LatestBlockNumber
senderCfg.PendingLimit = 2
newSender, err := sender.NewSender(context.Background(), senderCfg, privateKeys)
assert.NoError(t, err)
defer newSender.Stop()
for i := 0; i < newSender.PendingLimit(); i++ {
_, err = newSender.SendTransaction(strconv.Itoa(i), &common.Address{}, big.NewInt(1), nil, 0)
assert.NoError(t, err)
}
assert.True(t, newSender.PendingCount() <= newSender.PendingLimit())
}
func testMinGasLimit(t *testing.T) {
senderCfg := cfg.L1Config.RelayerConfig.SenderConfig
senderCfg.Confirmations = rpc.LatestBlockNumber
@@ -100,6 +117,7 @@ func testBatchSender(t *testing.T, batchSize int) {
senderCfg := cfg.L1Config.RelayerConfig.SenderConfig
senderCfg.Confirmations = rpc.LatestBlockNumber
senderCfg.PendingLimit = batchSize * TXBatch
newSender, err := sender.NewSender(context.Background(), senderCfg, privateKeys)
if err != nil {
t.Fatal(err)
@@ -119,7 +137,7 @@ func testBatchSender(t *testing.T, batchSize int) {
toAddr := common.HexToAddress("0x4592d8f8d7b001e72cb26a73e4fa1806a51ac79d")
id := strconv.Itoa(i + index*1000)
_, err := newSender.SendTransaction(id, &toAddr, big.NewInt(1), nil, 0)
if errors.Is(err, sender.ErrNoAvailableAccount) {
if errors.Is(err, sender.ErrNoAvailableAccount) || errors.Is(err, sender.ErrFullPending) {
<-time.After(time.Second)
continue
}

View File

@@ -6,6 +6,7 @@ import (
"math"
"testing"
"github.com/scroll-tech/go-ethereum/common"
"github.com/stretchr/testify/assert"
"scroll-tech/database"
@@ -38,6 +39,16 @@ func testBatchProposerProposeBatch(t *testing.T) {
wc := watcher.NewL2WatcherClient(context.Background(), l2Cli, l2cfg.Confirmations, l2cfg.L2MessengerAddress, l2cfg.L2MessageQueueAddress, l2cfg.WithdrawTrieRootSlot, db)
loopToFetchEvent(subCtx, wc)
batch, err := db.GetLatestBatch()
assert.NoError(t, err)
// Create a new batch.
batchData := types.NewBatchData(&types.BlockBatch{
Index: 0,
Hash: batch.Hash,
StateRoot: batch.StateRoot,
}, []*types.WrappedBlock{wrappedBlock1}, nil)
relayer, err := relayer.NewLayer2Relayer(context.Background(), l2Cli, db, cfg.L2Config.RelayerConfig)
assert.NoError(t, err)
@@ -55,7 +66,7 @@ func testBatchProposerProposeBatch(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 0, len(infos))
exist, err := db.BatchRecordExist(batchData1.Hash().Hex())
exist, err := db.BatchRecordExist(batchData.Hash().Hex())
assert.NoError(t, err)
assert.Equal(t, true, exist)
}
@@ -74,6 +85,19 @@ func testBatchProposerGracefulRestart(t *testing.T) {
assert.NoError(t, db.InsertWrappedBlocks([]*types.WrappedBlock{wrappedBlock2}))
// Insert block batch into db.
batchData1 := types.NewBatchData(&types.BlockBatch{
Index: 0,
Hash: common.Hash{}.String(),
StateRoot: common.Hash{}.String(),
}, []*types.WrappedBlock{wrappedBlock1}, nil)
parentBatch2 := &types.BlockBatch{
Index: batchData1.Batch.BatchIndex,
Hash: batchData1.Hash().Hex(),
StateRoot: batchData1.Batch.NewStateRoot.String(),
}
batchData2 := types.NewBatchData(parentBatch2, []*types.WrappedBlock{wrappedBlock2}, nil)
dbTx, err := db.Beginx()
assert.NoError(t, err)
assert.NoError(t, db.NewBatchInDBTx(dbTx, batchData1))

View File

@@ -6,7 +6,6 @@ import (
"testing"
"github.com/scroll-tech/go-ethereum/ethclient"
"github.com/scroll-tech/go-ethereum/log"
"github.com/stretchr/testify/assert"
"scroll-tech/common/docker"
@@ -27,10 +26,6 @@ var (
// block trace
wrappedBlock1 *types.WrappedBlock
wrappedBlock2 *types.WrappedBlock
// batch data
batchData1 *types.BatchData
batchData2 *types.BatchData
)
func setupEnv(t *testing.T) (err error) {
@@ -57,12 +52,6 @@ func setupEnv(t *testing.T) (err error) {
if err = json.Unmarshal(templateBlockTrace1, wrappedBlock1); err != nil {
return err
}
parentBatch1 := &types.BlockBatch{
Index: 0,
Hash: "0x0cc6b102c2924402c14b2e3a19baccc316252bfdc44d9ec62e942d34e39ec729",
StateRoot: "0x2579122e8f9ec1e862e7d415cef2fb495d7698a8e5f0dddc5651ba4236336e7d",
}
batchData1 = types.NewBatchData(parentBatch1, []*types.WrappedBlock{wrappedBlock1}, nil)
templateBlockTrace2, err := os.ReadFile("../../common/testdata/blockTrace_03.json")
if err != nil {
@@ -73,15 +62,6 @@ func setupEnv(t *testing.T) (err error) {
if err = json.Unmarshal(templateBlockTrace2, wrappedBlock2); err != nil {
return err
}
parentBatch2 := &types.BlockBatch{
Index: batchData1.Batch.BatchIndex,
Hash: batchData1.Hash().Hex(),
StateRoot: batchData1.Batch.NewStateRoot.String(),
}
batchData2 = types.NewBatchData(parentBatch2, []*types.WrappedBlock{wrappedBlock2}, nil)
log.Info("batchHash", "batchhash1", batchData1.Hash().Hex(), "batchhash2", batchData2.Hash().Hex())
return err
}

View File

@@ -31,6 +31,8 @@ type Cmd struct {
checkFuncs cmap.ConcurrentMap //map[string]checkFunc
// open log flag.
openLog bool
// error channel
ErrChan chan error
}
@@ -64,7 +66,7 @@ func (c *Cmd) runCmd() {
// RunCmd parallel running when parallel is true.
func (c *Cmd) RunCmd(parallel bool) {
fmt.Println("cmd: ", c.args)
fmt.Println("cmd:", c.args)
if parallel {
go c.runCmd()
} else {
@@ -72,12 +74,17 @@ func (c *Cmd) RunCmd(parallel bool) {
}
}
// OpenLog open cmd log by this api.
func (c *Cmd) OpenLog(open bool) {
c.openLog = open
}
func (c *Cmd) Write(data []byte) (int, error) {
out := string(data)
if verbose {
fmt.Printf("%s: %v", c.name, out)
if verbose || c.openLog {
fmt.Printf("%s:\n\t%v", c.name, out)
} else if strings.Contains(out, "error") || strings.Contains(out, "warning") {
fmt.Printf("%s: %v", c.name, out)
fmt.Printf("%s:\n\t%v", c.name, out)
}
go c.checkFuncs.IterCb(func(_ string, value interface{}) {
check := value.(checkFunc)

View File

@@ -162,6 +162,7 @@ type SessionInfo struct {
ID string `json:"id"`
Rollers map[string]*RollerStatus `json:"rollers"`
StartTimestamp int64 `json:"start_timestamp"`
Attempts uint8 `json:"attempts,omitempty"`
}
// ProvingStatus block_batch proving_status (unassigned, assigned, proved, verified, submitted)

View File

@@ -5,7 +5,7 @@ import (
"runtime/debug"
)
var tag = "v3.0.3"
var tag = "v3.0.6"
var commit = func() string {
if info, ok := debug.ReadBuildInfo(); ok {

View File

@@ -2,6 +2,7 @@
"roller_manager_config": {
"compression_level": 9,
"rollers_per_session": 1,
"session_attempts": 2,
"collection_time": 180,
"token_time_to_live": 60,
"verifier": {

View File

@@ -11,7 +11,8 @@ import (
)
const (
defaultNumberOfVerifierWorkers = 10
defaultNumberOfVerifierWorkers = 10
defaultNumberOfSessionRetryAttempts = 2
)
// RollerManagerConfig loads sequencer configuration items.
@@ -21,6 +22,9 @@ type RollerManagerConfig struct {
OrderSession string `json:"order_session,omitempty"`
// The amount of rollers to pick per proof generation session.
RollersPerSession uint8 `json:"rollers_per_session"`
// Number of attempts that a session can be retried if previous attempts failed.
// Currently we only consider proving timeout as failure here.
SessionAttempts uint8 `json:"session_attempts,omitempty"`
// Zk verifier config.
Verifier *VerifierConfig `json:"verifier,omitempty"`
// Proof collection time (in minutes).
@@ -74,6 +78,9 @@ func NewConfig(file string) (*Config, error) {
if cfg.RollerManagerConfig.MaxVerifierWorkers == 0 {
cfg.RollerManagerConfig.MaxVerifierWorkers = defaultNumberOfVerifierWorkers
}
if cfg.RollerManagerConfig.SessionAttempts == 0 {
cfg.RollerManagerConfig.SessionAttempts = defaultNumberOfSessionRetryAttempts
}
return cfg, nil
}

View File

@@ -176,7 +176,7 @@ func (m *Manager) Loop() {
}
}
// Select roller and send message
for len(tasks) > 0 && m.StartProofGenerationSession(tasks[0]) {
for len(tasks) > 0 && m.StartProofGenerationSession(tasks[0], nil) {
tasks = tasks[1:]
}
case <-m.ctx.Done():
@@ -338,20 +338,22 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
// CollectProofs collects proofs corresponding to a proof generation session.
func (m *Manager) CollectProofs(sess *session) {
//Cleanup roller sessions before return.
defer func() {
// TODO: remove the clean-up, rollers report healthy status.
m.mu.Lock()
for pk := range sess.info.Rollers {
m.freeTaskIDForRoller(pk, sess.info.ID)
}
delete(m.sessions, sess.info.ID)
m.mu.Unlock()
}()
for {
select {
//Execute after timeout, set in config.json. Consider all rollers failed.
case <-time.After(time.Duration(m.cfg.CollectionTime) * time.Minute):
// Check if session can be replayed
if sess.info.Attempts < m.cfg.SessionAttempts {
if m.StartProofGenerationSession(nil, sess) {
m.mu.Lock()
for pk := range sess.info.Rollers {
m.freeTaskIDForRoller(pk, sess.info.ID)
}
m.mu.Unlock()
log.Info("Retrying session", "session id:", sess.info.ID)
return
}
}
// record failed session.
errMsg := "proof generation session ended without receiving any valid proofs"
m.addFailedSession(sess, errMsg)
@@ -363,6 +365,12 @@ func (m *Manager) CollectProofs(sess *session) {
if err := m.orm.UpdateProvingStatus(sess.info.ID, types.ProvingTaskFailed); err != nil {
log.Error("fail to reset task_status as Unassigned", "id", sess.info.ID, "err", err)
}
m.mu.Lock()
for pk := range sess.info.Rollers {
m.freeTaskIDForRoller(pk, sess.info.ID)
}
delete(m.sessions, sess.info.ID)
m.mu.Unlock()
return
//Execute after one of the roller finishes sending proof, return early if all rollers had sent results.
@@ -386,6 +394,11 @@ func (m *Manager) CollectProofs(sess *session) {
randIndex := mathrand.Intn(len(validRollers))
_ = validRollers[randIndex]
// TODO: reward winner
for pk := range sess.info.Rollers {
m.freeTaskIDForRoller(pk, sess.info.ID)
}
delete(m.sessions, sess.info.ID)
m.mu.Unlock()
return
}
@@ -439,27 +452,39 @@ func (m *Manager) APIs() []rpc.API {
}
// StartProofGenerationSession starts a proof generation session
func (m *Manager) StartProofGenerationSession(task *types.BlockBatch) (success bool) {
func (m *Manager) StartProofGenerationSession(task *types.BlockBatch, prevSession *session) (success bool) {
var taskId string
if task != nil {
taskId = task.Hash
} else {
taskId = prevSession.info.ID
}
if m.GetNumberOfIdleRollers() == 0 {
log.Warn("no idle roller when starting proof generation session", "id", task.Hash)
log.Warn("no idle roller when starting proof generation session", "id", taskId)
return false
}
log.Info("start proof generation session", "id", task.Hash)
log.Info("start proof generation session", "id", taskId)
defer func() {
if !success {
if err := m.orm.UpdateProvingStatus(task.Hash, types.ProvingTaskUnassigned); err != nil {
log.Error("fail to reset task_status as Unassigned", "id", task.Hash, "err", err)
if task != nil {
if err := m.orm.UpdateProvingStatus(taskId, types.ProvingTaskUnassigned); err != nil {
log.Error("fail to reset task_status as Unassigned", "id", taskId, "err", err)
}
} else {
if err := m.orm.UpdateProvingStatus(taskId, types.ProvingTaskFailed); err != nil {
log.Error("fail to reset task_status as Failed", "id", taskId, "err", err)
}
}
}
}()
// Get block traces.
blockInfos, err := m.orm.GetL2BlockInfos(map[string]interface{}{"batch_hash": task.Hash})
blockInfos, err := m.orm.GetL2BlockInfos(map[string]interface{}{"batch_hash": taskId})
if err != nil {
log.Error(
"could not GetBlockInfos",
"batch_hash", task.Hash,
"batch_hash", taskId,
"error", err,
)
return false
@@ -486,35 +511,39 @@ func (m *Manager) StartProofGenerationSession(task *types.BlockBatch) (success b
log.Info("selectRoller returns nil")
break
}
log.Info("roller is picked", "session id", task.Hash, "name", roller.Name, "public key", roller.PublicKey)
log.Info("roller is picked", "session id", taskId, "name", roller.Name, "public key", roller.PublicKey)
// send trace to roller
if !roller.sendTask(task.Hash, traces) {
log.Error("send task failed", "roller name", roller.Name, "public key", roller.PublicKey, "id", task.Hash)
if !roller.sendTask(taskId, traces) {
log.Error("send task failed", "roller name", roller.Name, "public key", roller.PublicKey, "id", taskId)
continue
}
rollers[roller.PublicKey] = &types.RollerStatus{PublicKey: roller.PublicKey, Name: roller.Name, Status: types.RollerAssigned}
}
// No roller assigned.
if len(rollers) == 0 {
log.Error("no roller assigned", "id", task.Hash, "number of idle rollers", m.GetNumberOfIdleRollers())
log.Error("no roller assigned", "id", taskId, "number of idle rollers", m.GetNumberOfIdleRollers())
return false
}
// Update session proving status as assigned.
if err = m.orm.UpdateProvingStatus(task.Hash, types.ProvingTaskAssigned); err != nil {
log.Error("failed to update task status", "id", task.Hash, "err", err)
if err = m.orm.UpdateProvingStatus(taskId, types.ProvingTaskAssigned); err != nil {
log.Error("failed to update task status", "id", taskId, "err", err)
return false
}
// Create a proof generation session.
sess := &session{
info: &types.SessionInfo{
ID: task.Hash,
ID: taskId,
Rollers: rollers,
StartTimestamp: time.Now().Unix(),
Attempts: 1,
},
finishChan: make(chan rollerProofStatus, proofAndPkBufferSize),
}
if prevSession != nil {
sess.info.Attempts += prevSession.info.Attempts
}
// Store session info.
if err = m.orm.SetSessionInfo(sess.info); err != nil {
@@ -531,7 +560,7 @@ func (m *Manager) StartProofGenerationSession(task *types.BlockBatch) (success b
}
m.mu.Lock()
m.sessions[task.Hash] = sess
m.sessions[taskId] = sess
m.mu.Unlock()
go m.CollectProofs(sess)

View File

@@ -87,6 +87,7 @@ func TestApis(t *testing.T) {
t.Run("TestSeveralConnections", testSeveralConnections)
t.Run("TestValidProof", testValidProof)
t.Run("TestInvalidProof", testInvalidProof)
t.Run("TestTimedoutProof", testTimedoutProof)
t.Run("TestIdleRollerSelection", testIdleRollerSelection)
// TODO: Restart roller alone when received task, can add this test case in integration-test.
//t.Run("TestRollerReconnect", testRollerReconnect)
@@ -356,6 +357,86 @@ func testInvalidProof(t *testing.T) {
}
}
func testTimedoutProof(t *testing.T) {
// Create db handler and reset db.
l2db, err := database.NewOrmFactory(cfg.DBConfig)
assert.NoError(t, err)
assert.NoError(t, migrate.ResetDB(l2db.GetDB().DB))
defer l2db.Close()
// Setup coordinator and ws server.
wsURL := "ws://" + randomURL()
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, 1, wsURL)
defer func() {
handler.Shutdown(context.Background())
rollerManager.Stop()
}()
// create first mock roller, that will not send any proof.
roller1 := newMockRoller(t, "roller_test"+strconv.Itoa(0), wsURL)
defer func() {
// close connection
roller1.close()
}()
assert.Equal(t, 1, rollerManager.GetNumberOfIdleRollers())
var hashes = make([]string, 1)
dbTx, err := l2db.Beginx()
assert.NoError(t, err)
for i := range hashes {
assert.NoError(t, l2db.NewBatchInDBTx(dbTx, batchData))
hashes[i] = batchData.Hash().Hex()
}
assert.NoError(t, dbTx.Commit())
// verify proof status, it should be assigned, because roller didn't send any proof
var (
tick = time.Tick(500 * time.Millisecond)
tickStop = time.Tick(10 * time.Second)
)
for len(hashes) > 0 {
select {
case <-tick:
status, err := l2db.GetProvingStatusByHash(hashes[0])
assert.NoError(t, err)
if status == types.ProvingTaskAssigned {
hashes = hashes[1:]
}
case <-tickStop:
t.Error("failed to check proof status")
return
}
}
// create second mock roller, that will send valid proof.
roller2 := newMockRoller(t, "roller_test"+strconv.Itoa(1), wsURL)
roller2.waitTaskAndSendProof(t, time.Second, false, true)
defer func() {
// close connection
roller2.close()
}()
assert.Equal(t, 1, rollerManager.GetNumberOfIdleRollers())
// wait manager to finish first CollectProofs
<-time.After(60 * time.Second)
// verify proof status, it should be verified now, because second roller sent valid proof
for len(hashes) > 0 {
select {
case <-tick:
status, err := l2db.GetProvingStatusByHash(hashes[0])
assert.NoError(t, err)
if status == types.ProvingTaskVerified {
hashes = hashes[1:]
}
case <-tickStop:
t.Error("failed to check proof status")
return
}
}
}
func testIdleRollerSelection(t *testing.T) {
// Create db handler and reset db.
l2db, err := database.NewOrmFactory(cfg.DBConfig)
@@ -505,6 +586,7 @@ func setupCoordinator(t *testing.T, dbCfg *database.DBConfig, rollersPerSession
CollectionTime: 1,
TokenTimeToLive: 5,
MaxVerifierWorkers: 10,
SessionAttempts: 2,
}, db, nil)
assert.NoError(t, err)
assert.NoError(t, rollerManager.Start())

View File

@@ -78,6 +78,7 @@ func free(t *testing.T) {
}
type appAPI interface {
OpenLog(open bool)
WaitResult(t *testing.T, timeout time.Duration, keyword string) bool
RunApp(waitResult func() bool)
WaitExit()
@@ -86,33 +87,44 @@ type appAPI interface {
func runMsgRelayerApp(t *testing.T, args ...string) appAPI {
args = append(args, "--log.debug", "--config", bridgeFile)
return cmd.NewCmd("message-relayer-test", args...)
app := cmd.NewCmd("message-relayer-test", args...)
app.OpenLog(true)
return app
}
func runGasOracleApp(t *testing.T, args ...string) appAPI {
args = append(args, "--log.debug", "--config", bridgeFile)
return cmd.NewCmd("gas-oracle-test", args...)
app := cmd.NewCmd("gas-oracle-test", args...)
app.OpenLog(true)
return app
}
func runRollupRelayerApp(t *testing.T, args ...string) appAPI {
args = append(args, "--log.debug", "--config", bridgeFile)
return cmd.NewCmd("rollup-relayer-test", args...)
app := cmd.NewCmd("rollup-relayer-test", args...)
app.OpenLog(true)
return app
}
func runEventWatcherApp(t *testing.T, args ...string) appAPI {
args = append(args, "--log.debug", "--config", bridgeFile)
return cmd.NewCmd("event-watcher-test", args...)
app := cmd.NewCmd("event-watcher-test", args...)
app.OpenLog(true)
return app
}
func runCoordinatorApp(t *testing.T, args ...string) appAPI {
args = append(args, "--log.debug", "--config", coordinatorFile, "--ws", "--ws.port", strconv.Itoa(int(wsPort)))
// start process
return cmd.NewCmd("coordinator-test", args...)
app := cmd.NewCmd("coordinator-test", args...)
app.OpenLog(true)
return app
}
func runDBCliApp(t *testing.T, option, keyword string) {
args := []string{option, "--config", dbFile}
app := cmd.NewCmd("db_cli-test", args...)
app.OpenLog(true)
defer app.WaitExit()
// Wait expect result.
@@ -122,7 +134,9 @@ func runDBCliApp(t *testing.T, option, keyword string) {
func runRollerApp(t *testing.T, args ...string) appAPI {
args = append(args, "--log.debug", "--config", rollerFile)
return cmd.NewCmd("roller-test", args...)
app := cmd.NewCmd("roller-test", args...)
app.OpenLog(true)
return app
}
func runSender(t *testing.T, endpoint string) *sender.Sender {