Compare commits

...

71 Commits

Author SHA1 Message Date
maskpp
5cf492d3c9 merge develop branch and fix conflict 2023-03-28 10:22:49 +08:00
maskpp
b6f51af61b trigger ci 2023-03-27 09:22:38 +08:00
maskpp
3f93ec16f6 merge develop branch and fix conflict 2023-03-27 09:19:15 +08:00
maskpp
e1aef7ca3b trigger ci 2023-03-27 09:13:02 +08:00
maskpp
a15560f9fa Merge branch 'develop' into maskpp/resender_2 2023-03-24 10:00:18 +08:00
maskpp
e26854285e Merge branch 'develop' into maskpp/resender_2 2023-03-23 17:03:53 +08:00
maskpp
6cc3e59b7c trigger ci 2023-03-23 10:02:36 +08:00
maskpp
9fab743a87 Merge branch 'develop' into maskpp/resender_2 2023-03-22 22:19:21 +08:00
maskpp
9047512b90 Merge branch 'develop' into maskpp/resender_2 2023-03-22 19:48:10 +08:00
maskpp
cd70cd7008 Merge branch 'develop' into maskpp/resender_2 2023-03-22 16:04:48 +08:00
maskpp
ccf73c54dc fix comments. 2023-03-22 16:04:22 +08:00
maskpp
4f9eaa8381 fix ci 2023-03-21 21:53:07 +08:00
maskpp
92e70c0c61 fix ci 2023-03-21 21:42:04 +08:00
maskpp
e07c3e52f8 fix comments. 2023-03-21 21:37:58 +08:00
maskpp
4328cc59a8 fix comments. 2023-03-21 21:32:26 +08:00
maskpp
db1fdc5c97 revert change 2023-03-21 15:53:35 +08:00
maskpp
d1694bf110 trigger ci 2023-03-21 15:41:28 +08:00
maskpp
2a5f3ec5e9 open log docker to look more ci detail 2023-03-21 15:39:46 +08:00
maskpp
046c41c536 fix ci 2023-03-20 22:05:44 +08:00
maskpp
e988e6da4c fix ci 2023-03-20 21:51:34 +08:00
maskpp
a3654580a7 Merge branch 'develop' into maskpp/resender_2 2023-03-20 21:49:57 +08:00
maskpp
af7c3b0827 fix comments. 2023-03-20 21:49:28 +08:00
maskpp
397e3edb3b fix comments. 2023-03-20 20:30:11 +08:00
maskpp
44b66e94fa fix comments. 2023-03-20 20:30:11 +08:00
maskpp
42fc5c5f6d Update bridge/sender/sender.go
Co-authored-by: Péter Garamvölgyi <peter@scroll.io>
2023-03-20 20:06:34 +08:00
maskpp
fd6d9cbd3b Merge branch 'develop' into maskpp/resender_2 2023-03-20 15:04:31 +08:00
maskpp
5519b6f578 trigger ci 2023-03-20 13:26:01 +08:00
maskpp
1e313493b8 Merge branch 'develop' into maskpp/resender_2 2023-03-19 15:22:56 +08:00
maskpp
f7a8d0d14c trigger ci 2023-03-17 10:11:45 +08:00
maskpp
0cb3c295ef merge develop branch and fix conflict 2023-03-17 10:05:56 +08:00
maskpp
14c2f5cea8 merge develop branch and fix conflict 2023-03-16 17:38:39 +08:00
maskpp
7687a03cab fix comments. 2023-03-16 16:58:33 +08:00
maskpp
cd9968cdd9 fix comments. 2023-03-16 15:59:17 +08:00
maskpp
8b49a6b785 trigger ci 2023-03-16 10:37:07 +08:00
maskpp
8c20474d33 Merge branch 'develop' into maskpp/resender_2 2023-03-16 10:29:41 +08:00
maskpp
8b94e6aad9 Merge branch 'develop' into maskpp/resender_2 2023-03-16 10:08:00 +08:00
maskpp
d8c1cd0038 trigger ci 2023-03-15 17:35:51 +08:00
maskpp
90472ddb34 fix ci 2023-03-15 17:33:26 +08:00
maskpp
b9c9327803 trigger ci 2023-03-15 16:21:33 +08:00
maskpp
2934ab8930 fix ci 2023-03-15 14:37:07 +08:00
maskpp
042bb1f357 merge develop branch and fix conflict 2023-03-15 14:34:26 +08:00
maskpp
62eb8e484f Merge branch 'develop' into maskpp/resender_2 2023-03-15 10:27:15 +08:00
maskpp
2638c59896 change to use loop {} 2023-03-15 10:27:00 +08:00
maskpp
6eae95925a trigger ci 2023-03-14 19:42:19 +08:00
maskpp
1ae7f35040 trigger ci 2023-03-14 19:37:55 +08:00
maskpp
5b2ed914ac fix the latest number's messages 2023-03-14 17:14:20 +08:00
maskpp
21bd99248e Merge branch 'develop' into maskpp/resender_2 2023-03-14 17:07:52 +08:00
maskpp
e5e279a900 Merge branch 'develop' into maskpp/resender_2 2023-03-14 15:17:01 +08:00
maskpp
bc6bab305c fix comments. 2023-03-14 15:15:50 +08:00
maskpp
8727613d8c fix comments. 2023-03-14 14:11:20 +08:00
maskpp
b635a419ca fix comments. 2023-03-14 14:03:34 +08:00
maskpp
865fb79240 Merge branch 'develop' into maskpp/resender_2 2023-03-14 11:30:33 +08:00
Péter Garamvölgyi
c8dc81516e Merge branch 'staging' into maskpp/resender_2 2023-03-13 13:03:34 +01:00
maskpp
84cfea7d87 Merge branch 'staging' into maskpp/resender_2 2023-03-13 11:03:34 +08:00
maskpp
2538bcdc7b Merge branch 'staging' into maskpp/resender_2 2023-03-10 18:18:43 +08:00
maskpp
60c01c371e Merge branch 'staging' into maskpp/resender_2 2023-03-10 14:57:44 +08:00
maskpp
678e0adc0d fix comments. 2023-03-10 11:03:20 +08:00
maskpp
cf157bc670 trigger ci 2023-03-10 10:56:45 +08:00
HAOYUatHZ
6f0042735a revert Jenkinsfile 2023-03-10 10:12:09 +08:00
HAOYUatHZ
610f952f40 Merge remote-tracking branch 'origin/staging' into maskpp/resender_2 2023-03-10 10:11:51 +08:00
maskpp
50fb641b9c fix ci 2023-03-08 10:23:32 +08:00
maskpp
57c4e703db fix ci 2023-03-08 10:12:45 +08:00
maskpp
08d707dfd2 Add batches check. 2023-03-08 09:56:42 +08:00
maskpp
856af6735d temporary close coordinator in order to ignore cargo error. 2023-03-07 09:44:15 +08:00
maskpp
ad487af82f Fix ci 2023-03-07 09:23:56 +08:00
maskpp
ba71153e5a Merge branch 'staging' into maskpp/resender_2 2023-03-07 09:12:47 +08:00
maskpp
c43999e150 Merge branch 'staging' into maskpp/resender_2 2023-03-06 16:00:04 +08:00
maskpp
a88459dafb merge staging branch and fix conflict 2023-03-06 15:59:40 +08:00
maskpp
0c5515473b update resender by the latest status 2023-03-01 21:49:25 +08:00
maskpp
9f58c46206 update resender by the latest status 2023-03-01 18:21:25 +08:00
maskpp
30211250d3 update resender by the latest status 2023-03-01 18:17:32 +08:00
29 changed files with 1173 additions and 577 deletions

View File

