mirror of
https://github.com/scroll-tech/scroll.git
synced 2026-01-11 15:08:09 -05:00
Compare commits
71 Commits
fix_insert
...
maskpp/res
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5cf492d3c9 | ||
|
|
b6f51af61b | ||
|
|
3f93ec16f6 | ||
|
|
e1aef7ca3b | ||
|
|
a15560f9fa | ||
|
|
e26854285e | ||
|
|
6cc3e59b7c | ||
|
|
9fab743a87 | ||
|
|
9047512b90 | ||
|
|
cd70cd7008 | ||
|
|
ccf73c54dc | ||
|
|
4f9eaa8381 | ||
|
|
92e70c0c61 | ||
|
|
e07c3e52f8 | ||
|
|
4328cc59a8 | ||
|
|
db1fdc5c97 | ||
|
|
d1694bf110 | ||
|
|
2a5f3ec5e9 | ||
|
|
046c41c536 | ||
|
|
e988e6da4c | ||
|
|
a3654580a7 | ||
|
|
af7c3b0827 | ||
|
|
397e3edb3b | ||
|
|
44b66e94fa | ||
|
|
42fc5c5f6d | ||
|
|
fd6d9cbd3b | ||
|
|
5519b6f578 | ||
|
|
1e313493b8 | ||
|
|
f7a8d0d14c | ||
|
|
0cb3c295ef | ||
|
|
14c2f5cea8 | ||
|
|
7687a03cab | ||
|
|
cd9968cdd9 | ||
|
|
8b49a6b785 | ||
|
|
8c20474d33 | ||
|
|
8b94e6aad9 | ||
|
|
d8c1cd0038 | ||
|
|
90472ddb34 | ||
|
|
b9c9327803 | ||
|
|
2934ab8930 | ||
|
|
042bb1f357 | ||
|
|
62eb8e484f | ||
|
|
2638c59896 | ||
|
|
6eae95925a | ||
|
|
1ae7f35040 | ||
|
|
5b2ed914ac | ||
|
|
21bd99248e | ||
|
|
e5e279a900 | ||
|
|
bc6bab305c | ||
|
|
8727613d8c | ||
|
|
b635a419ca | ||
|
|
865fb79240 | ||
|
|
c8dc81516e | ||
|
|
84cfea7d87 | ||
|
|
2538bcdc7b | ||
|
|
60c01c371e | ||
|
|
678e0adc0d | ||
|
|
cf157bc670 | ||
|
|
6f0042735a | ||
|
|
610f952f40 | ||
|
|
50fb641b9c | ||
|
|
57c4e703db | ||
|
|
08d707dfd2 | ||
|
|
856af6735d | ||
|
|
ad487af82f | ||
|
|
ba71153e5a | ||
|
|
c43999e150 | ||
|
|
a88459dafb | ||
|
|
0c5515473b | ||
|
|
9f58c46206 | ||
|
|
30211250d3 |
@@ -19,7 +19,8 @@
|
||||
"escalate_multiple_den": 10,
|
||||
"max_gas_price": 10000000000,
|
||||
"tx_type": "LegacyTx",
|
||||
"min_balance": 100000000000000000000
|
||||
"min_balance": 100000000000000000000,
|
||||
"pending_limit": 500
|
||||
},
|
||||
"gas_oracle_config": {
|
||||
"min_gas_price": 0,
|
||||
@@ -53,7 +54,8 @@
|
||||
"escalate_multiple_den": 10,
|
||||
"max_gas_price": 10000000000,
|
||||
"tx_type": "LegacyTx",
|
||||
"min_balance": 100000000000000000000
|
||||
"min_balance": 100000000000000000000,
|
||||
"pending_limit": 500
|
||||
},
|
||||
"gas_oracle_config": {
|
||||
"min_gas_price": 0,
|
||||
|
||||
@@ -30,7 +30,8 @@ type SenderConfig struct {
|
||||
// The transaction type to use: LegacyTx, AccessListTx, DynamicFeeTx
|
||||
TxType string `json:"tx_type"`
|
||||
// The min balance set for check and set balance for sender's accounts.
|
||||
MinBalance *big.Int `json:"min_balance,omitempty"`
|
||||
MinBalance *big.Int `json:"min_balance,omitempty"`
|
||||
PendingLimit int64 `json:"pending_limit,omitempty"`
|
||||
// The interval (in seconds) to check balance and top up sender's accounts
|
||||
CheckBalanceTime uint64 `json:"check_balance_time"`
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ module scroll-tech/bridge
|
||||
go 1.18
|
||||
|
||||
require (
|
||||
github.com/modern-go/reflect2 v1.0.2
|
||||
github.com/orcaman/concurrent-map v1.0.0
|
||||
github.com/scroll-tech/go-ethereum v1.10.14-0.20230321020420-127af384ed04
|
||||
github.com/stretchr/testify v1.8.2
|
||||
|
||||
@@ -60,6 +60,8 @@ github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/Qd
|
||||
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
|
||||
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
|
||||
github.com/mitchellh/pointerstructure v1.2.0 h1:O+i9nHnXS3l/9Wu7r4NrEdwA2VFTicjUEN1uBnDo34A=
|
||||
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
|
||||
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
|
||||
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
|
||||
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
|
||||
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
|
||||
|
||||
@@ -2,14 +2,11 @@ package l1
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"math/big"
|
||||
"time"
|
||||
|
||||
// not sure if this will make problems when relay with l1geth
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/accounts/abi"
|
||||
"github.com/scroll-tech/go-ethereum/common"
|
||||
"github.com/scroll-tech/go-ethereum/crypto"
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
geth_metrics "github.com/scroll-tech/go-ethereum/metrics"
|
||||
@@ -101,7 +98,7 @@ func NewLayer1Relayer(ctx context.Context, db database.OrmFactory, cfg *config.R
|
||||
minGasLimitForMessageRelay = cfg.MessageRelayMinGasLimit
|
||||
}
|
||||
|
||||
return &Layer1Relayer{
|
||||
relayer := &Layer1Relayer{
|
||||
ctx: ctx,
|
||||
db: db,
|
||||
|
||||
@@ -120,104 +117,10 @@ func NewLayer1Relayer(ctx context.Context, db database.OrmFactory, cfg *config.R
|
||||
|
||||
cfg: cfg,
|
||||
stopCh: make(chan struct{}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ProcessSavedEvents relays saved un-processed cross-domain transactions to desired blockchain
|
||||
func (r *Layer1Relayer) ProcessSavedEvents() {
|
||||
// msgs are sorted by nonce in increasing order
|
||||
msgs, err := r.db.GetL1MessagesByStatus(types.MsgPending, 100)
|
||||
if err != nil {
|
||||
log.Error("Failed to fetch unprocessed L1 messages", "err", err)
|
||||
return
|
||||
}
|
||||
go relayer.confirmLoop(ctx)
|
||||
|
||||
if len(msgs) > 0 {
|
||||
log.Info("Processing L1 messages", "count", len(msgs))
|
||||
}
|
||||
|
||||
for _, msg := range msgs {
|
||||
if err = r.processSavedEvent(msg); err != nil {
|
||||
if !errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
log.Error("failed to process event", "msg.msgHash", msg.MsgHash, "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Layer1Relayer) processSavedEvent(msg *types.L1Message) error {
|
||||
calldata := common.Hex2Bytes(msg.Calldata)
|
||||
|
||||
hash, err := r.messageSender.SendTransaction(msg.MsgHash, &r.cfg.MessengerContractAddress, big.NewInt(0), calldata, r.minGasLimitForMessageRelay)
|
||||
if err != nil && err.Error() == "execution reverted: Message expired" {
|
||||
return r.db.UpdateLayer1Status(r.ctx, msg.MsgHash, types.MsgExpired)
|
||||
}
|
||||
if err != nil && err.Error() == "execution reverted: Message successfully executed" {
|
||||
return r.db.UpdateLayer1Status(r.ctx, msg.MsgHash, types.MsgConfirmed)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
bridgeL1MsgsRelayedTotalCounter.Inc(1)
|
||||
log.Info("relayMessage to layer2", "msg hash", msg.MsgHash, "tx hash", hash)
|
||||
|
||||
err = r.db.UpdateLayer1StatusAndLayer2Hash(r.ctx, msg.MsgHash, types.MsgSubmitted, hash.String())
|
||||
if err != nil {
|
||||
log.Error("UpdateLayer1StatusAndLayer2Hash failed", "msg.msgHash", msg.MsgHash, "msg.height", msg.Height, "err", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// ProcessGasPriceOracle imports gas price to layer2
|
||||
func (r *Layer1Relayer) ProcessGasPriceOracle() {
|
||||
latestBlockHeight, err := r.db.GetLatestL1BlockHeight()
|
||||
if err != nil {
|
||||
log.Warn("Failed to fetch latest L1 block height from db", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
blocks, err := r.db.GetL1BlockInfos(map[string]interface{}{
|
||||
"number": latestBlockHeight,
|
||||
})
|
||||
if err != nil {
|
||||
log.Error("Failed to GetL1BlockInfos from db", "height", latestBlockHeight, "err", err)
|
||||
return
|
||||
}
|
||||
if len(blocks) != 1 {
|
||||
log.Error("Block not exist", "height", latestBlockHeight)
|
||||
return
|
||||
}
|
||||
block := blocks[0]
|
||||
|
||||
if block.GasOracleStatus == types.GasOraclePending {
|
||||
expectedDelta := r.lastGasPrice * r.gasPriceDiff / gasPriceDiffPrecision
|
||||
// last is undefine or (block.BaseFee >= minGasPrice && exceed diff)
|
||||
if r.lastGasPrice == 0 || (block.BaseFee >= r.minGasPrice && (block.BaseFee >= r.lastGasPrice+expectedDelta || block.BaseFee <= r.lastGasPrice-expectedDelta)) {
|
||||
baseFee := big.NewInt(int64(block.BaseFee))
|
||||
data, err := r.l1GasOracleABI.Pack("setL1BaseFee", baseFee)
|
||||
if err != nil {
|
||||
log.Error("Failed to pack setL1BaseFee", "block.Hash", block.Hash, "block.Height", block.Number, "block.BaseFee", block.BaseFee, "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
hash, err := r.gasOracleSender.SendTransaction(block.Hash, &r.cfg.GasPriceOracleContractAddress, big.NewInt(0), data, 0)
|
||||
if err != nil {
|
||||
if !errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
log.Error("Failed to send setL1BaseFee tx to layer2 ", "block.Hash", block.Hash, "block.Height", block.Number, "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
err = r.db.UpdateL1GasOracleStatusAndOracleTxHash(r.ctx, block.Hash, types.GasOracleImporting, hash.String())
|
||||
if err != nil {
|
||||
log.Error("UpdateGasOracleStatusAndOracleTxHash failed", "block.Hash", block.Hash, "block.Height", block.Number, "err", err)
|
||||
return
|
||||
}
|
||||
r.lastGasPrice = block.BaseFee
|
||||
log.Info("Update l1 base fee", "txHash", hash.String(), "baseFee", baseFee)
|
||||
}
|
||||
}
|
||||
return relayer, nil
|
||||
}
|
||||
|
||||
// Start the relayer process
|
||||
@@ -225,55 +128,65 @@ func (r *Layer1Relayer) Start() {
|
||||
go func() {
|
||||
ctx, cancel := context.WithCancel(r.ctx)
|
||||
|
||||
go utils.Loop(ctx, 2*time.Second, r.ProcessSavedEvents)
|
||||
go utils.Loop(ctx, 2*time.Second, r.ProcessGasPriceOracle)
|
||||
|
||||
go func(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case cfm := <-r.messageCh:
|
||||
bridgeL1MsgsRelayedConfirmedTotalCounter.Inc(1)
|
||||
if !cfm.IsSuccessful {
|
||||
err := r.db.UpdateLayer1StatusAndLayer2Hash(r.ctx, cfm.ID, types.MsgRelayFailed, cfm.TxHash.String())
|
||||
if err != nil {
|
||||
log.Warn("UpdateLayer1StatusAndLayer2Hash failed", "err", err)
|
||||
}
|
||||
log.Warn("transaction confirmed but failed in layer2", "confirmation", cfm)
|
||||
} else {
|
||||
// @todo handle db error
|
||||
err := r.db.UpdateLayer1StatusAndLayer2Hash(r.ctx, cfm.ID, types.MsgConfirmed, cfm.TxHash.String())
|
||||
if err != nil {
|
||||
log.Warn("UpdateLayer1StatusAndLayer2Hash failed", "err", err)
|
||||
}
|
||||
log.Info("transaction confirmed in layer2", "confirmation", cfm)
|
||||
}
|
||||
case cfm := <-r.gasOracleCh:
|
||||
if !cfm.IsSuccessful {
|
||||
// @discuss: maybe make it pending again?
|
||||
err := r.db.UpdateL1GasOracleStatusAndOracleTxHash(r.ctx, cfm.ID, types.GasOracleFailed, cfm.TxHash.String())
|
||||
if err != nil {
|
||||
log.Warn("UpdateL1GasOracleStatusAndOracleTxHash failed", "err", err)
|
||||
}
|
||||
log.Warn("transaction confirmed but failed in layer2", "confirmation", cfm)
|
||||
} else {
|
||||
// @todo handle db error
|
||||
err := r.db.UpdateL1GasOracleStatusAndOracleTxHash(r.ctx, cfm.ID, types.GasOracleImported, cfm.TxHash.String())
|
||||
if err != nil {
|
||||
log.Warn("UpdateGasOracleStatusAndOracleTxHash failed", "err", err)
|
||||
}
|
||||
log.Info("transaction confirmed in layer2", "confirmation", cfm)
|
||||
}
|
||||
}
|
||||
go func() {
|
||||
if err := r.checkSubmittedMessages(); err != nil {
|
||||
log.Error("failed to init layer1 submitted tx", "err", err)
|
||||
}
|
||||
}(ctx)
|
||||
// Wait until sender pool is clean.
|
||||
utils.TryTimes(-1, func() bool {
|
||||
return r.messageSender.PendingCount() == 0
|
||||
})
|
||||
go utils.Loop(ctx, 2*time.Second, r.ProcessSavedEvents)
|
||||
}()
|
||||
|
||||
go utils.Loop(ctx, 2*time.Second, r.ProcessGasPriceOracle)
|
||||
|
||||
<-r.stopCh
|
||||
cancel()
|
||||
}()
|
||||
}
|
||||
|
||||
func (r *Layer1Relayer) confirmLoop(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case cfm := <-r.messageCh:
|
||||
bridgeL1MsgsRelayedConfirmedTotalCounter.Inc(1)
|
||||
if !cfm.IsSuccessful {
|
||||
err := r.db.UpdateLayer1StatusAndLayer2Hash(r.ctx, cfm.ID, types.MsgRelayFailed, cfm.TxHash.String())
|
||||
if err != nil {
|
||||
log.Warn("UpdateLayer1StatusAndLayer2Hash failed", "err", err)
|
||||
}
|
||||
log.Warn("transaction confirmed but failed in layer2", "confirmation", cfm)
|
||||
} else {
|
||||
// @todo handle db error
|
||||
err := r.db.UpdateLayer1StatusAndLayer2Hash(r.ctx, cfm.ID, types.MsgConfirmed, cfm.TxHash.String())
|
||||
if err != nil {
|
||||
log.Warn("UpdateLayer1StatusAndLayer2Hash failed", "err", err)
|
||||
}
|
||||
log.Info("transaction confirmed in layer2", "confirmation", cfm)
|
||||
}
|
||||
case cfm := <-r.gasOracleCh:
|
||||
if !cfm.IsSuccessful {
|
||||
// @discuss: maybe make it pending again?
|
||||
err := r.db.UpdateL1GasOracleStatusAndOracleTxHash(r.ctx, cfm.ID, types.GasOracleFailed, cfm.TxHash.String())
|
||||
if err != nil {
|
||||
log.Warn("UpdateL1GasOracleStatusAndOracleTxHash failed", "err", err)
|
||||
}
|
||||
log.Warn("transaction confirmed but failed in layer2", "confirmation", cfm)
|
||||
} else {
|
||||
// @todo handle db error
|
||||
err := r.db.UpdateL1GasOracleStatusAndOracleTxHash(r.ctx, cfm.ID, types.GasOracleImported, cfm.TxHash.String())
|
||||
if err != nil {
|
||||
log.Warn("UpdateGasOracleStatusAndOracleTxHash failed", "err", err)
|
||||
}
|
||||
log.Info("transaction confirmed in layer2", "confirmation", cfm)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stop the relayer module, for a graceful shutdown.
|
||||
func (r *Layer1Relayer) Stop() {
|
||||
close(r.stopCh)
|
||||
|
||||
63
bridge/l1/relayer_gasoracle.go
Normal file
63
bridge/l1/relayer_gasoracle.go
Normal file
@@ -0,0 +1,63 @@
|
||||
package l1
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"math/big"
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
|
||||
"scroll-tech/common/types"
|
||||
|
||||
"scroll-tech/bridge/sender"
|
||||
)
|
||||
|
||||
// ProcessGasPriceOracle imports gas price to layer2
|
||||
func (r *Layer1Relayer) ProcessGasPriceOracle() {
|
||||
latestBlockHeight, err := r.db.GetLatestL1BlockHeight()
|
||||
if err != nil {
|
||||
log.Warn("Failed to fetch latest L1 block height from db", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
blocks, err := r.db.GetL1BlockInfos(map[string]interface{}{
|
||||
"number": latestBlockHeight,
|
||||
})
|
||||
if err != nil {
|
||||
log.Error("Failed to GetL1BlockInfos from db", "height", latestBlockHeight, "err", err)
|
||||
return
|
||||
}
|
||||
if len(blocks) != 1 {
|
||||
log.Error("Block not exist", "height", latestBlockHeight)
|
||||
return
|
||||
}
|
||||
block := blocks[0]
|
||||
|
||||
if block.GasOracleStatus == types.GasOraclePending {
|
||||
expectedDelta := r.lastGasPrice * r.gasPriceDiff / gasPriceDiffPrecision
|
||||
// last is undefine or (block.BaseFee >= minGasPrice && exceed diff)
|
||||
if r.lastGasPrice == 0 || (block.BaseFee >= r.minGasPrice && (block.BaseFee >= r.lastGasPrice+expectedDelta || block.BaseFee <= r.lastGasPrice-expectedDelta)) {
|
||||
baseFee := big.NewInt(int64(block.BaseFee))
|
||||
data, err := r.l1GasOracleABI.Pack("setL1BaseFee", baseFee)
|
||||
if err != nil {
|
||||
log.Error("Failed to pack setL1BaseFee", "block.Hash", block.Hash, "block.Height", block.Number, "block.BaseFee", block.BaseFee, "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
hash, err := r.gasOracleSender.SendTransaction(block.Hash, &r.cfg.GasPriceOracleContractAddress, big.NewInt(0), data, 0)
|
||||
if err != nil {
|
||||
if !errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
log.Error("Failed to send setL1BaseFee tx to layer2 ", "block.Hash", block.Hash, "block.Height", block.Number, "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
err = r.db.UpdateL1GasOracleStatusAndOracleTxHash(r.ctx, block.Hash, types.GasOracleImporting, hash.String())
|
||||
if err != nil {
|
||||
log.Error("UpdateGasOracleStatusAndOracleTxHash failed", "block.Hash", block.Hash, "block.Height", block.Number, "err", err)
|
||||
return
|
||||
}
|
||||
r.lastGasPrice = block.BaseFee
|
||||
log.Info("Update l1 base fee", "txHash", hash.String(), "baseFee", baseFee)
|
||||
}
|
||||
}
|
||||
}
|
||||
113
bridge/l1/relayer_message.go
Normal file
113
bridge/l1/relayer_message.go
Normal file
@@ -0,0 +1,113 @@
|
||||
package l1
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/big"
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/common"
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
|
||||
"scroll-tech/common/types"
|
||||
"scroll-tech/common/utils"
|
||||
|
||||
"scroll-tech/bridge/sender"
|
||||
)
|
||||
|
||||
func (r *Layer1Relayer) checkSubmittedMessages() error {
|
||||
var (
|
||||
index uint64
|
||||
msgsSize = 100
|
||||
)
|
||||
for {
|
||||
msgs, err := r.db.GetL1Messages(
|
||||
map[string]interface{}{"status": types.MsgSubmitted},
|
||||
fmt.Sprintf("AND queue_index > %d", index),
|
||||
fmt.Sprintf("ORDER BY queue_index ASC LIMIT %d", msgsSize),
|
||||
)
|
||||
if err != nil {
|
||||
log.Error("failed to get l1 submitted messages", "queue_index", index, "err", err)
|
||||
return err
|
||||
}
|
||||
if len(msgs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
index = msgs[len(msgs)-1].QueueIndex
|
||||
for _, msg := range msgs {
|
||||
// If pending txs pool is full, wait until pending pool is available.
|
||||
utils.TryTimes(-1, func() bool {
|
||||
return !r.messageSender.IsFull()
|
||||
})
|
||||
|
||||
err = r.messageSender.LoadOrSendTx(
|
||||
common.HexToHash(msg.Layer2Hash),
|
||||
msg.MsgHash,
|
||||
&r.cfg.MessengerContractAddress,
|
||||
big.NewInt(0),
|
||||
common.Hex2Bytes(msg.Calldata),
|
||||
r.minGasLimitForMessageRelay,
|
||||
)
|
||||
switch true {
|
||||
case err == nil:
|
||||
case err.Error() == "execution reverted: Message expired":
|
||||
if err = r.db.UpdateLayer1Status(r.ctx, msg.MsgHash, types.MsgExpired); err != nil {
|
||||
return err
|
||||
}
|
||||
case err.Error() == "execution reverted: Message successfully executed":
|
||||
if err = r.db.UpdateLayer1Status(r.ctx, msg.MsgHash, types.MsgConfirmed); err != nil {
|
||||
return err
|
||||
}
|
||||
default:
|
||||
log.Error("failed to load or send l1 submitted tx", "msg hash", msg.MsgHash, "err", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ProcessSavedEvents relays saved un-processed cross-domain transactions to desired blockchain
|
||||
func (r *Layer1Relayer) ProcessSavedEvents() {
|
||||
// msgs are sorted by nonce in increasing order
|
||||
msgs, err := r.db.GetL1MessagesByStatus(types.MsgPending, 100)
|
||||
if err != nil {
|
||||
log.Error("Failed to fetch unprocessed L1 messages", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
if len(msgs) > 0 {
|
||||
log.Info("Processing L1 messages", "count", len(msgs))
|
||||
}
|
||||
|
||||
for _, msg := range msgs {
|
||||
if err = r.processSavedEvent(msg); err != nil {
|
||||
if !errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
log.Error("failed to process event", "msg.msgHash", msg.MsgHash, "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Layer1Relayer) processSavedEvent(msg *types.L1Message) error {
|
||||
calldata := common.Hex2Bytes(msg.Calldata)
|
||||
|
||||
hash, err := r.messageSender.SendTransaction(msg.MsgHash, &r.cfg.MessengerContractAddress, big.NewInt(0), calldata, r.minGasLimitForMessageRelay)
|
||||
if err != nil && err.Error() == "execution reverted: Message expired" {
|
||||
return r.db.UpdateLayer1Status(r.ctx, msg.MsgHash, types.MsgExpired)
|
||||
}
|
||||
if err != nil && err.Error() == "execution reverted: Message successfully executed" {
|
||||
return r.db.UpdateLayer1Status(r.ctx, msg.MsgHash, types.MsgConfirmed)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
bridgeL1MsgsRelayedTotalCounter.Inc(1)
|
||||
log.Info("relayMessage to layer2", "msg hash", msg.MsgHash, "tx hash", hash)
|
||||
|
||||
err = r.db.UpdateLayer1StatusAndLayer2Hash(r.ctx, msg.MsgHash, types.MsgSubmitted, hash.String())
|
||||
if err != nil {
|
||||
log.Error("UpdateLayer1StatusAndLayer2Hash failed", "msg.msgHash", msg.MsgHash, "msg.height", msg.Height, "err", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
@@ -15,9 +15,10 @@ import (
|
||||
geth_metrics "github.com/scroll-tech/go-ethereum/metrics"
|
||||
"github.com/scroll-tech/go-ethereum/rpc"
|
||||
|
||||
"scroll-tech/database"
|
||||
|
||||
"scroll-tech/common/metrics"
|
||||
"scroll-tech/common/types"
|
||||
"scroll-tech/database"
|
||||
|
||||
cutil "scroll-tech/common/utils"
|
||||
|
||||
|
||||
@@ -32,18 +32,19 @@ func New(ctx context.Context, cfg *config.L2Config, orm database.OrmFactory) (*B
|
||||
// Otherwise, there will be a race condition between watcher.initializeGenesis and relayer.ProcessPendingBatches.
|
||||
watcher := NewL2WatcherClient(ctx, client, cfg.Confirmations, cfg.L2MessengerAddress, cfg.L2MessageQueueAddress, cfg.WithdrawTrieRootSlot, orm)
|
||||
|
||||
proposer := NewBatchProposer(ctx, cfg.BatchProposerConfig, orm)
|
||||
relayer, err := NewLayer2Relayer(ctx, client, orm, cfg.RelayerConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
batchProposer := NewBatchProposer(ctx, cfg.BatchProposerConfig, relayer, orm)
|
||||
proposer.SetLayer2Relayer(relayer)
|
||||
relayer.SetBatchProposer(proposer)
|
||||
|
||||
return &Backend{
|
||||
cfg: cfg,
|
||||
watcher: watcher,
|
||||
relayer: relayer,
|
||||
batchProposer: batchProposer,
|
||||
batchProposer: proposer,
|
||||
orm: orm,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -21,6 +20,10 @@ import (
|
||||
"scroll-tech/bridge/config"
|
||||
)
|
||||
|
||||
type relayerInterface interface {
|
||||
SendCommitTx(batchData []*types.BatchData) error
|
||||
}
|
||||
|
||||
var (
|
||||
bridgeL2BatchesGasOverThresholdTotalCounter = geth_metrics.NewRegisteredCounter("bridge/l2/batches/gas/over/threshold/total", metrics.ScrollRegistry)
|
||||
bridgeL2BatchesTxsOverThresholdTotalCounter = geth_metrics.NewRegisteredCounter("bridge/l2/batches/txs/over/threshold/total", metrics.ScrollRegistry)
|
||||
@@ -83,7 +86,8 @@ type BatchProposer struct {
|
||||
|
||||
proofGenerationFreq uint64
|
||||
batchDataBuffer []*types.BatchData
|
||||
relayer *Layer2Relayer
|
||||
|
||||
relayerInterface
|
||||
|
||||
piCfg *types.PublicInputHashConfig
|
||||
|
||||
@@ -91,8 +95,8 @@ type BatchProposer struct {
|
||||
}
|
||||
|
||||
// NewBatchProposer will return a new instance of BatchProposer.
|
||||
func NewBatchProposer(ctx context.Context, cfg *config.BatchProposerConfig, relayer *Layer2Relayer, orm database.OrmFactory) *BatchProposer {
|
||||
p := &BatchProposer{
|
||||
func NewBatchProposer(ctx context.Context, cfg *config.BatchProposerConfig, orm database.OrmFactory) *BatchProposer {
|
||||
return &BatchProposer{
|
||||
mutex: sync.Mutex{},
|
||||
ctx: ctx,
|
||||
orm: orm,
|
||||
@@ -106,26 +110,24 @@ func NewBatchProposer(ctx context.Context, cfg *config.BatchProposerConfig, rela
|
||||
batchDataBufferSizeLimit: 100*cfg.CommitTxCalldataSizeLimit + 1*1024*1024, // @todo: determine the value.
|
||||
proofGenerationFreq: cfg.ProofGenerationFreq,
|
||||
piCfg: cfg.PublicInputConfig,
|
||||
relayer: relayer,
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// SetLayer2Relayer set interface from layer2.
|
||||
func (p *BatchProposer) SetLayer2Relayer(relayer relayerInterface) {
|
||||
p.relayerInterface = relayer
|
||||
}
|
||||
|
||||
// Start the Listening process
|
||||
func (p *BatchProposer) Start() {
|
||||
// for graceful restart.
|
||||
p.recoverBatchDataBuffer()
|
||||
|
||||
// try to commit the leftover pending batches
|
||||
p.tryCommitBatches()
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
// Start the Listening process
|
||||
func (p *BatchProposer) Start() {
|
||||
go func() {
|
||||
if reflect.ValueOf(p.orm).IsNil() {
|
||||
panic("must run BatchProposer with DB")
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(p.ctx)
|
||||
|
||||
go utils.Loop(ctx, 2*time.Second, func() {
|
||||
@@ -199,7 +201,7 @@ func (p *BatchProposer) recoverBatchDataBuffer() {
|
||||
continue
|
||||
}
|
||||
|
||||
batchData, err := p.generateBatchData(parentBatch, blockInfos)
|
||||
batchData, err := p.GenerateBatchData(parentBatch, blockInfos)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
@@ -277,7 +279,7 @@ func (p *BatchProposer) tryCommitBatches() {
|
||||
// Send commit tx for batchDataBuffer[0:index]
|
||||
log.Info("Commit batches", "start_index", p.batchDataBuffer[0].Batch.BatchIndex,
|
||||
"end_index", p.batchDataBuffer[index-1].Batch.BatchIndex)
|
||||
err := p.relayer.SendCommitTx(p.batchDataBuffer[:index])
|
||||
err := p.SendCommitTx(p.batchDataBuffer[:index])
|
||||
if err != nil {
|
||||
// leave the retry to the next ticker
|
||||
log.Error("SendCommitTx failed", "error", err)
|
||||
@@ -357,7 +359,7 @@ func (p *BatchProposer) createBatchForBlocks(blocks []*types.BlockInfo) error {
|
||||
return err
|
||||
}
|
||||
|
||||
batchData, err := p.generateBatchData(lastBatch, blocks)
|
||||
batchData, err := p.GenerateBatchData(lastBatch, blocks)
|
||||
if err != nil {
|
||||
log.Error("createBatchData failed", "error", err)
|
||||
return err
|
||||
@@ -372,7 +374,8 @@ func (p *BatchProposer) createBatchForBlocks(blocks []*types.BlockInfo) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *BatchProposer) generateBatchData(parentBatch *types.BlockBatch, blocks []*types.BlockInfo) (*types.BatchData, error) {
|
||||
// GenerateBatchData as a public function can be called by layer2 relayer.
|
||||
func (p *BatchProposer) GenerateBatchData(parentBatch *types.BlockBatch, blocks []*types.BlockInfo) (*types.BatchData, error) {
|
||||
var wrappedBlocks []*types.WrappedBlock
|
||||
for _, block := range blocks {
|
||||
trs, err := p.orm.GetL2WrappedBlocks(map[string]interface{}{"hash": block.Hash})
|
||||
|
||||
@@ -40,7 +40,10 @@ func testBatchProposerProposeBatch(t *testing.T) {
|
||||
BatchTxNumThreshold: 135,
|
||||
BatchTimeSec: 1,
|
||||
BatchBlocksLimit: 100,
|
||||
}, relayer, db)
|
||||
}, db)
|
||||
relayer.SetBatchProposer(proposer)
|
||||
proposer.SetLayer2Relayer(relayer)
|
||||
|
||||
proposer.tryProposeBatch()
|
||||
|
||||
infos, err := db.GetUnbatchedL2Blocks(map[string]interface{}{},
|
||||
@@ -84,13 +87,15 @@ func testBatchProposerGracefulRestart(t *testing.T) {
|
||||
assert.Equal(t, 1, len(batchHashes))
|
||||
assert.Equal(t, batchData2.Hash().Hex(), batchHashes[0])
|
||||
// test p.recoverBatchDataBuffer().
|
||||
_ = NewBatchProposer(context.Background(), &config.BatchProposerConfig{
|
||||
proposer := NewBatchProposer(context.Background(), &config.BatchProposerConfig{
|
||||
ProofGenerationFreq: 1,
|
||||
BatchGasThreshold: 3000000,
|
||||
BatchTxNumThreshold: 135,
|
||||
BatchTimeSec: 1,
|
||||
BatchBlocksLimit: 100,
|
||||
}, relayer, db)
|
||||
}, db)
|
||||
proposer.SetLayer2Relayer(relayer)
|
||||
proposer.Start()
|
||||
|
||||
batchHashes, err = db.GetPendingBatches(math.MaxInt32)
|
||||
assert.NoError(t, err)
|
||||
|
||||
@@ -2,32 +2,26 @@ package l2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/accounts/abi"
|
||||
"github.com/scroll-tech/go-ethereum/common"
|
||||
"github.com/scroll-tech/go-ethereum/crypto"
|
||||
"github.com/scroll-tech/go-ethereum/ethclient"
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
geth_metrics "github.com/scroll-tech/go-ethereum/metrics"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"modernc.org/mathutil"
|
||||
|
||||
"scroll-tech/common/utils"
|
||||
|
||||
"scroll-tech/database"
|
||||
|
||||
"scroll-tech/common/metrics"
|
||||
"scroll-tech/common/types"
|
||||
"scroll-tech/database"
|
||||
|
||||
cutil "scroll-tech/common/utils"
|
||||
|
||||
bridge_abi "scroll-tech/bridge/abi"
|
||||
"scroll-tech/bridge/config"
|
||||
"scroll-tech/bridge/sender"
|
||||
"scroll-tech/bridge/utils"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -48,6 +42,10 @@ const (
|
||||
defaultMessageRelayMinGasLimit = 200000 // should be enough for both ERC20 and ETH relay
|
||||
)
|
||||
|
||||
type batchInterface interface {
|
||||
GenerateBatchData(parentBatch *types.BlockBatch, blocks []*types.BlockInfo) (*types.BatchData, error)
|
||||
}
|
||||
|
||||
// Layer2Relayer is responsible for
|
||||
// 1. Committing and finalizing L2 blocks on L1
|
||||
// 2. Relaying messages from L2 to L1
|
||||
@@ -92,6 +90,9 @@ type Layer2Relayer struct {
|
||||
// key(string): confirmation ID, value(string): batch hash.
|
||||
processingFinalization sync.Map
|
||||
|
||||
// Use batch_proposer's GenerateBatchData interface.
|
||||
batchInterface
|
||||
|
||||
stopCh chan struct{}
|
||||
}
|
||||
|
||||
@@ -131,7 +132,7 @@ func NewLayer2Relayer(ctx context.Context, l2Client *ethclient.Client, db databa
|
||||
minGasLimitForMessageRelay = cfg.MessageRelayMinGasLimit
|
||||
}
|
||||
|
||||
return &Layer2Relayer{
|
||||
relayer := &Layer2Relayer{
|
||||
ctx: ctx,
|
||||
db: db,
|
||||
|
||||
@@ -159,399 +160,52 @@ func NewLayer2Relayer(ctx context.Context, l2Client *ethclient.Client, db databa
|
||||
processingBatchesCommitment: sync.Map{},
|
||||
processingFinalization: sync.Map{},
|
||||
stopCh: make(chan struct{}),
|
||||
}, nil
|
||||
}
|
||||
go relayer.confirmLoop(ctx)
|
||||
|
||||
return relayer, nil
|
||||
}
|
||||
|
||||
const processMsgLimit = 100
|
||||
|
||||
// ProcessSavedEvents relays saved un-processed cross-domain transactions to desired blockchain
|
||||
func (r *Layer2Relayer) ProcessSavedEvents() {
|
||||
batch, err := r.db.GetLatestFinalizedBatch()
|
||||
if err != nil {
|
||||
log.Error("GetLatestFinalizedBatch failed", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
// msgs are sorted by nonce in increasing order
|
||||
msgs, err := r.db.GetL2Messages(
|
||||
map[string]interface{}{"status": types.MsgPending},
|
||||
fmt.Sprintf("AND height<=%d", batch.EndBlockNumber),
|
||||
fmt.Sprintf("ORDER BY nonce ASC LIMIT %d", processMsgLimit),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
log.Error("Failed to fetch unprocessed L2 messages", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
// process messages in batches
|
||||
batchSize := mathutil.Min((runtime.GOMAXPROCS(0)+1)/2, r.messageSender.NumberOfAccounts())
|
||||
for size := 0; len(msgs) > 0; msgs = msgs[size:] {
|
||||
if size = len(msgs); size > batchSize {
|
||||
size = batchSize
|
||||
}
|
||||
var g errgroup.Group
|
||||
for _, msg := range msgs[:size] {
|
||||
msg := msg
|
||||
g.Go(func() error {
|
||||
return r.processSavedEvent(msg)
|
||||
})
|
||||
}
|
||||
if err := g.Wait(); err != nil {
|
||||
if !errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
log.Error("failed to process l2 saved event", "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Layer2Relayer) processSavedEvent(msg *types.L2Message) error {
|
||||
// @todo fetch merkle proof from l2geth
|
||||
log.Info("Processing L2 Message", "msg.nonce", msg.Nonce, "msg.height", msg.Height)
|
||||
|
||||
// Get the block info that contains the message
|
||||
blockInfos, err := r.db.GetL2BlockInfos(map[string]interface{}{"number": msg.Height})
|
||||
if err != nil {
|
||||
log.Error("Failed to GetL2BlockInfos from DB", "number", msg.Height)
|
||||
}
|
||||
blockInfo := blockInfos[0]
|
||||
if !blockInfo.BatchHash.Valid {
|
||||
log.Error("Block has not been batched yet", "number", blockInfo.Number, "msg.nonce", msg.Nonce)
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO: rebuild the withdraw trie to generate the merkle proof
|
||||
proof := bridge_abi.IL1ScrollMessengerL2MessageProof{
|
||||
BatchHash: common.HexToHash(blockInfo.BatchHash.String),
|
||||
MerkleProof: make([]byte, 0),
|
||||
}
|
||||
from := common.HexToAddress(msg.Sender)
|
||||
target := common.HexToAddress(msg.Target)
|
||||
value, ok := big.NewInt(0).SetString(msg.Value, 10)
|
||||
if !ok {
|
||||
// @todo maybe panic?
|
||||
log.Error("Failed to parse message value", "msg.nonce", msg.Nonce, "msg.height", msg.Height)
|
||||
// TODO: need to skip this message by changing its status to MsgError
|
||||
}
|
||||
msgNonce := big.NewInt(int64(msg.Nonce))
|
||||
calldata := common.Hex2Bytes(msg.Calldata)
|
||||
data, err := r.l1MessengerABI.Pack("relayMessageWithProof", from, target, value, msgNonce, calldata, proof)
|
||||
if err != nil {
|
||||
log.Error("Failed to pack relayMessageWithProof", "msg.nonce", msg.Nonce, "err", err)
|
||||
// TODO: need to skip this message by changing its status to MsgError
|
||||
return err
|
||||
}
|
||||
|
||||
hash, err := r.messageSender.SendTransaction(msg.MsgHash, &r.cfg.MessengerContractAddress, big.NewInt(0), data, r.minGasLimitForMessageRelay)
|
||||
if err != nil && err.Error() == "execution reverted: Message expired" {
|
||||
return r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, types.MsgExpired)
|
||||
}
|
||||
if err != nil && err.Error() == "execution reverted: Message successfully executed" {
|
||||
return r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, types.MsgConfirmed)
|
||||
}
|
||||
if err != nil {
|
||||
if !errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
log.Error("Failed to send relayMessageWithProof tx to layer1 ", "msg.height", msg.Height, "msg.MsgHash", msg.MsgHash, "err", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
bridgeL2MsgsRelayedTotalCounter.Inc(1)
|
||||
log.Info("relayMessageWithProof to layer1", "msgHash", msg.MsgHash, "txhash", hash.String())
|
||||
|
||||
// save status in db
|
||||
// @todo handle db error
|
||||
err = r.db.UpdateLayer2StatusAndLayer1Hash(r.ctx, msg.MsgHash, types.MsgSubmitted, hash.String())
|
||||
if err != nil {
|
||||
log.Error("UpdateLayer2StatusAndLayer1Hash failed", "msgHash", msg.MsgHash, "err", err)
|
||||
return err
|
||||
}
|
||||
r.processingMessage.Store(msg.MsgHash, msg.MsgHash)
|
||||
return nil
|
||||
}
|
||||
|
||||
// ProcessGasPriceOracle imports gas price to layer1
|
||||
func (r *Layer2Relayer) ProcessGasPriceOracle() {
|
||||
batch, err := r.db.GetLatestBatch()
|
||||
if err != nil {
|
||||
log.Error("Failed to GetLatestBatch", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
if batch.OracleStatus == types.GasOraclePending {
|
||||
suggestGasPrice, err := r.l2Client.SuggestGasPrice(r.ctx)
|
||||
if err != nil {
|
||||
log.Error("Failed to fetch SuggestGasPrice from l2geth", "err", err)
|
||||
return
|
||||
}
|
||||
suggestGasPriceUint64 := uint64(suggestGasPrice.Int64())
|
||||
expectedDelta := r.lastGasPrice * r.gasPriceDiff / gasPriceDiffPrecision
|
||||
|
||||
// last is undefine or (suggestGasPriceUint64 >= minGasPrice && exceed diff)
|
||||
if r.lastGasPrice == 0 || (suggestGasPriceUint64 >= r.minGasPrice && (suggestGasPriceUint64 >= r.lastGasPrice+expectedDelta || suggestGasPriceUint64 <= r.lastGasPrice-expectedDelta)) {
|
||||
data, err := r.l2GasOracleABI.Pack("setL2BaseFee", suggestGasPrice)
|
||||
if err != nil {
|
||||
log.Error("Failed to pack setL2BaseFee", "batch.Hash", batch.Hash, "GasPrice", suggestGasPrice.Uint64(), "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
hash, err := r.gasOracleSender.SendTransaction(batch.Hash, &r.cfg.GasPriceOracleContractAddress, big.NewInt(0), data, 0)
|
||||
if err != nil {
|
||||
if !errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
log.Error("Failed to send setL2BaseFee tx to layer2 ", "batch.Hash", batch.Hash, "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
err = r.db.UpdateL2GasOracleStatusAndOracleTxHash(r.ctx, batch.Hash, types.GasOracleImporting, hash.String())
|
||||
if err != nil {
|
||||
log.Error("UpdateGasOracleStatusAndOracleTxHash failed", "batch.Hash", batch.Hash, "err", err)
|
||||
return
|
||||
}
|
||||
r.lastGasPrice = suggestGasPriceUint64
|
||||
log.Info("Update l2 gas price", "txHash", hash.String(), "GasPrice", suggestGasPrice)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SendCommitTx sends commitBatches tx to L1.
|
||||
func (r *Layer2Relayer) SendCommitTx(batchData []*types.BatchData) error {
|
||||
if len(batchData) == 0 {
|
||||
log.Error("SendCommitTx receives empty batch")
|
||||
return nil
|
||||
}
|
||||
|
||||
// pack calldata
|
||||
commitBatches := make([]bridge_abi.IScrollChainBatch, len(batchData))
|
||||
for i, batch := range batchData {
|
||||
commitBatches[i] = batch.Batch
|
||||
}
|
||||
calldata, err := r.l1RollupABI.Pack("commitBatches", commitBatches)
|
||||
if err != nil {
|
||||
log.Error("Failed to pack commitBatches",
|
||||
"error", err,
|
||||
"start_batch_index", commitBatches[0].BatchIndex,
|
||||
"end_batch_index", commitBatches[len(commitBatches)-1].BatchIndex)
|
||||
return err
|
||||
}
|
||||
|
||||
// generate a unique txID and send transaction
|
||||
var bytes []byte
|
||||
for _, batch := range batchData {
|
||||
bytes = append(bytes, batch.Hash().Bytes()...)
|
||||
}
|
||||
txID := crypto.Keccak256Hash(bytes).String()
|
||||
txHash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), calldata, 0)
|
||||
if err != nil {
|
||||
if !errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
log.Error("Failed to send commitBatches tx to layer1 ", "err", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
bridgeL2BatchesCommittedTotalCounter.Inc(int64(len(commitBatches)))
|
||||
log.Info("Sent the commitBatches tx to layer1",
|
||||
"tx_hash", txHash.Hex(),
|
||||
"start_batch_index", commitBatches[0].BatchIndex,
|
||||
"end_batch_index", commitBatches[len(commitBatches)-1].BatchIndex)
|
||||
|
||||
// record and sync with db, @todo handle db error
|
||||
batchHashes := make([]string, len(batchData))
|
||||
for i, batch := range batchData {
|
||||
batchHashes[i] = batch.Hash().Hex()
|
||||
err = r.db.UpdateCommitTxHashAndRollupStatus(r.ctx, batchHashes[i], txHash.String(), types.RollupCommitting)
|
||||
if err != nil {
|
||||
log.Error("UpdateCommitTxHashAndRollupStatus failed", "hash", batchHashes[i], "index", batch.Batch.BatchIndex, "err", err)
|
||||
}
|
||||
}
|
||||
r.processingBatchesCommitment.Store(txID, batchHashes)
|
||||
return nil
|
||||
}
|
||||
|
||||
// ProcessCommittedBatches submit proof to layer 1 rollup contract
|
||||
func (r *Layer2Relayer) ProcessCommittedBatches() {
|
||||
// set skipped batches in a single db operation
|
||||
if count, err := r.db.UpdateSkippedBatches(); err != nil {
|
||||
log.Error("UpdateSkippedBatches failed", "err", err)
|
||||
// continue anyway
|
||||
} else if count > 0 {
|
||||
bridgeL2BatchesSkippedTotalCounter.Inc(count)
|
||||
log.Info("Skipping batches", "count", count)
|
||||
}
|
||||
|
||||
// batches are sorted by batch index in increasing order
|
||||
batchHashes, err := r.db.GetCommittedBatches(1)
|
||||
if err != nil {
|
||||
log.Error("Failed to fetch committed L2 batches", "err", err)
|
||||
return
|
||||
}
|
||||
if len(batchHashes) == 0 {
|
||||
return
|
||||
}
|
||||
hash := batchHashes[0]
|
||||
// @todo add support to relay multiple batches
|
||||
|
||||
batches, err := r.db.GetBlockBatches(map[string]interface{}{"hash": hash}, "LIMIT 1")
|
||||
if err != nil {
|
||||
log.Error("Failed to fetch committed L2 batch", "hash", hash, "err", err)
|
||||
return
|
||||
}
|
||||
if len(batches) == 0 {
|
||||
log.Error("Unexpected result for GetBlockBatches", "hash", hash, "len", 0)
|
||||
return
|
||||
}
|
||||
|
||||
batch := batches[0]
|
||||
status := batch.ProvingStatus
|
||||
|
||||
switch status {
|
||||
case types.ProvingTaskUnassigned, types.ProvingTaskAssigned:
|
||||
// The proof for this block is not ready yet.
|
||||
return
|
||||
|
||||
case types.ProvingTaskProved:
|
||||
// It's an intermediate state. The roller manager received the proof but has not verified
|
||||
// the proof yet. We don't roll up the proof until it's verified.
|
||||
return
|
||||
|
||||
case types.ProvingTaskFailed, types.ProvingTaskSkipped:
|
||||
// note: this is covered by UpdateSkippedBatches, but we keep it for completeness's sake
|
||||
|
||||
if err = r.db.UpdateRollupStatus(r.ctx, hash, types.RollupFinalizationSkipped); err != nil {
|
||||
log.Warn("UpdateRollupStatus failed", "hash", hash, "err", err)
|
||||
}
|
||||
|
||||
case types.ProvingTaskVerified:
|
||||
log.Info("Start to roll up zk proof", "hash", hash)
|
||||
success := false
|
||||
|
||||
previousBatch, err := r.db.GetLatestFinalizingOrFinalizedBatch()
|
||||
|
||||
// skip submitting proof
|
||||
if err == nil && uint64(batch.CreatedAt.Sub(*previousBatch.CreatedAt).Seconds()) < r.cfg.FinalizeBatchIntervalSec {
|
||||
log.Info(
|
||||
"Not enough time passed, skipping",
|
||||
"hash", hash,
|
||||
"createdAt", batch.CreatedAt,
|
||||
"lastFinalizingHash", previousBatch.Hash,
|
||||
"lastFinalizingStatus", previousBatch.RollupStatus,
|
||||
"lastFinalizingCreatedAt", previousBatch.CreatedAt,
|
||||
)
|
||||
|
||||
if err = r.db.UpdateRollupStatus(r.ctx, hash, types.RollupFinalizationSkipped); err != nil {
|
||||
log.Warn("UpdateRollupStatus failed", "hash", hash, "err", err)
|
||||
} else {
|
||||
success = true
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// handle unexpected db error
|
||||
if err != nil && err.Error() != "sql: no rows in result set" {
|
||||
log.Error("Failed to get latest finalized batch", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
// TODO: need to revisit this and have a more fine-grained error handling
|
||||
if !success {
|
||||
log.Info("Failed to upload the proof, change rollup status to FinalizationSkipped", "hash", hash)
|
||||
if err = r.db.UpdateRollupStatus(r.ctx, hash, types.RollupFinalizationSkipped); err != nil {
|
||||
log.Warn("UpdateRollupStatus failed", "hash", hash, "err", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
proofBuffer, instanceBuffer, err := r.db.GetVerifiedProofAndInstanceByHash(hash)
|
||||
if err != nil {
|
||||
log.Warn("fetch get proof by hash failed", "hash", hash, "err", err)
|
||||
return
|
||||
}
|
||||
if proofBuffer == nil || instanceBuffer == nil {
|
||||
log.Warn("proof or instance not ready", "hash", hash)
|
||||
return
|
||||
}
|
||||
if len(proofBuffer)%32 != 0 {
|
||||
log.Error("proof buffer has wrong length", "hash", hash, "length", len(proofBuffer))
|
||||
return
|
||||
}
|
||||
if len(instanceBuffer)%32 != 0 {
|
||||
log.Warn("instance buffer has wrong length", "hash", hash, "length", len(instanceBuffer))
|
||||
return
|
||||
}
|
||||
|
||||
proof := utils.BufferToUint256Le(proofBuffer)
|
||||
instance := utils.BufferToUint256Le(instanceBuffer)
|
||||
data, err := r.l1RollupABI.Pack("finalizeBatchWithProof", common.HexToHash(hash), proof, instance)
|
||||
if err != nil {
|
||||
log.Error("Pack finalizeBatchWithProof failed", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
txID := hash + "-finalize"
|
||||
// add suffix `-finalize` to avoid duplication with commit tx in unit tests
|
||||
txHash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), data, 0)
|
||||
finalizeTxHash := &txHash
|
||||
if err != nil {
|
||||
if !errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
log.Error("finalizeBatchWithProof in layer1 failed", "hash", hash, "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
bridgeL2BatchesFinalizedTotalCounter.Inc(1)
|
||||
log.Info("finalizeBatchWithProof in layer1", "batch_hash", hash, "tx_hash", hash)
|
||||
|
||||
// record and sync with db, @todo handle db error
|
||||
err = r.db.UpdateFinalizeTxHashAndRollupStatus(r.ctx, hash, finalizeTxHash.String(), types.RollupFinalizing)
|
||||
if err != nil {
|
||||
log.Warn("UpdateFinalizeTxHashAndRollupStatus failed", "batch_hash", hash, "err", err)
|
||||
}
|
||||
success = true
|
||||
r.processingFinalization.Store(txID, hash)
|
||||
|
||||
default:
|
||||
log.Error("encounter unreachable case in ProcessCommittedBatches",
|
||||
"block_status", status,
|
||||
)
|
||||
}
|
||||
// SetBatchProposer set interface from batch_proposer.
|
||||
func (r *Layer2Relayer) SetBatchProposer(proposer batchInterface) {
|
||||
r.batchInterface = proposer
|
||||
}
|
||||
|
||||
// Start the relayer process
|
||||
func (r *Layer2Relayer) Start() {
|
||||
go func() {
|
||||
ctx, cancel := context.WithCancel(r.ctx)
|
||||
go cutil.Loop(ctx, time.Second, r.ProcessSavedEvents)
|
||||
go cutil.Loop(ctx, time.Second, r.ProcessCommittedBatches)
|
||||
go cutil.Loop(ctx, time.Second, r.ProcessGasPriceOracle)
|
||||
|
||||
go func(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case confirmation := <-r.messageCh:
|
||||
r.handleConfirmation(confirmation)
|
||||
case confirmation := <-r.rollupCh:
|
||||
r.handleConfirmation(confirmation)
|
||||
case cfm := <-r.gasOracleCh:
|
||||
if !cfm.IsSuccessful {
|
||||
// @discuss: maybe make it pending again?
|
||||
err := r.db.UpdateL2GasOracleStatusAndOracleTxHash(r.ctx, cfm.ID, types.GasOracleFailed, cfm.TxHash.String())
|
||||
if err != nil {
|
||||
log.Warn("UpdateL2GasOracleStatusAndOracleTxHash failed", "err", err)
|
||||
}
|
||||
log.Warn("transaction confirmed but failed in layer1", "confirmation", cfm)
|
||||
} else {
|
||||
// @todo handle db error
|
||||
err := r.db.UpdateL2GasOracleStatusAndOracleTxHash(r.ctx, cfm.ID, types.GasOracleImported, cfm.TxHash.String())
|
||||
if err != nil {
|
||||
log.Warn("UpdateL2GasOracleStatusAndOracleTxHash failed", "err", err)
|
||||
}
|
||||
log.Info("transaction confirmed in layer1", "confirmation", cfm)
|
||||
}
|
||||
}
|
||||
go func() {
|
||||
if err := r.checkSubmittedMessages(); err != nil {
|
||||
log.Error("failed to init layer2 submitted messages", "err", err)
|
||||
}
|
||||
}(ctx)
|
||||
// Wait until sender pool is clean.
|
||||
utils.TryTimes(-1, func() bool {
|
||||
return r.messageSender.PendingCount() == 0
|
||||
})
|
||||
go cutil.Loop(ctx, time.Second, r.ProcessSavedEvents)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
if err := r.checkRollupBatches(); err != nil {
|
||||
log.Error("failed to init layer2 rollupCommitting messages", "err", err)
|
||||
}
|
||||
utils.TryTimes(-1, func() bool {
|
||||
return r.rollupSender.PendingCount() == 0
|
||||
})
|
||||
|
||||
if err := r.checkFinalizingBatches(); err != nil {
|
||||
log.Error("failed to init layer2 finalizing batches", "err", err)
|
||||
}
|
||||
// Wait until sender pool is clean.
|
||||
utils.TryTimes(-1, func() bool {
|
||||
return r.rollupSender.PendingCount() == 0
|
||||
})
|
||||
go cutil.Loop(ctx, time.Second, r.ProcessCommittedBatches)
|
||||
}()
|
||||
|
||||
go cutil.Loop(ctx, time.Second, r.ProcessGasPriceOracle)
|
||||
|
||||
<-r.stopCh
|
||||
cancel()
|
||||
@@ -563,6 +217,35 @@ func (r *Layer2Relayer) Stop() {
|
||||
close(r.stopCh)
|
||||
}
|
||||
|
||||
func (r *Layer2Relayer) confirmLoop(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case confirmation := <-r.messageCh:
|
||||
r.handleConfirmation(confirmation)
|
||||
case confirmation := <-r.rollupCh:
|
||||
r.handleConfirmation(confirmation)
|
||||
case cfm := <-r.gasOracleCh:
|
||||
if !cfm.IsSuccessful {
|
||||
// @discuss: maybe make it pending again?
|
||||
err := r.db.UpdateL2GasOracleStatusAndOracleTxHash(r.ctx, cfm.ID, types.GasOracleFailed, cfm.TxHash.String())
|
||||
if err != nil {
|
||||
log.Warn("UpdateL2GasOracleStatusAndOracleTxHash failed", "err", err)
|
||||
}
|
||||
log.Warn("transaction confirmed but failed in layer1", "confirmation", cfm)
|
||||
} else {
|
||||
// @todo handle db error
|
||||
err := r.db.UpdateL2GasOracleStatusAndOracleTxHash(r.ctx, cfm.ID, types.GasOracleImported, cfm.TxHash.String())
|
||||
if err != nil {
|
||||
log.Warn("UpdateL2GasOracleStatusAndOracleTxHash failed", "err", err)
|
||||
}
|
||||
log.Info("transaction confirmed in layer1", "confirmation", cfm)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Layer2Relayer) handleConfirmation(confirmation *sender.Confirmation) {
|
||||
transactionType := "Unknown"
|
||||
// check whether it is message relay transaction
|
||||
|
||||
187
bridge/l2/relayer_commit.go
Normal file
187
bridge/l2/relayer_commit.go
Normal file
@@ -0,0 +1,187 @@
|
||||
package l2
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/big"
|
||||
|
||||
"github.com/modern-go/reflect2"
|
||||
"github.com/scroll-tech/go-ethereum/common"
|
||||
"github.com/scroll-tech/go-ethereum/crypto"
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
|
||||
bridge_abi "scroll-tech/bridge/abi"
|
||||
|
||||
"scroll-tech/common/types"
|
||||
"scroll-tech/common/utils"
|
||||
|
||||
"scroll-tech/bridge/sender"
|
||||
)
|
||||
|
||||
func (r *Layer2Relayer) checkRollupBatches() error {
|
||||
// Check mock generateBatch interface.
|
||||
if reflect2.IsNil(r.batchInterface) {
|
||||
return nil
|
||||
}
|
||||
|
||||
blockBatchCache := make(map[string]*types.BlockBatch)
|
||||
getBlockBatch := func(batchHash string) (*types.BlockBatch, error) {
|
||||
if blockBatch, ok := blockBatchCache[batchHash]; ok {
|
||||
return blockBatch, nil
|
||||
}
|
||||
blockBatches, err := r.db.GetBlockBatches(map[string]interface{}{"hash": batchHash})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(blockBatches) == 0 {
|
||||
return nil, fmt.Errorf("don't has such blockBatch, batchHash: %s", batchHash)
|
||||
}
|
||||
blockBatchCache[batchHash] = blockBatches[0]
|
||||
return blockBatches[0], nil
|
||||
}
|
||||
|
||||
var batchIndex uint64
|
||||
for {
|
||||
blockBatches, err := r.db.GetBlockBatches(
|
||||
map[string]interface{}{"rollup_status": types.RollupCommitting},
|
||||
fmt.Sprintf("AND commit_tx_hash IN (SELECT commit_tx_hash FROM block_batch WHERE index > %d GROUP BY index, commit_tx_hash ORDER BY index LIMIT 1)", batchIndex),
|
||||
fmt.Sprintf("AND index > %d", batchIndex),
|
||||
"ORDER BY index ASC",
|
||||
)
|
||||
if err != nil {
|
||||
log.Error("failed to get rollup committing block batches", "batch index", batchIndex, "err", err)
|
||||
return err
|
||||
}
|
||||
if len(blockBatches) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var batchDataBuffer []*types.BatchData
|
||||
batchIndex = blockBatches[len(blockBatches)-1].Index
|
||||
for _, blockBatch := range blockBatches {
|
||||
// Wait until sender's pending is not full.
|
||||
utils.TryTimes(-1, func() bool {
|
||||
return !r.rollupSender.IsFull()
|
||||
})
|
||||
|
||||
var (
|
||||
parentBatch *types.BlockBatch
|
||||
blockInfos []*types.BlockInfo
|
||||
)
|
||||
parentBatch, err = getBlockBatch(blockBatch.ParentHash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
blockInfos, err = r.db.GetL2BlockInfos(
|
||||
map[string]interface{}{"batch_hash": blockBatch.Hash},
|
||||
"order by number ASC",
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(blockInfos) != int(blockBatch.EndBlockNumber-blockBatch.StartBlockNumber+1) {
|
||||
log.Error("the number of block info retrieved from DB mistmatches the blockBatch info in the DB",
|
||||
"len(blockInfos)", len(blockInfos),
|
||||
"expected", blockBatch.EndBlockNumber-blockBatch.StartBlockNumber+1)
|
||||
continue
|
||||
}
|
||||
var batchData *types.BatchData
|
||||
batchData, err = r.GenerateBatchData(parentBatch, blockInfos)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
batchDataBuffer = append(batchDataBuffer, batchData)
|
||||
}
|
||||
batchHashes, txID, callData, err := r.packBatchData(batchDataBuffer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Handle tx.
|
||||
err = r.rollupSender.LoadOrSendTx(
|
||||
common.HexToHash(blockBatches[0].CommitTxHash.String),
|
||||
txID,
|
||||
&r.cfg.RollupContractAddress,
|
||||
big.NewInt(0),
|
||||
callData,
|
||||
0,
|
||||
)
|
||||
switch true {
|
||||
case err == nil:
|
||||
r.processingBatchesCommitment.Store(txID, batchHashes)
|
||||
case err.Error() == "execution reverted: Batch already commited": //nolint:misspell
|
||||
log.Warn("blockBatches already committed", "start index", blockBatches[0].Index, "end index", blockBatches[len(blockBatches)-1].Index)
|
||||
default:
|
||||
log.Error("failed to load or send batchData tx")
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Layer2Relayer) packBatchData(batchData []*types.BatchData) ([]string, string, []byte, error) {
|
||||
// pack calldata
|
||||
commitBatches := make([]bridge_abi.IScrollChainBatch, len(batchData))
|
||||
for i, batch := range batchData {
|
||||
commitBatches[i] = batch.Batch
|
||||
}
|
||||
calldata, err := r.l1RollupABI.Pack("commitBatches", commitBatches)
|
||||
if err != nil {
|
||||
log.Error("Failed to pack commitBatches",
|
||||
"error", err,
|
||||
"start_batch_index", commitBatches[0].BatchIndex,
|
||||
"end_batch_index", commitBatches[len(commitBatches)-1].BatchIndex)
|
||||
return nil, "", nil, err
|
||||
}
|
||||
|
||||
// generate a unique txID and send transaction
|
||||
var (
|
||||
bytes []byte
|
||||
batchHashes = make([]string, len(batchData))
|
||||
)
|
||||
for i, batch := range batchData {
|
||||
bytes = append(bytes, batch.Hash().Bytes()...)
|
||||
batchHashes[i] = batch.Hash().Hex()
|
||||
}
|
||||
return batchHashes, crypto.Keccak256Hash(bytes).String(), calldata, nil
|
||||
}
|
||||
|
||||
// SendCommitTx sends commitBatches tx to L1.
|
||||
func (r *Layer2Relayer) SendCommitTx(batchData []*types.BatchData) error {
|
||||
if len(batchData) == 0 {
|
||||
log.Error("SendCommitTx receives empty batch")
|
||||
return nil
|
||||
}
|
||||
|
||||
// pack calldata
|
||||
batchHashes, txID, calldata, err := r.packBatchData(batchData)
|
||||
if err != nil {
|
||||
log.Error("Failed to pack commitBatches",
|
||||
"error", err,
|
||||
"start_batch_index", batchData[0].Batch.BatchIndex,
|
||||
"end_batch_index", batchData[len(batchData)-1].Batch.BatchIndex)
|
||||
return err
|
||||
}
|
||||
|
||||
txHash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), calldata, 0)
|
||||
if err != nil {
|
||||
if !errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
log.Error("Failed to send commitBatches tx to layer1 ", "err", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
bridgeL2BatchesCommittedTotalCounter.Inc(int64(len(batchHashes)))
|
||||
log.Info("Sent the commitBatches tx to layer1",
|
||||
"tx_hash", txHash.Hex(),
|
||||
"start_batch_index", batchData[0].Batch.BatchIndex,
|
||||
"end_batch_index", batchData[len(batchData)-1].Batch.BatchIndex)
|
||||
|
||||
// record and sync with db, @todo handle db error
|
||||
for i := range batchData {
|
||||
err = r.db.UpdateCommitTxHashAndRollupStatus(r.ctx, batchHashes[i], txHash.String(), types.RollupCommitting)
|
||||
if err != nil {
|
||||
log.Error("UpdateCommitTxHashAndRollupStatus failed", "hash", batchHashes[i], "index", batchData[i].Batch.BatchIndex, "err", err)
|
||||
}
|
||||
}
|
||||
r.processingBatchesCommitment.Store(txID, batchHashes)
|
||||
return nil
|
||||
}
|
||||
240
bridge/l2/relayer_finalize.go
Normal file
240
bridge/l2/relayer_finalize.go
Normal file
@@ -0,0 +1,240 @@
|
||||
package l2
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/big"
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/common"
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
|
||||
"scroll-tech/common/types"
|
||||
comutiles "scroll-tech/common/utils"
|
||||
|
||||
"scroll-tech/bridge/sender"
|
||||
"scroll-tech/bridge/utils"
|
||||
)
|
||||
|
||||
func (r *Layer2Relayer) checkFinalizingBatches() error {
|
||||
var (
|
||||
batchLimit = 10
|
||||
batchIndex uint64
|
||||
)
|
||||
for {
|
||||
batches, err := r.db.GetBlockBatches(
|
||||
map[string]interface{}{"rollup_status": types.RollupFinalizing},
|
||||
fmt.Sprintf("AND index > %d", batchIndex),
|
||||
fmt.Sprintf("ORDER BY index ASC LIMIT %d", batchLimit),
|
||||
)
|
||||
if err != nil {
|
||||
log.Error("failed to get Rollup finalizing batches", "batch index", batchIndex, "err", err)
|
||||
return err
|
||||
}
|
||||
if len(batches) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
batchIndex = batches[len(batches)-1].Index
|
||||
for _, batch := range batches { //nolint:staticcheck
|
||||
// Wait until sender's pending is not full.
|
||||
comutiles.TryTimes(-1, func() bool {
|
||||
return !r.rollupSender.IsFull()
|
||||
})
|
||||
|
||||
var (
|
||||
txHash common.Hash
|
||||
hash = batch.Hash
|
||||
)
|
||||
// Use empty txHash can let tx resent, if tx is already on block will be checked.
|
||||
if batch.CommitTxHash.Valid {
|
||||
txHash = common.HexToHash(batch.CommitTxHash.String)
|
||||
}
|
||||
|
||||
data, err := r.packFinalizeBatch(hash)
|
||||
if err != nil {
|
||||
log.Error("failed to pack finalize data", "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
txID := hash + "-finalize"
|
||||
err = r.rollupSender.LoadOrSendTx(
|
||||
txHash,
|
||||
txID,
|
||||
&r.cfg.RollupContractAddress,
|
||||
big.NewInt(0),
|
||||
data,
|
||||
0,
|
||||
)
|
||||
switch true {
|
||||
case err == nil:
|
||||
r.processingFinalization.Store(txID, hash)
|
||||
case err.Error() == "execution reverted: Batch is already finalized":
|
||||
log.Warn("block batch already rollup finalized", "batch index", batch.Index)
|
||||
if err = r.db.UpdateRollupStatus(r.ctx, hash, types.RollupFinalized); err != nil {
|
||||
return err
|
||||
}
|
||||
default:
|
||||
log.Error("failed to load or send finalized tx", "batch hash", hash, "err", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Layer2Relayer) packFinalizeBatch(hash string) ([]byte, error) {
|
||||
proofBuffer, instanceBuffer, err := r.db.GetVerifiedProofAndInstanceByHash(hash)
|
||||
if err != nil {
|
||||
log.Warn("fetch get proof by hash failed", "hash", hash, "err", err)
|
||||
return nil, err
|
||||
}
|
||||
if proofBuffer == nil || instanceBuffer == nil {
|
||||
log.Warn("proof or instance not ready", "hash", hash)
|
||||
return nil, err
|
||||
}
|
||||
if len(proofBuffer)%32 != 0 {
|
||||
log.Error("proof buffer has wrong length", "hash", hash, "length", len(proofBuffer))
|
||||
return nil, err
|
||||
}
|
||||
if len(instanceBuffer)%32 != 0 {
|
||||
log.Warn("instance buffer has wrong length", "hash", hash, "length", len(instanceBuffer))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
proof := utils.BufferToUint256Le(proofBuffer)
|
||||
instance := utils.BufferToUint256Le(instanceBuffer)
|
||||
data, err := r.l1RollupABI.Pack("finalizeBatchWithProof", common.HexToHash(hash), proof, instance)
|
||||
if err != nil {
|
||||
log.Error("Pack finalizeBatchWithProof failed", "err", err)
|
||||
return nil, err
|
||||
}
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// ProcessCommittedBatches submit proof to layer 1 rollup contract
|
||||
func (r *Layer2Relayer) ProcessCommittedBatches() {
|
||||
// set skipped batches in a single db operation
|
||||
if count, err := r.db.UpdateSkippedBatches(); err != nil {
|
||||
log.Error("UpdateSkippedBatches failed", "err", err)
|
||||
// continue anyway
|
||||
} else if count > 0 {
|
||||
bridgeL2BatchesSkippedTotalCounter.Inc(count)
|
||||
log.Info("Skipping batches", "count", count)
|
||||
}
|
||||
|
||||
// batches are sorted by batch index in increasing order
|
||||
batchHashes, err := r.db.GetCommittedBatches(1)
|
||||
if err != nil {
|
||||
log.Error("Failed to fetch committed L2 batches", "err", err)
|
||||
return
|
||||
}
|
||||
if len(batchHashes) == 0 {
|
||||
return
|
||||
}
|
||||
hash := batchHashes[0]
|
||||
// @todo add support to relay multiple batches
|
||||
|
||||
batches, err := r.db.GetBlockBatches(map[string]interface{}{"hash": hash}, "LIMIT 1")
|
||||
if err != nil {
|
||||
log.Error("Failed to fetch committed L2 batch", "hash", hash, "err", err)
|
||||
return
|
||||
}
|
||||
if len(batches) == 0 {
|
||||
log.Error("Unexpected result for GetBlockBatches", "hash", hash, "len", 0)
|
||||
return
|
||||
}
|
||||
|
||||
batch := batches[0]
|
||||
status := batch.ProvingStatus
|
||||
|
||||
switch status {
|
||||
case types.ProvingTaskUnassigned, types.ProvingTaskAssigned:
|
||||
// The proof for this block is not ready yet.
|
||||
return
|
||||
|
||||
case types.ProvingTaskProved:
|
||||
// It's an intermediate state. The roller manager received the proof but has not verified
|
||||
// the proof yet. We don't roll up the proof until it's verified.
|
||||
return
|
||||
|
||||
case types.ProvingTaskFailed, types.ProvingTaskSkipped:
|
||||
// note: this is covered by UpdateSkippedBatches, but we keep it for completeness's sake
|
||||
|
||||
if err = r.db.UpdateRollupStatus(r.ctx, hash, types.RollupFinalizationSkipped); err != nil {
|
||||
log.Warn("UpdateRollupStatus failed", "hash", hash, "err", err)
|
||||
}
|
||||
|
||||
case types.ProvingTaskVerified:
|
||||
log.Info("Start to roll up zk proof", "hash", hash)
|
||||
success := false
|
||||
|
||||
previousBatch, err := r.db.GetLatestFinalizingOrFinalizedBatch()
|
||||
|
||||
// skip submitting proof
|
||||
if err == nil && uint64(batch.CreatedAt.Sub(*previousBatch.CreatedAt).Seconds()) < r.cfg.FinalizeBatchIntervalSec {
|
||||
log.Info(
|
||||
"Not enough time passed, skipping",
|
||||
"hash", hash,
|
||||
"createdAt", batch.CreatedAt,
|
||||
"lastFinalizingHash", previousBatch.Hash,
|
||||
"lastFinalizingStatus", previousBatch.RollupStatus,
|
||||
"lastFinalizingCreatedAt", previousBatch.CreatedAt,
|
||||
)
|
||||
|
||||
if err = r.db.UpdateRollupStatus(r.ctx, hash, types.RollupFinalizationSkipped); err != nil {
|
||||
log.Warn("UpdateRollupStatus failed", "hash", hash, "err", err)
|
||||
} else {
|
||||
success = true
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// handle unexpected db error
|
||||
if err != nil && err.Error() != "sql: no rows in result set" {
|
||||
log.Error("Failed to get latest finalized batch", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
// TODO: need to revisit this and have a more fine-grained error handling
|
||||
if !success {
|
||||
log.Info("Failed to upload the proof, change rollup status to FinalizationSkipped", "hash", hash)
|
||||
if err = r.db.UpdateRollupStatus(r.ctx, hash, types.RollupFinalizationSkipped); err != nil {
|
||||
log.Warn("UpdateRollupStatus failed", "hash", hash, "err", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
data, err := r.packFinalizeBatch(hash)
|
||||
if err != nil {
|
||||
log.Error("Pack finalizeBatchWithProof failed", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
txID := hash + "-finalize"
|
||||
// add suffix `-finalize` to avoid duplication with commit tx in unit tests
|
||||
txHash, err := r.rollupSender.SendTransaction(txID, &r.cfg.RollupContractAddress, big.NewInt(0), data, 0)
|
||||
finalizeTxHash := &txHash
|
||||
if err != nil {
|
||||
if !errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
log.Error("finalizeBatchWithProof in layer1 failed", "hash", hash, "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
bridgeL2BatchesFinalizedTotalCounter.Inc(1)
|
||||
log.Info("finalizeBatchWithProof in layer1", "batch_hash", hash, "tx_hash", hash)
|
||||
|
||||
// record and sync with db, @todo handle db error
|
||||
err = r.db.UpdateFinalizeTxHashAndRollupStatus(r.ctx, hash, finalizeTxHash.String(), types.RollupFinalizing)
|
||||
if err != nil {
|
||||
log.Warn("UpdateFinalizeTxHashAndRollupStatus failed", "batch_hash", hash, "err", err)
|
||||
}
|
||||
success = true
|
||||
r.processingFinalization.Store(txID, hash)
|
||||
|
||||
default:
|
||||
log.Error("encounter unreachable case in ProcessCommittedBatches",
|
||||
"block_status", status,
|
||||
)
|
||||
}
|
||||
}
|
||||
207
bridge/l2/relayer_message.go
Normal file
207
bridge/l2/relayer_message.go
Normal file
@@ -0,0 +1,207 @@
|
||||
package l2
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"runtime"
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/common"
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"modernc.org/mathutil"
|
||||
|
||||
"scroll-tech/common/utils"
|
||||
|
||||
"scroll-tech/common/types"
|
||||
|
||||
bridge_abi "scroll-tech/bridge/abi"
|
||||
"scroll-tech/bridge/sender"
|
||||
)
|
||||
|
||||
const processMsgLimit = 100
|
||||
|
||||
func (r *Layer2Relayer) checkSubmittedMessages() error {
|
||||
var nonce uint64
|
||||
for {
|
||||
// msgs are sorted by nonce in increasing order
|
||||
msgs, err := r.db.GetL2Messages(
|
||||
map[string]interface{}{"status": types.MsgSubmitted},
|
||||
fmt.Sprintf("AND nonce > %d", nonce),
|
||||
fmt.Sprintf("ORDER BY nonce ASC LIMIT %d", processMsgLimit),
|
||||
)
|
||||
if err != nil {
|
||||
log.Error("failed to get l2 submitted messages", "message nonce", nonce, "err", err)
|
||||
return err
|
||||
}
|
||||
if len(msgs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var batch *types.BlockBatch
|
||||
nonce = msgs[len(msgs)-1].Nonce
|
||||
for _, msg := range msgs { //nolint:staticcheck
|
||||
// Wait until sender's pending is not full.
|
||||
utils.TryTimes(-1, func() bool {
|
||||
return !r.messageSender.IsFull()
|
||||
})
|
||||
|
||||
// Get batch by block number.
|
||||
if batch == nil || msg.Height < batch.StartBlockNumber || msg.Height > batch.EndBlockNumber {
|
||||
batches, err := r.db.GetBlockBatches(
|
||||
map[string]interface{}{},
|
||||
fmt.Sprintf("AND start_block_number <= %d AND end_block_number >= %d", msg.Height, msg.Height),
|
||||
)
|
||||
// If get batch failed, stop and return immediately.
|
||||
if err != nil || len(batches) == 0 {
|
||||
return err
|
||||
}
|
||||
batch = batches[0]
|
||||
}
|
||||
|
||||
data, err := r.packRelayMessage(msg, common.HexToHash(batch.Hash))
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
err = r.messageSender.LoadOrSendTx(
|
||||
common.HexToHash(msg.Layer1Hash),
|
||||
msg.MsgHash,
|
||||
&r.cfg.MessengerContractAddress,
|
||||
big.NewInt(0),
|
||||
data,
|
||||
r.minGasLimitForMessageRelay,
|
||||
)
|
||||
switch true {
|
||||
case err == nil:
|
||||
r.processingMessage.Store(msg.MsgHash, msg.MsgHash)
|
||||
case err.Error() == "execution reverted: Message expired":
|
||||
if err = r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, types.MsgExpired); err != nil {
|
||||
return err
|
||||
}
|
||||
case err.Error() == "execution reverted: Message successfully executed":
|
||||
if err = r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, types.MsgConfirmed); err != nil {
|
||||
return err
|
||||
}
|
||||
default:
|
||||
log.Error("failed to load or send l2 submitted tx", "batch hash", batch.Hash, "msg hash", msg.MsgHash, "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ProcessSavedEvents relays saved un-processed cross-domain transactions to desired blockchain
|
||||
func (r *Layer2Relayer) ProcessSavedEvents() {
|
||||
batch, err := r.db.GetLatestFinalizedBatch()
|
||||
if err != nil {
|
||||
log.Error("GetLatestFinalizedBatch failed", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
// msgs are sorted by nonce in increasing order
|
||||
msgs, err := r.db.GetL2Messages(
|
||||
map[string]interface{}{"status": types.MsgPending},
|
||||
fmt.Sprintf("AND height<=%d", batch.EndBlockNumber),
|
||||
fmt.Sprintf("ORDER BY nonce ASC LIMIT %d", processMsgLimit),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
log.Error("Failed to fetch unprocessed L2 messages", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
// process messages in batches
|
||||
batchSize := mathutil.Min((runtime.GOMAXPROCS(0)+1)/2, r.messageSender.NumberOfAccounts())
|
||||
for size := 0; len(msgs) > 0; msgs = msgs[size:] {
|
||||
if size = len(msgs); size > batchSize {
|
||||
size = batchSize
|
||||
}
|
||||
var g errgroup.Group
|
||||
for _, msg := range msgs[:size] {
|
||||
msg := msg
|
||||
g.Go(func() error {
|
||||
return r.processSavedEvent(msg)
|
||||
})
|
||||
}
|
||||
if err := g.Wait(); err != nil {
|
||||
if !errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
log.Error("failed to process l2 saved event", "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Layer2Relayer) packRelayMessage(msg *types.L2Message, batchHash common.Hash) ([]byte, error) {
|
||||
// TODO: rebuild the withdraw trie to generate the merkle proof
|
||||
proof := bridge_abi.IL1ScrollMessengerL2MessageProof{
|
||||
BatchHash: batchHash,
|
||||
MerkleProof: make([]byte, 0),
|
||||
}
|
||||
from := common.HexToAddress(msg.Sender)
|
||||
target := common.HexToAddress(msg.Target)
|
||||
value, ok := big.NewInt(0).SetString(msg.Value, 10)
|
||||
if !ok {
|
||||
// @todo maybe panic?
|
||||
log.Error("Failed to parse message value", "msg.nonce", msg.Nonce, "msg.height", msg.Height)
|
||||
// TODO: need to skip this message by changing its status to MsgError
|
||||
}
|
||||
msgNonce := big.NewInt(int64(msg.Nonce))
|
||||
calldata := common.Hex2Bytes(msg.Calldata)
|
||||
data, err := r.l1MessengerABI.Pack("relayMessageWithProof", from, target, value, msgNonce, calldata, proof)
|
||||
if err != nil {
|
||||
log.Error("Failed to pack relayMessageWithProof", "msg.nonce", msg.Nonce, "err", err)
|
||||
// TODO: need to skip this message by changing its status to MsgError
|
||||
return nil, err
|
||||
}
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func (r *Layer2Relayer) processSavedEvent(msg *types.L2Message) error {
|
||||
// @todo fetch merkle proof from l2geth
|
||||
log.Info("Processing L2 Message", "msg.nonce", msg.Nonce, "msg.height", msg.Height)
|
||||
|
||||
// Get the block info that contains the message
|
||||
blockInfos, err := r.db.GetL2BlockInfos(map[string]interface{}{"number": msg.Height})
|
||||
if err != nil {
|
||||
log.Error("Failed to GetL2BlockInfos from DB", "number", msg.Height)
|
||||
}
|
||||
blockInfo := blockInfos[0]
|
||||
if !blockInfo.BatchHash.Valid {
|
||||
log.Error("Block has not been batched yet", "number", blockInfo.Number, "msg.nonce", msg.Nonce)
|
||||
return nil
|
||||
}
|
||||
|
||||
data, err := r.packRelayMessage(msg, common.HexToHash(blockInfo.BatchHash.String))
|
||||
if err != nil {
|
||||
log.Error("Failed to pack relayMessageWithProof", "msg.nonce", msg.Nonce, "err", err)
|
||||
// TODO: need to skip this message by changing its status to MsgError
|
||||
return err
|
||||
}
|
||||
|
||||
hash, err := r.messageSender.SendTransaction(msg.MsgHash, &r.cfg.MessengerContractAddress, big.NewInt(0), data, r.minGasLimitForMessageRelay)
|
||||
if err != nil && err.Error() == "execution reverted: Message expired" {
|
||||
return r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, types.MsgExpired)
|
||||
}
|
||||
if err != nil && err.Error() == "execution reverted: Message successfully executed" {
|
||||
return r.db.UpdateLayer2Status(r.ctx, msg.MsgHash, types.MsgConfirmed)
|
||||
}
|
||||
if err != nil {
|
||||
if !errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
log.Error("Failed to send relayMessageWithProof tx to layer1 ", "msg.height", msg.Height, "msg.MsgHash", msg.MsgHash, "err", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
bridgeL2MsgsRelayedTotalCounter.Inc(1)
|
||||
log.Info("relayMessageWithProof to layer1", "msgHash", msg.MsgHash, "txhash", hash.String())
|
||||
|
||||
// save status in db
|
||||
// @todo handle db error
|
||||
err = r.db.UpdateLayer2StatusAndLayer1Hash(r.ctx, msg.MsgHash, types.MsgSubmitted, hash.String())
|
||||
if err != nil {
|
||||
log.Error("UpdateLayer2StatusAndLayer1Hash failed", "msgHash", msg.MsgHash, "err", err)
|
||||
return err
|
||||
}
|
||||
r.processingMessage.Store(msg.MsgHash, msg.MsgHash)
|
||||
return nil
|
||||
}
|
||||
56
bridge/l2/relayer_priceoracle.go
Normal file
56
bridge/l2/relayer_priceoracle.go
Normal file
@@ -0,0 +1,56 @@
|
||||
package l2
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"math/big"
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
|
||||
"scroll-tech/common/types"
|
||||
|
||||
"scroll-tech/bridge/sender"
|
||||
)
|
||||
|
||||
// ProcessGasPriceOracle imports gas price to layer1
|
||||
func (r *Layer2Relayer) ProcessGasPriceOracle() {
|
||||
batch, err := r.db.GetLatestBatch()
|
||||
if err != nil {
|
||||
log.Error("Failed to GetLatestBatch", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
if batch.OracleStatus == types.GasOraclePending {
|
||||
suggestGasPrice, err := r.l2Client.SuggestGasPrice(r.ctx)
|
||||
if err != nil {
|
||||
log.Error("Failed to fetch SuggestGasPrice from l2geth", "err", err)
|
||||
return
|
||||
}
|
||||
suggestGasPriceUint64 := uint64(suggestGasPrice.Int64())
|
||||
expectedDelta := r.lastGasPrice * r.gasPriceDiff / gasPriceDiffPrecision
|
||||
|
||||
// last is undefine or (suggestGasPriceUint64 >= minGasPrice && exceed diff)
|
||||
if r.lastGasPrice == 0 || (suggestGasPriceUint64 >= r.minGasPrice && (suggestGasPriceUint64 >= r.lastGasPrice+expectedDelta || suggestGasPriceUint64 <= r.lastGasPrice-expectedDelta)) {
|
||||
data, err := r.l2GasOracleABI.Pack("setL2BaseFee", suggestGasPrice)
|
||||
if err != nil {
|
||||
log.Error("Failed to pack setL2BaseFee", "batch.Hash", batch.Hash, "GasPrice", suggestGasPrice.Uint64(), "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
hash, err := r.gasOracleSender.SendTransaction(batch.Hash, &r.cfg.GasPriceOracleContractAddress, big.NewInt(0), data, 0)
|
||||
if err != nil {
|
||||
if !errors.Is(err, sender.ErrNoAvailableAccount) {
|
||||
log.Error("Failed to send setL2BaseFee tx to layer2 ", "batch.Hash", batch.Hash, "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
err = r.db.UpdateL2GasOracleStatusAndOracleTxHash(r.ctx, batch.Hash, types.GasOracleImporting, hash.String())
|
||||
if err != nil {
|
||||
log.Error("UpdateGasOracleStatusAndOracleTxHash failed", "batch.Hash", batch.Hash, "err", err)
|
||||
return
|
||||
}
|
||||
r.lastGasPrice = suggestGasPriceUint64
|
||||
log.Info("Update l2 gas price", "txHash", hash.String(), "GasPrice", suggestGasPrice)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -19,10 +19,11 @@ import (
|
||||
geth_metrics "github.com/scroll-tech/go-ethereum/metrics"
|
||||
"github.com/scroll-tech/go-ethereum/rpc"
|
||||
|
||||
"scroll-tech/database"
|
||||
|
||||
"scroll-tech/common/metrics"
|
||||
"scroll-tech/common/types"
|
||||
cutil "scroll-tech/common/utils"
|
||||
"scroll-tech/database"
|
||||
|
||||
bridge_abi "scroll-tech/bridge/abi"
|
||||
"scroll-tech/bridge/utils"
|
||||
|
||||
@@ -76,6 +76,7 @@ type Sender struct {
|
||||
|
||||
blockNumber uint64 // Current block number on chain.
|
||||
baseFeePerGas uint64 // Current base fee per gas on chain
|
||||
pendingNum int64 // current pending tx count.
|
||||
pendingTxs sync.Map // Mapping from nonce to pending transaction
|
||||
confirmCh chan *Confirmation
|
||||
|
||||
@@ -134,6 +135,16 @@ func NewSender(ctx context.Context, config *config.SenderConfig, privs []*ecdsa.
|
||||
return sender, nil
|
||||
}
|
||||
|
||||
// PendingCount return the current pending txs num.
|
||||
func (s *Sender) PendingCount() int64 {
|
||||
return atomic.LoadInt64(&s.pendingNum)
|
||||
}
|
||||
|
||||
// PendingLimit return the maximum pendingTxs can handle.
|
||||
func (s *Sender) PendingLimit() int64 {
|
||||
return s.config.PendingLimit
|
||||
}
|
||||
|
||||
// Stop stop the sender module.
|
||||
func (s *Sender) Stop() {
|
||||
close(s.stopCh)
|
||||
@@ -157,16 +168,27 @@ func (s *Sender) getFeeData(auth *bind.TransactOpts, target *common.Address, val
|
||||
return s.estimateLegacyGas(auth, target, value, data, minGasLimit)
|
||||
}
|
||||
|
||||
// IsFull If pendingTxs pool is full return true.
|
||||
func (s *Sender) IsFull() bool {
|
||||
return atomic.LoadInt64(&s.pendingNum) >= s.config.PendingLimit
|
||||
}
|
||||
|
||||
// SendTransaction send a signed L2tL1 transaction.
|
||||
func (s *Sender) SendTransaction(ID string, target *common.Address, value *big.Int, data []byte, minGasLimit uint64) (hash common.Hash, err error) {
|
||||
if s.IsFull() {
|
||||
return common.Hash{}, fmt.Errorf("pending txs is full, pending size: %d", s.config.PendingLimit)
|
||||
}
|
||||
// We occupy the ID, in case some other threads call with the same ID in the same time
|
||||
if _, loaded := s.pendingTxs.LoadOrStore(ID, nil); loaded {
|
||||
return common.Hash{}, fmt.Errorf("has the repeat tx ID, ID: %s", ID)
|
||||
}
|
||||
atomic.AddInt64(&s.pendingNum, 1)
|
||||
|
||||
// get
|
||||
auth := s.auths.getAccount()
|
||||
if auth == nil {
|
||||
s.pendingTxs.Delete(ID) // release the ID on failure
|
||||
atomic.AddInt64(&s.pendingNum, -1)
|
||||
return common.Hash{}, ErrNoAvailableAccount
|
||||
}
|
||||
|
||||
@@ -174,6 +196,7 @@ func (s *Sender) SendTransaction(ID string, target *common.Address, value *big.I
|
||||
defer func() {
|
||||
if err != nil {
|
||||
s.pendingTxs.Delete(ID) // release the ID on failure
|
||||
atomic.AddInt64(&s.pendingNum, -1)
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -201,6 +224,61 @@ func (s *Sender) SendTransaction(ID string, target *common.Address, value *big.I
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Sender) getTxAndAddr(txHash common.Hash) (*types.Transaction, uint64, common.Address, error) {
|
||||
tx, isPending, err := s.client.TransactionByHash(s.ctx, txHash)
|
||||
if err != nil {
|
||||
return nil, 0, common.Address{}, err
|
||||
}
|
||||
|
||||
sender, err := types.Sender(types.LatestSignerForChainID(s.chainID), tx)
|
||||
if err != nil {
|
||||
return nil, 0, common.Address{}, err
|
||||
}
|
||||
|
||||
if isPending {
|
||||
return tx, s.blockNumber, sender, nil
|
||||
}
|
||||
|
||||
receipt, err := s.client.TransactionReceipt(s.ctx, txHash)
|
||||
if err != nil {
|
||||
return nil, 0, common.Address{}, err
|
||||
}
|
||||
return tx, receipt.BlockNumber.Uint64(), sender, nil
|
||||
}
|
||||
|
||||
// LoadOrSendTx loads a transaction if it already exists on chain and resends it otherwise.
|
||||
func (s *Sender) LoadOrSendTx(destTxHash common.Hash, ID string, target *common.Address, value *big.Int, data []byte, minGasLimit uint64) error {
|
||||
tx, blockNumber, from, err := s.getTxAndAddr(destTxHash)
|
||||
// If this tx already exist load it to the pending.
|
||||
if err == nil && tx != nil {
|
||||
auth := s.auths.accounts[from]
|
||||
var feeData *FeeData
|
||||
feeData, err = s.getFeeData(auth, target, value, data, minGasLimit)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// We occupy the ID, in case some other threads call with the same ID in the same time
|
||||
if _, loaded := s.pendingTxs.LoadOrStore(ID, nil); loaded {
|
||||
return fmt.Errorf("has the repeat tx ID, ID: %s", ID)
|
||||
}
|
||||
atomic.AddInt64(&s.pendingNum, 1)
|
||||
s.pendingTxs.Store(ID, &PendingTransaction{
|
||||
tx: tx,
|
||||
id: ID,
|
||||
signer: auth,
|
||||
// Record the transaction's block blockNumber.
|
||||
submitAt: blockNumber,
|
||||
feeData: feeData,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
// Tx is dropped from chain node, resend it.
|
||||
_, err = s.SendTransaction(ID, target, value, data, minGasLimit)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Sender) createAndSendTx(auth *bind.TransactOpts, feeData *FeeData, target *common.Address, value *big.Int, data []byte, overrideNonce *uint64) (tx *types.Transaction, err error) {
|
||||
var (
|
||||
nonce = auth.Nonce.Uint64()
|
||||
@@ -346,6 +424,7 @@ func (s *Sender) checkPendingTransaction(header *types.Header, confirmed uint64)
|
||||
if (err == nil) && (receipt != nil) {
|
||||
if receipt.BlockNumber.Uint64() <= confirmed {
|
||||
s.pendingTxs.Delete(key)
|
||||
atomic.AddInt64(&s.pendingNum, -1)
|
||||
// send confirm message
|
||||
s.confirmCh <- &Confirmation{
|
||||
ID: pending.id,
|
||||
@@ -377,6 +456,7 @@ func (s *Sender) checkPendingTransaction(header *types.Header, confirmed uint64)
|
||||
if strings.Contains(err.Error(), "nonce") {
|
||||
// This key can be deleted
|
||||
s.pendingTxs.Delete(key)
|
||||
atomic.AddInt64(&s.pendingNum, -1)
|
||||
// Try get receipt by the latest replaced tx hash
|
||||
receipt, err := s.client.TransactionReceipt(s.ctx, pending.tx.Hash())
|
||||
if (err == nil) && (receipt != nil) {
|
||||
|
||||
@@ -82,7 +82,7 @@ func (i *ImgDB) Endpoint() string {
|
||||
}
|
||||
|
||||
func (i *ImgDB) prepare() []string {
|
||||
cmd := []string{"docker", "run", "--name", i.name, "-p", fmt.Sprintf("%d:5432", i.port)}
|
||||
cmd := []string{"docker", "container", "run", "--name", i.name, "-p", fmt.Sprintf("%d:5432", i.port)}
|
||||
envs := []string{
|
||||
"-e", "POSTGRES_PASSWORD=" + i.password,
|
||||
"-e", fmt.Sprintf("POSTGRES_DB=%s", i.dbName),
|
||||
|
||||
@@ -125,7 +125,7 @@ func (i *ImgGeth) Stop() error {
|
||||
}
|
||||
|
||||
func (i *ImgGeth) prepare() []string {
|
||||
cmds := []string{"docker", "run", "--name", i.name}
|
||||
cmds := []string{"docker", "container", "run", "--name", i.name}
|
||||
var ports []string
|
||||
if i.httpPort != 0 {
|
||||
ports = append(ports, []string{"-p", strconv.Itoa(i.httpPort) + ":8545"}...)
|
||||
|
||||
@@ -98,6 +98,7 @@ type L1Message struct {
|
||||
Calldata string `json:"calldata" db:"calldata"`
|
||||
GasLimit uint64 `json:"gas_limit" db:"gas_limit"`
|
||||
Layer1Hash string `json:"layer1_hash" db:"layer1_hash"`
|
||||
Layer2Hash string `json:"layer2_hash" db:"layer2_hash"`
|
||||
Status MsgStatus `json:"status" db:"status"`
|
||||
}
|
||||
|
||||
@@ -110,6 +111,7 @@ type L2Message struct {
|
||||
Value string `json:"value" db:"value"`
|
||||
Target string `json:"target" db:"target"`
|
||||
Calldata string `json:"calldata" db:"calldata"`
|
||||
Layer1Hash string `json:"layer1_hash" db:"layer1_hash"`
|
||||
Layer2Hash string `json:"layer2_hash" db:"layer2_hash"`
|
||||
Status MsgStatus `json:"status" db:"status"`
|
||||
}
|
||||
|
||||
@@ -7,7 +7,7 @@ import (
|
||||
|
||||
// TryTimes try run several times until the function return true.
|
||||
func TryTimes(times int, run func() bool) {
|
||||
for i := 0; i < times; i++ {
|
||||
for i := 0; times == -1 || i < times; i++ {
|
||||
if run() {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -18,10 +18,11 @@ import (
|
||||
geth_metrics "github.com/scroll-tech/go-ethereum/metrics"
|
||||
"github.com/scroll-tech/go-ethereum/rpc"
|
||||
|
||||
"scroll-tech/database"
|
||||
|
||||
"scroll-tech/common/message"
|
||||
"scroll-tech/common/metrics"
|
||||
"scroll-tech/common/types"
|
||||
"scroll-tech/database"
|
||||
|
||||
"scroll-tech/common/utils/workerpool"
|
||||
|
||||
|
||||
@@ -80,6 +80,7 @@ type BlockBatchOrm interface {
|
||||
|
||||
// L1MessageOrm is layer1 message db interface
|
||||
type L1MessageOrm interface {
|
||||
GetL1Messages(fields map[string]interface{}, args ...string) ([]*types.L1Message, error)
|
||||
GetL1MessageByQueueIndex(queueIndex uint64) (*types.L1Message, error)
|
||||
GetL1MessageByMsgHash(msgHash string) (*types.L1Message, error)
|
||||
GetL1MessagesByStatus(status types.MsgStatus, limit uint64) ([]*types.L1Message, error)
|
||||
|
||||
@@ -4,6 +4,8 @@ import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
@@ -22,6 +24,37 @@ func NewL1MessageOrm(db *sqlx.DB) L1MessageOrm {
|
||||
return &l1MessageOrm{db: db}
|
||||
}
|
||||
|
||||
// GetL1Messages get l1 messages by k-v map and args.
|
||||
func (m *l1MessageOrm) GetL1Messages(fields map[string]interface{}, args ...string) ([]*types.L1Message, error) {
|
||||
query := "SELECT queue_index, msg_hash, height, sender, target, value, calldata, layer1_hash, layer2_hash, status FROM l1_message WHERE 1 = 1 "
|
||||
for key := range fields {
|
||||
query += fmt.Sprintf(" AND %s=:%s ", key, key)
|
||||
}
|
||||
query = strings.Join(append([]string{query}, args...), " ")
|
||||
|
||||
db := m.db
|
||||
rows, err := db.NamedQuery(db.Rebind(query), fields)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var msgs []*types.L1Message
|
||||
for rows.Next() {
|
||||
msg := &types.L1Message{}
|
||||
if err = rows.StructScan(&msg); err != nil {
|
||||
break
|
||||
}
|
||||
msgs = append(msgs, msg)
|
||||
}
|
||||
if len(msgs) == 0 || errors.Is(err, sql.ErrNoRows) {
|
||||
// log.Warn("no unprocessed layer2 messages in db", "err", err)
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return msgs, rows.Close()
|
||||
}
|
||||
|
||||
// GetL1MessageByMsgHash fetch message by queue_index
|
||||
func (m *l1MessageOrm) GetL1MessageByMsgHash(msgHash string) (*types.L1Message, error) {
|
||||
msg := types.L1Message{}
|
||||
@@ -48,7 +81,7 @@ func (m *l1MessageOrm) GetL1MessageByQueueIndex(queueIndex uint64) (*types.L1Mes
|
||||
|
||||
// GetL1MessagesByStatus fetch list of unprocessed messages given msg status
|
||||
func (m *l1MessageOrm) GetL1MessagesByStatus(status types.MsgStatus, limit uint64) ([]*types.L1Message, error) {
|
||||
rows, err := m.db.Queryx(`SELECT queue_index, msg_hash, height, sender, target, value, calldata, layer1_hash, status FROM l1_message WHERE status = $1 ORDER BY queue_index ASC LIMIT $2;`, status, limit)
|
||||
rows, err := m.db.Queryx(`SELECT queue_index, msg_hash, height, sender, target, value, calldata, layer1_hash, layer2_hash, status FROM l1_message WHERE status = $1 ORDER BY queue_index ASC LIMIT $2;`, status, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -108,9 +141,10 @@ func (m *l1MessageOrm) SaveL1Messages(ctx context.Context, messages []*types.L1M
|
||||
"gas_limit": msg.GasLimit,
|
||||
"calldata": msg.Calldata,
|
||||
"layer1_hash": msg.Layer1Hash,
|
||||
"layer2_hash": msg.Layer2Hash,
|
||||
}
|
||||
}
|
||||
_, err := m.db.NamedExec(`INSERT INTO public.l1_message (queue_index, msg_hash, height, sender, target, value, gas_limit, calldata, layer1_hash) VALUES (:queue_index, :msg_hash, :height, :sender, :target, :value, :gas_limit, :calldata, :layer1_hash);`, messageMaps)
|
||||
_, err := m.db.NamedExec(`INSERT INTO public.l1_message (queue_index, msg_hash, height, sender, target, value, gas_limit, calldata, layer1_hash, layer2_hash) VALUES (:queue_index, :msg_hash, :height, :sender, :target, :value, :gas_limit, :calldata, :layer1_hash, :layer2_hash);`, messageMaps)
|
||||
if err != nil {
|
||||
queueIndices := make([]uint64, 0, len(messages))
|
||||
heights := make([]uint64, 0, len(messages))
|
||||
|
||||
@@ -92,7 +92,7 @@ func (m *layer2MessageOrm) GetL2ProcessedNonce() (int64, error) {
|
||||
|
||||
// GetL2MessagesByStatus fetch list of messages given msg status
|
||||
func (m *layer2MessageOrm) GetL2Messages(fields map[string]interface{}, args ...string) ([]*types.L2Message, error) {
|
||||
query := "SELECT nonce, msg_hash, height, sender, target, value, calldata, layer2_hash FROM l2_message WHERE 1 = 1 "
|
||||
query := "SELECT nonce, msg_hash, height, sender, target, value, calldata, layer1_hash, layer2_hash FROM l2_message WHERE 1 = 1 "
|
||||
for key := range fields {
|
||||
query += fmt.Sprintf("AND %s=:%s ", key, key)
|
||||
}
|
||||
@@ -137,11 +137,12 @@ func (m *layer2MessageOrm) SaveL2Messages(ctx context.Context, messages []*types
|
||||
"target": msg.Target,
|
||||
"value": msg.Value,
|
||||
"calldata": msg.Calldata,
|
||||
"layer1_hash": msg.Layer1Hash,
|
||||
"layer2_hash": msg.Layer2Hash,
|
||||
}
|
||||
}
|
||||
|
||||
_, err := m.db.NamedExec(`INSERT INTO public.l2_message (nonce, msg_hash, height, sender, target, value, calldata, layer2_hash) VALUES (:nonce, :msg_hash, :height, :sender, :target, :value, :calldata, :layer2_hash);`, messageMaps)
|
||||
_, err := m.db.NamedExec(`INSERT INTO public.l2_message (nonce, msg_hash, height, sender, target, value, calldata, layer1_hash, layer2_hash) VALUES (:nonce, :msg_hash, :height, :sender, :target, :value, :calldata, :layer1_hash, :layer2_hash);`, messageMaps)
|
||||
if err != nil {
|
||||
nonces := make([]uint64, 0, len(messages))
|
||||
heights := make([]uint64, 0, len(messages))
|
||||
|
||||
@@ -603,6 +603,4 @@ gopkg.in/yaml.v3 v3.0.0-20191120175047-4206685974f2/go.mod h1:K4uyk7z7BCEPqu6E+C
|
||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
honnef.co/go/tools v0.1.3 h1:qTakTkI6ni6LFD5sBwwsdSO+AQqbSIxOauHTTQKZ/7o=
|
||||
honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las=
|
||||
rsc.io/binaryregexp v0.2.0 h1:HfqmD5MEmC0zvwBuF187nq9mdnXjXsSivRiXN7SmRkE=
|
||||
rsc.io/pdf v0.1.1 h1:k1MczvYDUvJBe93bYd7wrZLLUEcLZAuF824/I4e5Xr4=
|
||||
rsc.io/tmplfunc v0.0.3/go.mod h1:AG3sTPzElb1Io3Yg4voV9AGZJuleGAwaVRxL9M49PhA=
|
||||
|
||||
@@ -3,7 +3,7 @@ module scroll-tech/integration-test
|
||||
go 1.18
|
||||
|
||||
require (
|
||||
github.com/scroll-tech/go-ethereum v1.10.14-0.20230306131930-03b4de32b78b
|
||||
github.com/scroll-tech/go-ethereum v1.10.14-0.20230321020420-127af384ed04
|
||||
github.com/stretchr/testify v1.8.2
|
||||
)
|
||||
|
||||
|
||||
@@ -34,7 +34,7 @@ github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1Cpa
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
|
||||
github.com/scroll-tech/go-ethereum v1.10.14-0.20230306131930-03b4de32b78b h1:shNTzAnD2oDcDCrM4aaVCTzQNVfYxF1An08R2H2DLAg=
|
||||
github.com/scroll-tech/go-ethereum v1.10.14-0.20230321020420-127af384ed04 h1:PpI31kaBVm6+7sZtyK03Ex0QIg3P821Ktae0FHFh7IM=
|
||||
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
||||
|
||||
Reference in New Issue
Block a user