Compare commits

..

49 Commits

Author SHA1 Message Date
colinlyguo
edffc64bcc fix 2023-12-17 18:16:12 +08:00
colinlyguo
acd7845383 fix 2023-12-17 14:39:31 +08:00
colinlyguo
5e2fe452d2 change start height 2023-12-16 23:52:43 +08:00
colinlyguo
a5ca547961 bug fixes 2023-12-16 23:33:34 +08:00
georgehao
8d034de87e feat: tweak some code (#1052)
Co-authored-by: colin <102356659+colinlyguo@users.noreply.github.com>
2023-12-15 18:22:23 +08:00
colinlyguo
9484c1c85f Merge branch 'develop' into fix-bridge-history-api-write-db 2023-12-14 22:43:42 +08:00
georgehao
7011c3ee2e feat: update some layer logic (#1050)
Co-authored-by: colinlyguo <colinlyguo@scroll.io>
2023-12-14 22:11:17 +08:00
colinlyguo
2f7e329aec fix duplicated update reverted txs 2023-12-14 09:05:19 +08:00
colinlyguo
33ec1e1dde add withdraw root check 2023-12-14 00:22:17 +08:00
colinlyguo
5c1de26ccc split relayed failed to failed relayed and relayed tx reverted 2023-12-13 23:47:16 +08:00
colinlyguo
bb1a7f6921 add 0x prefix in merkle proof 2023-12-13 23:17:38 +08:00
colinlyguo
fef9f83788 go mod tidy 2023-12-13 17:44:26 +08:00
colinlyguo
1075d7d671 chore: auto version bump [bot] 2023-12-13 09:42:52 +00:00
colinlyguo
420c96d15a go mod tidy 2023-12-13 17:42:27 +08:00
colinlyguo
3f9e50052e Merge branch 'develop' into fix-bridge-history-api-write-db 2023-12-13 17:41:18 +08:00
colinlyguo
04006b3ead update message_type in relayed messages 2023-12-13 17:30:00 +08:00
colinlyguo
7fe751f7c5 fix ERROR: ON CONFLICT DO UPDATE command cannot affect row a second time (SQLSTATE 21000) error 2023-12-13 17:12:21 +08:00
colinlyguo
0da1355c4d tweak Makefile 2023-12-13 16:43:38 +08:00
colinlyguo
9b5a7f7b62 fix go mod tidy 2023-12-13 15:53:24 +08:00
colinlyguo
b534aed080 Merge branch 'develop' into fix-bridge-history-api-write-db 2023-12-13 15:15:13 +08:00
colinlyguo
1d358fa4ed tweak logs 2023-12-13 15:06:18 +08:00
colinlyguo
ebf7a46242 remove useless logs 2023-12-13 13:35:47 +08:00
colinlyguo
cd883a36c2 add batch sync height in db 2023-12-13 05:15:53 +08:00
colin
f11ce33687 perf(bridge-history-api): add db indices and concurrent blocks fetching (#1043) 2023-12-13 04:46:08 +08:00
colinlyguo
e00d5a9548 fix bugs 2023-12-12 16:07:58 +08:00
colinlyguo
faec5a5e35 trigger ci 2023-12-11 20:10:07 +08:00
colinlyguo
3a090eecdd chore: auto version bump [bot] 2023-12-11 09:39:06 +00:00
colinlyguo
bee9489c42 Merge branch 'develop' into fix-bridge-history-api-write-db 2023-12-11 17:38:32 +08:00
colinlyguo
c4c68ef12d chore: auto version bump [bot] 2023-12-11 07:16:40 +00:00
colin
7f5c823a58 Merge branch 'develop' into fix-bridge-history-api-write-db 2023-12-11 15:15:47 +08:00
colin
367636843f fix: avoid duplicated batch updates (#1029)
Co-authored-by: georgehao <haohongfan@gmail.com>
2023-12-11 14:32:13 +08:00
colin
2f1abc06f6 address comments of new bridge history api (#1027) 2023-12-08 11:16:00 +08:00
colinlyguo
cc5fd778c7 tweak 2023-12-07 23:51:50 +08:00
colin
8587116ff8 Update bridge-history-api/internal/config/config.go
Co-authored-by: georgehao <haohongfan@gmail.com>
2023-12-07 23:49:56 +08:00
colinlyguo
a75813e6c1 add PageSize limit 2023-12-07 22:58:29 +08:00
colinlyguo
01d35d7449 add fetching reverted relayed messages and fixes 2023-12-07 17:59:08 +08:00
colinlyguo
59c8470c04 fix batch index update 2023-12-07 16:41:17 +08:00
colinlyguo
5622f8f627 tweak 2023-12-07 14:10:08 +08:00
colinlyguo
5936cd61d6 fixes 2023-12-07 02:43:01 +08:00
colin
f031c38890 Apply suggestions from code review
Co-authored-by: georgehao <haohongfan@gmail.com>
2023-12-06 23:48:56 +08:00
colinlyguo
9bfefa0474 update l1 start block height to first queue event 2023-12-06 18:44:23 +08:00
colinlyguo
a6c72a2ce2 fix 2023-12-06 18:03:25 +08:00
colinlyguo
299f5e46fe change Makefile 2023-12-06 17:14:24 +08:00
colinlyguo
7629f06243 change config default value 2023-12-06 17:12:15 +08:00
colinlyguo
5902bee340 fix 2023-12-06 16:58:53 +08:00
colinlyguo
e79900e24c refactor 2023-12-06 16:55:17 +08:00
colinlyguo
0803dd97fb refactor 2023-12-06 14:33:54 +08:00
colinlyguo
bdbddc38f5 update go mod 2023-12-06 13:39:44 +08:00
colinlyguo
f011fd5ac6 feat: new bridge-history apis 2023-12-06 12:31:28 +08:00
32 changed files with 397 additions and 903 deletions

View File

@@ -33,7 +33,7 @@ Examples of unacceptable behavior include:
* Public or private harassment
* Publishing others' private information, such as a physical or email
address, without their explicit permission
* Other conduct that could reasonably be considered inappropriate in a
* Other conduct which could reasonably be considered inappropriate in a
professional setting
## Enforcement Responsibilities

View File

@@ -13,7 +13,7 @@
## Directory Structure
<pre>
├── <a href="./bridge-history-api/">bridge-history-api</a>: Bridge history service that collects deposit and withdraw events from both L1 and L2 chains and generates withdrawal proofs
├── <a href="./bridge-history-api/">bridge-history-api</a>: Bridge history service that collects deposit and withdraw events from both L1 and L2 chain and generates withdrawal proofs
├── <a href="./common/">common</a>: Common libraries and types
├── <a href="./coordinator/">coordinator</a>: Prover coordinator service that dispatches proving tasks to provers
├── <a href="./database">database</a>: Database client and schema definition

View File

@@ -7,7 +7,7 @@ The bridge-history-api contains three distinct components
### bridgehistoryapi-db-cli
Provide init, show version, rollback, and check status services of DB
Provide init, show version, rollback, check status services of DB
```
cd ./bridge-history-api
make bridgehistoryapi-db-cli
@@ -36,7 +36,7 @@ provides REST APIs. Please refer to the API details below.
1. `/api/txs`
```
// @Summary get all txs under the given address
// @Summary get all txs under given address
// @Accept plain
// @Produce plain
// @Param address query string true "wallet address"
@@ -60,7 +60,7 @@ provides REST APIs. Please refer to the API details below.
3. `/api/l2/unclaimed/withdrawals`
```
// @Summary get all L2 unclaimed withdrawals under the given address
// @Summary get all L2 unclaimed withdrawals under given address
// @Accept plain
// @Produce plain
// @Param address query string true "wallet address"

View File

@@ -68,10 +68,19 @@ func action(ctx *cli.Context) error {
observability.Server(ctx, db)
l1MessageFetcher := fetcher.NewL1MessageFetcher(subCtx, cfg.L1, db, l1Client)
// syncInfo is used to store the shared info between L1 fetcher and L2 fetcher, e.g., the sync height.
syncInfo := &fetcher.SyncInfo{}
l1MessageFetcher, err := fetcher.NewL1MessageFetcher(subCtx, cfg.L1, db, l1Client, syncInfo)
if err != nil {
log.Crit("failed to create L1 cross message fetcher", "error", err)
}
go l1MessageFetcher.Start()
l2MessageFetcher := fetcher.NewL2MessageFetcher(subCtx, cfg.L2, db, l2Client)
l2MessageFetcher, err := fetcher.NewL2MessageFetcher(subCtx, cfg.L2, db, l2Client, syncInfo)
if err != nil {
log.Crit("failed to create L2 cross message fetcher", "error", err)
}
go l2MessageFetcher.Start()
// Catch CTRL-C to ensure a graceful shutdown.

View File

@@ -1,40 +1,36 @@
{
"L1": {
"confirmation": 0,
"endpoint": "https://rpc.ankr.com/eth",
"startHeight": 18306000,
"blockTime": 10,
"fetchLimit": 30,
"MessengerAddr": "0x6774Bcbd5ceCeF1336b5300fb5186a12DDD8b367",
"ETHGatewayAddr": "0x7F2b8C31F88B6006c382775eea88297Ec1e3E905",
"WETHGatewayAddr": "0x7AC440cAe8EB6328de4fA621163a792c1EA9D4fE",
"StandardERC20GatewayAddr": "0xD8A791fE2bE73eb6E6cF1eb0cb3F36adC9B3F8f9",
"CustomERC20GatewayAddr": "0xb2b10a289A229415a124EFDeF310C10cb004B6ff",
"ERC721GatewayAddr": "0x6260aF48e8948617b8FA17F4e5CEa2d21D21554B",
"ERC1155GatewayAddr": "0xb94f7F6ABcb811c5Ac709dE14E37590fcCd975B6",
"USDCGatewayAddr": "0xf1AF3b23DE0A5Ca3CAb7261cb0061C0D779A5c7B",
"LIDOGatewayAddr": "0x6625C6332c9F91F2D27c304E729B86db87A3f504",
"DAIGatewayAddr": "0x67260A8B73C5B77B55c1805218A42A7A6F98F515",
"ScrollChainAddr": "0xa13BAF47339d63B743e7Da8741db5456DAc1E556",
"GatewayRouterAddr": "0xF8B1378579659D8F7EE5f3C929c2f3E332E41Fd6",
"MessageQueueAddr": "0x0d7E906BD9cAFa154b048cFa766Cc1E54E39AF9B"
"confirmation": 2,
"endpoint": "L1-URL",
"startHeight": 4038000,
"blockTime": 12,
"fetchLimit": 32,
"MessengerAddr": "0x50c7d3e7f7c656493D1D76aaa1a836CedfCBB16A",
"ETHGatewayAddr": "0x8A54A2347Da2562917304141ab67324615e9866d",
"WETHGatewayAddr": "0x3dA0BF44814cfC678376b3311838272158211695",
"StandardERC20GatewayAddr": "0x65D123d6389b900d954677c26327bfc1C3e88A13",
"CustomERC20GatewayAddr": "0x31C994F2017E71b82fd4D8118F140c81215bbb37",
"ERC721GatewayAddr": "0xEF27A5E63aa3f1B8312f744b9b4DcEB910Ba77AC",
"ERC1155GatewayAddr": "0xa5Df8530766A85936EE3E139dECE3bF081c83146",
"DAIGatewayAddr": "0x8b0B9c4e9f41b9bbDEfFee24F9f11C328093d248",
"ScrollChainAddr": "0x2D567EcE699Eabe5afCd141eDB7A4f2D0D6ce8a0",
"GatewayRouterAddr": "0x13FBE0D0e5552b8c9c4AE9e2435F38f37355998a",
"MessageQueueAddr": "0xF0B2293F5D834eAe920c6974D50957A1732de763"
},
"L2": {
"confirmation": 0,
"endpoint": "https://rpc.scroll.io",
"confirmation": 1,
"endpoint": "L2-URL",
"blockTime": 3,
"fetchLimit": 100,
"MessengerAddr": "0x781e90f1c8Fc4611c9b7497C3B47F99Ef6969CbC",
"ETHGatewayAddr": "0x6EA73e05AdC79974B931123675ea8F78FfdacDF0",
"WETHGatewayAddr": "0x7003E7B7186f0E6601203b99F7B8DECBfA391cf9",
"StandardERC20GatewayAddr": "0xE2b4795039517653c5Ae8C2A9BFdd783b48f447A",
"CustomERC20GatewayAddr": "0x64CCBE37c9A82D85A1F2E74649b7A42923067988",
"ERC721GatewayAddr": "0x7bC08E1c04fb41d75F1410363F0c5746Eae80582",
"ERC1155GatewayAddr": "0x62597Cc19703aF10B58feF87B0d5D29eFE263bcc",
"USDCGatewayAddr": "0x33B60d5Dd260d453cAC3782b0bDC01ce84672142",
"LIDOGatewayAddr": "0x8aE8f22226B9d789A36AC81474e633f8bE2856c9",
"DAIGatewayAddr": "0xaC78dff3A87b5b534e366A93E785a0ce8fA6Cc62",
"GatewayRouterAddr": "0x4C0926FF5252A435FD19e10ED15e5a249Ba19d79",
"fetchLimit": 128,
"MessengerAddr": "0xBa50f5340FB9F3Bd074bD638c9BE13eCB36E603d",
"ETHGatewayAddr": "0x91e8ADDFe1358aCa5314c644312d38237fC1101C",
"WETHGatewayAddr": "0x481B20A927206aF7A754dB8b904B052e2781ea27",
"StandardERC20GatewayAddr": "0xaDcA915971A336EA2f5b567e662F5bd74AEf9582",
"CustomERC20GatewayAddr": "0x058dec71E53079F9ED053F3a0bBca877F6f3eAcf",
"ERC721GatewayAddr": "0x179B9415194B67DC3c0b8760E075cD4415785c97",
"ERC1155GatewayAddr": "0xe17C9b9C66FAF07753cdB04316D09f52144612A5",
"DAIGatewayAddr": "0xbF28c28490988026Dca2396148DE50136A54534e",
"GatewayRouterAddr": "0x9aD3c5617eCAa556d6E166787A97081907171230",
"MessageQueueAddr": "0x5300000000000000000000000000000000000000"
},
"db": {

View File

@@ -12,7 +12,7 @@ import (
type LayerConfig struct {
Confirmation uint64 `json:"confirmation"`
Endpoint string `json:"endpoint"`
StartHeight uint64 `json:"startHeight"` // Can only be configured to contract deployment height, otherwise in the current implementation, the message proof could not be successfully updated.
StartHeight uint64 `json:"startHeight"`
BlockTime int64 `json:"blockTime"`
FetchLimit uint64 `json:"fetchLimit"`
MessengerAddr string `json:"MessengerAddr"`

View File

@@ -2,12 +2,8 @@ package fetcher
import (
"context"
"math/big"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/ethclient"
"github.com/scroll-tech/go-ethereum/log"
"gorm.io/gorm"
@@ -23,75 +19,41 @@ type L1MessageFetcher struct {
cfg *config.LayerConfig
client *ethclient.Client
l1SyncHeight uint64
l1LastSyncBlockHash common.Hash
syncInfo *SyncInfo
l1ScanHeight uint64
eventUpdateLogic *logic.EventUpdateLogic
l1FetcherLogic *logic.L1FetcherLogic
l1MessageFetcherRunningTotal prometheus.Counter
l1MessageFetcherReorgTotal prometheus.Counter
l1MessageFetcherSyncHeight prometheus.Gauge
}
// NewL1MessageFetcher creates a new L1MessageFetcher instance.
func NewL1MessageFetcher(ctx context.Context, cfg *config.LayerConfig, db *gorm.DB, client *ethclient.Client) *L1MessageFetcher {
c := &L1MessageFetcher{
func NewL1MessageFetcher(ctx context.Context, cfg *config.LayerConfig, db *gorm.DB, client *ethclient.Client, syncInfo *SyncInfo) (*L1MessageFetcher, error) {
return &L1MessageFetcher{
ctx: ctx,
cfg: cfg,
client: client,
eventUpdateLogic: logic.NewEventUpdateLogic(db, true),
syncInfo: syncInfo,
eventUpdateLogic: logic.NewEventUpdateLogic(db),
l1FetcherLogic: logic.NewL1FetcherLogic(cfg, db, client),
}
reg := prometheus.DefaultRegisterer
c.l1MessageFetcherRunningTotal = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "L1_message_fetcher_running_total",
Help: "Current count of running L1 message fetcher instances.",
})
c.l1MessageFetcherReorgTotal = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "L1_message_fetcher_reorg_total",
Help: "Total count of blockchain reorgs encountered by the L1 message fetcher.",
})
c.l1MessageFetcherSyncHeight = promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "L1_message_fetcher_sync_height",
Help: "Latest blockchain height the L1 message fetcher has synced with.",
})
return c
}, nil
}
// Start starts the L1 message fetching process.
func (c *L1MessageFetcher) Start() {
messageSyncedHeight, batchSyncedHeight, dbErr := c.eventUpdateLogic.GetL1SyncHeight(c.ctx)
if dbErr != nil {
log.Crit("L1MessageFetcher start failed", "err", dbErr)
}
l1SyncHeight := messageSyncedHeight
if batchSyncedHeight > l1SyncHeight {
l1SyncHeight = batchSyncedHeight
}
if c.cfg.StartHeight > l1SyncHeight {
l1SyncHeight = c.cfg.StartHeight - 1
}
// Sync from an older block to prevent reorg during restart.
if l1SyncHeight < logic.L1ReorgSafeDepth {
l1SyncHeight = 0
} else {
l1SyncHeight -= logic.L1ReorgSafeDepth
}
header, err := c.client.HeaderByNumber(c.ctx, new(big.Int).SetUint64(l1SyncHeight))
messageSyncedHeight, batchSyncedHeight, err := c.eventUpdateLogic.GetL1SyncHeight(c.ctx)
if err != nil {
log.Crit("failed to get L1 header by number", "block number", l1SyncHeight, "err", err)
return
log.Crit("L1MessageFetcher start failed", "error", err)
}
c.updateL1SyncHeight(l1SyncHeight, header.Hash())
c.l1ScanHeight = messageSyncedHeight
if batchSyncedHeight > c.l1ScanHeight {
c.l1ScanHeight = batchSyncedHeight
}
if c.cfg.StartHeight > c.l1ScanHeight {
c.l1ScanHeight = c.cfg.StartHeight - 1
}
log.Info("Start L1 message fetcher", "message synced height", messageSyncedHeight, "batch synced height", batchSyncedHeight, "config start height", c.cfg.StartHeight, "sync start height", c.l1SyncHeight+1)
log.Info("Start L1 message fetcher", "message synced height", messageSyncedHeight, "batch synced height", batchSyncedHeight, "config start height", c.cfg.StartHeight)
tick := time.NewTicker(time.Duration(c.cfg.BlockTime) * time.Second)
go func() {
@@ -108,15 +70,13 @@ func (c *L1MessageFetcher) Start() {
}
func (c *L1MessageFetcher) fetchAndSaveEvents(confirmation uint64) {
c.l1MessageFetcherRunningTotal.Inc()
startHeight := c.l1SyncHeight + 1
endHeight, rpcErr := utils.GetBlockNumber(c.ctx, c.client, confirmation)
if rpcErr != nil {
log.Error("failed to get L1 block number", "confirmation", confirmation, "err", rpcErr)
startHeight := c.l1ScanHeight + 1
endHeight, err := utils.GetBlockNumber(c.ctx, c.client, confirmation)
if err != nil {
log.Error("failed to get L1 safe block number", "err", err)
return
}
log.Info("fetch and save missing L1 events", "start height", startHeight, "end height", endHeight, "confirmation", confirmation)
log.Info("fetch and save missing L1 events", "start height", startHeight, "end height", endHeight)
for from := startHeight; from <= endHeight; from += c.cfg.FetchLimit {
to := from + c.cfg.FetchLimit - 1
@@ -124,30 +84,27 @@ func (c *L1MessageFetcher) fetchAndSaveEvents(confirmation uint64) {
to = endHeight
}
isReorg, resyncHeight, lastBlockHash, l1FetcherResult, fetcherErr := c.l1FetcherLogic.L1Fetcher(c.ctx, from, to, c.l1LastSyncBlockHash)
fetcherResult, fetcherErr := c.l1FetcherLogic.L1Fetcher(c.ctx, from, to)
if fetcherErr != nil {
log.Error("failed to fetch L1 events", "from", from, "to", to, "err", fetcherErr)
log.Error("failed to fetch L1 events", "from", from, "to", to, "err", err)
return
}
if isReorg {
c.l1MessageFetcherReorgTotal.Inc()
log.Warn("L1 reorg happened, exit and re-enter fetchAndSaveEvents", "re-sync height", resyncHeight)
c.updateL1SyncHeight(resyncHeight, lastBlockHash)
if insertUpdateErr := c.eventUpdateLogic.L1InsertOrUpdate(c.ctx, fetcherResult); insertUpdateErr != nil {
log.Error("failed to save L1 events", "from", from, "to", to, "err", err)
return
}
c.l1ScanHeight = to
l2ScannedHeight := c.syncInfo.GetL2ScanHeight()
if l2ScannedHeight == 0 {
log.Error("L2 fetcher has not successfully synced at least one round yet")
return
}
if insertUpdateErr := c.eventUpdateLogic.L1InsertOrUpdate(c.ctx, l1FetcherResult); insertUpdateErr != nil {
log.Error("failed to save L1 events", "from", from, "to", to, "err", insertUpdateErr)
if updateErr := c.eventUpdateLogic.UpdateL1BatchIndexAndStatus(c.ctx, l2ScannedHeight); updateErr != nil {
log.Error("failed to update L1 batch index and status", "from", from, "to", to, "err", err)
return
}
c.updateL1SyncHeight(to, lastBlockHash)
}
}
func (c *L1MessageFetcher) updateL1SyncHeight(height uint64, blockHash common.Hash) {
c.l1MessageFetcherSyncHeight.Set(float64(height))
c.l1LastSyncBlockHash = blockHash
c.l1SyncHeight = height
}

View File

@@ -2,11 +2,10 @@ package fetcher
import (
"context"
"fmt"
"math/big"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/ethclient"
"github.com/scroll-tech/go-ethereum/log"
@@ -14,79 +13,45 @@ import (
"scroll-tech/bridge-history-api/internal/config"
"scroll-tech/bridge-history-api/internal/logic"
"scroll-tech/bridge-history-api/internal/orm"
"scroll-tech/bridge-history-api/internal/utils"
)
// L2MessageFetcher fetches cross message events from L2 and saves them to database.
type L2MessageFetcher struct {
ctx context.Context
cfg *config.LayerConfig
db *gorm.DB
client *ethclient.Client
l2SyncHeight uint64
l2LastSyncBlockHash common.Hash
ctx context.Context
cfg *config.LayerConfig
db *gorm.DB
client *ethclient.Client
syncInfo *SyncInfo
eventUpdateLogic *logic.EventUpdateLogic
l2FetcherLogic *logic.L2FetcherLogic
l2MessageFetcherRunningTotal prometheus.Counter
l2MessageFetcherReorgTotal prometheus.Counter
l2MessageFetcherSyncHeight prometheus.Gauge
}
// NewL2MessageFetcher creates a new L2MessageFetcher instance.
func NewL2MessageFetcher(ctx context.Context, cfg *config.LayerConfig, db *gorm.DB, client *ethclient.Client) *L2MessageFetcher {
c := &L2MessageFetcher{
func NewL2MessageFetcher(ctx context.Context, cfg *config.LayerConfig, db *gorm.DB, client *ethclient.Client, syncInfo *SyncInfo) (*L2MessageFetcher, error) {
return &L2MessageFetcher{
ctx: ctx,
cfg: cfg,
db: db,
syncInfo: syncInfo,
client: client,
eventUpdateLogic: logic.NewEventUpdateLogic(db, false),
eventUpdateLogic: logic.NewEventUpdateLogic(db),
l2FetcherLogic: logic.NewL2FetcherLogic(cfg, db, client),
}
reg := prometheus.DefaultRegisterer
c.l2MessageFetcherRunningTotal = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "L2_message_fetcher_running_total",
Help: "Current count of running L2 message fetcher instances.",
})
c.l2MessageFetcherReorgTotal = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "L2_message_fetcher_reorg_total",
Help: "Total count of blockchain reorgs encountered by the L2 message fetcher.",
})
c.l2MessageFetcherSyncHeight = promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "L2_message_fetcher_sync_height",
Help: "Latest blockchain height the L2 message fetcher has synced with.",
})
return c
}, nil
}
// Start starts the L2 message fetching process.
func (c *L2MessageFetcher) Start() {
l2SentMessageSyncedHeight, dbErr := c.eventUpdateLogic.GetL2MessageSyncedHeightInDB(c.ctx)
if dbErr != nil {
log.Crit("failed to get L2 cross message processed height", "err", dbErr)
return
}
l2SyncHeight := l2SentMessageSyncedHeight
// Sync from an older block to prevent reorg during restart.
if l2SyncHeight < logic.L2ReorgSafeDepth {
l2SyncHeight = 0
} else {
l2SyncHeight -= logic.L2ReorgSafeDepth
}
header, err := c.client.HeaderByNumber(c.ctx, new(big.Int).SetUint64(l2SyncHeight))
l2SentMessageSyncedHeight, err := c.eventUpdateLogic.GetL2MessageSyncedHeightInDB(c.ctx)
if err != nil {
log.Crit("failed to get L2 header by number", "block number", l2SyncHeight, "err", err)
log.Error("failed to get L2 cross message processed height", "err", err)
return
}
c.updateL2SyncHeight(l2SyncHeight, header.Hash())
log.Info("Start L2 message fetcher", "message synced height", l2SentMessageSyncedHeight, "sync start height", l2SyncHeight+1)
c.syncInfo.SetL2ScanHeight(l2SentMessageSyncedHeight)
log.Info("Start L2 message fetcher", "message synced height", l2SentMessageSyncedHeight)
tick := time.NewTicker(time.Duration(c.cfg.BlockTime) * time.Second)
go func() {
@@ -103,14 +68,13 @@ func (c *L2MessageFetcher) Start() {
}
func (c *L2MessageFetcher) fetchAndSaveEvents(confirmation uint64) {
startHeight := c.l2SyncHeight + 1
endHeight, rpcErr := utils.GetBlockNumber(c.ctx, c.client, confirmation)
if rpcErr != nil {
log.Error("failed to get L2 block number", "confirmation", confirmation, "err", rpcErr)
startHeight := c.syncInfo.GetL2ScanHeight() + 1
endHeight, err := utils.GetBlockNumber(c.ctx, c.client, confirmation)
if err != nil {
log.Error("failed to get L1 safe block number", "err", err)
return
}
log.Info("fetch and save missing L2 events", "start height", startHeight, "end height", endHeight, "confirmation", confirmation)
c.l2MessageFetcherRunningTotal.Inc()
log.Info("fetch and save missing L2 events", "start height", startHeight, "end height", endHeight)
for from := startHeight; from <= endHeight; from += c.cfg.FetchLimit {
to := from + c.cfg.FetchLimit - 1
@@ -118,35 +82,67 @@ func (c *L2MessageFetcher) fetchAndSaveEvents(confirmation uint64) {
to = endHeight
}
isReorg, resyncHeight, lastBlockHash, l2FetcherResult, fetcherErr := c.l2FetcherLogic.L2Fetcher(c.ctx, from, to, c.l2LastSyncBlockHash)
if fetcherErr != nil {
log.Error("failed to fetch L2 events", "from", from, "to", to, "err", fetcherErr)
l2FilterResult, err := c.l2FetcherLogic.L2Fetcher(c.ctx, from, to)
if err != nil {
log.Error("failed to fetch L2 events", "from", from, "to", to, "err", err)
return
}
if isReorg {
c.l2MessageFetcherReorgTotal.Inc()
log.Warn("L2 reorg happened, exit and re-enter fetchAndSaveEvents", "re-sync height", resyncHeight)
c.updateL2SyncHeight(resyncHeight, lastBlockHash)
if updateWithdrawErr := c.updateL2WithdrawMessageProofs(c.ctx, l2FilterResult.WithdrawMessages, to); updateWithdrawErr != nil {
log.Error("failed to update L2 withdraw message", "from", from, "to", to, "err", err)
return
}
if insertUpdateErr := c.eventUpdateLogic.L2InsertOrUpdate(c.ctx, l2FetcherResult); insertUpdateErr != nil {
log.Error("failed to save L2 events", "from", from, "to", to, "err", insertUpdateErr)
if insertUpdateErr := c.eventUpdateLogic.L2InsertOrUpdate(c.ctx, l2FilterResult); insertUpdateErr != nil {
log.Error("failed to save L2 events", "from", from, "to", to, "err", err)
return
}
if updateErr := c.eventUpdateLogic.UpdateL1BatchIndexAndStatus(c.ctx, c.l2SyncHeight); updateErr != nil {
log.Error("failed to update L1 batch index and status", "from", from, "to", to, "err", updateErr)
return
}
c.updateL2SyncHeight(to, lastBlockHash)
c.syncInfo.SetL2ScanHeight(to)
}
}
func (c *L2MessageFetcher) updateL2SyncHeight(height uint64, blockHash common.Hash) {
c.l2MessageFetcherSyncHeight.Set(float64(height))
c.l2LastSyncBlockHash = blockHash
c.l2SyncHeight = height
func (c *L2MessageFetcher) updateL2WithdrawMessageProofs(ctx context.Context, l2WithdrawMessages []*orm.CrossMessage, endBlock uint64) error {
withdrawTrie := utils.NewWithdrawTrie()
message, err := c.eventUpdateLogic.GetL2LatestWithdrawal(ctx)
if err != nil {
log.Error("failed to get latest L2 sent message event", "err", err)
return err
}
if message != nil {
withdrawTrie.Initialize(message.MessageNonce, common.HexToHash(message.MessageHash), message.MerkleProof)
}
messageHashes := make([]common.Hash, len(l2WithdrawMessages))
for i, message := range l2WithdrawMessages {
messageHashes[i] = common.HexToHash(message.MessageHash)
}
for i, messageHash := range messageHashes {
proof := withdrawTrie.AppendMessages([]common.Hash{messageHash})
if err != nil {
log.Error("error generating proof", "messageHash", messageHash, "error", err)
return fmt.Errorf("error generating proof for messageHash %s: %v", messageHash, err)
}
if len(proof) != 1 {
log.Error("invalid proof len", "got", len(proof), "expected", 1)
return fmt.Errorf("invalid proof len, got: %v, expected: 1", len(proof))
}
l2WithdrawMessages[i].MerkleProof = proof[0]
}
// Verify if local info is correct.
withdrawRoot, err := c.client.StorageAt(ctx, common.HexToAddress(c.cfg.MessageQueueAddr), common.Hash{}, new(big.Int).SetUint64(endBlock))
if err != nil {
log.Error("failed to get withdraw root", "number", endBlock, "error", err)
return fmt.Errorf("failed to get withdraw root: %v, number: %v", err, endBlock)
}
if common.BytesToHash(withdrawRoot) != withdrawTrie.MessageRoot() {
log.Error("withdraw root mismatch", "expected", common.BytesToHash(withdrawRoot).String(), "got", withdrawTrie.MessageRoot().String())
return fmt.Errorf("withdraw root mismatch. expected: %v, got: %v", common.BytesToHash(withdrawRoot), withdrawTrie.MessageRoot())
}
return nil
}

View File

@@ -0,0 +1,18 @@
package fetcher
import "sync/atomic"
// SyncInfo is a struct that stores synchronization information shared between L1 fetcher and L2 fetcher.
type SyncInfo struct {
l2ScanHeight uint64
}
// SetL2ScanHeight is a method that sets the value of l2ScanHeight in SyncInfo.
func (s *SyncInfo) SetL2ScanHeight(height uint64) {
atomic.StoreUint64(&s.l2ScanHeight, height)
}
// GetL2ScanHeight is a method that retrieves the value of l2ScanHeight in SyncInfo.
func (s *SyncInfo) GetL2ScanHeight() uint64 {
return atomic.LoadUint64(&s.l2ScanHeight)
}

View File

@@ -2,16 +2,11 @@ package logic
import (
"context"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/log"
"gorm.io/gorm"
"scroll-tech/bridge-history-api/internal/orm"
"scroll-tech/bridge-history-api/internal/utils"
)
// EventUpdateLogic the logic of insert/update the database
@@ -19,35 +14,18 @@ type EventUpdateLogic struct {
db *gorm.DB
crossMessageOrm *orm.CrossMessage
batchEventOrm *orm.BatchEvent
eventUpdateLogicL1FinalizeBatchEventL2BlockUpdateHeight prometheus.Gauge
eventUpdateLogicL2MessageNonceUpdateHeight prometheus.Gauge
}
// NewEventUpdateLogic creates a EventUpdateLogic instance
func NewEventUpdateLogic(db *gorm.DB, isL1 bool) *EventUpdateLogic {
b := &EventUpdateLogic{
// NewEventUpdateLogic create a EventUpdateLogic instance
func NewEventUpdateLogic(db *gorm.DB) *EventUpdateLogic {
return &EventUpdateLogic{
db: db,
crossMessageOrm: orm.NewCrossMessage(db),
batchEventOrm: orm.NewBatchEvent(db),
}
if !isL1 {
reg := prometheus.DefaultRegisterer
b.eventUpdateLogicL1FinalizeBatchEventL2BlockUpdateHeight = promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "event_update_logic_L1_finalize_batch_event_L2_block_update_height",
Help: "L2 block height of the latest L1 batch event that has been finalized and updated in the message_table.",
})
b.eventUpdateLogicL2MessageNonceUpdateHeight = promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "event_update_logic_L2_message_nonce_update_height",
Help: "L2 message nonce height in the latest L1 batch event that has been finalized and updated in the message_table.",
})
}
return b
}
// GetL1SyncHeight gets the l1 sync height from db
// GetL1SyncHeight get the l1 sync height from db
func (b *EventUpdateLogic) GetL1SyncHeight(ctx context.Context) (uint64, uint64, error) {
messageSyncedHeight, err := b.crossMessageOrm.GetMessageSyncedHeightInDB(ctx, orm.MessageTypeL1SentMessage)
if err != nil {
@@ -64,7 +42,7 @@ func (b *EventUpdateLogic) GetL1SyncHeight(ctx context.Context) (uint64, uint64,
return messageSyncedHeight, batchSyncedHeight, nil
}
// GetL2MessageSyncedHeightInDB gets L2 messages synced height
// GetL2MessageSyncedHeightInDB get L2 messages synced height
func (b *EventUpdateLogic) GetL2MessageSyncedHeightInDB(ctx context.Context) (uint64, error) {
l2SentMessageSyncedHeight, err := b.crossMessageOrm.GetMessageSyncedHeightInDB(ctx, orm.MessageTypeL2SentMessage)
if err != nil {
@@ -74,7 +52,17 @@ func (b *EventUpdateLogic) GetL2MessageSyncedHeightInDB(ctx context.Context) (ui
return l2SentMessageSyncedHeight, nil
}
// L1InsertOrUpdate inserts or updates l1 messages
// GetL2LatestWithdrawal get L2 latest withdrawal message
func (b *EventUpdateLogic) GetL2LatestWithdrawal(ctx context.Context) (*orm.CrossMessage, error) {
message, err := b.crossMessageOrm.GetLatestL2Withdrawal(ctx)
if err != nil {
log.Error("failed to get latest L2 sent message event", "err", err)
return nil, err
}
return message, nil
}
// L1InsertOrUpdate insert or update l1 messages
func (b *EventUpdateLogic) L1InsertOrUpdate(ctx context.Context, l1FetcherResult *L1FilterResult) error {
err := b.db.Transaction(func(tx *gorm.DB) error {
if txErr := b.crossMessageOrm.InsertOrUpdateL1Messages(ctx, l1FetcherResult.DepositMessages, tx); txErr != nil {
@@ -97,7 +85,7 @@ func (b *EventUpdateLogic) L1InsertOrUpdate(ctx context.Context, l1FetcherResult
return txErr
}
if txErr := b.crossMessageOrm.InsertFailedGatewayRouterTxs(ctx, l1FetcherResult.RevertedTxs, tx); txErr != nil {
if txErr := b.crossMessageOrm.InsertFailedGatewayRouterTxs(ctx, l1FetcherResult.FailedGatewayRouterTxs, tx); txErr != nil {
log.Error("failed to insert L1 failed gateway router transactions", "err", txErr)
return txErr
}
@@ -112,79 +100,30 @@ func (b *EventUpdateLogic) L1InsertOrUpdate(ctx context.Context, l1FetcherResult
return nil
}
func (b *EventUpdateLogic) updateL2WithdrawMessageInfos(ctx context.Context, batchIndex, startBlock, endBlock uint64) error {
l2WithdrawMessages, err := b.crossMessageOrm.GetL2WithdrawalsByBlockRange(ctx, startBlock, endBlock)
if err != nil {
log.Error("failed to get L2 withdrawals by batch index", "batch index", batchIndex, "err", err)
return err
}
if len(l2WithdrawMessages) == 0 {
return nil
}
withdrawTrie := utils.NewWithdrawTrie()
lastMessage, err := b.crossMessageOrm.GetL2LatestFinalizedWithdrawal(ctx)
if err != nil {
log.Error("failed to get latest L2 finalized sent message event", "err", err)
return err
}
if lastMessage != nil {
withdrawTrie.Initialize(lastMessage.MessageNonce, common.HexToHash(lastMessage.MessageHash), lastMessage.MerkleProof)
}
if withdrawTrie.NextMessageNonce != l2WithdrawMessages[0].MessageNonce {
log.Error("nonce mismatch", "expected next message nonce", withdrawTrie.NextMessageNonce, "actuall next message nonce", l2WithdrawMessages[0].MessageNonce)
return fmt.Errorf("nonce mismatch")
}
messageHashes := make([]common.Hash, len(l2WithdrawMessages))
for i, message := range l2WithdrawMessages {
messageHashes[i] = common.HexToHash(message.MessageHash)
}
proofs := withdrawTrie.AppendMessages(messageHashes)
for i, message := range l2WithdrawMessages {
message.MerkleProof = proofs[i]
message.RollupStatus = int(orm.RollupStatusTypeFinalized)
message.BatchIndex = batchIndex
}
if dbErr := b.crossMessageOrm.UpdateBatchIndexRollupStatusMerkleProofOfL2Messages(ctx, l2WithdrawMessages); dbErr != nil {
log.Error("failed to update batch index and rollup status and merkle proof of L2 messages", "err", dbErr)
return dbErr
}
b.eventUpdateLogicL2MessageNonceUpdateHeight.Set(float64(withdrawTrie.NextMessageNonce - 1))
return nil
}
// UpdateL1BatchIndexAndStatus updates L1 finalized batch index and status
// UpdateL1BatchIndexAndStatus update l1 batch index and status
func (b *EventUpdateLogic) UpdateL1BatchIndexAndStatus(ctx context.Context, height uint64) error {
finalizedBatches, err := b.batchEventOrm.GetFinalizedBatchesLEBlockHeight(ctx, height)
batches, err := b.batchEventOrm.GetBatchesLEBlockHeight(ctx, height)
if err != nil {
log.Error("failed to get batches >= block height", "error", err)
return err
}
for _, finalizedBatch := range finalizedBatches {
log.Info("update finalized batch info of L2 withdrawals", "index", finalizedBatch.BatchIndex, "start", finalizedBatch.StartBlockNumber, "end", finalizedBatch.EndBlockNumber)
if updateErr := b.updateL2WithdrawMessageInfos(ctx, finalizedBatch.BatchIndex, finalizedBatch.StartBlockNumber, finalizedBatch.EndBlockNumber); updateErr != nil {
log.Error("failed to update L2 withdraw message infos", "index", finalizedBatch.BatchIndex, "start", finalizedBatch.StartBlockNumber, "end", finalizedBatch.EndBlockNumber, "error", updateErr)
return updateErr
}
if dbErr := b.batchEventOrm.UpdateBatchEventStatus(ctx, finalizedBatch.BatchIndex); dbErr != nil {
log.Error("failed to update batch event status as updated", "index", finalizedBatch.BatchIndex, "start", finalizedBatch.StartBlockNumber, "end", finalizedBatch.EndBlockNumber, "error", dbErr)
for _, batch := range batches {
log.Info("update batch info of L2 withdrawals", "index", batch.BatchIndex, "start", batch.StartBlockNumber, "end", batch.EndBlockNumber)
if dbErr := b.crossMessageOrm.UpdateBatchStatusOfL2Withdrawals(ctx, batch.StartBlockNumber, batch.EndBlockNumber, batch.BatchIndex); dbErr != nil {
log.Error("failed to update batch status of L2 sent messages", "start", batch.StartBlockNumber, "end", batch.EndBlockNumber, "index", batch.BatchIndex, "error", dbErr)
return dbErr
}
if dbErr := b.batchEventOrm.UpdateBatchEventStatus(ctx, batch.BatchIndex); dbErr != nil {
log.Error("failed to update batch event status as updated", "start", batch.StartBlockNumber, "end", batch.EndBlockNumber, "index", batch.BatchIndex, "error", dbErr)
return dbErr
}
b.eventUpdateLogicL1FinalizeBatchEventL2BlockUpdateHeight.Set(float64(finalizedBatch.EndBlockNumber))
}
return nil
}
// L2InsertOrUpdate inserts or updates L2 messages
// L2InsertOrUpdate insert or update L2 messages
func (b *EventUpdateLogic) L2InsertOrUpdate(ctx context.Context, l2FetcherResult *L2FilterResult) error {
err := b.db.Transaction(func(tx *gorm.DB) error {
if txErr := b.crossMessageOrm.InsertOrUpdateL2Messages(ctx, l2FetcherResult.WithdrawMessages, tx); txErr != nil {
@@ -195,7 +134,11 @@ func (b *EventUpdateLogic) L2InsertOrUpdate(ctx context.Context, l2FetcherResult
log.Error("failed to update L2 relayed messages of L1 deposits", "err", txErr)
return txErr
}
if txErr := b.crossMessageOrm.InsertFailedGatewayRouterTxs(ctx, l2FetcherResult.OtherRevertedTxs, tx); txErr != nil {
if txErr := b.crossMessageOrm.InsertOrUpdateL2RevertedRelayedMessagesOfL1Deposits(ctx, l2FetcherResult.RevertedRelayedMessages, tx); txErr != nil {
log.Error("failed to update L2 relayed messages of L1 deposits", "err", txErr)
return txErr
}
if txErr := b.crossMessageOrm.InsertFailedGatewayRouterTxs(ctx, l2FetcherResult.FailedGatewayRouterTxs, tx); txErr != nil {
log.Error("failed to insert L2 failed gateway router transactions", "err", txErr)
return txErr
}

View File

@@ -265,8 +265,6 @@ func getTxHistoryInfo(message *orm.CrossMessage) *types.TxHistoryInfo {
}
if txHistory.IsL1 {
txHistory.Hash = message.L1TxHash
txHistory.ReplayTxHash = message.L1ReplayTxHash
txHistory.RefundTxHash = message.L1RefundTxHash
txHistory.BlockNumber = message.L1BlockNumber
txHistory.FinalizeTx = &types.Finalized{
Hash: message.L2TxHash,

View File

@@ -3,9 +3,7 @@ package logic
import (
"context"
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/crypto"
"github.com/scroll-tech/go-ethereum/ethclient"
"github.com/scroll-tech/go-ethereum/log"
@@ -18,7 +16,7 @@ import (
type L1EventParser struct {
}
// NewL1EventParser creates l1 event parser
// NewL1EventParser create l1 event parser
func NewL1EventParser() *L1EventParser {
return &L1EventParser{}
}
@@ -217,12 +215,7 @@ func (e *L1EventParser) ParseL1BatchEventLogs(ctx context.Context, logs []types.
}
// ParseL1MessageQueueEventLogs parses L1 watched message queue events.
func (e *L1EventParser) ParseL1MessageQueueEventLogs(logs []types.Log, l1DepositMessages []*orm.CrossMessage) ([]*orm.MessageQueueEvent, error) {
messageHashes := make(map[common.Hash]struct{})
for _, msg := range l1DepositMessages {
messageHashes[common.HexToHash(msg.MessageHash)] = struct{}{}
}
func (e *L1EventParser) ParseL1MessageQueueEventLogs(logs []types.Log) ([]*orm.MessageQueueEvent, error) {
var l1MessageQueueEvents []*orm.MessageQueueEvent
for _, vlog := range logs {
switch vlog.Topics[0] {
@@ -232,16 +225,13 @@ func (e *L1EventParser) ParseL1MessageQueueEventLogs(logs []types.Log, l1Deposit
log.Warn("Failed to unpack QueueTransaction event", "err", err)
return nil, err
}
messageHash := common.BytesToHash(crypto.Keccak256(event.Data))
// If the message hash is not found in the map, it's not a replayMessage or enforced tx (omitted); add it to the events.
if _, exists := messageHashes[messageHash]; !exists {
l1MessageQueueEvents = append(l1MessageQueueEvents, &orm.MessageQueueEvent{
EventType: orm.MessageQueueEventTypeQueueTransaction,
QueueIndex: event.QueueIndex,
MessageHash: messageHash,
TxHash: vlog.TxHash,
})
}
// 1. Update queue index of both sent message and replay message.
// 2. Update tx hash of replay message.
l1MessageQueueEvents = append(l1MessageQueueEvents, &orm.MessageQueueEvent{
EventType: orm.MessageQueueEventTypeQueueTransaction,
QueueIndex: event.QueueIndex,
TxHash: vlog.TxHash,
})
case backendabi.L1DequeueTransactionEventSig:
event := backendabi.L1DequeueTransactionEvent{}
if err := utils.UnpackLog(backendabi.IL1MessageQueueABI, &event, "DequeueTransaction", vlog); err != nil {
@@ -264,7 +254,6 @@ func (e *L1EventParser) ParseL1MessageQueueEventLogs(logs []types.Log, l1Deposit
l1MessageQueueEvents = append(l1MessageQueueEvents, &orm.MessageQueueEvent{
EventType: orm.MessageQueueEventTypeDropTransaction,
QueueIndex: event.Index.Uint64(),
TxHash: vlog.TxHash,
})
}
}

View File

@@ -4,8 +4,6 @@ import (
"context"
"math/big"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/scroll-tech/go-ethereum"
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/core/types"
@@ -19,20 +17,16 @@ import (
"scroll-tech/bridge-history-api/internal/utils"
)
// L1ReorgSafeDepth represents the number of block confirmations considered safe against L1 chain reorganizations.
// Reorganizations at this depth under normal cases are extremely unlikely.
const L1ReorgSafeDepth = 64
// L1FilterResult L1 fetcher result
// L1FilterResult l1 fetcher result
type L1FilterResult struct {
DepositMessages []*orm.CrossMessage
RelayedMessages []*orm.CrossMessage
BatchEvents []*orm.BatchEvent
MessageQueueEvents []*orm.MessageQueueEvent
RevertedTxs []*orm.CrossMessage
FailedGatewayRouterTxs []*orm.CrossMessage
DepositMessages []*orm.CrossMessage
RelayedMessages []*orm.CrossMessage
BatchEvents []*orm.BatchEvent
MessageQueueEvents []*orm.MessageQueueEvent
}
// L1FetcherLogic the L1 fetcher logic
// L1FetcherLogic the l1 fetcher's logic
type L1FetcherLogic struct {
cfg *config.LayerConfig
client *ethclient.Client
@@ -41,11 +35,9 @@ type L1FetcherLogic struct {
db *gorm.DB
crossMessageOrm *orm.CrossMessage
batchEventOrm *orm.BatchEvent
l1FetcherLogicFetchedTotal *prometheus.CounterVec
}
// NewL1FetcherLogic creates L1 fetcher logic
// NewL1FetcherLogic create l1 fetcher logic
func NewL1FetcherLogic(cfg *config.LayerConfig, db *gorm.DB, client *ethclient.Client) *L1FetcherLogic {
addressList := []common.Address{
common.HexToAddress(cfg.ETHGatewayAddr),
@@ -74,7 +66,7 @@ func NewL1FetcherLogic(cfg *config.LayerConfig, db *gorm.DB, client *ethclient.C
addressList = append(addressList, common.HexToAddress(cfg.LIDOGatewayAddr))
}
f := &L1FetcherLogic{
return &L1FetcherLogic{
db: db,
crossMessageOrm: orm.NewCrossMessage(db),
batchEventOrm: orm.NewBatchEvent(db),
@@ -83,47 +75,17 @@ func NewL1FetcherLogic(cfg *config.LayerConfig, db *gorm.DB, client *ethclient.C
addressList: addressList,
parser: NewL1EventParser(),
}
reg := prometheus.DefaultRegisterer
f.l1FetcherLogicFetchedTotal = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "L1_fetcher_logic_fetched_total",
Help: "The total number of events or failed txs fetched in L1 fetcher logic.",
}, []string{"type"})
return f
}
func (f *L1FetcherLogic) getBlocksAndDetectReorg(ctx context.Context, from, to uint64, lastBlockHash common.Hash) (bool, uint64, common.Hash, []*types.Block, error) {
func (f *L1FetcherLogic) gatewayRouterFailedTxs(ctx context.Context, from, to uint64) (map[uint64]uint64, []*orm.CrossMessage, error) {
blocks, err := utils.GetL1BlocksInRange(ctx, f.client, from, to)
if err != nil {
log.Error("failed to get L1 blocks in range", "from", from, "to", to, "err", err)
return false, 0, common.Hash{}, nil, err
return nil, nil, err
}
for _, block := range blocks {
if block.ParentHash() != lastBlockHash {
log.Warn("L1 reorg detected", "reorg height", block.NumberU64()-1, "expected hash", block.ParentHash().String(), "local hash", lastBlockHash.String())
var resyncHeight uint64
if block.NumberU64() > L1ReorgSafeDepth+1 {
resyncHeight = block.NumberU64() - L1ReorgSafeDepth - 1
}
header, err := f.client.HeaderByNumber(ctx, new(big.Int).SetUint64(resyncHeight))
if err != nil {
log.Error("failed to get L1 header by number", "block number", resyncHeight, "err", err)
return false, 0, common.Hash{}, nil, err
}
return true, resyncHeight, header.Hash(), nil, nil
}
lastBlockHash = block.Hash()
}
return false, 0, lastBlockHash, blocks, nil
}
func (f *L1FetcherLogic) getRevertedTxs(ctx context.Context, from, to uint64, blocks []*types.Block) (map[uint64]uint64, []*orm.CrossMessage, error) {
var l1RevertedTxs []*orm.CrossMessage
blockTimestampsMap := make(map[uint64]uint64)
var l1FailedGatewayRouterTxs []*orm.CrossMessage
for i := from; i <= to; i++ {
block := blocks[i-from]
blockTimestampsMap[block.NumberU64()] = block.Time()
@@ -135,9 +97,7 @@ func (f *L1FetcherLogic) getRevertedTxs(ctx context.Context, from, to uint64, bl
}
toAddress := txTo.String()
// GatewayRouter: L1 deposit.
// Messenger: L1 deposit retry (replayMessage), L1 deposit refund (dropMessage), L2 withdrawal's claim (relayMessageWithProof).
if toAddress != f.cfg.GatewayRouterAddr && toAddress != f.cfg.MessengerAddr {
if toAddress != f.cfg.GatewayRouterAddr {
continue
}
@@ -148,7 +108,7 @@ func (f *L1FetcherLogic) getRevertedTxs(ctx context.Context, from, to uint64, bl
return nil, nil, receiptErr
}
// Check if the transaction is failed
// Check if the transaction failed
if receipt.Status != types.ReceiptStatusFailed {
continue
}
@@ -160,18 +120,18 @@ func (f *L1FetcherLogic) getRevertedTxs(ctx context.Context, from, to uint64, bl
return nil, nil, senderErr
}
l1RevertedTxs = append(l1RevertedTxs, &orm.CrossMessage{
l1FailedGatewayRouterTxs = append(l1FailedGatewayRouterTxs, &orm.CrossMessage{
L1TxHash: tx.Hash().String(),
MessageType: int(orm.MessageTypeL1SentMessage),
Sender: sender.String(),
Receiver: (*tx.To()).String(),
L1BlockNumber: receipt.BlockNumber.Uint64(),
BlockTimestamp: block.Time(),
TxStatus: int(orm.TxStatusTypeSentTxReverted),
TxStatus: int(orm.TxStatusTypeSentFailed),
})
}
}
return blockTimestampsMap, l1RevertedTxs, nil
return blockTimestampsMap, l1FailedGatewayRouterTxs, nil
}
func (f *L1FetcherLogic) l1FetcherLogs(ctx context.Context, from, to uint64) ([]types.Log, error) {
@@ -205,110 +165,46 @@ func (f *L1FetcherLogic) l1FetcherLogs(ctx context.Context, from, to uint64) ([]
return eventLogs, nil
}
// L1Fetcher L1 fetcher
func (f *L1FetcherLogic) L1Fetcher(ctx context.Context, from, to uint64, lastBlockHash common.Hash) (bool, uint64, common.Hash, *L1FilterResult, error) {
// L1Fetcher l1 fetcher
func (f *L1FetcherLogic) L1Fetcher(ctx context.Context, from, to uint64) (*L1FilterResult, error) {
log.Info("fetch and save L1 events", "from", from, "to", to)
isReorg, reorgHeight, blockHash, blocks, getErr := f.getBlocksAndDetectReorg(ctx, from, to, lastBlockHash)
if getErr != nil {
log.Error("L1Fetcher getBlocksAndDetectReorg failed", "from", from, "to", to, "error", getErr)
return false, 0, common.Hash{}, nil, getErr
}
if isReorg {
return isReorg, reorgHeight, blockHash, nil, nil
}
blockTimestampsMap, l1RevertedTxs, err := f.getRevertedTxs(ctx, from, to, blocks)
blockTimestampsMap, l1FailedGatewayRouterTxs, err := f.gatewayRouterFailedTxs(ctx, from, to)
if err != nil {
log.Error("L1Fetcher getRevertedTxs failed", "from", from, "to", to, "error", err)
return false, 0, common.Hash{}, nil, err
log.Error("L1Fetcher gatewayRouterFailedTxs failed", "from", from, "to", to, "error", err)
return nil, err
}
eventLogs, err := f.l1FetcherLogs(ctx, from, to)
if err != nil {
log.Error("L1Fetcher l1FetcherLogs failed", "from", from, "to", to, "error", err)
return false, 0, common.Hash{}, nil, err
return nil, err
}
l1DepositMessages, l1RelayedMessages, err := f.parser.ParseL1CrossChainEventLogs(eventLogs, blockTimestampsMap)
if err != nil {
log.Error("failed to parse L1 cross chain event logs", "from", from, "to", to, "err", err)
return false, 0, common.Hash{}, nil, err
return nil, err
}
l1BatchEvents, err := f.parser.ParseL1BatchEventLogs(ctx, eventLogs, f.client)
if err != nil {
log.Error("failed to parse L1 batch event logs", "from", from, "to", to, "err", err)
return false, 0, common.Hash{}, nil, err
return nil, err
}
l1MessageQueueEvents, err := f.parser.ParseL1MessageQueueEventLogs(eventLogs, l1DepositMessages)
l1MessageQueueEvents, err := f.parser.ParseL1MessageQueueEventLogs(eventLogs)
if err != nil {
log.Error("failed to parse L1 message queue event logs", "from", from, "to", to, "err", err)
return false, 0, common.Hash{}, nil, err
return nil, err
}
res := L1FilterResult{
DepositMessages: l1DepositMessages,
RelayedMessages: l1RelayedMessages,
BatchEvents: l1BatchEvents,
MessageQueueEvents: l1MessageQueueEvents,
RevertedTxs: l1RevertedTxs,
}
f.updateMetrics(res)
return false, 0, blockHash, &res, nil
}
func (f *L1FetcherLogic) updateMetrics(res L1FilterResult) {
f.l1FetcherLogicFetchedTotal.WithLabelValues("L1_failed_gateway_router_transaction").Add(float64(len(res.RevertedTxs)))
for _, depositMessage := range res.DepositMessages {
switch orm.TokenType(depositMessage.TokenType) {
case orm.TokenTypeETH:
f.l1FetcherLogicFetchedTotal.WithLabelValues("L1_deposit_eth").Add(1)
case orm.TokenTypeERC20:
f.l1FetcherLogicFetchedTotal.WithLabelValues("L1_deposit_erc20").Add(1)
case orm.TokenTypeERC721:
f.l1FetcherLogicFetchedTotal.WithLabelValues("L1_deposit_erc721").Add(1)
case orm.TokenTypeERC1155:
f.l1FetcherLogicFetchedTotal.WithLabelValues("L1_deposit_erc1155").Add(1)
}
}
for _, relayedMessage := range res.RelayedMessages {
switch orm.TxStatusType(relayedMessage.TxStatus) {
case orm.TxStatusTypeRelayed:
f.l1FetcherLogicFetchedTotal.WithLabelValues("L1_relayed_message").Add(1)
case orm.TxStatusTypeFailedRelayed:
f.l1FetcherLogicFetchedTotal.WithLabelValues("L1_failed_relayed_message").Add(1)
}
// Have not tracked L1 relayed message reverted transaction yet.
// 1. need to parse calldata of tx.
// 2. hard to track internal tx.
}
for _, batchEvent := range res.BatchEvents {
switch orm.BatchStatusType(batchEvent.BatchStatus) {
case orm.BatchStatusTypeCommitted:
f.l1FetcherLogicFetchedTotal.WithLabelValues("L1_commit_batch_event").Add(1)
case orm.BatchStatusTypeReverted:
f.l1FetcherLogicFetchedTotal.WithLabelValues("L1_revert_batch_event").Add(1)
case orm.BatchStatusTypeFinalized:
f.l1FetcherLogicFetchedTotal.WithLabelValues("L1_finalize_batch_event").Add(1)
}
}
for _, messageQueueEvent := range res.MessageQueueEvents {
switch messageQueueEvent.EventType {
case orm.MessageQueueEventTypeQueueTransaction: // sendMessage is filtered out, only leaving replayMessage or appendEnforcedTransaction.
f.l1FetcherLogicFetchedTotal.WithLabelValues("L1_replay_message_or_enforced_transaction").Add(1)
case orm.MessageQueueEventTypeDequeueTransaction:
f.l1FetcherLogicFetchedTotal.WithLabelValues("L1_skip_message").Add(1)
case orm.MessageQueueEventTypeDropTransaction:
f.l1FetcherLogicFetchedTotal.WithLabelValues("L1_drop_message").Add(1)
}
FailedGatewayRouterTxs: l1FailedGatewayRouterTxs,
DepositMessages: l1DepositMessages,
RelayedMessages: l1RelayedMessages,
BatchEvents: l1BatchEvents,
MessageQueueEvents: l1MessageQueueEvents,
}
return &res, nil
}

View File

@@ -14,7 +14,7 @@ import (
type L2EventParser struct {
}
// NewL2EventParser creates the L2 event parser
// NewL2EventParser create the L2 event parser
func NewL2EventParser() *L2EventParser {
return &L2EventParser{}
}

View File

@@ -4,8 +4,6 @@ import (
"context"
"math/big"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/scroll-tech/go-ethereum"
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/core/types"
@@ -20,15 +18,12 @@ import (
"scroll-tech/bridge-history-api/internal/utils"
)
// L2ReorgSafeDepth represents the number of block confirmations considered safe against L2 chain reorganizations.
// Reorganizations at this depth under normal cases are extremely unlikely.
const L2ReorgSafeDepth = 256
// L2FilterResult the L2 filter result
type L2FilterResult struct {
WithdrawMessages []*orm.CrossMessage
RelayedMessages []*orm.CrossMessage // relayed, failed relayed, relay tx reverted.
OtherRevertedTxs []*orm.CrossMessage // reverted txs except relay tx reverted.
FailedGatewayRouterTxs []*orm.CrossMessage
RevertedRelayedMessages []*orm.CrossMessage
WithdrawMessages []*orm.CrossMessage
RelayedMessages []*orm.CrossMessage
}
// L2FetcherLogic the L2 fetcher logic
@@ -40,8 +35,6 @@ type L2FetcherLogic struct {
db *gorm.DB
crossMessageOrm *orm.CrossMessage
batchEventOrm *orm.BatchEvent
l2FetcherLogicFetchedTotal *prometheus.CounterVec
}
// NewL2FetcherLogic create L2 fetcher logic
@@ -69,7 +62,7 @@ func NewL2FetcherLogic(cfg *config.LayerConfig, db *gorm.DB, client *ethclient.C
addressList = append(addressList, common.HexToAddress(cfg.LIDOGatewayAddr))
}
f := &L2FetcherLogic{
return &L2FetcherLogic{
db: db,
crossMessageOrm: orm.NewCrossMessage(db),
batchEventOrm: orm.NewBatchEvent(db),
@@ -78,48 +71,19 @@ func NewL2FetcherLogic(cfg *config.LayerConfig, db *gorm.DB, client *ethclient.C
addressList: addressList,
parser: NewL2EventParser(),
}
reg := prometheus.DefaultRegisterer
f.l2FetcherLogicFetchedTotal = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "L2_fetcher_logic_fetched_total",
Help: "The total number of events or failed txs fetched in L2 fetcher logic.",
}, []string{"type"})
return f
}
func (f *L2FetcherLogic) getBlocksAndDetectReorg(ctx context.Context, from, to uint64, lastBlockHash common.Hash) (bool, uint64, common.Hash, []*types.BlockWithRowConsumption, error) {
func (f *L2FetcherLogic) gatewayRouterFailedTxs(ctx context.Context, from, to uint64) (map[uint64]uint64, []*orm.CrossMessage, []*orm.CrossMessage, error) {
var l2FailedGatewayRouterTxs []*orm.CrossMessage
var l2RevertedRelayedMessages []*orm.CrossMessage
blockTimestampsMap := make(map[uint64]uint64)
blocks, err := utils.GetL2BlocksInRange(ctx, f.client, from, to)
if err != nil {
log.Error("failed to get L2 blocks in range", "from", from, "to", to, "err", err)
return false, 0, common.Hash{}, nil, err
return nil, nil, nil, err
}
for _, block := range blocks {
if block.ParentHash() != lastBlockHash {
log.Warn("L2 reorg detected", "reorg height", block.NumberU64()-1, "expected hash", block.ParentHash().String(), "local hash", lastBlockHash.String())
var resyncHeight uint64
if block.NumberU64() > L2ReorgSafeDepth+1 {
resyncHeight = block.NumberU64() - L2ReorgSafeDepth - 1
}
header, err := f.client.HeaderByNumber(ctx, new(big.Int).SetUint64(resyncHeight))
if err != nil {
log.Error("failed to get L2 header by number", "block number", resyncHeight, "err", err)
return false, 0, common.Hash{}, nil, err
}
return true, resyncHeight, header.Hash(), nil, nil
}
lastBlockHash = block.Hash()
}
return false, 0, lastBlockHash, blocks, nil
}
func (f *L2FetcherLogic) getRevertedTxs(ctx context.Context, from, to uint64, blocks []*types.BlockWithRowConsumption) (map[uint64]uint64, []*orm.CrossMessage, []*orm.CrossMessage, error) {
var l2RevertedUserTxs []*orm.CrossMessage
var l2RevertedRelayedMessageTxs []*orm.CrossMessage
blockTimestampsMap := make(map[uint64]uint64)
for i := from; i <= to; i++ {
block := blocks[i-from]
blockTimestampsMap[block.NumberU64()] = block.Time()
@@ -131,7 +95,6 @@ func (f *L2FetcherLogic) getRevertedTxs(ctx context.Context, from, to uint64, bl
}
toAddress := txTo.String()
// GatewayRouter: L2 withdrawal.
if toAddress == f.cfg.GatewayRouterAddr {
receipt, receiptErr := f.client.TransactionReceipt(ctx, tx.Hash())
if receiptErr != nil {
@@ -139,7 +102,7 @@ func (f *L2FetcherLogic) getRevertedTxs(ctx context.Context, from, to uint64, bl
return nil, nil, nil, receiptErr
}
// Check if the transaction is failed
// Check if the transaction failed
if receipt.Status == types.ReceiptStatusFailed {
signer := types.LatestSignerForChainID(new(big.Int).SetUint64(tx.ChainId().Uint64()))
sender, signerErr := signer.Sender(tx)
@@ -148,14 +111,14 @@ func (f *L2FetcherLogic) getRevertedTxs(ctx context.Context, from, to uint64, bl
return nil, nil, nil, signerErr
}
l2RevertedUserTxs = append(l2RevertedUserTxs, &orm.CrossMessage{
l2FailedGatewayRouterTxs = append(l2FailedGatewayRouterTxs, &orm.CrossMessage{
L2TxHash: tx.Hash().String(),
MessageType: int(orm.MessageTypeL2SentMessage),
Sender: sender.String(),
Receiver: (*tx.To()).String(),
L2BlockNumber: receipt.BlockNumber.Uint64(),
BlockTimestamp: block.Time(),
TxStatus: int(orm.TxStatusTypeSentTxReverted),
TxStatus: int(orm.TxStatusTypeSentFailed),
})
}
}
@@ -167,12 +130,12 @@ func (f *L2FetcherLogic) getRevertedTxs(ctx context.Context, from, to uint64, bl
return nil, nil, nil, receiptErr
}
// Check if the transaction is failed
// Check if the transaction failed
if receipt.Status == types.ReceiptStatusFailed {
l2RevertedRelayedMessageTxs = append(l2RevertedRelayedMessageTxs, &orm.CrossMessage{
MessageHash: common.BytesToHash(crypto.Keccak256(tx.AsL1MessageTx().Data)).String(),
l2RevertedRelayedMessages = append(l2RevertedRelayedMessages, &orm.CrossMessage{
MessageHash: "0x" + common.Bytes2Hex(crypto.Keccak256(tx.AsL1MessageTx().Data)),
L2TxHash: tx.Hash().String(),
TxStatus: int(orm.TxStatusTypeRelayTxReverted),
TxStatus: int(orm.TxStatusTypeRelayedTxReverted),
L2BlockNumber: receipt.BlockNumber.Uint64(),
MessageType: int(orm.MessageTypeL1SentMessage),
})
@@ -180,7 +143,7 @@ func (f *L2FetcherLogic) getRevertedTxs(ctx context.Context, from, to uint64, bl
}
}
}
return blockTimestampsMap, l2RevertedUserTxs, l2RevertedRelayedMessageTxs, nil
return blockTimestampsMap, l2FailedGatewayRouterTxs, l2RevertedRelayedMessages, nil
}
func (f *L2FetcherLogic) l2FetcherLogs(ctx context.Context, from, to uint64) ([]types.Log, error) {
@@ -208,72 +171,32 @@ func (f *L2FetcherLogic) l2FetcherLogs(ctx context.Context, from, to uint64) ([]
}
// L2Fetcher L2 fetcher
func (f *L2FetcherLogic) L2Fetcher(ctx context.Context, from, to uint64, lastBlockHash common.Hash) (bool, uint64, common.Hash, *L2FilterResult, error) {
log.Info("fetch and save L2 events", "from", from, "to", to)
func (f *L2FetcherLogic) L2Fetcher(ctx context.Context, from, to uint64) (*L2FilterResult, error) {
log.Info("fetch and save L1 events", "from", from, "to", to)
isReorg, reorgHeight, blockHash, blocks, getErr := f.getBlocksAndDetectReorg(ctx, from, to, lastBlockHash)
if getErr != nil {
log.Error("L2Fetcher getBlocksAndDetectReorg failed", "from", from, "to", to, "error", getErr)
return false, 0, common.Hash{}, nil, getErr
}
if isReorg {
return isReorg, reorgHeight, blockHash, nil, nil
}
blockTimestampsMap, revertedUserTxs, revertedRelayMsgs, routerErr := f.getRevertedTxs(ctx, from, to, blocks)
blockTimestampsMap, l2FailedGatewayRouterTxs, l2RevertedRelayedMessages, routerErr := f.gatewayRouterFailedTxs(ctx, from, to)
if routerErr != nil {
log.Error("L2Fetcher getRevertedTxs failed", "from", from, "to", to, "error", routerErr)
return false, 0, common.Hash{}, nil, routerErr
log.Error("L2Fetcher gatewayRouterFailedTxs failed", "from", from, "to", to, "error", routerErr)
return nil, routerErr
}
eventLogs, err := f.l2FetcherLogs(ctx, from, to)
if err != nil {
log.Error("L2Fetcher l2FetcherLogs failed", "from", from, "to", to, "error", err)
return false, 0, common.Hash{}, nil, err
return nil, err
}
l2WithdrawMessages, l2RelayedMessages, err := f.parser.ParseL2EventLogs(eventLogs, blockTimestampsMap)
if err != nil {
log.Error("failed to parse L2 event logs", "from", from, "to", to, "err", err)
return false, 0, common.Hash{}, nil, err
return nil, err
}
res := L2FilterResult{
WithdrawMessages: l2WithdrawMessages,
RelayedMessages: append(l2RelayedMessages, revertedRelayMsgs...),
OtherRevertedTxs: revertedUserTxs,
}
f.updateMetrics(res)
return false, 0, blockHash, &res, nil
}
func (f *L2FetcherLogic) updateMetrics(res L2FilterResult) {
f.l2FetcherLogicFetchedTotal.WithLabelValues("L2_failed_gateway_router_transaction").Add(float64(len(res.OtherRevertedTxs)))
for _, withdrawMessage := range res.WithdrawMessages {
switch orm.TokenType(withdrawMessage.TokenType) {
case orm.TokenTypeETH:
f.l2FetcherLogicFetchedTotal.WithLabelValues("L2_withdraw_eth").Add(1)
case orm.TokenTypeERC20:
f.l2FetcherLogicFetchedTotal.WithLabelValues("L2_withdraw_erc20").Add(1)
case orm.TokenTypeERC721:
f.l2FetcherLogicFetchedTotal.WithLabelValues("L2_withdraw_erc721").Add(1)
case orm.TokenTypeERC1155:
f.l2FetcherLogicFetchedTotal.WithLabelValues("L2_withdraw_erc1155").Add(1)
}
}
for _, relayedMessage := range res.RelayedMessages {
switch orm.TxStatusType(relayedMessage.TxStatus) {
case orm.TxStatusTypeRelayed:
f.l2FetcherLogicFetchedTotal.WithLabelValues("L2_relayed_message").Add(1)
case orm.TxStatusTypeFailedRelayed:
f.l2FetcherLogicFetchedTotal.WithLabelValues("L2_failed_relayed_message").Add(1)
case orm.TxStatusTypeRelayTxReverted:
f.l2FetcherLogicFetchedTotal.WithLabelValues("L2_reverted_relayed_message_transaction").Add(1)
}
FailedGatewayRouterTxs: l2FailedGatewayRouterTxs,
RevertedRelayedMessages: l2RevertedRelayedMessages,
WithdrawMessages: l2WithdrawMessages,
RelayedMessages: l2RelayedMessages,
}
return &res, nil
}

View File

@@ -70,13 +70,12 @@ func (c *BatchEvent) GetBatchEventSyncedHeightInDB(ctx context.Context) (uint64,
return batch.L1BlockNumber, nil
}
// GetFinalizedBatchesLEBlockHeight returns the finalized batches with end block <= given block height in db.
func (c *BatchEvent) GetFinalizedBatchesLEBlockHeight(ctx context.Context, blockHeight uint64) ([]*BatchEvent, error) {
// GetBatchesLEBlockHeight returns the batches with end block <= given block height in db.
func (c *BatchEvent) GetBatchesLEBlockHeight(ctx context.Context, blockHeight uint64) ([]*BatchEvent, error) {
var batches []*BatchEvent
db := c.db.WithContext(ctx)
db = db.Model(&BatchEvent{})
db = db.Where("end_block_number <= ?", blockHeight)
db = db.Where("batch_status = ?", BatchStatusTypeFinalized)
db = db.Where("update_status = ?", UpdateStatusTypeUnupdated)
db = db.Order("batch_index asc")
if err := db.Find(&batches).Error; err != nil {

View File

@@ -38,21 +38,22 @@ type TxStatusType int
// Constants for TxStatusType.
const (
// TxStatusTypeSent is one of the initial statuses for cross-chain messages.
// TxStatusTypeSent is one of the initial statuses for cross-chain messages (the other one is TxStatusTypeSentFailed).
// It is used as the default value to prevent overwriting the transaction status in scenarios where the message status might change
// from a later status (e.g., relayed) back to "sent".
// Example flow (L1 -> L2 message, and L1 fetcher is slower than L2 fetcher):
// 1. The relayed message is first tracked and processed, setting tx_status to TxStatusTypeRelayed.
// 2. The sent message is later processed (same cross-chain message), the tx_status should not over-write TxStatusTypeRelayed.
TxStatusTypeSent TxStatusType = iota
TxStatusTypeSentTxReverted // Not track message hash, thus will not be processed again anymore.
TxStatusTypeRelayed // Terminal status.
// Example flow:
// 1. A relayed message is processed, setting tx_status to TxStatusTypeRelayed.
// 2. If a sent message is later processed for the same cross-chain message, the tx_status
// should remain as TxStatusTypeRelayed and not be modified back to TxStatusTypeSent.
TxStatusTypeSent TxStatusType = iota
TxStatusTypeSentFailed
TxStatusTypeRelayed
// FailedRelayedMessage event: encoded tx failed, cannot retry. e.g., https://sepolia.scrollscan.com/tx/0xfc7d3ea5ec8dc9b664a5a886c3b33d21e665355057601033481a439498efb79a
TxStatusTypeFailedRelayed // Terminal status.
TxStatusTypeFailedRelayed
// In some cases, user can retry with a larger gas limit. e.g., https://sepolia.scrollscan.com/tx/0x7323a7ba29492cb47d92206411be99b27896f2823cee0633a596b646b73f1b5b
TxStatusTypeRelayTxReverted
TxStatusTypeRelayedTxReverted
TxStatusTypeSkipped
TxStatusTypeDropped // Terminal status.
TxStatusTypeDropped
)
// RollupStatusType represents the status of a rollup.
@@ -79,12 +80,7 @@ const (
type MessageQueueEvent struct {
EventType MessageQueueEventType
QueueIndex uint64
// Track replay tx hash and refund tx hash.
TxHash common.Hash
// QueueTransaction only in replayMessage, to track which message is replayed.
MessageHash common.Hash
TxHash common.Hash
}
// CrossMessage represents a cross message.
@@ -99,10 +95,8 @@ type CrossMessage struct {
Sender string `json:"sender" gorm:"column:sender"`
Receiver string `json:"receiver" gorm:"column:receiver"`
MessageHash string `json:"message_hash" gorm:"column:message_hash"`
L1TxHash string `json:"l1_tx_hash" gorm:"column:l1_tx_hash"` // initial tx hash, if MessageType is MessageTypeL1SentMessage.
L1ReplayTxHash string `json:"l1_replay_tx_hash" gorm:"column:l1_replay_tx_hash"`
L1RefundTxHash string `json:"l1_refund_tx_hash" gorm:"column:l1_refund_tx_hash"`
L2TxHash string `json:"l2_tx_hash" gorm:"column:l2_tx_hash"` // initial tx hash, if MessageType is MessageTypeL2SentMessage.
L1TxHash string `json:"l1_tx_hash" gorm:"column:l1_tx_hash"`
L2TxHash string `json:"l2_tx_hash" gorm:"column:l2_tx_hash"`
L1BlockNumber uint64 `json:"l1_block_number" gorm:"column:l1_block_number"`
L2BlockNumber uint64 `json:"l2_block_number" gorm:"column:l2_block_number"`
L1TokenAddress string `json:"l1_token_address" gorm:"column:l1_token_address"`
@@ -160,42 +154,23 @@ func (c *CrossMessage) GetMessageSyncedHeightInDB(ctx context.Context, messageTy
}
}
// GetL2LatestFinalizedWithdrawal returns the latest finalized L2 withdrawal from the database.
func (c *CrossMessage) GetL2LatestFinalizedWithdrawal(ctx context.Context) (*CrossMessage, error) {
// GetLatestL2Withdrawal returns the latest processed L2 withdrawal from the database.
func (c *CrossMessage) GetLatestL2Withdrawal(ctx context.Context) (*CrossMessage, error) {
var message CrossMessage
db := c.db.WithContext(ctx)
db = db.Model(&CrossMessage{})
db = db.Where("message_type = ?", MessageTypeL2SentMessage)
db = db.Where("rollup_status = ?", RollupStatusTypeFinalized)
db = db.Where("tx_status != ?", TxStatusTypeSentFailed)
db = db.Order("message_nonce desc")
if err := db.First(&message).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return nil, nil
}
return nil, fmt.Errorf("failed to get latest L2 finalized sent message event, error: %w", err)
return nil, fmt.Errorf("failed to get latest L2 sent message event, error: %w", err)
}
return &message, nil
}
// GetL2WithdrawalsByBlockRange returns the L2 withdrawals by block range from the database.
func (c *CrossMessage) GetL2WithdrawalsByBlockRange(ctx context.Context, startBlock, endBlock uint64) ([]*CrossMessage, error) {
var messages []*CrossMessage
db := c.db.WithContext(ctx)
db = db.Model(&CrossMessage{})
db = db.Where("l2_block_number >= ?", startBlock)
db = db.Where("l2_block_number <= ?", endBlock)
db = db.Where("tx_status != ?", TxStatusTypeSentTxReverted)
db = db.Where("message_type = ?", MessageTypeL2SentMessage)
db = db.Order("message_nonce asc")
if err := db.Find(&messages).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return nil, nil
}
return nil, fmt.Errorf("failed to get latest L2 finalized sent message event, error: %w", err)
}
return messages, nil
}
// GetMessagesByTxHashes retrieves all cross messages from the database that match the provided transaction hashes.
func (c *CrossMessage) GetMessagesByTxHashes(ctx context.Context, txHashes []string) ([]*CrossMessage, error) {
var messages []*CrossMessage
@@ -255,7 +230,6 @@ func (c *CrossMessage) GetTxsByAddress(ctx context.Context, sender string) ([]*C
// UpdateL1MessageQueueEventsInfo updates the information about L1 message queue events in the database.
func (c *CrossMessage) UpdateL1MessageQueueEventsInfo(ctx context.Context, l1MessageQueueEvents []*MessageQueueEvent, dbTX ...*gorm.DB) error {
// update tx statuses.
for _, l1MessageQueueEvent := range l1MessageQueueEvents {
db := c.db
if len(dbTX) > 0 && dbTX[0] != nil {
@@ -263,62 +237,20 @@ func (c *CrossMessage) UpdateL1MessageQueueEventsInfo(ctx context.Context, l1Mes
}
db = db.WithContext(ctx)
db = db.Model(&CrossMessage{})
// do not over-write terminal statuses.
db = db.Where("tx_status != ?", TxStatusTypeRelayed)
db = db.Where("tx_status != ?", TxStatusTypeFailedRelayed)
db = db.Where("tx_status != ?", TxStatusTypeDropped)
txStatusUpdateFields := make(map[string]interface{})
db = db.Where("message_type = ?", MessageTypeL1SentMessage)
db = db.Where("message_nonce = ?", l1MessageQueueEvent.QueueIndex)
updateFields := make(map[string]interface{})
switch l1MessageQueueEvent.EventType {
case MessageQueueEventTypeQueueTransaction:
// only replayMessages or enforced txs (whose message hashes would not be found), sentMessages have been filtered out.
// replayMessage case:
// First SentMessage in L1: https://sepolia.etherscan.io/tx/0xbee4b631312448fcc2caac86e4dccf0a2ae0a88acd6c5fd8764d39d746e472eb
// Transaction reverted in L2: https://sepolia.scrollscan.com/tx/0xde6ef307a7da255888aad7a4c40a6b8c886e46a8a05883070bbf18b736cbfb8c
// replayMessage: https://sepolia.etherscan.io/tx/0xa5392891232bb32d98fcdbaca0d91b4d22ef2755380d07d982eebd47b147ce28
//
// Note: update l1_tx_hash if the user calls replayMessage, cannot use queue index here,
// because in replayMessage, queue index != message nonce.
// Ref: https://github.com/scroll-tech/scroll/blob/v4.3.44/contracts/src/L1/L1ScrollMessenger.sol#L187-L190
db = db.Where("message_hash = ?", l1MessageQueueEvent.MessageHash.String())
txStatusUpdateFields["tx_status"] = TxStatusTypeSent // reset status to "sent".
// Update l1_tx_hash if the user calls replayMessage.
updateFields["l1_tx_hash"] = l1MessageQueueEvent.TxHash.String()
case MessageQueueEventTypeDequeueTransaction:
db = db.Where("message_nonce = ?", l1MessageQueueEvent.QueueIndex)
db = db.Where("message_type = ?", MessageTypeL1SentMessage)
txStatusUpdateFields["tx_status"] = TxStatusTypeSkipped
updateFields["tx_status"] = TxStatusTypeSkipped
case MessageQueueEventTypeDropTransaction:
db = db.Where("message_nonce = ?", l1MessageQueueEvent.QueueIndex)
db = db.Where("message_type = ?", MessageTypeL1SentMessage)
txStatusUpdateFields["tx_status"] = TxStatusTypeDropped
updateFields["tx_status"] = TxStatusTypeDropped
}
if err := db.Updates(txStatusUpdateFields).Error; err != nil {
return fmt.Errorf("failed to update tx statuses of L1 message queue events, update fields: %v, error: %w", txStatusUpdateFields, err)
}
}
// update tx hashes of replay and refund.
for _, l1MessageQueueEvent := range l1MessageQueueEvents {
db := c.db
if len(dbTX) > 0 && dbTX[0] != nil {
db = dbTX[0]
}
db = db.WithContext(ctx)
db = db.Model(&CrossMessage{})
txHashUpdateFields := make(map[string]interface{})
switch l1MessageQueueEvent.EventType {
case MessageQueueEventTypeQueueTransaction:
// only replayMessages or enforced txs (whose message hashes would not be found), sentMessages have been filtered out.
db = db.Where("message_hash = ?", l1MessageQueueEvent.MessageHash.String())
txHashUpdateFields["l1_replay_tx_hash"] = l1MessageQueueEvent.TxHash.String()
case MessageQueueEventTypeDropTransaction:
db = db.Where("message_nonce = ?", l1MessageQueueEvent.QueueIndex)
db = db.Where("message_type = ?", MessageTypeL1SentMessage)
txHashUpdateFields["l1_refund_tx_hash"] = l1MessageQueueEvent.TxHash.String()
}
// Check if there are fields to update to avoid empty update operation (skip message).
if len(txHashUpdateFields) > 0 {
if err := db.Updates(txHashUpdateFields).Error; err != nil {
return fmt.Errorf("failed to update tx hashes of replay and refund in L1 message queue events info, update fields: %v, error: %w", txHashUpdateFields, err)
}
if err := db.Updates(updateFields).Error; err != nil {
return fmt.Errorf("failed to update L1 message queue events info, error: %w", err)
}
}
return nil
@@ -340,27 +272,6 @@ func (c *CrossMessage) UpdateBatchStatusOfL2Withdrawals(ctx context.Context, sta
return nil
}
// UpdateBatchIndexRollupStatusMerkleProofOfL2Messages updates the batch_index, rollup_status, and merkle_proof fields for a list of L2 cross messages.
func (c *CrossMessage) UpdateBatchIndexRollupStatusMerkleProofOfL2Messages(ctx context.Context, messages []*CrossMessage) error {
if len(messages) == 0 {
return nil
}
for _, message := range messages {
updateFields := map[string]interface{}{
"batch_index": message.BatchIndex,
"rollup_status": message.RollupStatus,
"merkle_proof": message.MerkleProof,
}
db := c.db.WithContext(ctx)
db = db.Model(&CrossMessage{})
db = db.Where("message_hash = ?", message.MessageHash)
if err := db.Updates(updateFields).Error; err != nil {
return fmt.Errorf("failed to update L2 message with message_hash %s, error: %w", message.MessageHash, err)
}
}
return nil
}
// InsertOrUpdateL1Messages inserts or updates a list of L1 cross messages into the database.
func (c *CrossMessage) InsertOrUpdateL1Messages(ctx context.Context, messages []*CrossMessage, dbTX ...*gorm.DB) error {
if len(messages) == 0 {
@@ -395,10 +306,9 @@ func (c *CrossMessage) InsertOrUpdateL2Messages(ctx context.Context, messages []
db = db.WithContext(ctx)
db = db.Model(&CrossMessage{})
// 'tx_status' column is not explicitly assigned during the update to prevent a later status from being overwritten back to "sent".
// The merkle_proof is updated separately in batch status updates and hence is not included here.
db = db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "message_hash"}},
DoUpdates: clause.AssignmentColumns([]string{"sender", "receiver", "token_type", "l2_block_number", "l2_tx_hash", "l1_token_address", "l2_token_address", "token_ids", "token_amounts", "message_type", "block_timestamp", "message_from", "message_to", "message_value", "message_data", "message_nonce"}),
DoUpdates: clause.AssignmentColumns([]string{"sender", "receiver", "token_type", "l2_block_number", "l2_tx_hash", "l1_token_address", "l2_token_address", "token_ids", "token_amounts", "message_type", "block_timestamp", "message_from", "message_to", "message_value", "message_data", "merkle_proof", "message_nonce"}),
})
if err := db.Create(messages).Error; err != nil {
return fmt.Errorf("failed to insert message, error: %w", err)
@@ -434,35 +344,24 @@ func (c *CrossMessage) InsertOrUpdateL2RelayedMessagesOfL1Deposits(ctx context.C
if len(l2RelayedMessages) == 0 {
return nil
}
// Deduplicate messages, for each message_hash, retaining message with the highest block number.
// This is necessary as a single message, like a FailedRelayedMessage or a reverted relayed transaction,
// may be relayed multiple times within certain block ranges, potentially leading to the error:
// "ERROR: ON CONFLICT DO UPDATE command cannot affect row a second time (SQLSTATE 21000)".
// This happens if we attempt to insert multiple records with the same message_hash in a single db.Create operation.
// For example, see these transactions where the same message was relayed twice within certain block ranges:
// Reverted tx 1: https://sepolia.scrollscan.com/tx/0xcd6979277c3bc747445273a5e58ef1e9692fbe101d88cfefbbb69d3aef3193c0
// Reverted tx 2: https://sepolia.scrollscan.com/tx/0x43e28ed7cb71107c18c5d8ebbdb4a1d9cac73e60391d14d41e92985028faa337
// Another example:
// FailedRelayedMessage 1: https://sepolia.scrollscan.com/tx/0xfadb147fb211e5096446c5cac3ae0a8a705d2ece6c47c65135c8874f84638f17
// FailedRelayedMessage 2: https://sepolia.scrollscan.com/tx/0x6cb149b61afd07bf2e17561a59ebebde41e343b6610290c97515b2f862160b42
mergedL2RelayedMessages := make(map[string]*CrossMessage)
for _, message := range l2RelayedMessages {
if existing, found := mergedL2RelayedMessages[message.MessageHash]; found {
if TxStatusType(message.TxStatus) == TxStatusTypeRelayed || message.L2BlockNumber > existing.L2BlockNumber {
mergedL2RelayedMessages[message.MessageHash] = message
}
} else {
mergedL2RelayedMessages[message.MessageHash] = message
}
db := c.db.WithContext(ctx)
db = db.Model(&CrossMessage{})
db = db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "message_hash"}},
DoUpdates: clause.AssignmentColumns([]string{"message_type", "l2_block_number", "l2_tx_hash", "tx_status"}),
})
if err := db.Create(l2RelayedMessages).Error; err != nil {
return fmt.Errorf("failed to update L2 relayed message of L1 deposit, error: %w", err)
}
uniqueL2RelayedMessages := make([]*CrossMessage, 0, len(mergedL2RelayedMessages))
for _, msg := range mergedL2RelayedMessages {
uniqueL2RelayedMessages = append(uniqueL2RelayedMessages, msg)
return nil
}
// InsertOrUpdateL2RevertedRelayedMessagesOfL1Deposits inserts or updates the database with a list of L2 relayed messages related to L1 deposits.
func (c *CrossMessage) InsertOrUpdateL2RevertedRelayedMessagesOfL1Deposits(ctx context.Context, l2RevertedRelayedMessages []*CrossMessage, dbTX ...*gorm.DB) error {
if len(l2RevertedRelayedMessages) == 0 {
return nil
}
// Do not update tx status of successfully or failed relayed messages,
// because if a message is handled, the later relayed message tx would be reverted.
// ref: https://github.com/scroll-tech/scroll/blob/v4.3.44/contracts/src/L2/L2ScrollMessenger.sol#L102
// e.g.,
// Do not update tx status of successfully relayed messages. e.g.,
// Successfully relayed: https://sepolia.scrollscan.com/tx/0x4eb7cb07ba76956259c0079819a34a146f8a93dd891dc94812e9b3d66b056ec7#eventlog
// Reverted tx 1 (Reason: Message was already successfully executed): https://sepolia.scrollscan.com/tx/0x1973cafa14eb40734df30da7bfd4d9aceb53f8f26e09d96198c16d0e2e4a95fd
// Reverted tx 2 (Reason: Message was already successfully executed): https://sepolia.scrollscan.com/tx/0x02fc3a28684a590aead2482022f56281539085bd3d273ac8dedc1ceccb2bc554
@@ -471,18 +370,9 @@ func (c *CrossMessage) InsertOrUpdateL2RelayedMessagesOfL1Deposits(ctx context.C
db = db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "message_hash"}},
DoUpdates: clause.AssignmentColumns([]string{"message_type", "l2_block_number", "l2_tx_hash", "tx_status"}),
Where: clause.Where{
Exprs: []clause.Expression{
clause.And(
// do not over-write terminal statuses.
clause.Neq{Column: "cross_message.tx_status", Value: TxStatusTypeRelayed},
clause.Neq{Column: "cross_message.tx_status", Value: TxStatusTypeFailedRelayed},
clause.Neq{Column: "cross_message.tx_status", Value: TxStatusTypeDropped},
),
},
},
Where: clause.Where{Exprs: []clause.Expression{clause.Neq{Column: "cross_message.tx_status", Value: TxStatusTypeRelayed}}},
})
if err := db.Create(uniqueL2RelayedMessages).Error; err != nil {
if err := db.Create(l2RevertedRelayedMessages).Error; err != nil {
return fmt.Errorf("failed to update L2 reverted relayed message of L1 deposit, error: %w", err)
}
return nil
@@ -501,13 +391,10 @@ func (c *CrossMessage) InsertOrUpdateL1RelayedMessagesOfL2Withdrawals(ctx contex
// For example, see these transactions where the same message was relayed twice within certain block ranges:
// FailedRelayedMessage 1: https://sepolia.etherscan.io/tx/0x28b3212cda6ca0f3790f362a780257bbe2b37417ccf75a4eca6c3a08294c8f1b#eventlog
// FailedRelayedMessage 2: https://sepolia.etherscan.io/tx/0xc8a8254825dd2cab5caef58cfd8d88c077ceadadc78f2340214a86cf8ab88543#eventlog
// Another example (relayed success, then relayed again):
// Relay Message, and success: https://sepolia.etherscan.io/tx/0xcfdf2f5446719e3e123a8aa06e4d6b3809c3850a13adf875755c8b1e423aa448#eventlog
// Relay Message again, and reverted: https://sepolia.etherscan.io/tx/0xb1fcae7546f3de4cfd0b4d679f4075adb4eb69578b12e2b5673f5f24b1836578
mergedL1RelayedMessages := make(map[string]*CrossMessage)
for _, message := range l1RelayedMessages {
if existing, found := mergedL1RelayedMessages[message.MessageHash]; found {
if TxStatusType(message.TxStatus) == TxStatusTypeRelayed || message.L1BlockNumber > existing.L1BlockNumber {
if message.L1BlockNumber > existing.L1BlockNumber {
mergedL1RelayedMessages[message.MessageHash] = message
}
} else {
@@ -527,16 +414,6 @@ func (c *CrossMessage) InsertOrUpdateL1RelayedMessagesOfL2Withdrawals(ctx contex
db = db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "message_hash"}},
DoUpdates: clause.AssignmentColumns([]string{"message_type", "l1_block_number", "l1_tx_hash", "tx_status"}),
Where: clause.Where{
Exprs: []clause.Expression{
clause.And(
// do not over-write terminal statuses.
clause.Neq{Column: "cross_message.tx_status", Value: TxStatusTypeRelayed},
clause.Neq{Column: "cross_message.tx_status", Value: TxStatusTypeFailedRelayed},
clause.Neq{Column: "cross_message.tx_status", Value: TxStatusTypeDropped},
),
},
},
})
if err := db.Create(uniqueL1RelayedMessages).Error; err != nil {
return fmt.Errorf("failed to update L1 relayed message of L2 withdrawal, error: %w", err)

View File

@@ -12,8 +12,6 @@ CREATE TABLE cross_message
message_hash VARCHAR DEFAULT NULL, -- NULL for failed txs
l1_tx_hash VARCHAR DEFAULT NULL,
l1_replay_tx_hash VARCHAR DEFAULT NULL,
l1_refund_tx_hash VARCHAR DEFAULT NULL,
l2_tx_hash VARCHAR DEFAULT NULL,
l1_block_number BIGINT DEFAULT NULL,
l2_block_number BIGINT DEFAULT NULL,
@@ -41,8 +39,7 @@ CREATE TABLE cross_message
CREATE UNIQUE INDEX IF NOT EXISTS idx_cm_message_hash ON cross_message (message_hash);
CREATE INDEX IF NOT EXISTS idx_cm_message_type_l1_block_number ON cross_message (message_type, l1_block_number DESC);
CREATE INDEX IF NOT EXISTS idx_cm_message_type_l2_block_number ON cross_message (message_type, l2_block_number DESC);
CREATE INDEX IF NOT EXISTS idx_cm_message_type_rollup_status_message_nonce ON cross_message (message_type, rollup_status, message_nonce DESC);
CREATE INDEX IF NOT EXISTS idx_cm_message_type_message_nonce_tx_status_l2_block_number ON cross_message (message_type, message_nonce, tx_status, l2_block_number);
CREATE INDEX IF NOT EXISTS idx_cm_message_type_message_nonce ON cross_message (message_type, message_nonce DESC);
CREATE INDEX IF NOT EXISTS idx_cm_l1_tx_hash ON cross_message (l1_tx_hash);
CREATE INDEX IF NOT EXISTS idx_cm_l2_tx_hash ON cross_message (l2_tx_hash);
CREATE INDEX IF NOT EXISTS idx_cm_message_type_tx_status_sender_block_timestamp ON cross_message (message_type, tx_status, sender, block_timestamp DESC);

View File

@@ -17,8 +17,8 @@ CREATE TABLE batch_event
CREATE INDEX IF NOT EXISTS idx_be_l1_block_number ON batch_event (l1_block_number);
CREATE INDEX IF NOT EXISTS idx_be_batch_index ON batch_event (batch_index);
CREATE INDEX IF NOT EXISTS idx_be_batch_index_batch_hash ON batch_event (batch_index, batch_hash);
CREATE INDEX IF NOT EXISTS idx_be_end_block_number_update_status_batch_status_batch_index ON batch_event (end_block_number, update_status, batch_status, batch_index);
CREATE UNIQUE INDEX IF NOT EXISTS unique_idx_be_batch_index_batch_hash ON batch_event (batch_index, batch_hash);
CREATE INDEX IF NOT EXISTS idx_be_end_block_number_update_status_batch_index ON batch_event (end_block_number, update_status, batch_index);
-- +goose StatementEnd

View File

@@ -69,8 +69,6 @@ type UserClaimInfo struct {
// TxHistoryInfo the schema of tx history infos
type TxHistoryInfo struct {
Hash string `json:"hash"`
ReplayTxHash string `json:"replayTxHash"`
RefundTxHash string `json:"refundTxHash"`
MsgHash string `json:"msgHash"`
Amount string `json:"amount"`
IsL1 bool `json:"isL1"`

View File

@@ -46,71 +46,15 @@ func (w *WithdrawTrie) Initialize(currentMessageNonce uint64, msgHash common.Has
}
// AppendMessages appends a list of new messages as leaf nodes to the rightest of the tree and returns the proofs for all messages.
// The function correctly returns the proofs for the entire tree after all messages have been inserted, not the individual proofs after each insertion.
func (w *WithdrawTrie) AppendMessages(hashes []common.Hash) [][]byte {
length := len(hashes)
if length == 0 {
return make([][]byte, 0)
}
cache := make([]map[uint64]common.Hash, MaxHeight)
for h := 0; h < MaxHeight; h++ {
cache[h] = make(map[uint64]common.Hash)
}
// cache all branches will be used later.
if w.NextMessageNonce != 0 {
index := w.NextMessageNonce
for h := 0; h <= w.height; h++ {
if index%2 == 1 {
// right child, `w.branches[h]` is the corresponding left child
// the index of left child should be `index ^ 1`.
cache[h][index^1] = w.branches[h]
}
index >>= 1
}
}
// cache all new leaves
for i := 0; i < length; i++ {
cache[0][w.NextMessageNonce+uint64(i)] = hashes[i]
}
// build withdraw trie with new hashes
minIndex := w.NextMessageNonce
maxIndex := w.NextMessageNonce + uint64(length) - 1
for h := 0; maxIndex > 0; h++ {
if minIndex%2 == 1 {
minIndex--
}
if maxIndex%2 == 0 {
cache[h][maxIndex^1] = w.zeroes[h]
}
for i := minIndex; i <= maxIndex; i += 2 {
cache[h+1][i>>1] = Keccak2(cache[h][i], cache[h][i^1])
}
minIndex >>= 1
maxIndex >>= 1
}
// update branches using hashes one by one
for i := 0; i < length; i++ {
proof := updateBranchWithNewMessage(w.zeroes, w.branches, w.NextMessageNonce, hashes[i])
w.NextMessageNonce++
w.height = len(proof)
}
proofs := make([][]byte, length)
// retrieve merkle proof from cache
for i := 0; i < length; i++ {
index := w.NextMessageNonce + uint64(i) - uint64(length)
var merkleProof []common.Hash
for h := 0; h < w.height; h++ {
merkleProof = append(merkleProof, cache[h][index^1])
index >>= 1
}
merkleProof := updateBranchWithNewMessage(w.zeroes, w.branches, w.NextMessageNonce, hashes[i])
w.NextMessageNonce++
w.height = len(merkleProof)
proofs[i] = encodeMerkleProofToBytes(merkleProof)
}
return proofs
}

View File

@@ -95,7 +95,8 @@ func (c *Cmd) Write(data []byte) (int, error) {
if verbose || c.openLog {
fmt.Printf("%s:\n\t%v", c.name, out)
} else if strings.Contains(strings.ToLower(out), "error") ||
strings.Contains(strings.ToLower(out), "warning") {
strings.Contains(strings.ToLower(out), "warning") ||
strings.Contains(strings.ToLower(out), "info") {
fmt.Printf("%s:\n\t%v", c.name, out)
}
go c.checkFuncs.IterCb(func(_ string, value interface{}) {

View File

@@ -1,6 +1,6 @@
# Scroll Contracts
This directory contains the solidity code for Scroll L1 bridge and rollup contracts and L2 bridge and pre-deployed contracts. You can also find contract APIs and more details in the [`docs`](./docs) folder.
This directory contains the solidity code for Scroll L1 bridge and rollup contracts and L2 bridge and pre-deployed contracts. The [`specs`](../specs/) folder describes the overall Scroll protocol including the cross-domain messaging and rollup process. You can also find contract APIs and more details in the [`docs`](./docs) folder.
## Directory Structure

View File

@@ -15,7 +15,7 @@ describe("ZkEvmVerifierV1", async () => {
beforeEach(async () => {
[deployer] = await ethers.getSigners();
const bytecode = hexlify(fs.readFileSync("./src/libraries/verifier/plonk-verifier/plonk_verifier_0.9.8.bin"));
const bytecode = hexlify(fs.readFileSync("./src/libraries/verifier/plonk-verifier/plonk_verifier_0.5.1.bin"));
const tx = await deployer.sendTransaction({ data: bytecode });
const receipt = await tx.wait();
@@ -25,15 +25,47 @@ describe("ZkEvmVerifierV1", async () => {
});
it("should succeed", async () => {
const proof = hexlify(fs.readFileSync("./integration-test/testdata/plonk_verifier_0.9.8_proof.data"));
const instances = fs.readFileSync("./integration-test/testdata/plonk_verifier_0.9.8_pi.data");
const proof = hexlify(fs.readFileSync("./integration-test/testdata/plonk_verifier_0.5.1_proof.data"));
const instances = fs.readFileSync("./integration-test/testdata/plonk_verifier_0.5.1_pi.data");
const publicInputHash = new Uint8Array(32);
for (let i = 0; i < 32; i++) {
publicInputHash[i] = instances[i * 32 + 31];
}
expect(hexlify(publicInputHash)).to.eq("0x31b430667bc9e8a8b7eda5e5c76f2250c64023f5f8e0689ac9f4e53f5362da66");
// chunk1: https://github.com/scroll-tech/test-traces/blob/674ad743beab04b57da369fa5958fb6824155bfe/erc20/1_transfer.json
// 0000000000000005 blockNumber
// 0000000064c3ca7c timestamp
// 0000000000000000000000000000000000000000000000000000000000000000 baseFee
// 00000000007a1200 gasLimit
// 0001 numTransactions
// 8da3fedb103b6da8ccc2514094336d1a76df166238f4d8e8558fbe54cce2516a tx hash 0
// chunk2: https://github.com/scroll-tech/test-traces/blob/674ad743beab04b57da369fa5958fb6824155bfe/erc20/10_transfer.json
// 0000000000000006 blockNumber
// 0000000064c3ca7f timestamp
// 0000000000000000000000000000000000000000000000000000000000000000 baseFee
// 00000000007a1200 gasLimit
// 000a numTransactions
// 419164c1a7213e4e52f8578463c47a01549f69a7ff220d93221ce02909f5b919 tx hash 0
// 6c1b03d1a9b5156e189ad2e7ba73ba71d9a83b24f9830f38dd7a597fe1e67167 tx hash 1
// 94f981938d02b2c1d91ff370b3ed759dadc617c7347cd4b8552b275edbffd767 tx hash 2
// bfe98147fc808a916bdff90e838e77609fd59634787443f6fc58f9a371790d09 tx hash 3
// beb9dd0259e7c4f0a8d5ac3ba6aa3940c3e53947395f64e8ee88c7067c6d210e tx hash 4
// 208c6c767356552ad8085fa77a99d9154e0c8cf8777e329cb76bcbc969d21fca tx hash 5
// 37c8969833fbc6cbb88a63ccef324d7b42d0607ac0094f14e1f6d4e50f84d87f tx hash 6
// 088c5ad45a990694ac783207fe6bda9bf97da40e1f3eb468c73941d51b99932c tx hash 7
// c3d8ddbdfc67877a253255b9357aabfd062ce80d39eba67547f964c288660065 tx hash 8
// ff26ca52c02b97b1a6677263d5d6dec0321fb7b49be44ae0a66ba5482b1180b4 tx hash 9
// => chunk 0 data hash: 9390886a7d22aa43aae87e62a350c904fabc5db4487d9b25bdca446ba7ed15a1
// => chunk 1 data hash: a8846bf9bc53f30a391ae452b5fd456cb86a99ab7bd2e1e47898ffbe3509e8eb
// => batch data hash: ee64d77c2f2e0b2c4ac952a0f54fdba4a217c42eb26a07b28de9fbc7b009acae
// 000000000000cf55 layer2ChainId
// 02040e949809e8d2e56d35b4dfb876e08ee7b4608d22f23f52052425857c31ba prevStateRoot
// 1532cdb7732da0a4ca3044914c6959b7e2b7ba4e913a9f5f0b55051e467412d9 postStateRoot
// 0000000000000000000000000000000000000000000000000000000000000000 withdrawRoot
// ee64d77c2f2e0b2c4ac952a0f54fdba4a217c42eb26a07b28de9fbc7b009acae batchDataHash
// public input hash: 9ea439164727042e029464a40901e52800095c1ade301b63b4b7453880f5723e
expect(hexlify(publicInputHash)).to.eq("0x9ea439164727042e029464a40901e52800095c1ade301b63b4b7453880f5723e");
// verify ok
await zkEvmVerifier.verify(proof, publicInputHash);

Binary file not shown.

Binary file not shown.

View File

@@ -222,13 +222,12 @@ func (s *Sender) SendTransaction(ID string, target *common.Address, value *big.I
if feeData, err = s.getFeeData(s.auth, target, value, data, fallbackGasLimit); err != nil {
s.metrics.sendTransactionFailureGetFee.WithLabelValues(s.service, s.name).Inc()
log.Error("failed to get fee data", "from", s.auth.From.String(), "nonce", s.auth.Nonce.Uint64(), "fallback gas limit", fallbackGasLimit, "err", err)
log.Error("failed to get fee data", "err", err)
return common.Hash{}, fmt.Errorf("failed to get fee data, err: %w", err)
}
if tx, err = s.createAndSendTx(s.auth, feeData, target, value, data, nil); err != nil {
s.metrics.sendTransactionFailureSendTx.WithLabelValues(s.service, s.name).Inc()
log.Error("failed to create and send tx (non-resubmit case)", "from", s.auth.From.String(), "nonce", s.auth.Nonce.Uint64(), "err", err)
return common.Hash{}, fmt.Errorf("failed to create and send transaction, err: %w", err)
}
@@ -304,11 +303,11 @@ func (s *Sender) createAndSendTx(auth *bind.TransactOpts, feeData *FeeData, targ
// sign and send
tx, err := auth.Signer(auth.From, types.NewTx(txData))
if err != nil {
log.Error("failed to sign tx", "address", auth.From.String(), "err", err)
log.Error("failed to sign tx", "err", err)
return nil, err
}
if err = s.client.SendTransaction(s.ctx, tx); err != nil {
log.Error("failed to send tx", "tx hash", tx.Hash().String(), "from", auth.From.String(), "nonce", tx.Nonce(), "err", err)
log.Error("failed to send tx", "tx hash", tx.Hash().String(), "err", err)
// Check if contain nonce, and reset nonce
// only reset nonce when it is not from resubmit
if strings.Contains(err.Error(), "nonce") && overrideNonce == nil {
@@ -357,7 +356,6 @@ func (s *Sender) resubmitTransaction(feeData *FeeData, auth *bind.TransactOpts,
"tx_hash": tx.Hash().String(),
"tx_type": s.config.TxType,
"from": auth.From.String(),
"nonce": tx.Nonce(),
}
switch s.config.TxType {
@@ -371,15 +369,10 @@ func (s *Sender) resubmitTransaction(feeData *FeeData, auth *bind.TransactOpts,
if gasPrice.Cmp(maxGasPrice) > 0 {
gasPrice = maxGasPrice
}
if originalGasPrice.Cmp(gasPrice) == 0 {
log.Warn("gas price bump corner case, add 1 wei", "original", originalGasPrice.Uint64(), "adjusted", gasPrice.Uint64())
gasPrice = new(big.Int).Add(gasPrice, big.NewInt(1))
}
feeData.gasPrice = gasPrice
txInfo["original_gas_price"] = originalGasPrice.Uint64()
txInfo["adjusted_gas_price"] = gasPrice.Uint64()
txInfo["original_gas_price"] = originalGasPrice
txInfo["adjusted_gas_price"] = gasPrice
default:
originalGasTipCap := big.NewInt(feeData.gasTipCap.Int64())
originalGasFeeCap := big.NewInt(feeData.gasFeeCap.Int64())
@@ -413,35 +406,20 @@ func (s *Sender) resubmitTransaction(feeData *FeeData, auth *bind.TransactOpts,
if gasFeeCap.Cmp(maxGasPrice) > 0 {
gasFeeCap = maxGasPrice
}
if originalGasTipCap.Cmp(gasTipCap) == 0 {
log.Warn("gas tip cap bump corner case, add 1 wei", "original", originalGasTipCap.Uint64(), "adjusted", gasTipCap.Uint64())
gasTipCap = new(big.Int).Add(gasTipCap, big.NewInt(1))
}
if originalGasFeeCap.Cmp(gasFeeCap) == 0 {
log.Warn("gas fee cap bump corner case, add 1 wei", "original", originalGasFeeCap.Uint64(), "adjusted", gasFeeCap.Uint64())
gasFeeCap = new(big.Int).Add(gasFeeCap, big.NewInt(1))
}
feeData.gasFeeCap = gasFeeCap
feeData.gasTipCap = gasTipCap
txInfo["original_gas_tip_cap"] = originalGasTipCap.Uint64()
txInfo["adjusted_gas_tip_cap"] = gasTipCap.Uint64()
txInfo["original_gas_fee_cap"] = originalGasFeeCap.Uint64()
txInfo["adjusted_gas_fee_cap"] = gasFeeCap.Uint64()
txInfo["original_gas_tip_cap"] = originalGasTipCap
txInfo["adjusted_gas_tip_cap"] = gasTipCap
txInfo["original_gas_fee_cap"] = originalGasFeeCap
txInfo["adjusted_gas_fee_cap"] = gasFeeCap
}
log.Info("Transaction gas adjustment details", txInfo)
log.Debug("Transaction gas adjustment details", txInfo)
nonce := tx.Nonce()
s.metrics.resubmitTransactionTotal.WithLabelValues(s.service, s.name).Inc()
tx, err := s.createAndSendTx(auth, feeData, tx.To(), tx.Value(), tx.Data(), &nonce)
if err != nil {
log.Error("failed to create and send tx (resubmit case)", "from", s.auth.From.String(), "nonce", nonce, "err", err)
return nil, err
}
return tx, nil
return s.createAndSendTx(auth, feeData, tx.To(), tx.Value(), tx.Data(), &nonce)
}
// checkPendingTransaction checks the confirmation status of pending transactions against the latest confirmed block number.
@@ -477,15 +455,15 @@ func (s *Sender) checkPendingTransaction(header *types.Header, confirmed uint64)
}
}
} else if s.config.EscalateBlocks+pending.submitAt < number {
log.Info("resubmit transaction",
"hash", pending.tx.Hash().String(),
"from", pending.signer.From.String(),
"nonce", pending.tx.Nonce(),
log.Debug("resubmit transaction",
"tx hash", pending.tx.Hash().String(),
"submit block number", pending.submitAt,
"current block number", number,
"configured escalateBlocks", s.config.EscalateBlocks)
"escalateBlocks", s.config.EscalateBlocks)
if tx, err := s.resubmitTransaction(pending.feeData, pending.signer, pending.tx); err != nil {
var tx *types.Transaction
tx, err := s.resubmitTransaction(pending.feeData, pending.signer, pending.tx)
if err != nil {
// If account pool is empty, it will try again in next loop.
if !errors.Is(err, ErrNoAvailableAccount) {
log.Error("failed to resubmit transaction, reset submitAt", "tx hash", pending.tx.Hash().String(), "err", err)

View File

@@ -43,7 +43,7 @@ func setupEnv(t *testing.T) {
var err error
cfg, err = config.NewConfig("../../../conf/config.json")
assert.NoError(t, err)
base.RunL1Geth(t)
base.RunImages(t)
priv, err := crypto.HexToECDSA("1212121212121212121212121212121212121212121212121212121212121212")
assert.NoError(t, err)
// Load default private key.
@@ -60,9 +60,7 @@ func TestSender(t *testing.T) {
t.Run("test new sender", testNewSender)
t.Run("test pending limit", testPendLimit)
t.Run("test fallback gas limit", testFallbackGasLimit)
t.Run("test resubmit zero gas price transaction", testResubmitZeroGasPriceTransaction)
t.Run("test resubmit non-zero gas price transaction", testResubmitNonZeroGasPriceTransaction)
t.Run("test resubmit under priced transaction", testResubmitUnderpricedTransaction)
t.Run("test resubmit transaction", testResubmitTransaction)
t.Run("test resubmit transaction with rising base fee", testResubmitTransactionWithRisingBaseFee)
t.Run("test check pending transaction", testCheckPendingTransaction)
}
@@ -140,76 +138,21 @@ func testFallbackGasLimit(t *testing.T) {
}
}
func testResubmitZeroGasPriceTransaction(t *testing.T) {
func testResubmitTransaction(t *testing.T) {
for _, txType := range txTypes {
cfgCopy := *cfg.L1Config.RelayerConfig.SenderConfig
cfgCopy.TxType = txType
s, err := NewSender(context.Background(), &cfgCopy, privateKey, "test", "test", nil)
assert.NoError(t, err)
feeData := &FeeData{
gasPrice: big.NewInt(0),
gasTipCap: big.NewInt(0),
gasFeeCap: big.NewInt(0),
gasLimit: 50000,
}
tx, err := s.createAndSendTx(s.auth, feeData, &common.Address{}, big.NewInt(0), nil, nil)
tx := types.NewTransaction(s.auth.Nonce.Uint64(), common.Address{}, big.NewInt(0), 0, big.NewInt(0), nil)
feeData, err := s.getFeeData(s.auth, &common.Address{}, big.NewInt(0), nil, 0)
assert.NoError(t, err)
assert.NotNil(t, tx)
// Increase at least 1 wei in gas price, gas tip cap and gas fee cap.
_, err = s.resubmitTransaction(feeData, s.auth, tx)
assert.NoError(t, err)
s.Stop()
}
}
func testResubmitNonZeroGasPriceTransaction(t *testing.T) {
for _, txType := range txTypes {
cfgCopy := *cfg.L1Config.RelayerConfig.SenderConfig
// Bump gas price, gas tip cap and gas fee cap just touch the minimum threshold of 10% (default config of geth).
cfgCopy.EscalateMultipleNum = 110
cfgCopy.EscalateMultipleDen = 100
cfgCopy.TxType = txType
s, err := NewSender(context.Background(), &cfgCopy, privateKey, "test", "test", nil)
assert.NoError(t, err)
feeData := &FeeData{
gasPrice: big.NewInt(100000),
gasTipCap: big.NewInt(100000),
gasFeeCap: big.NewInt(100000),
gasLimit: 50000,
}
tx, err := s.createAndSendTx(s.auth, feeData, &common.Address{}, big.NewInt(0), nil, nil)
assert.NoError(t, err)
assert.NotNil(t, tx)
_, err = s.resubmitTransaction(feeData, s.auth, tx)
assert.NoError(t, err)
s.Stop()
}
}
func testResubmitUnderpricedTransaction(t *testing.T) {
for _, txType := range txTypes {
cfgCopy := *cfg.L1Config.RelayerConfig.SenderConfig
// Bump gas price, gas tip cap and gas fee cap less than 10% (default config of geth).
cfgCopy.EscalateMultipleNum = 109
cfgCopy.EscalateMultipleDen = 100
cfgCopy.TxType = txType
s, err := NewSender(context.Background(), &cfgCopy, privateKey, "test", "test", nil)
assert.NoError(t, err)
feeData := &FeeData{
gasPrice: big.NewInt(100000),
gasTipCap: big.NewInt(100000),
gasFeeCap: big.NewInt(100000),
gasLimit: 50000,
}
tx, err := s.createAndSendTx(s.auth, feeData, &common.Address{}, big.NewInt(0), nil, nil)
assert.NoError(t, err)
assert.NotNil(t, tx)
_, err = s.resubmitTransaction(feeData, s.auth, tx)
assert.Error(t, err, "replacement transaction underpriced")
s.Stop()
}
}
func testResubmitTransactionWithRisingBaseFee(t *testing.T) {
txType := "DynamicFeeTx"
@@ -304,7 +247,7 @@ func testCheckPendingTransaction(t *testing.T) {
},
)
pendingTx := &PendingTransaction{id: "abc", tx: tx, signer: s.auth, submitAt: header.Number.Uint64() - s.config.EscalateBlocks - 1}
pendingTx := &PendingTransaction{id: "abc", tx: tx, submitAt: header.Number.Uint64() - s.config.EscalateBlocks - 1}
s.pendingTxs.Set(pendingTx.id, pendingTx)
s.checkPendingTransaction(header, confirmed)