@@ -19,7 +19,8 @@
"escalate_multiple_den": 10,
"max_gas_price": 10000000000,
"tx_type": "LegacyTx",
"min_balance": 100000000000000000000
"min_balance": 100000000000000000000,
"pending_limit": 500
},
"gas_oracle_config": {
"min_gas_price": 0,
@@ -53,7 +54,8 @@
"escalate_multiple_den": 10,
"max_gas_price": 10000000000,
"tx_type": "LegacyTx",
"min_balance": 100000000000000000000
"min_balance": 100000000000000000000,
"pending_limit": 500
},
"gas_oracle_config": {
"min_gas_price": 0,

View File

@@ -30,7 +30,8 @@ type SenderConfig struct {
// The transaction type to use: LegacyTx, AccessListTx, DynamicFeeTx
TxType string `json:"tx_type"`
// The min balance set for check and set balance for sender's accounts.
MinBalance *big.Int `json:"min_balance,omitempty"`
MinBalance *big.Int `json:"min_balance,omitempty"`
PendingLimit int64 `json:"pending_limit,omitempty"`
// The interval (in seconds) to check balance and top up sender's accounts
CheckBalanceTime uint64 `json:"check_balance_time"`
}

View File

@@ -3,6 +3,7 @@ module scroll-tech/bridge
go 1.18
require (
github.com/modern-go/reflect2 v1.0.2
github.com/orcaman/concurrent-map v1.0.0
github.com/scroll-tech/go-ethereum v1.10.14-0.20230321020420-127af384ed04
github.com/stretchr/testify v1.8.2

View File

@@ -60,6 +60,8 @@ github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/Qd
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/pointerstructure v1.2.0 h1:O+i9nHnXS3l/9Wu7r4NrEdwA2VFTicjUEN1uBnDo34A=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=

View File

@@ -2,14 +2,11 @@ package l1
import (
"context"
"errors"
"math/big"
"time"
// not sure if this will make problems when relay with l1geth
"github.com/scroll-tech/go-ethereum/accounts/abi"
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/crypto"
"github.com/scroll-tech/go-ethereum/log"
geth_metrics "github.com/scroll-tech/go-ethereum/metrics"
@@ -101,7 +98,7 @@ func NewLayer1Relayer(ctx context.Context, db database.OrmFactory, cfg *config.R
minGasLimitForMessageRelay = cfg.MessageRelayMinGasLimit
}
return &Layer1Relayer{
relayer := &Layer1Relayer{
ctx: ctx,
db: db,
@@ -120,104 +117,10 @@ func NewLayer1Relayer(ctx context.Context, db database.OrmFactory, cfg *config.R
cfg: cfg,
stopCh: make(chan struct{}),
}, nil
}
// ProcessSavedEvents relays saved un-processed cross-domain transactions to desired blockchain
func (r *Layer1Relayer) ProcessSavedEvents() {
// msgs are sorted by nonce in increasing order
msgs, err := r.db.GetL1MessagesByStatus(types.MsgPending, 100)
if err != nil {
log.Error("Failed to fetch unprocessed L1 messages", "err", err)
return
}
go relayer.confirmLoop(ctx)
if len(msgs) > 0 {
log.Info("Processing L1 messages", "count", len(msgs))
}
for _, msg := range msgs {
if err = r.processSavedEvent(msg); err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
log.Error("failed to process event", "msg.msgHash", msg.MsgHash, "err", err)
}
return
}
}
}
func (r *Layer1Relayer) processSavedEvent(msg *types.L1Message) error {
calldata := common.Hex2Bytes(msg.Calldata)
hash, err := r.messageSender.SendTransaction(msg.MsgHash, &r.cfg.MessengerContractAddress, big.NewInt(0), calldata, r.minGasLimitForMessageRelay)
if err != nil && err.Error() == "execution reverted: Message expired" {
return r.db.UpdateLayer1Status(r.ctx, msg.MsgHash, types.MsgExpired)
}
if err != nil && err.Error() == "execution reverted: Message successfully executed" {
return r.db.UpdateLayer1Status(r.ctx, msg.MsgHash, types.MsgConfirmed)
}
if err != nil {
return err
}
bridgeL1MsgsRelayedTotalCounter.Inc(1)
log.Info("relayMessage to layer2", "msg hash", msg.MsgHash, "tx hash", hash)
err = r.db.UpdateLayer1StatusAndLayer2Hash(r.ctx, msg.MsgHash, types.MsgSubmitted, hash.String())
if err != nil {
log.Error("UpdateLayer1StatusAndLayer2Hash failed", "msg.msgHash", msg.MsgHash, "msg.height", msg.Height, "err", err)
}
return err
}
// ProcessGasPriceOracle imports gas price to layer2
func (r *Layer1Relayer) ProcessGasPriceOracle() {
latestBlockHeight, err := r.db.GetLatestL1BlockHeight()
if err != nil {
log.Warn("Failed to fetch latest L1 block height from db", "err", err)
return
}
blocks, err := r.db.GetL1BlockInfos(map[string]interface{}{
"number": latestBlockHeight,
})
if err != nil {
log.Error("Failed to GetL1BlockInfos from db", "height", latestBlockHeight, "err", err)
return
}
if len(blocks) != 1 {
log.Error("Block not exist", "height", latestBlockHeight)
return
}
block := blocks[0]
if block.GasOracleStatus == types.GasOraclePending {
expectedDelta := r.lastGasPrice * r.gasPriceDiff / gasPriceDiffPrecision
// last is undefine or (block.BaseFee >= minGasPrice && exceed diff)
if r.lastGasPrice == 0 || (block.BaseFee >= r.minGasPrice && (block.BaseFee >= r.lastGasPrice+expectedDelta || block.BaseFee <= r.lastGasPrice-expectedDelta)) {
baseFee := big.NewInt(int64(block.BaseFee))
data, err := r.l1GasOracleABI.Pack("setL1BaseFee", baseFee)
if err != nil {
log.Error("Failed to pack setL1BaseFee", "block.Hash", block.Hash, "block.Height", block.Number, "block.BaseFee", block.BaseFee, "err", err)
return
}
hash, err := r.gasOracleSender.SendTransaction(block.Hash, &r.cfg.GasPriceOracleContractAddress, big.NewInt(0), data, 0)
if err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
log.Error("Failed to send setL1BaseFee tx to layer2 ", "block.Hash", block.Hash, "block.Height", block.Number, "err", err)
}
return
}
err = r.db.UpdateL1GasOracleStatusAndOracleTxHash(r.ctx, block.Hash, types.GasOracleImporting, hash.String())
if err != nil {
log.Error("UpdateGasOracleStatusAndOracleTxHash failed", "block.Hash", block.Hash, "block.Height", block.Number, "err", err)
return
}
r.lastGasPrice = block.BaseFee
log.Info("Update l1 base fee", "txHash", hash.String(), "baseFee", baseFee)
}
}
return relayer, nil
}
// Start the relayer process
@@ -225,55 +128,65 @@ func (r *Layer1Relayer) Start() {
go func() {
ctx, cancel := context.WithCancel(r.ctx)
go utils.Loop(ctx, 2*time.Second, r.ProcessSavedEvents)
go utils.Loop(ctx, 2*time.Second, r.ProcessGasPriceOracle)
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case cfm := <-r.messageCh:
bridgeL1MsgsRelayedConfirmedTotalCounter.Inc(1)
if !cfm.IsSuccessful {
err := r.db.UpdateLayer1StatusAndLayer2Hash(r.ctx, cfm.ID, types.MsgRelayFailed, cfm.TxHash.String())
if err != nil {
log.Warn("UpdateLayer1StatusAndLayer2Hash failed", "err", err)
}
log.Warn("transaction confirmed but failed in layer2", "confirmation", cfm)
} else {
// @todo handle db error
err := r.db.UpdateLayer1StatusAndLayer2Hash(r.ctx, cfm.ID, types.MsgConfirmed, cfm.TxHash.String())
if err != nil {
log.Warn("UpdateLayer1StatusAndLayer2Hash failed", "err", err)
}
log.Info("transaction confirmed in layer2", "confirmation", cfm)
}
case cfm := <-r.gasOracleCh:
if !cfm.IsSuccessful {
// @discuss: maybe make it pending again?
err := r.db.UpdateL1GasOracleStatusAndOracleTxHash(r.ctx, cfm.ID, types.GasOracleFailed, cfm.TxHash.String())
if err != nil {
log.Warn("UpdateL1GasOracleStatusAndOracleTxHash failed", "err", err)
}
log.Warn("transaction confirmed but failed in layer2", "confirmation", cfm)
} else {
// @todo handle db error
err := r.db.UpdateL1GasOracleStatusAndOracleTxHash(r.ctx, cfm.ID, types.GasOracleImported, cfm.TxHash.String())
if err != nil {
log.Warn("UpdateGasOracleStatusAndOracleTxHash failed", "err", err)
}
log.Info("transaction confirmed in layer2", "confirmation", cfm)
}
}
go func() {
if err := r.checkSubmittedMessages(); err != nil {
log.Error("failed to init layer1 submitted tx", "err", err)
}
}(ctx)
// Wait until sender pool is clean.
utils.TryTimes(-1, func() bool {
return r.messageSender.PendingCount() == 0
})
go utils.Loop(ctx, 2*time.Second, r.ProcessSavedEvents)
}()
go utils.Loop(ctx, 2*time.Second, r.ProcessGasPriceOracle)
<-r.stopCh
cancel()
}()
}
func (r *Layer1Relayer) confirmLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case cfm := <-r.messageCh:
bridgeL1MsgsRelayedConfirmedTotalCounter.Inc(1)
if !cfm.IsSuccessful {
err := r.db.UpdateLayer1StatusAndLayer2Hash(r.ctx, cfm.ID, types.MsgRelayFailed, cfm.TxHash.String())
if err != nil {
log.Warn("UpdateLayer1StatusAndLayer2Hash failed", "err", err)
}
log.Warn("transaction confirmed but failed in layer2", "confirmation", cfm)
} else {
// @todo handle db error
err := r.db.UpdateLayer1StatusAndLayer2Hash(r.ctx, cfm.ID, types.MsgConfirmed, cfm.TxHash.String())
if err != nil {
log.Warn("UpdateLayer1StatusAndLayer2Hash failed", "err", err)
}
log.Info("transaction confirmed in layer2", "confirmation", cfm)
}
case cfm := <-r.gasOracleCh:
if !cfm.IsSuccessful {
// @discuss: maybe make it pending again?
err := r.db.UpdateL1GasOracleStatusAndOracleTxHash(r.ctx, cfm.ID, types.GasOracleFailed, cfm.TxHash.String())
if err != nil {
log.Warn("UpdateL1GasOracleStatusAndOracleTxHash failed", "err", err)
}
log.Warn("transaction confirmed but failed in layer2", "confirmation", cfm)
} else {
// @todo handle db error
err := r.db.UpdateL1GasOracleStatusAndOracleTxHash(r.ctx, cfm.ID, types.GasOracleImported, cfm.TxHash.String())
if err != nil {
log.Warn("UpdateGasOracleStatusAndOracleTxHash failed", "err", err)
}
log.Info("transaction confirmed in layer2", "confirmation", cfm)
}
}
}
}
// Stop the relayer module, for a graceful shutdown.
func (r *Layer1Relayer) Stop() {
close(r.stopCh)

View File

@@ -0,0 +1,63 @@
package l1
import (
"errors"
"math/big"
"github.com/scroll-tech/go-ethereum/log"
"scroll-tech/common/types"
"scroll-tech/bridge/sender"
)
// ProcessGasPriceOracle imports gas price to layer2
func (r *Layer1Relayer) ProcessGasPriceOracle() {
latestBlockHeight, err := r.db.GetLatestL1BlockHeight()
if err != nil {
log.Warn("Failed to fetch latest L1 block height from db", "err", err)
return
}
blocks, err := r.db.GetL1BlockInfos(map[string]interface{}{
"number": latestBlockHeight,
})
if err != nil {
log.Error("Failed to GetL1BlockInfos from db", "height", latestBlockHeight, "err", err)
return
}
if len(blocks) != 1 {
log.Error("Block not exist", "height", latestBlockHeight)
return
}
block := blocks[0]
if block.GasOracleStatus == types.GasOraclePending {
expectedDelta := r.lastGasPrice * r.gasPriceDiff / gasPriceDiffPrecision
// last is undefine or (block.BaseFee >= minGasPrice && exceed diff)
if r.lastGasPrice == 0 || (block.BaseFee >= r.minGasPrice && (block.BaseFee >= r.lastGasPrice+expectedDelta || block.BaseFee <= r.lastGasPrice-expectedDelta)) {
baseFee := big.NewInt(int64(block.BaseFee))
data, err := r.l1GasOracleABI.Pack("setL1BaseFee", baseFee)
if err != nil {
log.Error("Failed to pack setL1BaseFee", "block.Hash", block.Hash, "block.Height", block.Number, "block.BaseFee", block.BaseFee, "err", err)
return
}
hash, err := r.gasOracleSender.SendTransaction(block.Hash, &r.cfg.GasPriceOracleContractAddress, big.NewInt(0), data, 0)
if err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
log.Error("Failed to send setL1BaseFee tx to layer2 ", "block.Hash", block.Hash, "block.Height", block.Number, "err", err)
}
return
}
err = r.db.UpdateL1GasOracleStatusAndOracleTxHash(r.ctx, block.Hash, types.GasOracleImporting, hash.String())
if err != nil {
log.Error("UpdateGasOracleStatusAndOracleTxHash failed", "block.Hash", block.Hash, "block.Height", block.Number, "err", err)
return
}
r.lastGasPrice = block.BaseFee
log.Info("Update l1 base fee", "txHash", hash.String(), "baseFee", baseFee)
}
}
}

View File

