fix(bridge-history): incorrect status update and duplicated failed txs (#1088)

Co-authored-by: zzq0826 <770166635@qq.com>
This commit is contained in:
colin
2024-01-23 17:50:12 +08:00
committed by GitHub
parent c68f4283b1
commit c6e8fcb2d3
14 changed files with 288 additions and 198 deletions

View File

@@ -3,8 +3,8 @@
"confirmation": 0,
"endpoint": "https://rpc.ankr.com/eth",
"startHeight": 18306000,
"blockTime": 10,
"fetchLimit": 30,
"blockTime": 12,
"fetchLimit": 16,
"MessengerAddr": "0x6774Bcbd5ceCeF1336b5300fb5186a12DDD8b367",
"ETHGatewayAddr": "0x7F2b8C31F88B6006c382775eea88297Ec1e3E905",
"WETHGatewayAddr": "0x7AC440cAe8EB6328de4fA621163a792c1EA9D4fE",
@@ -23,7 +23,7 @@
"confirmation": 0,
"endpoint": "https://rpc.scroll.io",
"blockTime": 3,
"fetchLimit": 100,
"fetchLimit": 64,
"MessengerAddr": "0x781e90f1c8Fc4611c9b7497C3B47F99Ef6969CbC",
"ETHGatewayAddr": "0x6EA73e05AdC79974B931123675ea8F78FfdacDF0",
"WETHGatewayAddr": "0x7003E7B7186f0E6601203b99F7B8DECBfA391cf9",

View File

@@ -6,7 +6,6 @@ require (
github.com/gin-contrib/cors v1.5.0
github.com/gin-gonic/gin v1.9.1
github.com/go-redis/redis/v8 v8.11.5
github.com/google/uuid v1.4.0
github.com/pressly/goose/v3 v3.16.0
github.com/prometheus/client_golang v1.14.0
github.com/scroll-tech/go-ethereum v1.10.14-0.20231130005111-38a3a9c9198c
@@ -40,6 +39,7 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/holiman/uint256 v1.2.4 // indirect
github.com/huin/goupnp v1.3.0 // indirect

View File

@@ -8,11 +8,11 @@ import (
"scroll-tech/common/database"
)
// LayerConfig is the configuration of Layer1/Layer2
type LayerConfig struct {
// FetcherConfig is the configuration of Layer1 or Layer2 fetcher.
type FetcherConfig struct {
Confirmation uint64 `json:"confirmation"`
Endpoint string `json:"endpoint"`
StartHeight uint64 `json:"startHeight"` // Can only be configured to contract deployment height, otherwise in the current implementation, the message proof could not be successfully updated.
StartHeight uint64 `json:"startHeight"` // Can only be configured to contract deployment height, message proof should be updated from the very beginning.
BlockTime int64 `json:"blockTime"`
FetchLimit uint64 `json:"fetchLimit"`
MessengerAddr string `json:"MessengerAddr"`
@@ -43,8 +43,8 @@ type RedisConfig struct {
// Config is the configuration of the bridge history backend
type Config struct {
L1 *LayerConfig `json:"L1"`
L2 *LayerConfig `json:"L2"`
L1 *FetcherConfig `json:"L1"`
L2 *FetcherConfig `json:"L2"`
DB *database.Config `json:"db"`
Redis *RedisConfig `json:"redis"`
}

View File

@@ -20,7 +20,7 @@ import (
// L1MessageFetcher fetches cross message events from L1 and saves them to database.
type L1MessageFetcher struct {
ctx context.Context
cfg *config.LayerConfig
cfg *config.FetcherConfig
client *ethclient.Client
l1SyncHeight uint64
@@ -35,7 +35,7 @@ type L1MessageFetcher struct {
}
// NewL1MessageFetcher creates a new L1MessageFetcher instance.
func NewL1MessageFetcher(ctx context.Context, cfg *config.LayerConfig, db *gorm.DB, client *ethclient.Client) *L1MessageFetcher {
func NewL1MessageFetcher(ctx context.Context, cfg *config.FetcherConfig, db *gorm.DB, client *ethclient.Client) *L1MessageFetcher {
c := &L1MessageFetcher{
ctx: ctx,
cfg: cfg,

View File

@@ -20,7 +20,7 @@ import (
// L2MessageFetcher fetches cross message events from L2 and saves them to database.
type L2MessageFetcher struct {
ctx context.Context
cfg *config.LayerConfig
cfg *config.FetcherConfig
db *gorm.DB
client *ethclient.Client
l2SyncHeight uint64
@@ -35,7 +35,7 @@ type L2MessageFetcher struct {
}
// NewL2MessageFetcher creates a new L2MessageFetcher instance.
func NewL2MessageFetcher(ctx context.Context, cfg *config.LayerConfig, db *gorm.DB, client *ethclient.Client) *L2MessageFetcher {
func NewL2MessageFetcher(ctx context.Context, cfg *config.FetcherConfig, db *gorm.DB, client *ethclient.Client) *L2MessageFetcher {
c := &L2MessageFetcher{
ctx: ctx,
cfg: cfg,

View File

@@ -76,39 +76,30 @@ func (b *EventUpdateLogic) GetL2MessageSyncedHeightInDB(ctx context.Context) (ui
// L1InsertOrUpdate inserts or updates l1 messages
func (b *EventUpdateLogic) L1InsertOrUpdate(ctx context.Context, l1FetcherResult *L1FilterResult) error {
err := b.db.Transaction(func(tx *gorm.DB) error {
if txErr := b.crossMessageOrm.InsertOrUpdateL1Messages(ctx, l1FetcherResult.DepositMessages, tx); txErr != nil {
log.Error("failed to insert L1 deposit messages", "err", txErr)
return txErr
}
if txErr := b.crossMessageOrm.InsertOrUpdateL1RelayedMessagesOfL2Withdrawals(ctx, l1FetcherResult.RelayedMessages, tx); txErr != nil {
log.Error("failed to update L1 relayed messages of L2 withdrawals", "err", txErr)
return txErr
}
if txErr := b.batchEventOrm.InsertOrUpdateBatchEvents(ctx, l1FetcherResult.BatchEvents, tx); txErr != nil {
log.Error("failed to insert or update batch events", "err", txErr)
return txErr
}
if txErr := b.crossMessageOrm.UpdateL1MessageQueueEventsInfo(ctx, l1FetcherResult.MessageQueueEvents, tx); txErr != nil {
log.Error("failed to insert L1 message queue events", "err", txErr)
return txErr
}
if txErr := b.crossMessageOrm.InsertFailedGatewayRouterTxs(ctx, l1FetcherResult.RevertedTxs, tx); txErr != nil {
log.Error("failed to insert L1 failed gateway router transactions", "err", txErr)
return txErr
}
return nil
})
if err != nil {
log.Error("failed to update db of L1 events", "err", err)
if err := b.crossMessageOrm.InsertOrUpdateL1Messages(ctx, l1FetcherResult.DepositMessages); err != nil {
log.Error("failed to insert L1 deposit messages", "err", err)
return err
}
if err := b.crossMessageOrm.InsertOrUpdateL1RelayedMessagesOfL2Withdrawals(ctx, l1FetcherResult.RelayedMessages); err != nil {
log.Error("failed to update L1 relayed messages of L2 withdrawals", "err", err)
return err
}
if err := b.batchEventOrm.InsertOrUpdateBatchEvents(ctx, l1FetcherResult.BatchEvents); err != nil {
log.Error("failed to insert or update batch events", "err", err)
return err
}
if err := b.crossMessageOrm.UpdateL1MessageQueueEventsInfo(ctx, l1FetcherResult.MessageQueueEvents); err != nil {
log.Error("failed to insert L1 message queue events", "err", err)
return err
}
if err := b.crossMessageOrm.InsertFailedL1GatewayTxs(ctx, l1FetcherResult.RevertedTxs); err != nil {
log.Error("failed to insert failed L1 gateway transactions", "err", err)
return err
}
return nil
}
@@ -186,24 +177,18 @@ func (b *EventUpdateLogic) UpdateL1BatchIndexAndStatus(ctx context.Context, heig
// L2InsertOrUpdate inserts or updates L2 messages
func (b *EventUpdateLogic) L2InsertOrUpdate(ctx context.Context, l2FetcherResult *L2FilterResult) error {
err := b.db.Transaction(func(tx *gorm.DB) error {
if txErr := b.crossMessageOrm.InsertOrUpdateL2Messages(ctx, l2FetcherResult.WithdrawMessages, tx); txErr != nil {
log.Error("failed to insert L2 withdrawal messages", "err", txErr)
return txErr
}
if txErr := b.crossMessageOrm.InsertOrUpdateL2RelayedMessagesOfL1Deposits(ctx, l2FetcherResult.RelayedMessages, tx); txErr != nil {
log.Error("failed to update L2 relayed messages of L1 deposits", "err", txErr)
return txErr
}
if txErr := b.crossMessageOrm.InsertFailedGatewayRouterTxs(ctx, l2FetcherResult.OtherRevertedTxs, tx); txErr != nil {
log.Error("failed to insert L2 failed gateway router transactions", "err", txErr)
return txErr
}
return nil
})
if err := b.crossMessageOrm.InsertOrUpdateL2Messages(ctx, l2FetcherResult.WithdrawMessages); err != nil {
log.Error("failed to insert L2 withdrawal messages", "err", err)
return err
}
if err != nil {
log.Error("failed to update db of L2 events", "err", err)
if err := b.crossMessageOrm.InsertOrUpdateL2RelayedMessagesOfL1Deposits(ctx, l2FetcherResult.RelayedMessages); err != nil {
log.Error("failed to update L2 relayed messages of L1 deposits", "err", err)
return err
}
if err := b.crossMessageOrm.InsertFailedL2GatewayTxs(ctx, l2FetcherResult.OtherRevertedTxs); err != nil {
log.Error("failed to insert failed L2 gateway transactions", "err", err)
return err
}
return nil

View File

@@ -2,6 +2,7 @@ package logic
import (
"context"
"math/big"
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/core/types"
@@ -10,21 +11,27 @@ import (
"github.com/scroll-tech/go-ethereum/log"
backendabi "scroll-tech/bridge-history-api/abi"
"scroll-tech/bridge-history-api/internal/config"
"scroll-tech/bridge-history-api/internal/orm"
"scroll-tech/bridge-history-api/internal/utils"
)
// L1EventParser the l1 event parser
type L1EventParser struct {
cfg *config.FetcherConfig
client *ethclient.Client
}
// NewL1EventParser creates l1 event parser
func NewL1EventParser() *L1EventParser {
return &L1EventParser{}
func NewL1EventParser(cfg *config.FetcherConfig, client *ethclient.Client) *L1EventParser {
return &L1EventParser{
cfg: cfg,
client: client,
}
}
// ParseL1CrossChainEventLogs parses L1 watched cross chain events.
func (e *L1EventParser) ParseL1CrossChainEventLogs(logs []types.Log, blockTimestampsMap map[uint64]uint64) ([]*orm.CrossMessage, []*orm.CrossMessage, error) {
func (e *L1EventParser) ParseL1CrossChainEventLogs(ctx context.Context, logs []types.Log, blockTimestampsMap map[uint64]uint64) ([]*orm.CrossMessage, []*orm.CrossMessage, error) {
var l1DepositMessages []*orm.CrossMessage
var l1RelayedMessages []*orm.CrossMessage
for _, vlog := range logs {
@@ -32,7 +39,7 @@ func (e *L1EventParser) ParseL1CrossChainEventLogs(logs []types.Log, blockTimest
case backendabi.L1DepositETHSig:
event := backendabi.ETHMessageEvent{}
if err := utils.UnpackLog(backendabi.IL1ETHGatewayABI, &event, "DepositETH", vlog); err != nil {
log.Warn("Failed to unpack DepositETH event", "err", err)
log.Error("Failed to unpack DepositETH event", "err", err)
return nil, nil, err
}
lastMessage := l1DepositMessages[len(l1DepositMessages)-1]
@@ -44,7 +51,7 @@ func (e *L1EventParser) ParseL1CrossChainEventLogs(logs []types.Log, blockTimest
event := backendabi.ERC20MessageEvent{}
err := utils.UnpackLog(backendabi.IL1ERC20GatewayABI, &event, "DepositERC20", vlog)
if err != nil {
log.Warn("Failed to unpack DepositERC20 event", "err", err)
log.Error("Failed to unpack DepositERC20 event", "err", err)
return nil, nil, err
}
lastMessage := l1DepositMessages[len(l1DepositMessages)-1]
@@ -57,7 +64,7 @@ func (e *L1EventParser) ParseL1CrossChainEventLogs(logs []types.Log, blockTimest
case backendabi.L1DepositERC721Sig:
event := backendabi.ERC721MessageEvent{}
if err := utils.UnpackLog(backendabi.IL1ERC721GatewayABI, &event, "DepositERC721", vlog); err != nil {
log.Warn("Failed to unpack DepositERC721 event", "err", err)
log.Error("Failed to unpack DepositERC721 event", "err", err)
return nil, nil, err
}
lastMessage := l1DepositMessages[len(l1DepositMessages)-1]
@@ -70,7 +77,7 @@ func (e *L1EventParser) ParseL1CrossChainEventLogs(logs []types.Log, blockTimest
case backendabi.L1BatchDepositERC721Sig:
event := backendabi.BatchERC721MessageEvent{}
if err := utils.UnpackLog(backendabi.IL1ERC721GatewayABI, &event, "BatchDepositERC721", vlog); err != nil {
log.Warn("Failed to unpack BatchDepositERC721 event", "err", err)
log.Error("Failed to unpack BatchDepositERC721 event", "err", err)
return nil, nil, err
}
lastMessage := l1DepositMessages[len(l1DepositMessages)-1]
@@ -83,7 +90,7 @@ func (e *L1EventParser) ParseL1CrossChainEventLogs(logs []types.Log, blockTimest
case backendabi.L1DepositERC1155Sig:
event := backendabi.ERC1155MessageEvent{}
if err := utils.UnpackLog(backendabi.IL1ERC1155GatewayABI, &event, "DepositERC1155", vlog); err != nil {
log.Warn("Failed to unpack DepositERC1155 event", "err", err)
log.Error("Failed to unpack DepositERC1155 event", "err", err)
return nil, nil, err
}
lastMessage := l1DepositMessages[len(l1DepositMessages)-1]
@@ -97,7 +104,7 @@ func (e *L1EventParser) ParseL1CrossChainEventLogs(logs []types.Log, blockTimest
case backendabi.L1BatchDepositERC1155Sig:
event := backendabi.BatchERC1155MessageEvent{}
if err := utils.UnpackLog(backendabi.IL1ERC1155GatewayABI, &event, "BatchDepositERC1155", vlog); err != nil {
log.Warn("Failed to unpack BatchDepositERC1155 event", "err", err)
log.Error("Failed to unpack BatchDepositERC1155 event", "err", err)
return nil, nil, err
}
lastMessage := l1DepositMessages[len(l1DepositMessages)-1]
@@ -111,12 +118,17 @@ func (e *L1EventParser) ParseL1CrossChainEventLogs(logs []types.Log, blockTimest
case backendabi.L1SentMessageEventSig:
event := backendabi.L1SentMessageEvent{}
if err := utils.UnpackLog(backendabi.IL1ScrollMessengerABI, &event, "SentMessage", vlog); err != nil {
log.Warn("Failed to unpack SentMessage event", "err", err)
log.Error("Failed to unpack SentMessage event", "err", err)
return nil, nil, err
}
from, err := getRealFromAddress(ctx, event.Sender, e.client, vlog.TxHash, e.cfg.GatewayRouterAddr)
if err != nil {
log.Error("Failed to get real 'from' address", "err", err)
return nil, nil, err
}
l1DepositMessages = append(l1DepositMessages, &orm.CrossMessage{
L1BlockNumber: vlog.BlockNumber,
Sender: event.Sender.String(),
Sender: from,
Receiver: event.Target.String(),
TokenType: int(orm.TokenTypeETH),
L1TxHash: vlog.TxHash.String(),
@@ -130,7 +142,7 @@ func (e *L1EventParser) ParseL1CrossChainEventLogs(logs []types.Log, blockTimest
case backendabi.L1RelayedMessageEventSig:
event := backendabi.L1RelayedMessageEvent{}
if err := utils.UnpackLog(backendabi.IL1ScrollMessengerABI, &event, "RelayedMessage", vlog); err != nil {
log.Warn("Failed to unpack RelayedMessage event", "err", err)
log.Error("Failed to unpack RelayedMessage event", "err", err)
return nil, nil, err
}
l1RelayedMessages = append(l1RelayedMessages, &orm.CrossMessage{
@@ -143,7 +155,7 @@ func (e *L1EventParser) ParseL1CrossChainEventLogs(logs []types.Log, blockTimest
case backendabi.L1FailedRelayedMessageEventSig:
event := backendabi.L1FailedRelayedMessageEvent{}
if err := utils.UnpackLog(backendabi.IL1ScrollMessengerABI, &event, "FailedRelayedMessage", vlog); err != nil {
log.Warn("Failed to unpack FailedRelayedMessage event", "err", err)
log.Error("Failed to unpack FailedRelayedMessage event", "err", err)
return nil, nil, err
}
l1RelayedMessages = append(l1RelayedMessages, &orm.CrossMessage{
@@ -166,17 +178,17 @@ func (e *L1EventParser) ParseL1BatchEventLogs(ctx context.Context, logs []types.
case backendabi.L1CommitBatchEventSig:
event := backendabi.L1CommitBatchEvent{}
if err := utils.UnpackLog(backendabi.IScrollChainABI, &event, "CommitBatch", vlog); err != nil {
log.Warn("Failed to unpack CommitBatch event", "err", err)
log.Error("Failed to unpack CommitBatch event", "err", err)
return nil, err
}
commitTx, isPending, err := client.TransactionByHash(ctx, vlog.TxHash)
if err != nil || isPending {
log.Warn("Failed to get commit Batch tx receipt or the tx is still pending", "err", err)
log.Error("Failed to get commit batch tx or the tx is still pending", "err", err, "isPending", isPending)
return nil, err
}
startBlock, endBlock, err := utils.GetBatchRangeFromCalldata(commitTx.Data())
if err != nil {
log.Warn("Failed to get batch range from calldata", "hash", commitTx.Hash().String(), "height", vlog.BlockNumber)
log.Error("Failed to get batch range from calldata", "hash", commitTx.Hash().String(), "height", vlog.BlockNumber)
return nil, err
}
l1BatchEvents = append(l1BatchEvents, &orm.BatchEvent{
@@ -190,7 +202,7 @@ func (e *L1EventParser) ParseL1BatchEventLogs(ctx context.Context, logs []types.
case backendabi.L1RevertBatchEventSig:
event := backendabi.L1RevertBatchEvent{}
if err := utils.UnpackLog(backendabi.IScrollChainABI, &event, "RevertBatch", vlog); err != nil {
log.Warn("Failed to unpack RevertBatch event", "err", err)
log.Error("Failed to unpack RevertBatch event", "err", err)
return nil, err
}
l1BatchEvents = append(l1BatchEvents, &orm.BatchEvent{
@@ -202,7 +214,7 @@ func (e *L1EventParser) ParseL1BatchEventLogs(ctx context.Context, logs []types.
case backendabi.L1FinalizeBatchEventSig:
event := backendabi.L1FinalizeBatchEvent{}
if err := utils.UnpackLog(backendabi.IScrollChainABI, &event, "FinalizeBatch", vlog); err != nil {
log.Warn("Failed to unpack FinalizeBatch event", "err", err)
log.Error("Failed to unpack FinalizeBatch event", "err", err)
return nil, err
}
l1BatchEvents = append(l1BatchEvents, &orm.BatchEvent{
@@ -229,7 +241,7 @@ func (e *L1EventParser) ParseL1MessageQueueEventLogs(logs []types.Log, l1Deposit
case backendabi.L1QueueTransactionEventSig:
event := backendabi.L1QueueTransactionEvent{}
if err := utils.UnpackLog(backendabi.IL1MessageQueueABI, &event, "QueueTransaction", vlog); err != nil {
log.Warn("Failed to unpack QueueTransaction event", "err", err)
log.Error("Failed to unpack QueueTransaction event", "err", err)
return nil, err
}
messageHash := common.BytesToHash(crypto.Keccak256(event.Data))
@@ -245,7 +257,7 @@ func (e *L1EventParser) ParseL1MessageQueueEventLogs(logs []types.Log, l1Deposit
case backendabi.L1DequeueTransactionEventSig:
event := backendabi.L1DequeueTransactionEvent{}
if err := utils.UnpackLog(backendabi.IL1MessageQueueABI, &event, "DequeueTransaction", vlog); err != nil {
log.Warn("Failed to unpack DequeueTransaction event", "err", err)
log.Error("Failed to unpack DequeueTransaction event", "err", err)
return nil, err
}
skippedIndices := utils.GetSkippedQueueIndices(event.StartIndex.Uint64(), event.SkippedBitmap)
@@ -258,7 +270,7 @@ func (e *L1EventParser) ParseL1MessageQueueEventLogs(logs []types.Log, l1Deposit
case backendabi.L1DropTransactionEventSig:
event := backendabi.L1DropTransactionEvent{}
if err := utils.UnpackLog(backendabi.IL1MessageQueueABI, &event, "DropTransaction", vlog); err != nil {
log.Warn("Failed to unpack DropTransaction event", "err", err)
log.Error("Failed to unpack DropTransaction event", "err", err)
return nil, err
}
l1MessageQueueEvents = append(l1MessageQueueEvents, &orm.MessageQueueEvent{
@@ -270,3 +282,27 @@ func (e *L1EventParser) ParseL1MessageQueueEventLogs(logs []types.Log, l1Deposit
}
return l1MessageQueueEvents, nil
}
func getRealFromAddress(ctx context.Context, eventSender common.Address, client *ethclient.Client, txHash common.Hash, gatewayRouterAddr string) (string, error) {
from := eventSender.String()
if from == gatewayRouterAddr {
tx, isPending, rpcErr := client.TransactionByHash(ctx, txHash)
if rpcErr != nil || isPending {
log.Error("Failed to get transaction or the transaction is still pending", "rpcErr", rpcErr, "isPending", isPending)
return "", rpcErr
}
// Case 1: deposit/withdraw ETH: EOA -> multisig -> gateway router -> messenger.
if tx.To() != nil && (*tx.To()).String() != gatewayRouterAddr {
return (*tx.To()).String(), nil
}
// Case 2: deposit/withdraw ETH: EOA -> gateway router -> messenger.
signer := types.LatestSignerForChainID(new(big.Int).SetUint64(tx.ChainId().Uint64()))
sender, err := signer.Sender(tx)
if err != nil {
log.Error("Get sender failed", "chain id", tx.ChainId().Uint64(), "tx hash", tx.Hash().String(), "err", err)
return "", err
}
return sender.String(), nil
}
return from, nil
}

View File

@@ -34,9 +34,10 @@ type L1FilterResult struct {
// L1FetcherLogic the L1 fetcher logic
type L1FetcherLogic struct {
cfg *config.LayerConfig
cfg *config.FetcherConfig
client *ethclient.Client
addressList []common.Address
gatewayList []common.Address
parser *L1EventParser
db *gorm.DB
crossMessageOrm *orm.CrossMessage
@@ -46,7 +47,7 @@ type L1FetcherLogic struct {
}
// NewL1FetcherLogic creates L1 fetcher logic
func NewL1FetcherLogic(cfg *config.LayerConfig, db *gorm.DB, client *ethclient.Client) *L1FetcherLogic {
func NewL1FetcherLogic(cfg *config.FetcherConfig, db *gorm.DB, client *ethclient.Client) *L1FetcherLogic {
addressList := []common.Address{
common.HexToAddress(cfg.ETHGatewayAddr),
@@ -65,16 +66,34 @@ func NewL1FetcherLogic(cfg *config.LayerConfig, db *gorm.DB, client *ethclient.C
common.HexToAddress(cfg.MessageQueueAddr),
}
gatewayList := []common.Address{
common.HexToAddress(cfg.ETHGatewayAddr),
common.HexToAddress(cfg.StandardERC20GatewayAddr),
common.HexToAddress(cfg.CustomERC20GatewayAddr),
common.HexToAddress(cfg.WETHGatewayAddr),
common.HexToAddress(cfg.DAIGatewayAddr),
common.HexToAddress(cfg.ERC721GatewayAddr),
common.HexToAddress(cfg.ERC1155GatewayAddr),
common.HexToAddress(cfg.MessengerAddr),
common.HexToAddress(cfg.GatewayRouterAddr),
}
// Optional erc20 gateways.
if common.HexToAddress(cfg.USDCGatewayAddr) != (common.Address{}) {
addressList = append(addressList, common.HexToAddress(cfg.USDCGatewayAddr))
gatewayList = append(gatewayList, common.HexToAddress(cfg.USDCGatewayAddr))
}
if common.HexToAddress(cfg.LIDOGatewayAddr) != (common.Address{}) {
addressList = append(addressList, common.HexToAddress(cfg.LIDOGatewayAddr))
gatewayList = append(gatewayList, common.HexToAddress(cfg.LIDOGatewayAddr))
}
log.Info("L1 Fetcher configured with the following address list", "addresses", addressList)
log.Info("L1 Fetcher configured with the following address list", "addresses", addressList, "gateways", gatewayList)
f := &L1FetcherLogic{
db: db,
@@ -83,7 +102,8 @@ func NewL1FetcherLogic(cfg *config.LayerConfig, db *gorm.DB, client *ethclient.C
cfg: cfg,
client: client,
addressList: addressList,
parser: NewL1EventParser(),
gatewayList: gatewayList,
parser: NewL1EventParser(cfg, client),
}
reg := prometheus.DefaultRegisterer
@@ -131,15 +151,9 @@ func (f *L1FetcherLogic) getRevertedTxs(ctx context.Context, from, to uint64, bl
blockTimestampsMap[block.NumberU64()] = block.Time()
for _, tx := range block.Transactions() {
txTo := tx.To()
if txTo == nil {
continue
}
toAddress := txTo.String()
// GatewayRouter: L1 deposit.
// Gateways: L1 deposit.
// Messenger: L1 deposit retry (replayMessage), L1 deposit refund (dropMessage), L2 withdrawal's claim (relayMessageWithProof).
if toAddress != f.cfg.GatewayRouterAddr && toAddress != f.cfg.MessengerAddr {
if !isTransactionToGateway(tx, f.gatewayList) {
continue
}
@@ -233,7 +247,7 @@ func (f *L1FetcherLogic) L1Fetcher(ctx context.Context, from, to uint64, lastBlo
return false, 0, common.Hash{}, nil, err
}
l1DepositMessages, l1RelayedMessages, err := f.parser.ParseL1CrossChainEventLogs(eventLogs, blockTimestampsMap)
l1DepositMessages, l1RelayedMessages, err := f.parser.ParseL1CrossChainEventLogs(ctx, eventLogs, blockTimestampsMap)
if err != nil {
log.Error("failed to parse L1 cross chain event logs", "from", from, "to", to, "err", err)
return false, 0, common.Hash{}, nil, err

View File

@@ -1,26 +1,35 @@
package logic
import (
"context"
"github.com/scroll-tech/go-ethereum/common/hexutil"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/ethclient"
"github.com/scroll-tech/go-ethereum/log"
backendabi "scroll-tech/bridge-history-api/abi"
"scroll-tech/bridge-history-api/internal/config"
"scroll-tech/bridge-history-api/internal/orm"
"scroll-tech/bridge-history-api/internal/utils"
)
// L2EventParser the L2 event parser
type L2EventParser struct {
cfg *config.FetcherConfig
client *ethclient.Client
}
// NewL2EventParser creates the L2 event parser
func NewL2EventParser() *L2EventParser {
return &L2EventParser{}
func NewL2EventParser(cfg *config.FetcherConfig, client *ethclient.Client) *L2EventParser {
return &L2EventParser{
cfg: cfg,
client: client,
}
}
// ParseL2EventLogs parses L2 watched events
func (e *L2EventParser) ParseL2EventLogs(logs []types.Log, blockTimestampsMap map[uint64]uint64) ([]*orm.CrossMessage, []*orm.CrossMessage, error) {
func (e *L2EventParser) ParseL2EventLogs(ctx context.Context, logs []types.Log, blockTimestampsMap map[uint64]uint64) ([]*orm.CrossMessage, []*orm.CrossMessage, error) {
var l2WithdrawMessages []*orm.CrossMessage
var l2RelayedMessages []*orm.CrossMessage
for _, vlog := range logs {
@@ -29,7 +38,7 @@ func (e *L2EventParser) ParseL2EventLogs(logs []types.Log, blockTimestampsMap ma
event := backendabi.ETHMessageEvent{}
err := utils.UnpackLog(backendabi.IL2ETHGatewayABI, &event, "WithdrawETH", vlog)
if err != nil {
log.Warn("Failed to unpack WithdrawETH event", "err", err)
log.Error("Failed to unpack WithdrawETH event", "err", err)
return nil, nil, err
}
lastMessage := l2WithdrawMessages[len(l2WithdrawMessages)-1]
@@ -41,7 +50,7 @@ func (e *L2EventParser) ParseL2EventLogs(logs []types.Log, blockTimestampsMap ma
event := backendabi.ERC20MessageEvent{}
err := utils.UnpackLog(backendabi.IL2ERC20GatewayABI, &event, "WithdrawERC20", vlog)
if err != nil {
log.Warn("Failed to unpack WithdrawERC20 event", "err", err)
log.Error("Failed to unpack WithdrawERC20 event", "err", err)
return nil, nil, err
}
lastMessage := l2WithdrawMessages[len(l2WithdrawMessages)-1]
@@ -55,7 +64,7 @@ func (e *L2EventParser) ParseL2EventLogs(logs []types.Log, blockTimestampsMap ma
event := backendabi.ERC721MessageEvent{}
err := utils.UnpackLog(backendabi.IL2ERC721GatewayABI, &event, "WithdrawERC721", vlog)
if err != nil {
log.Warn("Failed to unpack WithdrawERC721 event", "err", err)
log.Error("Failed to unpack WithdrawERC721 event", "err", err)
return nil, nil, err
}
lastMessage := l2WithdrawMessages[len(l2WithdrawMessages)-1]
@@ -69,7 +78,7 @@ func (e *L2EventParser) ParseL2EventLogs(logs []types.Log, blockTimestampsMap ma
event := backendabi.BatchERC721MessageEvent{}
err := utils.UnpackLog(backendabi.IL2ERC721GatewayABI, &event, "BatchWithdrawERC721", vlog)
if err != nil {
log.Warn("Failed to unpack BatchWithdrawERC721 event", "err", err)
log.Error("Failed to unpack BatchWithdrawERC721 event", "err", err)
return nil, nil, err
}
lastMessage := l2WithdrawMessages[len(l2WithdrawMessages)-1]
@@ -83,7 +92,7 @@ func (e *L2EventParser) ParseL2EventLogs(logs []types.Log, blockTimestampsMap ma
event := backendabi.ERC1155MessageEvent{}
err := utils.UnpackLog(backendabi.IL2ERC1155GatewayABI, &event, "WithdrawERC1155", vlog)
if err != nil {
log.Warn("Failed to unpack WithdrawERC1155 event", "err", err)
log.Error("Failed to unpack WithdrawERC1155 event", "err", err)
return nil, nil, err
}
lastMessage := l2WithdrawMessages[len(l2WithdrawMessages)-1]
@@ -98,7 +107,7 @@ func (e *L2EventParser) ParseL2EventLogs(logs []types.Log, blockTimestampsMap ma
event := backendabi.BatchERC1155MessageEvent{}
err := utils.UnpackLog(backendabi.IL2ERC1155GatewayABI, &event, "BatchWithdrawERC1155", vlog)
if err != nil {
log.Warn("Failed to unpack BatchWithdrawERC1155 event", "err", err)
log.Error("Failed to unpack BatchWithdrawERC1155 event", "err", err)
return nil, nil, err
}
lastMessage := l2WithdrawMessages[len(l2WithdrawMessages)-1]
@@ -113,12 +122,17 @@ func (e *L2EventParser) ParseL2EventLogs(logs []types.Log, blockTimestampsMap ma
event := backendabi.L2SentMessageEvent{}
err := utils.UnpackLog(backendabi.IL2ScrollMessengerABI, &event, "SentMessage", vlog)
if err != nil {
log.Warn("Failed to unpack SentMessage event", "err", err)
log.Error("Failed to unpack SentMessage event", "err", err)
return nil, nil, err
}
from, err := getRealFromAddress(ctx, event.Sender, e.client, vlog.TxHash, e.cfg.GatewayRouterAddr)
if err != nil {
log.Error("Failed to get real 'from' address", "err", err)
return nil, nil, err
}
l2WithdrawMessages = append(l2WithdrawMessages, &orm.CrossMessage{
MessageHash: utils.ComputeMessageHash(event.Sender, event.Target, event.Value, event.MessageNonce, event.Message).String(),
Sender: event.Sender.String(),
Sender: from,
Receiver: event.Target.String(),
TokenType: int(orm.TokenTypeETH),
L2TxHash: vlog.TxHash.String(),
@@ -137,7 +151,7 @@ func (e *L2EventParser) ParseL2EventLogs(logs []types.Log, blockTimestampsMap ma
event := backendabi.L2RelayedMessageEvent{}
err := utils.UnpackLog(backendabi.IL2ScrollMessengerABI, &event, "RelayedMessage", vlog)
if err != nil {
log.Warn("Failed to unpack RelayedMessage event", "err", err)
log.Error("Failed to unpack RelayedMessage event", "err", err)
return nil, nil, err
}
l2RelayedMessages = append(l2RelayedMessages, &orm.CrossMessage{
@@ -151,7 +165,7 @@ func (e *L2EventParser) ParseL2EventLogs(logs []types.Log, blockTimestampsMap ma
event := backendabi.L2RelayedMessageEvent{}
err := utils.UnpackLog(backendabi.IL2ScrollMessengerABI, &event, "FailedRelayedMessage", vlog)
if err != nil {
log.Warn("Failed to unpack FailedRelayedMessage event", "err", err)
log.Error("Failed to unpack FailedRelayedMessage event", "err", err)
return nil, nil, err
}
l2RelayedMessages = append(l2RelayedMessages, &orm.CrossMessage{

View File

@@ -33,9 +33,10 @@ type L2FilterResult struct {
// L2FetcherLogic the L2 fetcher logic
type L2FetcherLogic struct {
cfg *config.LayerConfig
cfg *config.FetcherConfig
client *ethclient.Client
addressList []common.Address
gatewayList []common.Address
parser *L2EventParser
db *gorm.DB
crossMessageOrm *orm.CrossMessage
@@ -45,7 +46,7 @@ type L2FetcherLogic struct {
}
// NewL2FetcherLogic create L2 fetcher logic
func NewL2FetcherLogic(cfg *config.LayerConfig, db *gorm.DB, client *ethclient.Client) *L2FetcherLogic {
func NewL2FetcherLogic(cfg *config.FetcherConfig, db *gorm.DB, client *ethclient.Client) *L2FetcherLogic {
addressList := []common.Address{
common.HexToAddress(cfg.ETHGatewayAddr),
@@ -60,16 +61,34 @@ func NewL2FetcherLogic(cfg *config.LayerConfig, db *gorm.DB, client *ethclient.C
common.HexToAddress(cfg.MessengerAddr),
}
gatewayList := []common.Address{
common.HexToAddress(cfg.ETHGatewayAddr),
common.HexToAddress(cfg.StandardERC20GatewayAddr),
common.HexToAddress(cfg.CustomERC20GatewayAddr),
common.HexToAddress(cfg.WETHGatewayAddr),
common.HexToAddress(cfg.DAIGatewayAddr),
common.HexToAddress(cfg.ERC721GatewayAddr),
common.HexToAddress(cfg.ERC1155GatewayAddr),
common.HexToAddress(cfg.MessengerAddr),
common.HexToAddress(cfg.GatewayRouterAddr),
}
// Optional erc20 gateways.
if common.HexToAddress(cfg.USDCGatewayAddr) != (common.Address{}) {
addressList = append(addressList, common.HexToAddress(cfg.USDCGatewayAddr))
gatewayList = append(gatewayList, common.HexToAddress(cfg.USDCGatewayAddr))
}
if common.HexToAddress(cfg.LIDOGatewayAddr) != (common.Address{}) {
addressList = append(addressList, common.HexToAddress(cfg.LIDOGatewayAddr))
gatewayList = append(gatewayList, common.HexToAddress(cfg.USDCGatewayAddr))
}
log.Info("L2 Fetcher configured with the following address list", "addresses", addressList)
log.Info("L2 Fetcher configured with the following address list", "addresses", addressList, "gateways", gatewayList)
f := &L2FetcherLogic{
db: db,
@@ -78,7 +97,8 @@ func NewL2FetcherLogic(cfg *config.LayerConfig, db *gorm.DB, client *ethclient.C
cfg: cfg,
client: client,
addressList: addressList,
parser: NewL2EventParser(),
gatewayList: gatewayList,
parser: NewL2EventParser(cfg, client),
}
reg := prometheus.DefaultRegisterer
@@ -127,42 +147,7 @@ func (f *L2FetcherLogic) getRevertedTxs(ctx context.Context, from, to uint64, bl
blockTimestampsMap[block.NumberU64()] = block.Time()
for _, tx := range block.Transactions() {
txTo := tx.To()
if txTo == nil {
continue
}
toAddress := txTo.String()
// GatewayRouter: L2 withdrawal.
if toAddress == f.cfg.GatewayRouterAddr {
receipt, receiptErr := f.client.TransactionReceipt(ctx, tx.Hash())
if receiptErr != nil {
log.Error("Failed to get transaction receipt", "txHash", tx.Hash().String(), "err", receiptErr)
return nil, nil, nil, receiptErr
}
// Check if the transaction is failed
if receipt.Status == types.ReceiptStatusFailed {
signer := types.LatestSignerForChainID(new(big.Int).SetUint64(tx.ChainId().Uint64()))
sender, signerErr := signer.Sender(tx)
if signerErr != nil {
log.Error("get sender failed", "chain id", tx.ChainId().Uint64(), "tx hash", tx.Hash().String(), "err", signerErr)
return nil, nil, nil, signerErr
}
l2RevertedUserTxs = append(l2RevertedUserTxs, &orm.CrossMessage{
L2TxHash: tx.Hash().String(),
MessageType: int(orm.MessageTypeL2SentMessage),
Sender: sender.String(),
Receiver: (*tx.To()).String(),
L2BlockNumber: receipt.BlockNumber.Uint64(),
BlockTimestamp: block.Time(),
TxStatus: int(orm.TxStatusTypeSentTxReverted),
})
}
}
if tx.Type() == types.L1MessageTxType {
if tx.IsL1MessageTx() {
receipt, receiptErr := f.client.TransactionReceipt(ctx, tx.Hash())
if receiptErr != nil {
log.Error("Failed to get transaction receipt", "txHash", tx.Hash().String(), "err", receiptErr)
@@ -179,6 +164,38 @@ func (f *L2FetcherLogic) getRevertedTxs(ctx context.Context, from, to uint64, bl
MessageType: int(orm.MessageTypeL1SentMessage),
})
}
continue
}
// Gateways: L2 withdrawal.
if !isTransactionToGateway(tx, f.gatewayList) {
continue
}
receipt, receiptErr := f.client.TransactionReceipt(ctx, tx.Hash())
if receiptErr != nil {
log.Error("Failed to get transaction receipt", "txHash", tx.Hash().String(), "err", receiptErr)
return nil, nil, nil, receiptErr
}
// Check if the transaction is failed
if receipt.Status == types.ReceiptStatusFailed {
signer := types.LatestSignerForChainID(new(big.Int).SetUint64(tx.ChainId().Uint64()))
sender, signerErr := signer.Sender(tx)
if signerErr != nil {
log.Error("get sender failed", "chain id", tx.ChainId().Uint64(), "tx hash", tx.Hash().String(), "err", signerErr)
return nil, nil, nil, signerErr
}
l2RevertedUserTxs = append(l2RevertedUserTxs, &orm.CrossMessage{
L2TxHash: tx.Hash().String(),
MessageType: int(orm.MessageTypeL2SentMessage),
Sender: sender.String(),
Receiver: (*tx.To()).String(),
L2BlockNumber: receipt.BlockNumber.Uint64(),
BlockTimestamp: block.Time(),
TxStatus: int(orm.TxStatusTypeSentTxReverted),
})
}
}
}
@@ -235,7 +252,7 @@ func (f *L2FetcherLogic) L2Fetcher(ctx context.Context, from, to uint64, lastBlo
return false, 0, common.Hash{}, nil, err
}
l2WithdrawMessages, l2RelayedMessages, err := f.parser.ParseL2EventLogs(eventLogs, blockTimestampsMap)
l2WithdrawMessages, l2RelayedMessages, err := f.parser.ParseL2EventLogs(ctx, eventLogs, blockTimestampsMap)
if err != nil {
log.Error("failed to parse L2 event logs", "from", from, "to", to, "err", err)
return false, 0, common.Hash{}, nil, err
@@ -279,3 +296,15 @@ func (f *L2FetcherLogic) updateMetrics(res L2FilterResult) {
}
}
}
func isTransactionToGateway(tx *types.Transaction, gatewayList []common.Address) bool {
if tx.To() == nil {
return false
}
for _, gateway := range gatewayList {
if *tx.To() == gateway {
return true
}
}
return false
}

View File

@@ -6,6 +6,7 @@ import (
"time"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
// BatchStatusType represents the type of batch status.
@@ -89,19 +90,21 @@ func (c *BatchEvent) GetFinalizedBatchesLEBlockHeight(ctx context.Context, block
}
// InsertOrUpdateBatchEvents inserts a new batch event or updates an existing one based on the BatchStatusType.
func (c *BatchEvent) InsertOrUpdateBatchEvents(ctx context.Context, l1BatchEvents []*BatchEvent, dbTX ...*gorm.DB) error {
func (c *BatchEvent) InsertOrUpdateBatchEvents(ctx context.Context, l1BatchEvents []*BatchEvent) error {
for _, l1BatchEvent := range l1BatchEvents {
db := c.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
}
db = db.WithContext(ctx)
db = db.Model(&BatchEvent{})
updateFields := make(map[string]interface{})
switch BatchStatusType(l1BatchEvent.BatchStatus) {
case BatchStatusTypeCommitted:
// Use the clause to either insert or ignore on conflict
db = db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "batch_hash"}},
DoNothing: true,
})
if err := db.Create(l1BatchEvent).Error; err != nil {
return fmt.Errorf("failed to insert batch event, error: %w", err)
return fmt.Errorf("failed to insert or ignore batch event, error: %w", err)
}
case BatchStatusTypeFinalized:
db = db.Where("batch_index = ?", l1BatchEvent.BatchIndex)

View File

@@ -5,7 +5,6 @@ import (
"fmt"
"time"
"github.com/google/uuid"
"github.com/scroll-tech/go-ethereum/common"
"gorm.io/gorm"
"gorm.io/gorm/clause"
@@ -47,9 +46,9 @@ const (
TxStatusTypeSent TxStatusType = iota
TxStatusTypeSentTxReverted // Not track message hash, thus will not be processed again anymore.
TxStatusTypeRelayed // Terminal status.
// FailedRelayedMessage event: encoded tx failed, cannot retry. e.g., https://sepolia.scrollscan.com/tx/0xfc7d3ea5ec8dc9b664a5a886c3b33d21e665355057601033481a439498efb79a
TxStatusTypeFailedRelayed // Terminal status.
// In some cases, user can retry with a larger gas limit. e.g., https://sepolia.scrollscan.com/tx/0x7323a7ba29492cb47d92206411be99b27896f2823cee0633a596b646b73f1b5b
// Retry: this often occurs due to an out of gas (OOG) issue if the transaction was initiated via the frontend.
TxStatusTypeFailedRelayed
// Retry: this often occurs due to an out of gas (OOG) issue if the transaction was initiated via the frontend.
TxStatusTypeRelayTxReverted
TxStatusTypeSkipped
TxStatusTypeDropped // Terminal status.
@@ -254,18 +253,14 @@ func (c *CrossMessage) GetTxsByAddress(ctx context.Context, sender string) ([]*C
}
// UpdateL1MessageQueueEventsInfo updates the information about L1 message queue events in the database.
func (c *CrossMessage) UpdateL1MessageQueueEventsInfo(ctx context.Context, l1MessageQueueEvents []*MessageQueueEvent, dbTX ...*gorm.DB) error {
func (c *CrossMessage) UpdateL1MessageQueueEventsInfo(ctx context.Context, l1MessageQueueEvents []*MessageQueueEvent) error {
// update tx statuses.
for _, l1MessageQueueEvent := range l1MessageQueueEvents {
db := c.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
}
db = db.WithContext(ctx)
db = db.Model(&CrossMessage{})
// do not over-write terminal statuses.
db = db.Where("tx_status != ?", TxStatusTypeRelayed)
db = db.Where("tx_status != ?", TxStatusTypeFailedRelayed)
db = db.Where("tx_status != ?", TxStatusTypeDropped)
txStatusUpdateFields := make(map[string]interface{})
switch l1MessageQueueEvent.EventType {
@@ -298,13 +293,12 @@ func (c *CrossMessage) UpdateL1MessageQueueEventsInfo(ctx context.Context, l1Mes
// update tx hashes of replay and refund.
for _, l1MessageQueueEvent := range l1MessageQueueEvents {
db := c.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
}
db = db.WithContext(ctx)
db = db.Model(&CrossMessage{})
txHashUpdateFields := make(map[string]interface{})
switch l1MessageQueueEvent.EventType {
case MessageQueueEventTypeDequeueTransaction:
continue
case MessageQueueEventTypeQueueTransaction:
// only replayMessages or enforced txs (whose message hashes would not be found), sentMessages have been filtered out.
db = db.Where("message_hash = ?", l1MessageQueueEvent.MessageHash.String())
@@ -314,11 +308,8 @@ func (c *CrossMessage) UpdateL1MessageQueueEventsInfo(ctx context.Context, l1Mes
db = db.Where("message_type = ?", MessageTypeL1SentMessage)
txHashUpdateFields["l1_refund_tx_hash"] = l1MessageQueueEvent.TxHash.String()
}
// Check if there are fields to update to avoid empty update operation (skip message).
if len(txHashUpdateFields) > 0 {
if err := db.Updates(txHashUpdateFields).Error; err != nil {
return fmt.Errorf("failed to update tx hashes of replay and refund in L1 message queue events info, update fields: %v, error: %w", txHashUpdateFields, err)
}
if err := db.Updates(txHashUpdateFields).Error; err != nil {
return fmt.Errorf("failed to update tx hashes of replay and refund in L1 message queue events info, update fields: %v, error: %w", txHashUpdateFields, err)
}
}
return nil
@@ -362,14 +353,11 @@ func (c *CrossMessage) UpdateBatchIndexRollupStatusMerkleProofOfL2Messages(ctx c
}
// InsertOrUpdateL1Messages inserts or updates a list of L1 cross messages into the database.
func (c *CrossMessage) InsertOrUpdateL1Messages(ctx context.Context, messages []*CrossMessage, dbTX ...*gorm.DB) error {
func (c *CrossMessage) InsertOrUpdateL1Messages(ctx context.Context, messages []*CrossMessage) error {
if len(messages) == 0 {
return nil
}
db := c.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
}
db = db.WithContext(ctx)
db = db.Model(&CrossMessage{})
// 'tx_status' column is not explicitly assigned during the update to prevent a later status from being overwritten back to "sent".
@@ -384,18 +372,14 @@ func (c *CrossMessage) InsertOrUpdateL1Messages(ctx context.Context, messages []
}
// InsertOrUpdateL2Messages inserts or updates a list of L2 cross messages into the database.
func (c *CrossMessage) InsertOrUpdateL2Messages(ctx context.Context, messages []*CrossMessage, dbTX ...*gorm.DB) error {
func (c *CrossMessage) InsertOrUpdateL2Messages(ctx context.Context, messages []*CrossMessage) error {
if len(messages) == 0 {
return nil
}
db := c.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
}
db = db.WithContext(ctx)
db = db.Model(&CrossMessage{})
// 'tx_status' column is not explicitly assigned during the update to prevent a later status from being overwritten back to "sent".
// The merkle_proof is updated separately in batch status updates and hence is not included here.
db = db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "message_hash"}},
DoUpdates: clause.AssignmentColumns([]string{"sender", "receiver", "token_type", "l2_block_number", "l2_tx_hash", "l1_token_address", "l2_token_address", "token_ids", "token_amounts", "message_type", "block_timestamp", "message_from", "message_to", "message_value", "message_data", "message_nonce"}),
@@ -406,31 +390,60 @@ func (c *CrossMessage) InsertOrUpdateL2Messages(ctx context.Context, messages []
return nil
}
// InsertFailedGatewayRouterTxs inserts a list of transactions that failed to interact with the gateway router into the database.
// These failed transactions are only fetched once, so they are inserted without checking for duplicates.
// To resolve unique index confliction, a random UUID will be generated and used as the MessageHash.
func (c *CrossMessage) InsertFailedGatewayRouterTxs(ctx context.Context, messages []*CrossMessage, dbTX ...*gorm.DB) error {
// InsertFailedL2GatewayTxs inserts a list of transactions that failed to interact with the L2 gateways into the database.
// To resolve unique index confliction, L2 tx hash is used as the MessageHash.
// The OnConflict clause is used to prevent inserting same failed transactions multiple times.
func (c *CrossMessage) InsertFailedL2GatewayTxs(ctx context.Context, messages []*CrossMessage) error {
if len(messages) == 0 {
return nil
}
db := c.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
for _, message := range messages {
message.MessageHash = message.L2TxHash
}
db := c.db
db = db.WithContext(ctx)
db = db.Model(&CrossMessage{})
for _, message := range messages {
message.MessageHash = uuid.New().String()
db = db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "message_hash"}},
DoNothing: true,
})
if err := db.Create(&messages).Error; err != nil {
return fmt.Errorf("failed to insert failed gateway router txs, error: %w", err)
}
if err := db.Create(messages).Error; err != nil {
return nil
}
// InsertFailedL1GatewayTxs inserts a list of transactions that failed to interact with the L1 gateways into the database.
// To resolve unique index confliction, L1 tx hash is used as the MessageHash.
// The OnConflict clause is used to prevent inserting same failed transactions multiple times.
func (c *CrossMessage) InsertFailedL1GatewayTxs(ctx context.Context, messages []*CrossMessage) error {
if len(messages) == 0 {
return nil
}
for _, message := range messages {
message.MessageHash = message.L1TxHash
}
db := c.db
db = db.WithContext(ctx)
db = db.Model(&CrossMessage{})
db = db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "message_hash"}},
DoNothing: true,
})
if err := db.Create(&messages).Error; err != nil {
return fmt.Errorf("failed to insert failed gateway router txs, error: %w", err)
}
return nil
}
// InsertOrUpdateL2RelayedMessagesOfL1Deposits inserts or updates the database with a list of L2 relayed messages related to L1 deposits.
func (c *CrossMessage) InsertOrUpdateL2RelayedMessagesOfL1Deposits(ctx context.Context, l2RelayedMessages []*CrossMessage, dbTX ...*gorm.DB) error {
func (c *CrossMessage) InsertOrUpdateL2RelayedMessagesOfL1Deposits(ctx context.Context, l2RelayedMessages []*CrossMessage) error {
if len(l2RelayedMessages) == 0 {
return nil
}
@@ -459,7 +472,7 @@ func (c *CrossMessage) InsertOrUpdateL2RelayedMessagesOfL1Deposits(ctx context.C
for _, msg := range mergedL2RelayedMessages {
uniqueL2RelayedMessages = append(uniqueL2RelayedMessages, msg)
}
// Do not update tx status of successfully or failed relayed messages,
// Do not update tx status of successfully relayed messages,
// because if a message is handled, the later relayed message tx would be reverted.
// ref: https://github.com/scroll-tech/scroll/blob/v4.3.44/contracts/src/L2/L2ScrollMessenger.sol#L102
// e.g.,
@@ -476,7 +489,6 @@ func (c *CrossMessage) InsertOrUpdateL2RelayedMessagesOfL1Deposits(ctx context.C
clause.And(
// do not over-write terminal statuses.
clause.Neq{Column: "cross_message_v2.tx_status", Value: TxStatusTypeRelayed},
clause.Neq{Column: "cross_message_v2.tx_status", Value: TxStatusTypeFailedRelayed},
clause.Neq{Column: "cross_message_v2.tx_status", Value: TxStatusTypeDropped},
),
},
@@ -489,7 +501,7 @@ func (c *CrossMessage) InsertOrUpdateL2RelayedMessagesOfL1Deposits(ctx context.C
}
// InsertOrUpdateL1RelayedMessagesOfL2Withdrawals inserts or updates the database with a list of L1 relayed messages related to L2 withdrawals.
func (c *CrossMessage) InsertOrUpdateL1RelayedMessagesOfL2Withdrawals(ctx context.Context, l1RelayedMessages []*CrossMessage, dbTX ...*gorm.DB) error {
func (c *CrossMessage) InsertOrUpdateL1RelayedMessagesOfL2Withdrawals(ctx context.Context, l1RelayedMessages []*CrossMessage) error {
if len(l1RelayedMessages) == 0 {
return nil
}
@@ -519,9 +531,6 @@ func (c *CrossMessage) InsertOrUpdateL1RelayedMessagesOfL2Withdrawals(ctx contex
uniqueL1RelayedMessages = append(uniqueL1RelayedMessages, msg)
}
db := c.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
}
db = db.WithContext(ctx)
db = db.Model(&CrossMessage{})
db = db.Clauses(clause.OnConflict{
@@ -532,7 +541,6 @@ func (c *CrossMessage) InsertOrUpdateL1RelayedMessagesOfL2Withdrawals(ctx contex
clause.And(
// do not over-write terminal statuses.
clause.Neq{Column: "cross_message_v2.tx_status", Value: TxStatusTypeRelayed},
clause.Neq{Column: "cross_message_v2.tx_status", Value: TxStatusTypeFailedRelayed},
clause.Neq{Column: "cross_message_v2.tx_status", Value: TxStatusTypeDropped},
),
},

View File

@@ -15,6 +15,7 @@ CREATE TABLE batch_event_v2
deleted_at TIMESTAMP(0) DEFAULT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS unique_idx_be_batch_hash ON batch_event_v2 (batch_hash);
CREATE INDEX IF NOT EXISTS idx_be_l1_block_number ON batch_event_v2 (l1_block_number);
CREATE INDEX IF NOT EXISTS idx_be_batch_index ON batch_event_v2 (batch_index);
CREATE INDEX IF NOT EXISTS idx_be_batch_index_batch_hash ON batch_event_v2 (batch_index, batch_hash);

View File

@@ -5,7 +5,7 @@ import (
"runtime/debug"
)
var tag = "v4.3.55"
var tag = "v4.3.56"
var commit = func() string {
if info, ok := debug.ReadBuildInfo(); ok {