@@ -0,0 +1,113 @@
package l1
import (
"errors"
"fmt"
"math/big"
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/log"
"scroll-tech/common/types"
"scroll-tech/common/utils"
"scroll-tech/bridge/sender"
)
func (r *Layer1Relayer) checkSubmittedMessages() error {
var (
index uint64
msgsSize = 100
)
for {
msgs, err := r.db.GetL1Messages(
map[string]interface{}{"status": types.MsgSubmitted},
fmt.Sprintf("AND queue_index > %d", index),
fmt.Sprintf("ORDER BY queue_index ASC LIMIT %d", msgsSize),
)
if err != nil {
log.Error("failed to get l1 submitted messages", "queue_index", index, "err", err)
return err
}
if len(msgs) == 0 {
return nil
}
index = msgs[len(msgs)-1].QueueIndex
for _, msg := range msgs {
// If pending txs pool is full, wait until pending pool is available.
utils.TryTimes(-1, func() bool {
return !r.messageSender.IsFull()
})
err = r.messageSender.LoadOrSendTx(
common.HexToHash(msg.Layer2Hash),
msg.MsgHash,
&r.cfg.MessengerContractAddress,
big.NewInt(0),
common.Hex2Bytes(msg.Calldata),
r.minGasLimitForMessageRelay,
)
switch true {
case err == nil:
case err.Error() == "execution reverted: Message expired":
if err = r.db.UpdateLayer1Status(r.ctx, msg.MsgHash, types.MsgExpired); err != nil {
return err
}
case err.Error() == "execution reverted: Message successfully executed":
if err = r.db.UpdateLayer1Status(r.ctx, msg.MsgHash, types.MsgConfirmed); err != nil {
return err
}
default:
log.Error("failed to load or send l1 submitted tx", "msg hash", msg.MsgHash, "err", err)
return err
}
}
}
}
// ProcessSavedEvents relays saved un-processed cross-domain transactions to desired blockchain
func (r *Layer1Relayer) ProcessSavedEvents() {
// msgs are sorted by nonce in increasing order
msgs, err := r.db.GetL1MessagesByStatus(types.MsgPending, 100)
if err != nil {
log.Error("Failed to fetch unprocessed L1 messages", "err", err)
return
}
if len(msgs) > 0 {
log.Info("Processing L1 messages", "count", len(msgs))
}
for _, msg := range msgs {
if err = r.processSavedEvent(msg); err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
log.Error("failed to process event", "msg.msgHash", msg.MsgHash, "err", err)
}
return
}
}
}
func (r *Layer1Relayer) processSavedEvent(msg *types.L1Message) error {
calldata := common.Hex2Bytes(msg.Calldata)
hash, err := r.messageSender.SendTransaction(msg.MsgHash, &r.cfg.MessengerContractAddress, big.NewInt(0), calldata, r.minGasLimitForMessageRelay)
if err != nil && err.Error() == "execution reverted: Message expired" {
return r.db.UpdateLayer1Status(r.ctx, msg.MsgHash, types.MsgExpired)
}
if err != nil && err.Error() == "execution reverted: Message successfully executed" {
return r.db.UpdateLayer1Status(r.ctx, msg.MsgHash, types.MsgConfirmed)
}
if err != nil {
return err
}
bridgeL1MsgsRelayedTotalCounter.Inc(1)
log.Info("relayMessage to layer2", "msg hash", msg.MsgHash, "tx hash", hash)
err = r.db.UpdateLayer1StatusAndLayer2Hash(r.ctx, msg.MsgHash, types.MsgSubmitted, hash.String())
if err != nil {
log.Error("UpdateLayer1StatusAndLayer2Hash failed", "msg.msgHash", msg.MsgHash, "msg.height", msg.Height, "err", err)
}
return err
}

View File

@@ -15,9 +15,10 @@ import (
geth_metrics "github.com/scroll-tech/go-ethereum/metrics"
"github.com/scroll-tech/go-ethereum/rpc"
"scroll-tech/database"
"scroll-tech/common/metrics"
"scroll-tech/common/types"
"scroll-tech/database"
cutil "scroll-tech/common/utils"

View File

@@ -32,18 +32,19 @@ func New(ctx context.Context, cfg *config.L2Config, orm database.OrmFactory) (*B
// Otherwise, there will be a race condition between watcher.initializeGenesis and relayer.ProcessPendingBatches.
watcher := NewL2WatcherClient(ctx, client, cfg.Confirmations, cfg.L2MessengerAddress, cfg.L2MessageQueueAddress, cfg.WithdrawTrieRootSlot, orm)
proposer := NewBatchProposer(ctx, cfg.BatchProposerConfig, orm)
relayer, err := NewLayer2Relayer(ctx, client, orm, cfg.RelayerConfig)
if err != nil {
return nil, err
}
batchProposer := NewBatchProposer(ctx, cfg.BatchProposerConfig, relayer, orm)
proposer.SetLayer2Relayer(relayer)
relayer.SetBatchProposer(proposer)
return &Backend{
cfg: cfg,
watcher: watcher,
relayer: relayer,
batchProposer: batchProposer,
batchProposer: proposer,
orm: orm,
}, nil
}

View File

@@ -4,7 +4,6 @@ import (
"context"
"fmt"
"math"
"reflect"
"sync"
"time"
@@ -21,6 +20,10 @@ import (
"scroll-tech/bridge/config"
)
type relayerInterface interface {
SendCommitTx(batchData []*types.BatchData) error
}
var (
bridgeL2BatchesGasOverThresholdTotalCounter = geth_metrics.NewRegisteredCounter("bridge/l2/batches/gas/over/threshold/total", metrics.ScrollRegistry)
bridgeL2BatchesTxsOverThresholdTotalCounter = geth_metrics.NewRegisteredCounter("bridge/l2/batches/txs/over/threshold/total", metrics.ScrollRegistry)
@@ -83,7 +86,8 @@ type BatchProposer struct {
proofGenerationFreq uint64
batchDataBuffer []*types.BatchData
relayer *Layer2Relayer
relayerInterface
piCfg *types.PublicInputHashConfig
@@ -91,8 +95,8 @@ type BatchProposer struct {
}
// NewBatchProposer will return a new instance of BatchProposer.
func NewBatchProposer(ctx context.Context, cfg *config.BatchProposerConfig, relayer *Layer2Relayer, orm database.OrmFactory) *BatchProposer {
p := &BatchProposer{
func NewBatchProposer(ctx context.Context, cfg *config.BatchProposerConfig, orm database.OrmFactory) *BatchProposer {
return &BatchProposer{
mutex: sync.Mutex{},
ctx: ctx,
orm: orm,
@@ -106,26 +110,24 @@ func NewBatchProposer(ctx context.Context, cfg *config.BatchProposerConfig, rela
batchDataBufferSizeLimit: 100*cfg.CommitTxCalldataSizeLimit + 1*1024*1024, // @todo: determine the value.
proofGenerationFreq: cfg.ProofGenerationFreq,
piCfg: cfg.PublicInputConfig,
relayer: relayer,
stopCh: make(chan struct{}),
}
}
// SetLayer2Relayer set interface from layer2.
func (p *BatchProposer) SetLayer2Relayer(relayer relayerInterface) {
p.relayerInterface = relayer
}
// Start the Listening process
func (p *BatchProposer) Start() {
// for graceful restart.
p.recoverBatchDataBuffer()
// try to commit the leftover pending batches
p.tryCommitBatches()
return p
}
// Start the Listening process
func (p *BatchProposer) Start() {
go func() {
if reflect.ValueOf(p.orm).IsNil() {
panic("must run BatchProposer with DB")
}
ctx, cancel := context.WithCancel(p.ctx)
go utils.Loop(ctx, 2*time.Second, func() {
@@ -199,7 +201,7 @@ func (p *BatchProposer) recoverBatchDataBuffer() {
continue
}
batchData, err := p.generateBatchData(parentBatch, blockInfos)
batchData, err := p.GenerateBatchData(parentBatch, blockInfos)
if err != nil {
continue
}
@@ -277,7 +279,7 @@ func (p *BatchProposer) tryCommitBatches() {
// Send commit tx for batchDataBuffer[0:index]
log.Info("Commit batches", "start_index", p.batchDataBuffer[0].Batch.BatchIndex,
"end_index", p.batchDataBuffer[index-1].Batch.BatchIndex)
err := p.relayer.SendCommitTx(p.batchDataBuffer[:index])
err := p.SendCommitTx(p.batchDataBuffer[:index])
if err != nil {
// leave the retry to the next ticker
log.Error("SendCommitTx failed", "error", err)
@@ -357,7 +359,7 @@ func (p *BatchProposer) createBatchForBlocks(blocks []*types.BlockInfo) error {
return err
}
batchData, err := p.generateBatchData(lastBatch, blocks)
batchData, err := p.GenerateBatchData(lastBatch, blocks)
if err != nil {
log.Error("createBatchData failed", "error", err)
return err
@@ -372,7 +374,8 @@ func (p *BatchProposer) createBatchForBlocks(blocks []*types.BlockInfo) error {
return nil
}
func (p *BatchProposer) generateBatchData(parentBatch *types.BlockBatch, blocks []*types.BlockInfo) (*types.BatchData, error) {
// GenerateBatchData as a public function can be called by layer2 relayer.
func (p *BatchProposer) GenerateBatchData(parentBatch *types.BlockBatch, blocks []*types.BlockInfo) (*types.BatchData, error) {
var wrappedBlocks []*types.WrappedBlock
for _, block := range blocks {
trs, err := p.orm.GetL2WrappedBlocks(map[string]interface{}{"hash": block.Hash})

View File

@@ -40,7 +40,10 @@ func testBatchProposerProposeBatch(t *testing.T) {
BatchTxNumThreshold: 135,
BatchTimeSec: 1,
BatchBlocksLimit: 100,
}, relayer, db)
}, db)
relayer.SetBatchProposer(proposer)
proposer.SetLayer2Relayer(relayer)
proposer.tryProposeBatch()
infos, err := db.GetUnbatchedL2Blocks(map[string]interface{}{},
@@ -84,13 +87,15 @@ func testBatchProposerGracefulRestart(t *testing.T) {
assert.Equal(t, 1, len(batchHashes))
assert.Equal(t, batchData2.Hash().Hex(), batchHashes[0])
// test p.recoverBatchDataBuffer().
_ = NewBatchProposer(context.Background(), &config.BatchProposerConfig{
proposer := NewBatchProposer(context.Background(), &config.BatchProposerConfig{
ProofGenerationFreq: 1,
BatchGasThreshold: 3000000,
BatchTxNumThreshold: 135,
BatchTimeSec: 1,
BatchBlocksLimit: 100,
}, relayer, db)
}, db)
proposer.SetLayer2Relayer(relayer)
proposer.Start()
batchHashes, err = db.GetPendingBatches(math.MaxInt32)
assert.NoError(t, err)

View File

@@ -2,32 +2,26 @@ package l2
import (
"context"
"errors"
"fmt"
"math/big"
"runtime"
"sync"
"time"
"github.com/scroll-tech/go-ethereum/accounts/abi"
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/crypto"
"github.com/scroll-tech/go-ethereum/ethclient"
"github.com/scroll-tech/go-ethereum/log"
geth_metrics "github.com/scroll-tech/go-ethereum/metrics"
"golang.org/x/sync/errgroup"
"modernc.org/mathutil"
"scroll-tech/common/utils"
"scroll-tech/database"
"scroll-tech/common/metrics"
"scroll-tech/common/types"
"scroll-tech/database"
cutil "scroll-tech/common/utils"
bridge_abi "scroll-tech/bridge/abi"
"scroll-tech/bridge/config"
"scroll-tech/bridge/sender"
"scroll-tech/bridge/utils"
)
var (
@@ -48,6 +42,10 @@ const (
defaultMessageRelayMinGasLimit = 200000 // should be enough for both ERC20 and ETH relay
)
type batchInterface interface {
GenerateBatchData(parentBatch *types.BlockBatch, blocks []*types.BlockInfo) (*types.BatchData, error)
}
// Layer2Relayer is responsible for
// 1. Committing and finalizing L2 blocks on L1
// 2. Relaying messages from L2 to L1
@@ -92,6 +90,9 @@ type Layer2Relayer struct {
// key(string): confirmation ID, value(string): batch hash.
processingFinalization sync.Map
// Use batch_proposer's GenerateBatchData interface.
batchInterface
stopCh chan struct{}
}
@@ -131,7 +132,7 @@ func NewLayer2Relayer(ctx context.Context, l2Client *ethclient.Client, db databa
minGasLimitForMessageRelay = cfg.MessageRelayMinGasLimit
}
return &Layer2Relayer{
relayer := &Layer2Relayer{
ctx: ctx,
db: db,
@@ -159,399 +160,52 @@ func NewLayer2Relayer(ctx context.Context, l2Client *ethclient.Client, db databa
processingBatchesCommitment: sync.Map{},
processingFinalization: sync.Map{},
stopCh: make(chan struct{}),
}, nil
}
go relayer.confirmLoop(ctx)
return relayer, nil
}
const processMsgLimit = 100
// ProcessSavedEvents relays saved un-processed cross-domain transactions to desired blockchain
func (r *Layer2Relayer) ProcessSavedEvents() {
batch, err := r.db.GetLatestFinalizedBatch()
if err != nil {
log.Error("GetLatestFinalizedBatch failed", "err", err)
return
}
// msgs are sorted by nonce in increasing order
msgs, err := r.db.GetL2Messages(
map[string]interface{}{"status": types.MsgPending},
fmt.Sprintf("AND height<=%d", batch.EndBlockNumber),
fmt.Sprintf("ORDER BY nonce ASC LIMIT %d", processMsgLimit),
)
if err != nil {
log.Error("Failed to fetch unprocessed L2 messages", "err", err)
return
}
// process messages in batches
batchSize := mathutil.Min((runtime.GOMAXPROCS(0)+1)/2, r.messageSender.NumberOfAccounts())
for size := 0; len(msgs) > 0; msgs = msgs[size:] {
if size = len(msgs); size > batchSize {
size = batchSize
}
var g errgroup.Group
for _, msg := range msgs[:size] {
msg := msg
g.Go(func() error {
return r.processSavedEvent(msg)
})
}
if err := g.Wait(); err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
log.Error("failed to process l2 saved event", "err", err)
}
return
}
}
}
func (r *Layer2Relayer) processSavedEvent(msg *types.L2Message) error {
// @todo fetch merkle proof from l2geth
log.Info("Processing L2 Message", "msg.nonce", msg.Nonce, "msg.height", msg.Height)
// Get the block info that contains the message
blockInfos, err := r.db.GetL2BlockInfos(map[string]interface{}{"number": msg.Height})
if err != nil {
log.Error("Failed to GetL2BlockInfos from DB", "number", msg.Height)
}
blockInfo := blockInfos[0]
if !blockInfo.BatchHash.Valid {
log.Error("Block has not been batched yet", "number", blockInfo.Number, "msg.nonce", msg.Nonce)
return nil
}
// TODO: rebuild the withdraw trie to generate the merkle proof
proof := bridge_abi.IL1ScrollMessengerL2MessageProof{
BatchHash: common.HexToHash(blockInfo.BatchHash.String),
MerkleProof: make([]byte, 0),
}
from := common.HexToAddress(msg.Sender)
target := common.HexToAddress(msg.Target)
value, ok := big.NewInt(0).SetString(msg.Value, 10)
if !ok {
// @todo maybe panic?
log.Error("Failed to parse message value", "msg.nonce", msg.Nonce, "msg.height", msg.Height)
// TODO: need to skip this message by changing its status to MsgError
}
msgNonce := big.NewInt(int64(msg.Nonce))
calldata := common.Hex2Bytes(msg.Calldata)
data, err := r.l1MessengerABI.Pack("relayMessageWithProof", from, target, value, msgNonce, calldata, proof)
if err != nil {
log.Error("Failed to pack relayMessageWithProof", "msg.nonce", msg.Nonce, "err", err)
// TODO: need to skip this message by changing its status to MsgError
return err
}
hash, err := r.messageSender.SendTransaction(msg.MsgHash, &r.cfg.MessengerContractAddress, big.NewInt(0), data, r.minGasLimitForMessageRelay)
if err != nil && err.Error() == "execution reverted: Message expired" {
return r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, types.MsgExpired)
}
if err != nil && err.Error() == "execution reverted: Message successfully executed" {
return r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, types.MsgConfirmed)
}
if err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
log.Error("Failed to send relayMessageWithProof tx to layer1 ", "msg.height", msg.Height, "msg.MsgHash", msg.MsgHash, "err", err)
}
return err
}
bridgeL2MsgsRelayedTotalCounter.Inc(1)
log.Info("relayMessageWithProof to layer1", "msgHash", msg.MsgHash, "txhash", hash.String())
// save status in db
// @todo handle db error
err = r.db.UpdateLayer2StatusAndLayer1Hash(r.ctx, msg.MsgHash, types.MsgSubmitted, hash.String())
if err != nil {
log.Error("UpdateLayer2StatusAndLayer1Hash failed", "msgHash", msg.MsgHash, "err", err)
return err
}
r.processingMessage.Store(msg.MsgHash, msg.MsgHash)
return nil
}
// ProcessGasPriceOracle imports gas price to layer1
func (r *Layer2Relayer) ProcessGasPriceOracle() {
batch, err := r.db.GetLatestBatch()
if err != nil {
log.Error("Failed to GetLatestBatch", "err", err)
return
}
if batch.OracleStatus == types.GasOraclePending {
suggestGasPrice, err := r.l2Client.SuggestGasPrice(r.ctx)
if err != nil {
log.Error("Failed to fetch SuggestGasPrice from l2geth", "err", err)
return
}
suggestGasPriceUint64 := uint64(suggestGasPrice.Int64())
expectedDelta := r.lastGasPrice * r.gasPriceDiff / gasPriceDiffPrecision
// last is undefine or (suggestGasPriceUint64 >= minGasPrice && exceed diff)
if r.lastGasPrice == 0 || (suggestGasPriceUint64 >= r.minGasPrice && (suggestGasPriceUint64 >= r.lastGasPrice+expectedDelta || suggestGasPriceUint64 <= r.lastGasPrice-expectedDelta)) {
data, err := r.l2GasOracleABI.Pack("setL2BaseFee", suggestGasPrice)
if err != nil {
log.Error("Failed to pack setL2BaseFee", "batch.Hash", batch.Hash, "GasPrice", suggestGasPrice.Uint64(), "err", err)
return
}
hash, err := r.gasOracleSender.SendTransaction(batch.Hash, &r.cfg.GasPriceOracleContractAddress, big.NewInt(0), data, 0)
if err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
log.Error("Failed to send setL2BaseFee tx to layer2 ", "batch.Hash", batch.Hash, "err", err)
}
return
}
err = r.db.UpdateL2GasOracleStatusAndOracleTxHash(r.ctx, batch.Hash, types.GasOracleImporting, hash.String())
if err != nil {
log.Error("UpdateGasOracleStatusAndOracleTxHash failed", "batch.Hash", batch.Hash, "err", err)
return
}
r.lastGasPrice = suggestGasPriceUint64
log.Info("Update l2 gas price", "txHash", hash.String(), "GasPrice", suggestGasPrice)
}
}
}
// SendCommitTx sends commitBatches tx to L1.
func (r *Layer2Relayer) SendCommitTx(batchData []*types.BatchData) error {
if len(batchData) == 0 {
log.Error("SendCommitTx receives empty batch")
return nil
}
// pack calldata
commitBatches := make([]bridge_abi.IScrollChainBatch, len(batchData))
for i, batch := range batchData {
commitBatches[i] = batch.Batch
}
calldata, err := r.l1RollupABI.Pack("commitBatches", commitBatches)
if err != nil {
log.Error("Failed to pack commitBatches",
"error", err,
"start_batch_index", commitBatches[0].BatchIndex,
"end_batch_index", commitBatches[len(commitBatches)-1].BatchIndex)
return err
}
// generate a unique txID and send transaction
var bytes []byte
for _, batch := range batchData {
bytes = append(bytes, batch.Hash().Bytes()...)
}
txID := crypto.Keccak256Hash(bytes).String()
txHash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), calldata, 0)
if err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
log.Error("Failed to send commitBatches tx to layer1 ", "err", err)
}
return err
}
bridgeL2BatchesCommittedTotalCounter.Inc(int64(len(commitBatches)))
log.Info("Sent the commitBatches tx to layer1",
"tx_hash", txHash.Hex(),
"start_batch_index", commitBatches[0].BatchIndex,
"end_batch_index", commitBatches[len(commitBatches)-1].BatchIndex)
// record and sync with db, @todo handle db error
batchHashes := make([]string, len(batchData))
for i, batch := range batchData {
batchHashes[i] = batch.Hash().Hex()
err = r.db.UpdateCommitTxHashAndRollupStatus(r.ctx, batchHashes[i], txHash.String(), types.RollupCommitting)
if err != nil {
log.Error("UpdateCommitTxHashAndRollupStatus failed", "hash", batchHashes[i], "index", batch.Batch.BatchIndex, "err", err)
}
}
r.processingBatchesCommitment.Store(txID, batchHashes)
return nil
}
// ProcessCommittedBatches submit proof to layer 1 rollup contract
func (r *Layer2Relayer) ProcessCommittedBatches() {
// set skipped batches in a single db operation
if count, err := r.db.UpdateSkippedBatches(); err != nil {
log.Error("UpdateSkippedBatches failed", "err", err)
// continue anyway
} else if count > 0 {
bridgeL2BatchesSkippedTotalCounter.Inc(count)
log.Info("Skipping batches", "count", count)
}
// batches are sorted by batch index in increasing order
batchHashes, err := r.db.GetCommittedBatches(1)
if err != nil {
log.Error("Failed to fetch committed L2 batches", "err", err)
return
}
if len(batchHashes) == 0 {
return
}
hash := batchHashes[0]
// @todo add support to relay multiple batches
batches, err := r.db.GetBlockBatches(map[string]interface{}{"hash": hash}, "LIMIT 1")
if err != nil {
log.Error("Failed to fetch committed L2 batch", "hash", hash, "err", err)
return
}
if len(batches) == 0 {
log.Error("Unexpected result for GetBlockBatches", "hash", hash, "len", 0)
return
}
batch := batches[0]
status := batch.ProvingStatus
switch status {
case types.ProvingTaskUnassigned, types.ProvingTaskAssigned:
// The proof for this block is not ready yet.
return
case types.ProvingTaskProved:
// It's an intermediate state. The roller manager received the proof but has not verified
// the proof yet. We don't roll up the proof until it's verified.
return
case types.ProvingTaskFailed, types.ProvingTaskSkipped:
// note: this is covered by UpdateSkippedBatches, but we keep it for completeness's sake
if err = r.db.UpdateRollupStatus(r.ctx, hash, types.RollupFinalizationSkipped); err != nil {
log.Warn("UpdateRollupStatus failed", "hash", hash, "err", err)
}
case types.ProvingTaskVerified:
log.Info("Start to roll up zk proof", "hash", hash)
success := false
previousBatch, err := r.db.GetLatestFinalizingOrFinalizedBatch()
// skip submitting proof
if err == nil && uint64(batch.CreatedAt.Sub(*previousBatch.CreatedAt).Seconds()) < r.cfg.FinalizeBatchIntervalSec {
log.Info(
"Not enough time passed, skipping",
"hash", hash,
"createdAt", batch.CreatedAt,
"lastFinalizingHash", previousBatch.Hash,
"lastFinalizingStatus", previousBatch.RollupStatus,
"lastFinalizingCreatedAt", previousBatch.CreatedAt,
)
if err = r.db.UpdateRollupStatus(r.ctx, hash, types.RollupFinalizationSkipped); err != nil {
log.Warn("UpdateRollupStatus failed", "hash", hash, "err", err)
} else {
success = true
}
return
}
// handle unexpected db error
if err != nil && err.Error() != "sql: no rows in result set" {
log.Error("Failed to get latest finalized batch", "err", err)
return
}
defer func() {
// TODO: need to revisit this and have a more fine-grained error handling
if !success {
log.Info("Failed to upload the proof, change rollup status to FinalizationSkipped", "hash", hash)
if err = r.db.UpdateRollupStatus(r.ctx, hash, types.RollupFinalizationSkipped); err != nil {
log.Warn("UpdateRollupStatus failed", "hash", hash, "err", err)
}
}
}()
proofBuffer, instanceBuffer, err := r.db.GetVerifiedProofAndInstanceByHash(hash)
if err != nil {
log.Warn("fetch get proof by hash failed", "hash", hash, "err", err)
return
}
if proofBuffer == nil || instanceBuffer == nil {
log.Warn("proof or instance not ready", "hash", hash)
return
}
if len(proofBuffer)%32 != 0 {
log.Error("proof buffer has wrong length", "hash", hash, "length", len(proofBuffer))
return
}
if len(instanceBuffer)%32 != 0 {
log.Warn("instance buffer has wrong length", "hash", hash, "length", len(instanceBuffer))
return
}
proof := utils.BufferToUint256Le(proofBuffer)
instance := utils.BufferToUint256Le(instanceBuffer)
data, err := r.l1RollupABI.Pack("finalizeBatchWithProof", common.HexToHash(hash), proof, instance)
if err != nil {
log.Error("Pack finalizeBatchWithProof failed", "err", err)
return
}
txID := hash + "-finalize"
// add suffix `-finalize` to avoid duplication with commit tx in unit tests
txHash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), data, 0)
finalizeTxHash := &txHash
if err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
log.Error("finalizeBatchWithProof in layer1 failed", "hash", hash, "err", err)
}
return
}
bridgeL2BatchesFinalizedTotalCounter.Inc(1)
log.Info("finalizeBatchWithProof in layer1", "batch_hash", hash, "tx_hash", hash)
// record and sync with db, @todo handle db error
err = r.db.UpdateFinalizeTxHashAndRollupStatus(r.ctx, hash, finalizeTxHash.String(), types.RollupFinalizing)
if err != nil {
log.Warn("UpdateFinalizeTxHashAndRollupStatus failed", "batch_hash", hash, "err", err)
}
success = true
r.processingFinalization.Store(txID, hash)
default:
log.Error("encounter unreachable case in ProcessCommittedBatches",
"block_status", status,
)
}
// SetBatchProposer set interface from batch_proposer.
func (r *Layer2Relayer) SetBatchProposer(proposer batchInterface) {
r.batchInterface = proposer
}
// Start the relayer process
func (r *Layer2Relayer) Start() {
go func() {
ctx, cancel := context.WithCancel(r.ctx)
go cutil.Loop(ctx, time.Second, r.ProcessSavedEvents)
go cutil.Loop(ctx, time.Second, r.ProcessCommittedBatches)
go cutil.Loop(ctx, time.Second, r.ProcessGasPriceOracle)
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case confirmation := <-r.messageCh:
r.handleConfirmation(confirmation)
case confirmation := <-r.rollupCh:
r.handleConfirmation(confirmation)
case cfm := <-r.gasOracleCh:
if !cfm.IsSuccessful {
// @discuss: maybe make it pending again?
err := r.db.UpdateL2GasOracleStatusAndOracleTxHash(r.ctx, cfm.ID, types.GasOracleFailed, cfm.TxHash.String())
if err != nil {
log.Warn("UpdateL2GasOracleStatusAndOracleTxHash failed", "err", err)
}
log.Warn("transaction confirmed but failed in layer1", "confirmation", cfm)
} else {
// @todo handle db error
err := r.db.UpdateL2GasOracleStatusAndOracleTxHash(r.ctx, cfm.ID, types.GasOracleImported, cfm.TxHash.String())
if err != nil {
log.Warn("UpdateL2GasOracleStatusAndOracleTxHash failed", "err", err)
}
log.Info("transaction confirmed in layer1", "confirmation", cfm)
}
}
go func() {
if err := r.checkSubmittedMessages(); err != nil {
log.Error("failed to init layer2 submitted messages", "err", err)
}
}(ctx)
// Wait until sender pool is clean.
utils.TryTimes(-1, func() bool {
return r.messageSender.PendingCount() == 0
})
go cutil.Loop(ctx, time.Second, r.ProcessSavedEvents)
}()
go func() {
if err := r.checkRollupBatches(); err != nil {
log.Error("failed to init layer2 rollupCommitting messages", "err", err)
}
utils.TryTimes(-1, func() bool {
return r.rollupSender.PendingCount() == 0
})
if err := r.checkFinalizingBatches(); err != nil {
log.Error("failed to init layer2 finalizing batches", "err", err)
}
// Wait until sender pool is clean.
utils.TryTimes(-1, func() bool {
return r.rollupSender.PendingCount() == 0
})
go cutil.Loop(ctx, time.Second, r.ProcessCommittedBatches)
}()
go cutil.Loop(ctx, time.Second, r.ProcessGasPriceOracle)
<-r.stopCh
cancel()
@@ -563,6 +217,35 @@ func (r *Layer2Relayer) Stop() {
close(r.stopCh)
}
func (r *Layer2Relayer) confirmLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case confirmation := <-r.messageCh:
r.handleConfirmation(confirmation)
case confirmation := <-r.rollupCh:
r.handleConfirmation(confirmation)
case cfm := <-r.gasOracleCh:
if !cfm.IsSuccessful {
// @discuss: maybe make it pending again?
err := r.db.UpdateL2GasOracleStatusAndOracleTxHash(r.ctx, cfm.ID, types.GasOracleFailed, cfm.TxHash.String())
if err != nil {
log.Warn("UpdateL2GasOracleStatusAndOracleTxHash failed", "err", err)
}
log.Warn("transaction confirmed but failed in layer1", "confirmation", cfm)
} else {
// @todo handle db error
err := r.db.UpdateL2GasOracleStatusAndOracleTxHash(r.ctx, cfm.ID, types.GasOracleImported, cfm.TxHash.String())
if err != nil {
log.Warn("UpdateL2GasOracleStatusAndOracleTxHash failed", "err", err)
}
log.Info("transaction confirmed in layer1", "confirmation", cfm)
}
}
}
}
func (r *Layer2Relayer) handleConfirmation(confirmation *sender.Confirmation) {
transactionType := "Unknown"
// check whether it is message relay transaction

187
bridge/l2/relayer_commit.go Normal file
View File

@@ -0,0 +1,187 @@
package l2
import (
"errors"
"fmt"
"math/big"
"github.com/modern-go/reflect2"
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/crypto"
"github.com/scroll-tech/go-ethereum/log"
bridge_abi "scroll-tech/bridge/abi"
"scroll-tech/common/types"
"scroll-tech/common/utils"
"scroll-tech/bridge/sender"
)
func (r *Layer2Relayer) checkRollupBatches() error {
// Check mock generateBatch interface.
if reflect2.IsNil(r.batchInterface) {
return nil
}
blockBatchCache := make(map[string]*types.BlockBatch)
getBlockBatch := func(batchHash string) (*types.BlockBatch, error) {
if blockBatch, ok := blockBatchCache[batchHash]; ok {
return blockBatch, nil
}
blockBatches, err := r.db.GetBlockBatches(map[string]interface{}{"hash": batchHash})
if err != nil {
return nil, err
}
if len(blockBatches) == 0 {
return nil, fmt.Errorf("don't has such blockBatch, batchHash: %s", batchHash)
}
blockBatchCache[batchHash] = blockBatches[0]
return blockBatches[0], nil
}
var batchIndex uint64
for {
blockBatches, err := r.db.GetBlockBatches(
map[string]interface{}{"rollup_status": types.RollupCommitting},
fmt.Sprintf("AND commit_tx_hash IN (SELECT commit_tx_hash FROM block_batch WHERE index > %d GROUP BY index, commit_tx_hash ORDER BY index LIMIT 1)", batchIndex),
fmt.Sprintf("AND index > %d", batchIndex),
"ORDER BY index ASC",
)
if err != nil {
log.Error("failed to get rollup committing block batches", "batch index", batchIndex, "err", err)
return err
}
if len(blockBatches) == 0 {
return nil
}
var batchDataBuffer []*types.BatchData
batchIndex = blockBatches[len(blockBatches)-1].Index
for _, blockBatch := range blockBatches {
// Wait until sender's pending is not full.
utils.TryTimes(-1, func() bool {
return !r.rollupSender.IsFull()
})
var (
parentBatch *types.BlockBatch
blockInfos []*types.BlockInfo
)
parentBatch, err = getBlockBatch(blockBatch.ParentHash)
if err != nil {
return err
}
blockInfos, err = r.db.GetL2BlockInfos(
map[string]interface{}{"batch_hash": blockBatch.Hash},
"order by number ASC",
)
if err != nil {
return err
}
if len(blockInfos) != int(blockBatch.EndBlockNumber-blockBatch.StartBlockNumber+1) {
log.Error("the number of block info retrieved from DB mistmatches the blockBatch info in the DB",
"len(blockInfos)", len(blockInfos),
"expected", blockBatch.EndBlockNumber-blockBatch.StartBlockNumber+1)
continue
}
var batchData *types.BatchData
batchData, err = r.GenerateBatchData(parentBatch, blockInfos)
if err != nil {
return err
}
batchDataBuffer = append(batchDataBuffer, batchData)
}
batchHashes, txID, callData, err := r.packBatchData(batchDataBuffer)
if err != nil {
return err
}
// Handle tx.
err = r.rollupSender.LoadOrSendTx(
common.HexToHash(blockBatches[0].CommitTxHash.String),
txID,
&r.cfg.RollupContractAddress,
big.NewInt(0),
callData,
0,
)
switch true {
case err == nil:
r.processingBatchesCommitment.Store(txID, batchHashes)
case err.Error() == "execution reverted: Batch already commited": //nolint:misspell
log.Warn("blockBatches already committed", "start index", blockBatches[0].Index, "end index", blockBatches[len(blockBatches)-1].Index)
default:
log.Error("failed to load or send batchData tx")
return err
}
}
}
func (r *Layer2Relayer) packBatchData(batchData []*types.BatchData) ([]string, string, []byte, error) {
// pack calldata
commitBatches := make([]bridge_abi.IScrollChainBatch, len(batchData))
for i, batch := range batchData {
commitBatches[i] = batch.Batch
}
calldata, err := r.l1RollupABI.Pack("commitBatches", commitBatches)
if err != nil {
log.Error("Failed to pack commitBatches",
"error", err,
"start_batch_index", commitBatches[0].BatchIndex,
"end_batch_index", commitBatches[len(commitBatches)-1].BatchIndex)
return nil, "", nil, err
}
// generate a unique txID and send transaction
var (
bytes []byte
batchHashes = make([]string, len(batchData))
)
for i, batch := range batchData {
bytes = append(bytes, batch.Hash().Bytes()...)
batchHashes[i] = batch.Hash().Hex()
}
return batchHashes, crypto.Keccak256Hash(bytes).String(), calldata, nil
}
// SendCommitTx sends commitBatches tx to L1.
func (r *Layer2Relayer) SendCommitTx(batchData []*types.BatchData) error {
if len(batchData) == 0 {
log.Error("SendCommitTx receives empty batch")
return nil
}
// pack calldata
batchHashes, txID, calldata, err := r.packBatchData(batchData)
if err != nil {
log.Error("Failed to pack commitBatches",
"error", err,
"start_batch_index", batchData[0].Batch.BatchIndex,
"end_batch_index", batchData[len(batchData)-1].Batch.BatchIndex)
return err
}
txHash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), calldata, 0)
if err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
log.Error("Failed to send commitBatches tx to layer1 ", "err", err)
}
return err
}
bridgeL2BatchesCommittedTotalCounter.Inc(int64(len(batchHashes)))
log.Info("Sent the commitBatches tx to layer1",
"tx_hash", txHash.Hex(),
"start_batch_index", batchData[0].Batch.BatchIndex,
"end_batch_index", batchData[len(batchData)-1].Batch.BatchIndex)
// record and sync with db, @todo handle db error
for i := range batchData {
err = r.db.UpdateCommitTxHashAndRollupStatus(r.ctx, batchHashes[i], txHash.String(), types.RollupCommitting)
if err != nil {
log.Error("UpdateCommitTxHashAndRollupStatus failed", "hash", batchHashes[i], "index", batchData[i].Batch.BatchIndex, "err", err)
}
}
r.processingBatchesCommitment.Store(txID, batchHashes)
return nil
}

View File

@@ -0,0 +1,240 @@
package l2
import (
"errors"
"fmt"
"math/big"
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/log"
"scroll-tech/common/types"
comutiles "scroll-tech/common/utils"
"scroll-tech/bridge/sender"
"scroll-tech/bridge/utils"
)
func (r *Layer2Relayer) checkFinalizingBatches() error {
var (
batchLimit = 10
batchIndex uint64
)
for {
batches, err := r.db.GetBlockBatches(
map[string]interface{}{"rollup_status": types.RollupFinalizing},
fmt.Sprintf("AND index > %d", batchIndex),
fmt.Sprintf("ORDER BY index ASC LIMIT %d", batchLimit),
)
if err != nil {
log.Error("failed to get Rollup finalizing batches", "batch index", batchIndex, "err", err)
return err
}
if len(batches) == 0 {
return nil
}
batchIndex = batches[len(batches)-1].Index
for _, batch := range batches { //nolint:staticcheck
// Wait until sender's pending is not full.
comutiles.TryTimes(-1, func() bool {
return !r.rollupSender.IsFull()
})
var (
txHash common.Hash
hash = batch.Hash
)
// Use empty txHash can let tx resent, if tx is already on block will be checked.
if batch.CommitTxHash.Valid {
txHash = common.HexToHash(batch.CommitTxHash.String)
}
data, err := r.packFinalizeBatch(hash)
if err != nil {
log.Error("failed to pack finalize data", "err", err)
continue
}
txID := hash + "-finalize"
err = r.rollupSender.LoadOrSendTx(
txHash,
txID,
&r.cfg.RollupContractAddress,
big.NewInt(0),
data,
0,
)
switch true {
case err == nil:
r.processingFinalization.Store(txID, hash)
case err.Error() == "execution reverted: Batch is already finalized":
log.Warn("block batch already rollup finalized", "batch index", batch.Index)
if err = r.db.UpdateRollupStatus(r.ctx, hash, types.RollupFinalized); err != nil {
return err
}
default:
log.Error("failed to load or send finalized tx", "batch hash", hash, "err", err)
return err
}
}
}
}
func (r *Layer2Relayer) packFinalizeBatch(hash string) ([]byte, error) {
proofBuffer, instanceBuffer, err := r.db.GetVerifiedProofAndInstanceByHash(hash)
if err != nil {
log.Warn("fetch get proof by hash failed", "hash", hash, "err", err)
return nil, err
}
if proofBuffer == nil || instanceBuffer == nil {
log.Warn("proof or instance not ready", "hash", hash)
return nil, err
}
if len(proofBuffer)%32 != 0 {
log.Error("proof buffer has wrong length", "hash", hash, "length", len(proofBuffer))
return nil, err
}
if len(instanceBuffer)%32 != 0 {
log.Warn("instance buffer has wrong length", "hash", hash, "length", len(instanceBuffer))
return nil, err
}
proof := utils.BufferToUint256Le(proofBuffer)
instance := utils.BufferToUint256Le(instanceBuffer)
data, err := r.l1RollupABI.Pack("finalizeBatchWithProof", common.HexToHash(hash), proof, instance)
if err != nil {
log.Error("Pack finalizeBatchWithProof failed", "err", err)
return nil, err
}
return data, nil
}
// ProcessCommittedBatches submit proof to layer 1 rollup contract
func (r *Layer2Relayer) ProcessCommittedBatches() {
// set skipped batches in a single db operation
if count, err := r.db.UpdateSkippedBatches(); err != nil {
log.Error("UpdateSkippedBatches failed", "err", err)
// continue anyway
} else if count > 0 {
bridgeL2BatchesSkippedTotalCounter.Inc(count)
log.Info("Skipping batches", "count", count)
}
// batches are sorted by batch index in increasing order
batchHashes, err := r.db.GetCommittedBatches(1)
if err != nil {
log.Error("Failed to fetch committed L2 batches", "err", err)
return
}
if len(batchHashes) == 0 {
return
}
hash := batchHashes[0]
// @todo add support to relay multiple batches
batches, err := r.db.GetBlockBatches(map[string]interface{}{"hash": hash}, "LIMIT 1")
if err != nil {
log.Error("Failed to fetch committed L2 batch", "hash", hash, "err", err)
return
}
if len(batches) == 0 {
log.Error("Unexpected result for GetBlockBatches", "hash", hash, "len", 0)
return
}
batch := batches[0]
status := batch.ProvingStatus
switch status {
case types.ProvingTaskUnassigned, types.ProvingTaskAssigned:
// The proof for this block is not ready yet.
return
case types.ProvingTaskProved:
// It's an intermediate state. The roller manager received the proof but has not verified
// the proof yet. We don't roll up the proof until it's verified.
return
case types.ProvingTaskFailed, types.ProvingTaskSkipped:
// note: this is covered by UpdateSkippedBatches, but we keep it for completeness's sake
if err = r.db.UpdateRollupStatus(r.ctx, hash, types.RollupFinalizationSkipped); err != nil {
log.Warn("UpdateRollupStatus failed", "hash", hash, "err", err)
}
case types.ProvingTaskVerified:
log.Info("Start to roll up zk proof", "hash", hash)
success := false
previousBatch, err := r.db.GetLatestFinalizingOrFinalizedBatch()
// skip submitting proof
if err == nil && uint64(batch.CreatedAt.Sub(*previousBatch.CreatedAt).Seconds()) < r.cfg.FinalizeBatchIntervalSec {
log.Info(
"Not enough time passed, skipping",
"hash", hash,
"createdAt", batch.CreatedAt,
"lastFinalizingHash", previousBatch.Hash,
"lastFinalizingStatus", previousBatch.RollupStatus,
"lastFinalizingCreatedAt", previousBatch.CreatedAt,
)
if err = r.db.UpdateRollupStatus(r.ctx, hash, types.RollupFinalizationSkipped); err != nil {
log.Warn("UpdateRollupStatus failed", "hash", hash, "err", err)
} else {
success = true
}
return
}
// handle unexpected db error
if err != nil && err.Error() != "sql: no rows in result set" {
log.Error("Failed to get latest finalized batch", "err", err)
return
}
defer func() {
// TODO: need to revisit this and have a more fine-grained error handling
if !success {
log.Info("Failed to upload the proof, change rollup status to FinalizationSkipped", "hash", hash)
if err = r.db.UpdateRollupStatus(r.ctx, hash, types.RollupFinalizationSkipped); err != nil {
log.Warn("UpdateRollupStatus failed", "hash", hash, "err", err)
}
}
}()
data, err := r.packFinalizeBatch(hash)
if err != nil {
log.Error("Pack finalizeBatchWithProof failed", "err", err)
return
}
txID := hash + "-finalize"
// add suffix `-finalize` to avoid duplication with commit tx in unit tests
txHash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), data, 0)
finalizeTxHash := &txHash
if err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
log.Error("finalizeBatchWithProof in layer1 failed", "hash", hash, "err", err)
}
return
}
bridgeL2BatchesFinalizedTotalCounter.Inc(1)
log.Info("finalizeBatchWithProof in layer1", "batch_hash", hash, "tx_hash", hash)
// record and sync with db, @todo handle db error
err = r.db.UpdateFinalizeTxHashAndRollupStatus(r.ctx, hash, finalizeTxHash.String(), types.RollupFinalizing)
if err != nil {
log.Warn("UpdateFinalizeTxHashAndRollupStatus failed", "batch_hash", hash, "err", err)
}
success = true
r.processingFinalization.Store(txID, hash)
default:
log.Error("encounter unreachable case in ProcessCommittedBatches",
"block_status", status,
)
}
}

View File

@@ -0,0 +1,207 @@
package l2
import (
"errors"
"fmt"
"math/big"
"runtime"
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/log"
"golang.org/x/sync/errgroup"
"modernc.org/mathutil"
"scroll-tech/common/utils"
"scroll-tech/common/types"
bridge_abi "scroll-tech/bridge/abi"
"scroll-tech/bridge/sender"
)
const processMsgLimit = 100
func (r *Layer2Relayer) checkSubmittedMessages() error {
var nonce uint64
for {
// msgs are sorted by nonce in increasing order
msgs, err := r.db.GetL2Messages(
map[string]interface{}{"status": types.MsgSubmitted},
fmt.Sprintf("AND nonce > %d", nonce),
fmt.Sprintf("ORDER BY nonce ASC LIMIT %d", processMsgLimit),
)
if err != nil {
log.Error("failed to get l2 submitted messages", "message nonce", nonce, "err", err)
return err
}
if len(msgs) == 0 {
return nil
}
var batch *types.BlockBatch
nonce = msgs[len(msgs)-1].Nonce
for _, msg := range msgs { //nolint:staticcheck
// Wait until sender's pending is not full.
utils.TryTimes(-1, func() bool {
return !r.messageSender.IsFull()
})
// Get batch by block number.
if batch == nil || msg.Height < batch.StartBlockNumber || msg.Height > batch.EndBlockNumber {
batches, err := r.db.GetBlockBatches(
map[string]interface{}{},
fmt.Sprintf("AND start_block_number <= %d AND end_block_number >= %d", msg.Height, msg.Height),
)
// If get batch failed, stop and return immediately.
if err != nil || len(batches) == 0 {
return err
}
batch = batches[0]
}
data, err := r.packRelayMessage(msg, common.HexToHash(batch.Hash))
if err != nil {
continue
}
err = r.messageSender.LoadOrSendTx(
common.HexToHash(msg.Layer1Hash),
msg.MsgHash,
&r.cfg.MessengerContractAddress,
big.NewInt(0),
data,
r.minGasLimitForMessageRelay,
)
switch true {
case err == nil:
r.processingMessage.Store(msg.MsgHash, msg.MsgHash)
case err.Error() == "execution reverted: Message expired":
if err = r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, types.MsgExpired); err != nil {
return err
}
case err.Error() == "execution reverted: Message successfully executed":
if err = r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, types.MsgConfirmed); err != nil {
return err
}
default:
log.Error("failed to load or send l2 submitted tx", "batch hash", batch.Hash, "msg hash", msg.MsgHash, "err", err)
}
}
}
}
// ProcessSavedEvents relays saved un-processed cross-domain transactions to desired blockchain
func (r *Layer2Relayer) ProcessSavedEvents() {
batch, err := r.db.GetLatestFinalizedBatch()
if err != nil {
log.Error("GetLatestFinalizedBatch failed", "err", err)
return
}
// msgs are sorted by nonce in increasing order
msgs, err := r.db.GetL2Messages(
map[string]interface{}{"status": types.MsgPending},
fmt.Sprintf("AND height<=%d", batch.EndBlockNumber),
fmt.Sprintf("ORDER BY nonce ASC LIMIT %d", processMsgLimit),
)
if err != nil {
log.Error("Failed to fetch unprocessed L2 messages", "err", err)
return
}
// process messages in batches
batchSize := mathutil.Min((runtime.GOMAXPROCS(0)+1)/2, r.messageSender.NumberOfAccounts())
for size := 0; len(msgs) > 0; msgs = msgs[size:] {
if size = len(msgs); size > batchSize {
size = batchSize
}
var g errgroup.Group
for _, msg := range msgs[:size] {
msg := msg
g.Go(func() error {
return r.processSavedEvent(msg)
})
}
if err := g.Wait(); err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
log.Error("failed to process l2 saved event", "err", err)
}
return
}
}
}
func (r *Layer2Relayer) packRelayMessage(msg *types.L2Message, batchHash common.Hash) ([]byte, error) {
// TODO: rebuild the withdraw trie to generate the merkle proof
proof := bridge_abi.IL1ScrollMessengerL2MessageProof{
BatchHash: batchHash,
MerkleProof: make([]byte, 0),
}
from := common.HexToAddress(msg.Sender)
target := common.HexToAddress(msg.Target)
value, ok := big.NewInt(0).SetString(msg.Value, 10)
if !ok {
// @todo maybe panic?
log.Error("Failed to parse message value", "msg.nonce", msg.Nonce, "msg.height", msg.Height)
// TODO: need to skip this message by changing its status to MsgError
}
msgNonce := big.NewInt(int64(msg.Nonce))
calldata := common.Hex2Bytes(msg.Calldata)
data, err := r.l1MessengerABI.Pack("relayMessageWithProof", from, target, value, msgNonce, calldata, proof)
if err != nil {
log.Error("Failed to pack relayMessageWithProof", "msg.nonce", msg.Nonce, "err", err)
// TODO: need to skip this message by changing its status to MsgError
return nil, err
}
return data, nil
}
func (r *Layer2Relayer) processSavedEvent(msg *types.L2Message) error {
// @todo fetch merkle proof from l2geth
log.Info("Processing L2 Message", "msg.nonce", msg.Nonce, "msg.height", msg.Height)
// Get the block info that contains the message
blockInfos, err := r.db.GetL2BlockInfos(map[string]interface{}{"number": msg.Height})
if err != nil {
log.Error("Failed to GetL2BlockInfos from DB", "number", msg.Height)
}
blockInfo := blockInfos[0]
if !blockInfo.BatchHash.Valid {
log.Error("Block has not been batched yet", "number", blockInfo.Number, "msg.nonce", msg.Nonce)
return nil
}
data, err := r.packRelayMessage(msg, common.HexToHash(blockInfo.BatchHash.String))
if err != nil {
log.Error("Failed to pack relayMessageWithProof", "msg.nonce", msg.Nonce, "err", err)
// TODO: need to skip this message by changing its status to MsgError
return err
}
hash, err := r.messageSender.SendTransaction(msg.MsgHash, &r.cfg.MessengerContractAddress, big.NewInt(0), data, r.minGasLimitForMessageRelay)
if err != nil && err.Error() == "execution reverted: Message expired" {
return r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, types.MsgExpired)
}
if err != nil && err.Error() == "execution reverted: Message successfully executed" {
return r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, types.MsgConfirmed)
}
if err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
log.Error("Failed to send relayMessageWithProof tx to layer1 ", "msg.height", msg.Height, "msg.MsgHash", msg.MsgHash, "err", err)
}
return err
}
bridgeL2MsgsRelayedTotalCounter.Inc(1)
log.Info("relayMessageWithProof to layer1", "msgHash", msg.MsgHash, "txhash", hash.String())
// save status in db
// @todo handle db error
err = r.db.UpdateLayer2StatusAndLayer1Hash(r.ctx, msg.MsgHash, types.MsgSubmitted, hash.String())
if err != nil {
log.Error("UpdateLayer2StatusAndLayer1Hash failed", "msgHash", msg.MsgHash, "err", err)
return err
}
r.processingMessage.Store(msg.MsgHash, msg.MsgHash)
return nil
}

View File

@@ -0,0 +1,56 @@
package l2
import (
"errors"
"math/big"
"github.com/scroll-tech/go-ethereum/log"
"scroll-tech/common/types"
"scroll-tech/bridge/sender"
)
// ProcessGasPriceOracle imports gas price to layer1
func (r *Layer2Relayer) ProcessGasPriceOracle() {
batch, err := r.db.GetLatestBatch()
if err != nil {
log.Error("Failed to GetLatestBatch", "err", err)
return
}
if batch.OracleStatus == types.GasOraclePending {
suggestGasPrice, err := r.l2Client.SuggestGasPrice(r.ctx)
if err != nil {
log.Error("Failed to fetch SuggestGasPrice from l2geth", "err", err)
return
}
suggestGasPriceUint64 := uint64(suggestGasPrice.Int64())
expectedDelta := r.lastGasPrice * r.gasPriceDiff / gasPriceDiffPrecision
// last is undefine or (suggestGasPriceUint64 >= minGasPrice && exceed diff)
if r.lastGasPrice == 0 || (suggestGasPriceUint64 >= r.minGasPrice && (suggestGasPriceUint64 >= r.lastGasPrice+expectedDelta || suggestGasPriceUint64 <= r.lastGasPrice-expectedDelta)) {
data, err := r.l2GasOracleABI.Pack("setL2BaseFee", suggestGasPrice)
if err != nil {
log.Error("Failed to pack setL2BaseFee", "batch.Hash", batch.Hash, "GasPrice", suggestGasPrice.Uint64(), "err", err)
return
}
hash, err := r.gasOracleSender.SendTransaction(batch.Hash, &r.cfg.GasPriceOracleContractAddress, big.NewInt(0), data, 0)
if err != nil {
if !errors.Is(err, sender.ErrNoAvailableAccount) {
log.Error("Failed to send setL2BaseFee tx to layer2 ", "batch.Hash", batch.Hash, "err", err)
}
return
}
err = r.db.UpdateL2GasOracleStatusAndOracleTxHash(r.ctx, batch.Hash, types.GasOracleImporting, hash.String())
if err != nil {
log.Error("UpdateGasOracleStatusAndOracleTxHash failed", "batch.Hash", batch.Hash, "err", err)
return
}
r.lastGasPrice = suggestGasPriceUint64
log.Info("Update l2 gas price", "txHash", hash.String(), "GasPrice", suggestGasPrice)
}
}
}

View File

@@ -19,10 +19,11 @@ import (
geth_metrics "github.com/scroll-tech/go-ethereum/metrics"
"github.com/scroll-tech/go-ethereum/rpc"
"scroll-tech/database"
"scroll-tech/common/metrics"
"scroll-tech/common/types"
cutil "scroll-tech/common/utils"
"scroll-tech/database"
bridge_abi "scroll-tech/bridge/abi"
"scroll-tech/bridge/utils"

View File

@@ -76,6 +76,7 @@ type Sender struct {
blockNumber uint64 // Current block number on chain.
baseFeePerGas uint64 // Current base fee per gas on chain
pendingNum int64 // current pending tx count.
pendingTxs sync.Map // Mapping from nonce to pending transaction
confirmCh chan *Confirmation
@@ -134,6 +135,16 @@ func NewSender(ctx context.Context, config *config.SenderConfig, privs []*ecdsa.
return sender, nil
}
// PendingCount return the current pending txs num.
func (s *Sender) PendingCount() int64 {
return atomic.LoadInt64(&s.pendingNum)
}
// PendingLimit return the maximum pendingTxs can handle.
func (s *Sender) PendingLimit() int64 {
return s.config.PendingLimit
}
// Stop stop the sender module.
func (s *Sender) Stop() {
close(s.stopCh)
@@ -157,16 +168,27 @@ func (s *Sender) getFeeData(auth *bind.TransactOpts, target *common.Address, val
return s.estimateLegacyGas(auth, target, value, data, minGasLimit)
}
// IsFull If pendingTxs pool is full return true.
func (s *Sender) IsFull() bool {
return atomic.LoadInt64(&s.pendingNum) >= s.config.PendingLimit
}
// SendTransaction send a signed L2tL1 transaction.
func (s *Sender) SendTransaction(ID string, target *common.Address, value *big.Int, data []byte, minGasLimit uint64) (hash common.Hash, err error) {
if s.IsFull() {
return common.Hash{}, fmt.Errorf("pending txs is full, pending size: %d", s.config.PendingLimit)
}
// We occupy the ID, in case some other threads call with the same ID in the same time
if _, loaded := s.pendingTxs.LoadOrStore(ID, nil); loaded {
return common.Hash{}, fmt.Errorf("has the repeat tx ID, ID: %s", ID)
}
atomic.AddInt64(&s.pendingNum, 1)
// get
auth := s.auths.getAccount()
if auth == nil {
s.pendingTxs.Delete(ID) // release the ID on failure
atomic.AddInt64(&s.pendingNum, -1)
return common.Hash{}, ErrNoAvailableAccount
}
@@ -174,6 +196,7 @@ func (s *Sender) SendTransaction(ID string, target *common.Address, value *big.I
defer func() {
if err != nil {
s.pendingTxs.Delete(ID) // release the ID on failure
atomic.AddInt64(&s.pendingNum, -1)
}
}()
@@ -201,6 +224,61 @@ func (s *Sender) SendTransaction(ID string, target *common.Address, value *big.I
return
}
func (s *Sender) getTxAndAddr(txHash common.Hash) (*types.Transaction, uint64, common.Address, error) {
tx, isPending, err := s.client.TransactionByHash(s.ctx, txHash)
if err != nil {
return nil, 0, common.Address{}, err
}
sender, err := types.Sender(types.LatestSignerForChainID(s.chainID), tx)
if err != nil {
return nil, 0, common.Address{}, err
}
if isPending {
return tx, s.blockNumber, sender, nil
}
receipt, err := s.client.TransactionReceipt(s.ctx, txHash)
if err != nil {
return nil, 0, common.Address{}, err
}
return tx, receipt.BlockNumber.Uint64(), sender, nil
}
// LoadOrSendTx loads a transaction if it already exists on chain and resends it otherwise.
func (s *Sender) LoadOrSendTx(destTxHash common.Hash, ID string, target *common.Address, value *big.Int, data []byte, minGasLimit uint64) error {
tx, blockNumber, from, err := s.getTxAndAddr(destTxHash)
// If this tx already exist load it to the pending.
if err == nil && tx != nil {
auth := s.auths.accounts[from]
var feeData *FeeData
feeData, err = s.getFeeData(auth, target, value, data, minGasLimit)
if err != nil {
return err
}
// We occupy the ID, in case some other threads call with the same ID in the same time
if _, loaded := s.pendingTxs.LoadOrStore(ID, nil); loaded {
return fmt.Errorf("has the repeat tx ID, ID: %s", ID)
}
atomic.AddInt64(&s.pendingNum, 1)
s.pendingTxs.Store(ID, &PendingTransaction{
tx: tx,
id: ID,
signer: auth,
// Record the transaction's block blockNumber.
submitAt: blockNumber,
feeData: feeData,
})
return nil
}
// Tx is dropped from chain node, resend it.
_, err = s.SendTransaction(ID, target, value, data, minGasLimit)
return err
}
func (s *Sender) createAndSendTx(auth *bind.TransactOpts, feeData *FeeData, target *common.Address, value *big.Int, data []byte, overrideNonce *uint64) (tx *types.Transaction, err error) {
var (
nonce = auth.Nonce.Uint64()
@@ -346,6 +424,7 @@ func (s *Sender) checkPendingTransaction(header *types.Header, confirmed uint64)
if (err == nil) && (receipt != nil) {
if receipt.BlockNumber.Uint64() <= confirmed {
s.pendingTxs.Delete(key)
atomic.AddInt64(&s.pendingNum, -1)
// send confirm message
s.confirmCh <- &Confirmation{
ID: pending.id,
@@ -377,6 +456,7 @@ func (s *Sender) checkPendingTransaction(header *types.Header, confirmed uint64)
if strings.Contains(err.Error(), "nonce") {
// This key can be deleted
s.pendingTxs.Delete(key)
atomic.AddInt64(&s.pendingNum, -1)
// Try get receipt by the latest replaced tx hash
receipt, err := s.client.TransactionReceipt(s.ctx, pending.tx.Hash())
if (err == nil) && (receipt != nil) {

View File

@@ -82,7 +82,7 @@ func (i *ImgDB) Endpoint() string {
}
func (i *ImgDB) prepare() []string {
cmd := []string{"docker", "run", "--name", i.name, "-p", fmt.Sprintf("%d:5432", i.port)}
cmd := []string{"docker", "container", "run", "--name", i.name, "-p", fmt.Sprintf("%d:5432", i.port)}
envs := []string{
"-e", "POSTGRES_PASSWORD=" + i.password,
"-e", fmt.Sprintf("POSTGRES_DB=%s", i.dbName),

View File

@@ -125,7 +125,7 @@ func (i *ImgGeth) Stop() error {
}
func (i *ImgGeth) prepare() []string {
cmds := []string{"docker", "run", "--name", i.name}
cmds := []string{"docker", "container", "run", "--name", i.name}
var ports []string
if i.httpPort != 0 {
ports = append(ports, []string{"-p", strconv.Itoa(i.httpPort) + ":8545"}...)

View File

@@ -98,6 +98,7 @@ type L1Message struct {
Calldata string `json:"calldata" db:"calldata"`
GasLimit uint64 `json:"gas_limit" db:"gas_limit"`
Layer1Hash string `json:"layer1_hash" db:"layer1_hash"`
Layer2Hash string `json:"layer2_hash" db:"layer2_hash"`
Status MsgStatus `json:"status" db:"status"`
}
@@ -110,6 +111,7 @@ type L2Message struct {
Value string `json:"value" db:"value"`
Target string `json:"target" db:"target"`
Calldata string `json:"calldata" db:"calldata"`
Layer1Hash string `json:"layer1_hash" db:"layer1_hash"`
Layer2Hash string `json:"layer2_hash" db:"layer2_hash"`
Status MsgStatus `json:"status" db:"status"`
}

View File

@@ -7,7 +7,7 @@ import (
// TryTimes try run several times until the function return true.
func TryTimes(times int, run func() bool) {
for i := 0; i < times; i++ {
for i := 0; times == -1 || i < times; i++ {
if run() {
return
}

View File

@@ -18,10 +18,11 @@ import (
geth_metrics "github.com/scroll-tech/go-ethereum/metrics"
"github.com/scroll-tech/go-ethereum/rpc"
"scroll-tech/database"
"scroll-tech/common/message"
"scroll-tech/common/metrics"
"scroll-tech/common/types"
"scroll-tech/database"
"scroll-tech/common/utils/workerpool"

View File

@@ -80,6 +80,7 @@ type BlockBatchOrm interface {
// L1MessageOrm is layer1 message db interface
type L1MessageOrm interface {
GetL1Messages(fields map[string]interface{}, args ...string) ([]*types.L1Message, error)
GetL1MessageByQueueIndex(queueIndex uint64) (*types.L1Message, error)
GetL1MessageByMsgHash(msgHash string) (*types.L1Message, error)
GetL1MessagesByStatus(status types.MsgStatus, limit uint64) ([]*types.L1Message, error)

View File

@@ -4,6 +4,8 @@ import (
"context"
"database/sql"
"errors"
"fmt"
"strings"
"github.com/jmoiron/sqlx"
"github.com/scroll-tech/go-ethereum/log"
@@ -22,6 +24,37 @@ func NewL1MessageOrm(db *sqlx.DB) L1MessageOrm {
return &l1MessageOrm{db: db}
}
// GetL1Messages get l1 messages by k-v map and args.
func (m *l1MessageOrm) GetL1Messages(fields map[string]interface{}, args ...string) ([]*types.L1Message, error) {
query := "SELECT queue_index, msg_hash, height, sender, target, value, calldata, layer1_hash, layer2_hash, status FROM l1_message WHERE 1 = 1 "
for key := range fields {
query += fmt.Sprintf(" AND %s=:%s ", key, key)
}
query = strings.Join(append([]string{query}, args...), " ")
db := m.db
rows, err := db.NamedQuery(db.Rebind(query), fields)
if err != nil {
return nil, err
}
var msgs []*types.L1Message
for rows.Next() {
msg := &types.L1Message{}
if err = rows.StructScan(&msg); err != nil {
break
}
msgs = append(msgs, msg)
}
if len(msgs) == 0 || errors.Is(err, sql.ErrNoRows) {
// log.Warn("no unprocessed layer2 messages in db", "err", err)
} else if err != nil {
return nil, err
}
return msgs, rows.Close()
}
// GetL1MessageByMsgHash fetch message by queue_index
func (m *l1MessageOrm) GetL1MessageByMsgHash(msgHash string) (*types.L1Message, error) {
msg := types.L1Message{}
@@ -48,7 +81,7 @@ func (m *l1MessageOrm) GetL1MessageByQueueIndex(queueIndex uint64) (*types.L1Mes
// GetL1MessagesByStatus fetch list of unprocessed messages given msg status
func (m *l1MessageOrm) GetL1MessagesByStatus(status types.MsgStatus, limit uint64) ([]*types.L1Message, error) {
rows, err := m.db.Queryx(`SELECT queue_index, msg_hash, height, sender, target, value, calldata, layer1_hash, status FROM l1_message WHERE status = $1 ORDER BY queue_index ASC LIMIT $2;`, status, limit)
rows, err := m.db.Queryx(`SELECT queue_index, msg_hash, height, sender, target, value, calldata, layer1_hash, layer2_hash, status FROM l1_message WHERE status = $1 ORDER BY queue_index ASC LIMIT $2;`, status, limit)
if err != nil {
return nil, err
}
@@ -108,9 +141,10 @@ func (m *l1MessageOrm) SaveL1Messages(ctx context.Context, messages []*types.L1M
"gas_limit": msg.GasLimit,
"calldata": msg.Calldata,
"layer1_hash": msg.Layer1Hash,
"layer2_hash": msg.Layer2Hash,
}
}
_, err := m.db.NamedExec(`INSERT INTO public.l1_message (queue_index, msg_hash, height, sender, target, value, gas_limit, calldata, layer1_hash) VALUES (:queue_index, :msg_hash, :height, :sender, :target, :value, :gas_limit, :calldata, :layer1_hash);`, messageMaps)
_, err := m.db.NamedExec(`INSERT INTO public.l1_message (queue_index, msg_hash, height, sender, target, value, gas_limit, calldata, layer1_hash, layer2_hash) VALUES (:queue_index, :msg_hash, :height, :sender, :target, :value, :gas_limit, :calldata, :layer1_hash, :layer2_hash);`, messageMaps)
if err != nil {
queueIndices := make([]uint64, 0, len(messages))
heights := make([]uint64, 0, len(messages))

View File

@@ -92,7 +92,7 @@ func (m *layer2MessageOrm) GetL2ProcessedNonce() (int64, error) {
// GetL2MessagesByStatus fetch list of messages given msg status
func (m *layer2MessageOrm) GetL2Messages(fields map[string]interface{}, args ...string) ([]*types.L2Message, error) {
query := "SELECT nonce, msg_hash, height, sender, target, value, calldata, layer2_hash FROM l2_message WHERE 1 = 1 "
query := "SELECT nonce, msg_hash, height, sender, target, value, calldata, layer1_hash, layer2_hash FROM l2_message WHERE 1 = 1 "
for key := range fields {
query += fmt.Sprintf("AND %s=:%s ", key, key)
}
@@ -137,11 +137,12 @@ func (m *layer2MessageOrm) SaveL2Messages(ctx context.Context, messages []*types
"target": msg.Target,
"value": msg.Value,
"calldata": msg.Calldata,
"layer1_hash": msg.Layer1Hash,
"layer2_hash": msg.Layer2Hash,
}
}
_, err := m.db.NamedExec(`INSERT INTO public.l2_message (nonce, msg_hash, height, sender, target, value, calldata, layer2_hash) VALUES (:nonce, :msg_hash, :height, :sender, :target, :value, :calldata, :layer2_hash);`, messageMaps)
_, err := m.db.NamedExec(`INSERT INTO public.l2_message (nonce, msg_hash, height, sender, target, value, calldata, layer1_hash, layer2_hash) VALUES (:nonce, :msg_hash, :height, :sender, :target, :value, :calldata, :layer1_hash, :layer2_hash);`, messageMaps)
if err != nil {
nonces := make([]uint64, 0, len(messages))
heights := make([]uint64, 0, len(messages))

View File

@@ -603,6 +603,4 @@ gopkg.in/yaml.v3 v3.0.0-20191120175047-4206685974f2/go.mod h1:K4uyk7z7BCEPqu6E+C
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.1.3 h1:qTakTkI6ni6LFD5sBwwsdSO+AQqbSIxOauHTTQKZ/7o=
honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las=
rsc.io/binaryregexp v0.2.0 h1:HfqmD5MEmC0zvwBuF187nq9mdnXjXsSivRiXN7SmRkE=
rsc.io/pdf v0.1.1 h1:k1MczvYDUvJBe93bYd7wrZLLUEcLZAuF824/I4e5Xr4=
rsc.io/tmplfunc v0.0.3/go.mod h1:AG3sTPzElb1Io3Yg4voV9AGZJuleGAwaVRxL9M49PhA=

View File

@@ -3,7 +3,7 @@ module scroll-tech/integration-test
go 1.18
require (
github.com/scroll-tech/go-ethereum v1.10.14-0.20230306131930-03b4de32b78b
github.com/scroll-tech/go-ethereum v1.10.14-0.20230321020420-127af384ed04
github.com/stretchr/testify v1.8.2
)

View File

@@ -34,7 +34,7 @@ github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1Cpa
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/scroll-tech/go-ethereum v1.10.14-0.20230306131930-03b4de32b78b h1:shNTzAnD2oDcDCrM4aaVCTzQNVfYxF1An08R2H2DLAg=
github.com/scroll-tech/go-ethereum v1.10.14-0.20230321020420-127af384ed04 h1:PpI31kaBVm6+7sZtyK03Ex0QIg3P821Ktae0FHFh7IM=
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=