From d7d09c2ac7cc6470e0042323386ba300cc9c5ffc Mon Sep 17 00:00:00 2001 From: ChuhanJin <60994121+ChuhanJin@users.noreply.github.com> Date: Wed, 28 Jun 2023 10:21:42 +0800 Subject: [PATCH] feat(bridge-history-api): support let user relay l2 msg (#563) Co-authored-by: vincent <419436363@qq.com> Co-authored-by: georgehao Co-authored-by: HAOYUatHZ <37070449+HAOYUatHZ@users.noreply.github.com> --- bridge-history-api/abi/backend_abi.go | 5 + .../cmd/cross_msg_fetcher/app/app.go | 21 +- bridge-history-api/config.json | 4 + bridge-history-api/config/config.go | 10 +- .../cross_msg/batch_info_fetcher.go | 105 ++++++ .../cross_msg/block_timestamp_fetcher.go | 37 ++- .../cross_msg/cross_msg_fetcher.go | 27 +- .../cross_msg/fetch_missing_event.go | 295 ++++------------- .../message_proof/msg_proof_updater.go | 241 ++++++++++++++ .../{ => message_proof}/withdraw_trie.go | 3 +- .../{ => message_proof}/withdraw_trie_test.go | 39 ++- bridge-history-api/cross_msg/reorg_handle.go | 5 + .../migrate/migrations/00003_l2_sent_msg.sql | 57 ++++ .../migrate/migrations/00004_rollup_batch.sql | 49 +++ bridge-history-api/db/orm/batch.go | 79 +++++ bridge-history-api/db/orm/interface.go | 32 +- bridge-history-api/db/orm/l1_cross_msg.go | 23 +- bridge-history-api/db/orm/l2_cross_msg.go | 24 +- bridge-history-api/db/orm/l2_sent_msg.go | 159 +++++++++ bridge-history-api/db/orm/relayed_msg.go | 14 +- bridge-history-api/db/orm_factory.go | 14 +- bridge-history-api/service/service.go | 56 +++- bridge-history-api/utils/parse_event.go | 305 ++++++++++++++++++ common/version/version.go | 2 +- 24 files changed, 1269 insertions(+), 337 deletions(-) create mode 100644 bridge-history-api/cross_msg/batch_info_fetcher.go create mode 100644 bridge-history-api/cross_msg/message_proof/msg_proof_updater.go rename bridge-history-api/cross_msg/{ => message_proof}/withdraw_trie.go (99%) rename bridge-history-api/cross_msg/{ => message_proof}/withdraw_trie_test.go (73%) create mode 100644 bridge-history-api/db/migrate/migrations/00003_l2_sent_msg.sql create mode 100644 bridge-history-api/db/migrate/migrations/00004_rollup_batch.sql create mode 100644 bridge-history-api/db/orm/batch.go create mode 100644 bridge-history-api/db/orm/l2_sent_msg.go create mode 100644 bridge-history-api/utils/parse_event.go diff --git a/bridge-history-api/abi/backend_abi.go b/bridge-history-api/abi/backend_abi.go index d179f475e..723792c6a 100644 --- a/bridge-history-api/abi/backend_abi.go +++ b/bridge-history-api/abi/backend_abi.go @@ -337,6 +337,11 @@ type IScrollChainBatch struct { L2Transactions []byte } +// L1CommitBatchEvent represents a CommitBatch event raised by the ScrollChain contract. +type L1CommitBatchEvent struct { + BatchHash common.Hash +} + // IScrollChainBlockContext is an auto generated low-level Go binding around an user-defined struct. type IScrollChainBlockContext struct { BlockHash common.Hash diff --git a/bridge-history-api/cmd/cross_msg_fetcher/app/app.go b/bridge-history-api/cmd/cross_msg_fetcher/app/app.go index 00c05d87f..4fdfa95a5 100644 --- a/bridge-history-api/cmd/cross_msg_fetcher/app/app.go +++ b/bridge-history-api/cmd/cross_msg_fetcher/app/app.go @@ -13,6 +13,7 @@ import ( "bridge-history-api/config" "bridge-history-api/cross_msg" + "bridge-history-api/cross_msg/message_proof" "bridge-history-api/db" cutils "bridge-history-api/utils" ) @@ -54,6 +55,7 @@ func action(ctx *cli.Context) error { log.Crit("failed to connect l2 geth", "config file", cfgFile, "error", err) } db, err := db.NewOrmFactory(cfg) + defer db.Close() if err != nil { log.Crit("failed to connect to db", "config file", cfgFile, "error", err) } @@ -98,13 +100,20 @@ func action(ctx *cli.Context) error { go l2crossMsgFetcher.Start() defer l2crossMsgFetcher.Stop() - l1BlocktimeFetcher := cross_msg.NewBlocktimestampFetcher(subCtx, uint(cfg.L1.Confirmation), int(cfg.L1.BlockTime), l1client, db.UpdateL1Blocktimestamp, db.GetL1EarliestNoBlocktimestampHeight) - go l1BlocktimeFetcher.Start() - defer l1BlocktimeFetcher.Stop() + // BlockTimestamp fetcher for l1 and l2 + l1BlockTimeFetcher := cross_msg.NewBlockTimestampFetcher(subCtx, cfg.L1.Confirmation, int(cfg.L1.BlockTime), l1client, db.UpdateL1BlockTimestamp, db.GetL1EarliestNoBlockTimestampHeight) + go l1BlockTimeFetcher.Start() + defer l1BlockTimeFetcher.Stop() - l2BlocktimeFetcher := cross_msg.NewBlocktimestampFetcher(subCtx, uint(cfg.L2.Confirmation), int(cfg.L2.BlockTime), l2client, db.UpdateL2Blocktimestamp, db.GetL2EarliestNoBlocktimestampHeight) - go l2BlocktimeFetcher.Start() - defer l2BlocktimeFetcher.Stop() + l2BlockTimeFetcher := cross_msg.NewBlockTimestampFetcher(subCtx, cfg.L2.Confirmation, int(cfg.L2.BlockTime), l2client, db.UpdateL2BlockTimestamp, db.GetL2EarliestNoBlockTimestampHeight) + go l2BlockTimeFetcher.Start() + defer l2BlockTimeFetcher.Stop() + + // Proof updater and batch fetcher + l2msgProofUpdater := message_proof.NewMsgProofUpdater(subCtx, cfg.L1.Confirmation, cfg.BatchInfoFetcher.BatchIndexStartBlock, db) + batchFetcher := cross_msg.NewBatchInfoFetcher(subCtx, common.HexToAddress(cfg.BatchInfoFetcher.ScrollChainAddr), cfg.BatchInfoFetcher.BatchIndexStartBlock, cfg.L1.Confirmation, int(cfg.L1.BlockTime), l1client, db, l2msgProofUpdater) + go batchFetcher.Start() + defer batchFetcher.Stop() // Catch CTRL-C to ensure a graceful shutdown. interrupt := make(chan os.Signal, 1) diff --git a/bridge-history-api/config.json b/bridge-history-api/config.json index 43f344a11..1c7061598 100644 --- a/bridge-history-api/config.json +++ b/bridge-history-api/config.json @@ -1,4 +1,8 @@ { + "batchInfoFetcher": { + "batchIndexStartBlock": 9091265, + "ScrollChainAddr": "0xcD00DB804C819175B381b2B44Aa16A391c8a01D6" + }, "l1": { "confirmation": 64, "endpoint": "https://rpc.ankr.com/eth_goerli", diff --git a/bridge-history-api/config/config.go b/bridge-history-api/config/config.go index 5cdcd2b86..cbc3d1e30 100644 --- a/bridge-history-api/config/config.go +++ b/bridge-history-api/config/config.go @@ -6,6 +6,11 @@ import ( "path/filepath" ) +type BatchInfoFetcherConfig struct { + BatchIndexStartBlock uint64 `json:"batchIndexStartBlock"` + ScrollChainAddr string `json:"ScrollChainAddr"` +} + // DBConfig db config type DBConfig struct { // data source name @@ -41,8 +46,9 @@ type Config struct { L2 *LayerConfig `json:"l2"` // data source name - DB *DBConfig `json:"db"` - Server *ServerConfig `json:"server"` + DB *DBConfig `json:"db"` + Server *ServerConfig `json:"server"` + BatchInfoFetcher *BatchInfoFetcherConfig `json:"batchInfoFetcher"` } // NewConfig returns a new instance of Config. diff --git a/bridge-history-api/cross_msg/batch_info_fetcher.go b/bridge-history-api/cross_msg/batch_info_fetcher.go new file mode 100644 index 000000000..aa43bd0cf --- /dev/null +++ b/bridge-history-api/cross_msg/batch_info_fetcher.go @@ -0,0 +1,105 @@ +package cross_msg + +import ( + "context" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/log" + + "bridge-history-api/cross_msg/message_proof" + "bridge-history-api/db" + "bridge-history-api/utils" +) + +type BatchInfoFetcher struct { + ctx context.Context + scrollChainAddr common.Address + batchInfoStartNumber uint64 + confirmation uint64 + blockTimeInSec int + client *ethclient.Client + db db.OrmFactory + msgProofUpdater *message_proof.MsgProofUpdater +} + +func NewBatchInfoFetcher(ctx context.Context, scrollChainAddr common.Address, batchInfoStartNumber uint64, confirmation uint64, blockTimeInSec int, client *ethclient.Client, db db.OrmFactory, msgProofUpdater *message_proof.MsgProofUpdater) *BatchInfoFetcher { + return &BatchInfoFetcher{ + ctx: ctx, + scrollChainAddr: scrollChainAddr, + batchInfoStartNumber: batchInfoStartNumber, + confirmation: confirmation, + blockTimeInSec: blockTimeInSec, + client: client, + db: db, + msgProofUpdater: msgProofUpdater, + } +} + +func (b *BatchInfoFetcher) Start() { + log.Info("BatchInfoFetcher Start") + // Fetch batch info at beginning + // Then start msg proof updater after db have some bridge batch + err := b.fetchBatchInfo() + if err != nil { + log.Error("fetch batch info at begining failed: ", "err", err) + } + + go b.msgProofUpdater.Start() + + go func() { + tick := time.NewTicker(time.Duration(b.blockTimeInSec) * time.Second) + for { + select { + case <-b.ctx.Done(): + tick.Stop() + return + case <-tick.C: + err := b.fetchBatchInfo() + if err != nil { + log.Error("fetch batch info failed: ", "err", err) + } + } + } + }() +} + +func (b *BatchInfoFetcher) Stop() { + log.Info("BatchInfoFetcher Stop") + b.msgProofUpdater.Stop() +} + +func (b *BatchInfoFetcher) fetchBatchInfo() error { + number, err := utils.GetSafeBlockNumber(b.ctx, b.client, b.confirmation) + if err != nil { + log.Error("Can not get latest block number: ", "err", err) + return err + } + latestBatch, err := b.db.GetLatestRollupBatch() + if err != nil { + log.Error("Can not get latest BatchInfo: ", "err", err) + return err + } + var startHeight uint64 + if latestBatch == nil { + startHeight = b.batchInfoStartNumber + } else { + startHeight = latestBatch.CommitHeight + 1 + } + for from := startHeight; number >= from; from += uint64(fetchLimit) { + to := from + uint64(fetchLimit) - 1 + // number - confirmation can never less than 0 since the for loop condition + // but watch out the overflow + if to > number { + to = number + } + // filter logs to fetch batches + err = FetchAndSaveBatchIndex(b.ctx, b.client, b.db, int64(from), int64(to), b.scrollChainAddr) + if err != nil { + log.Error("Can not fetch and save from chain: ", "err", err) + return err + } + } + return nil +} diff --git a/bridge-history-api/cross_msg/block_timestamp_fetcher.go b/bridge-history-api/cross_msg/block_timestamp_fetcher.go index e1f664f4e..61d01740c 100644 --- a/bridge-history-api/cross_msg/block_timestamp_fetcher.go +++ b/bridge-history-api/cross_msg/block_timestamp_fetcher.go @@ -9,30 +9,30 @@ import ( "github.com/ethereum/go-ethereum/log" ) -type GetEarliestNoBlocktimestampHeightFunc func() (uint64, error) -type UpdateBlocktimestampFunc func(height uint64, timestamp time.Time) error +type GetEarliestNoBlockTimestampHeightFunc func() (uint64, error) +type UpdateBlockTimestampFunc func(height uint64, timestamp time.Time) error -type BlocktimestampFetcher struct { +type BlockTimestampFetcher struct { ctx context.Context - confirmation uint + confirmation uint64 blockTimeInSec int client *ethclient.Client - updateBlocktimestampFunc UpdateBlocktimestampFunc - getEarliestNoBlocktimestampHeightFunc GetEarliestNoBlocktimestampHeightFunc + updateBlockTimestampFunc UpdateBlockTimestampFunc + getEarliestNoBlockTimestampHeightFunc GetEarliestNoBlockTimestampHeightFunc } -func NewBlocktimestampFetcher(ctx context.Context, confirmation uint, blockTimeInSec int, client *ethclient.Client, updateBlocktimestampFunc UpdateBlocktimestampFunc, getEarliestNoBlocktimestampHeightFunc GetEarliestNoBlocktimestampHeightFunc) *BlocktimestampFetcher { - return &BlocktimestampFetcher{ +func NewBlockTimestampFetcher(ctx context.Context, confirmation uint64, blockTimeInSec int, client *ethclient.Client, updateBlockTimestampFunc UpdateBlockTimestampFunc, getEarliestNoBlockTimestampHeightFunc GetEarliestNoBlockTimestampHeightFunc) *BlockTimestampFetcher { + return &BlockTimestampFetcher{ ctx: ctx, confirmation: confirmation, blockTimeInSec: blockTimeInSec, client: client, - getEarliestNoBlocktimestampHeightFunc: getEarliestNoBlocktimestampHeightFunc, - updateBlocktimestampFunc: updateBlocktimestampFunc, + getEarliestNoBlockTimestampHeightFunc: getEarliestNoBlockTimestampHeightFunc, + updateBlockTimestampFunc: updateBlockTimestampFunc, } } -func (b *BlocktimestampFetcher) Start() { +func (b *BlockTimestampFetcher) Start() { go func() { tick := time.NewTicker(time.Duration(b.blockTimeInSec) * time.Second) for { @@ -46,23 +46,23 @@ func (b *BlocktimestampFetcher) Start() { log.Error("Can not get latest block number", "err", err) continue } - startHeight, err := b.getEarliestNoBlocktimestampHeightFunc() + startHeight, err := b.getEarliestNoBlockTimestampHeightFunc() if err != nil { log.Error("Can not get latest record without block timestamp", "err", err) continue } - for height := startHeight; number >= height+uint64(b.confirmation) && height > 0; { + for height := startHeight; number >= height+b.confirmation && height > 0; { block, err := b.client.HeaderByNumber(b.ctx, new(big.Int).SetUint64(height)) if err != nil { log.Error("Can not get block by number", "err", err) break } - err = b.updateBlocktimestampFunc(height, time.Unix(int64(block.Time), 0)) + err = b.updateBlockTimestampFunc(height, time.Unix(int64(block.Time), 0)) if err != nil { - log.Error("Can not update blocktimstamp into DB ", "err", err) + log.Error("Can not update blockTimestamp into DB ", "err", err) break } - height, err = b.getEarliestNoBlocktimestampHeightFunc() + height, err = b.getEarliestNoBlockTimestampHeightFunc() if err != nil { log.Error("Can not get latest record without block timestamp", "err", err) break @@ -73,7 +73,6 @@ func (b *BlocktimestampFetcher) Start() { }() } -func (b *BlocktimestampFetcher) Stop() { - log.Info("BlocktimestampFetcher Stop") - b.ctx.Done() +func (b *BlockTimestampFetcher) Stop() { + log.Info("BlockTimestampFetcher Stop") } diff --git a/bridge-history-api/cross_msg/cross_msg_fetcher.go b/bridge-history-api/cross_msg/cross_msg_fetcher.go index c5ce828f5..098c7e3c0 100644 --- a/bridge-history-api/cross_msg/cross_msg_fetcher.go +++ b/bridge-history-api/cross_msg/cross_msg_fetcher.go @@ -27,13 +27,12 @@ type CrossMsgFetcher struct { reorgHandling ReorgHandling addressList []common.Address cachedHeaders []*types.Header - mu *sync.Mutex + mu sync.Mutex reorgStartCh chan struct{} reorgEndCh chan struct{} } func NewCrossMsgFetcher(ctx context.Context, config *config.LayerConfig, db db.OrmFactory, client *ethclient.Client, worker *FetchEventWorker, addressList []common.Address, reorg ReorgHandling) (*CrossMsgFetcher, error) { - newMU := &sync.Mutex{} crossMsgFetcher := &CrossMsgFetcher{ ctx: ctx, config: config, @@ -41,7 +40,6 @@ func NewCrossMsgFetcher(ctx context.Context, config *config.LayerConfig, db db.O client: client, worker: worker, reorgHandling: reorg, - mu: newMU, addressList: addressList, cachedHeaders: make([]*types.Header, 0), reorgStartCh: make(chan struct{}), @@ -97,7 +95,6 @@ func (c *CrossMsgFetcher) Start() { } func (c *CrossMsgFetcher) Stop() { - c.db.Close() log.Info("CrossMsgFetcher Stop") } @@ -106,7 +103,7 @@ func (c *CrossMsgFetcher) forwardFetchAndSaveMissingEvents(confirmation uint64) // if we fetch to the latest block, shall not exceed cachedHeaders var number uint64 var err error - if len(c.cachedHeaders) != 0 && confirmation <= 0 { + if len(c.cachedHeaders) != 0 && confirmation == 0 { number = c.cachedHeaders[len(c.cachedHeaders)-1].Number.Uint64() - 1 } else { number, err = utils.GetSafeBlockNumber(c.ctx, c.client, confirmation) @@ -119,22 +116,22 @@ func (c *CrossMsgFetcher) forwardFetchAndSaveMissingEvents(confirmation uint64) log.Error(fmt.Sprintf("%s: invalid get/fetch function", c.worker.Name)) return } - processed_height, err := c.worker.G(c.db) + processedHeight, err := c.worker.G(c.db) if err != nil { log.Error(fmt.Sprintf("%s: can not get latest processed block height", c.worker.Name)) } - log.Info(fmt.Sprintf("%s: ", c.worker.Name), "height", processed_height) - if processed_height <= 0 || processed_height < int64(c.config.StartHeight) { - processed_height = int64(c.config.StartHeight) + log.Info(fmt.Sprintf("%s: ", c.worker.Name), "height", processedHeight) + if processedHeight <= 0 || processedHeight < int64(c.config.StartHeight) { + processedHeight = int64(c.config.StartHeight) } else { - processed_height += 1 + processedHeight += 1 } - for n := processed_height; n <= int64(number); n += FETCH_LIMIT { - iter_end := n + FETCH_LIMIT - 1 - if iter_end > int64(number) { - iter_end = int64(number) + for from := processedHeight; from <= int64(number); from += fetchLimit { + to := from + fetchLimit - 1 + if to > int64(number) { + to = int64(number) } - err := c.worker.F(c.ctx, c.client, c.db, n, iter_end, c.addressList) + err := c.worker.F(c.ctx, c.client, c.db, from, to, c.addressList) if err != nil { log.Error(fmt.Sprintf("%s: failed!", c.worker.Name), "err", err) break diff --git a/bridge-history-api/cross_msg/fetch_missing_event.go b/bridge-history-api/cross_msg/fetch_missing_event.go index df89a6568..0ec20a5f3 100644 --- a/bridge-history-api/cross_msg/fetch_missing_event.go +++ b/bridge-history-api/cross_msg/fetch_missing_event.go @@ -6,21 +6,19 @@ import ( geth "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/log" "github.com/jmoiron/sqlx" backendabi "bridge-history-api/abi" "bridge-history-api/db" - "bridge-history-api/db/orm" "bridge-history-api/utils" ) // Todo : read from config var ( // the number of blocks fetch per round - FETCH_LIMIT = int64(3000) + fetchLimit = int64(3000) ) // FetchAndSave is a function type that fetches events from blockchain and saves them to database @@ -36,11 +34,6 @@ type FetchEventWorker struct { Name string } -type msgHashWrapper struct { - msgHash common.Hash - txHash common.Hash -} - func GetLatestL1ProcessedHeight(db db.OrmFactory) (int64, error) { crossHeight, err := db.GetLatestL1ProcessedHeight() if err != nil { @@ -70,15 +63,22 @@ func GetLatestL2ProcessedHeight(db db.OrmFactory) (int64, error) { log.Error("failed to get L2 relayed message processed height", "err", err) return 0, err } - if crossHeight > relayedHeight { - return crossHeight, nil - } else { - return relayedHeight, nil + l2SentHeight, err := db.GetLatestSentMsgHeightOnL2() + if err != nil { + log.Error("failed to get L2 sent message processed height", "err", err) + return 0, err } + maxHeight := crossHeight + if maxHeight < relayedHeight { + maxHeight = relayedHeight + } + if maxHeight < l2SentHeight { + maxHeight = l2SentHeight + } + return maxHeight, nil } func L1FetchAndSaveEvents(ctx context.Context, client *ethclient.Client, database db.OrmFactory, from int64, to int64, addrList []common.Address) error { - query := geth.FilterQuery{ FromBlock: big.NewInt(from), // inclusive ToBlock: big.NewInt(to), // inclusive @@ -99,7 +99,7 @@ func L1FetchAndSaveEvents(ctx context.Context, client *ethclient.Client, databas log.Warn("Failed to get l1 event logs", "err", err) return err } - depositL1CrossMsgs, msgHashes, relayedMsg, err := parseBackendL1EventLogs(logs) + depositL1CrossMsgs, msgHashes, relayedMsg, err := utils.ParseBackendL1EventLogs(logs) if err != nil { log.Error("l1FetchAndSaveEvents: Failed to parse cross msg event logs", "err", err) return err @@ -157,7 +157,7 @@ func L2FetchAndSaveEvents(ctx context.Context, client *ethclient.Client, databas log.Warn("Failed to get l2 event logs", "err", err) return err } - depositL2CrossMsgs, msgHashes, relayedMsg, err := parseBackendL2EventLogs(logs) + depositL2CrossMsgs, msgHashes, relayedMsg, l2sentMsgs, err := utils.ParseBackendL2EventLogs(logs) if err != nil { log.Error("l2FetchAndSaveEvents: Failed to parse cross msg event logs", "err", err) return err @@ -178,11 +178,19 @@ func L2FetchAndSaveEvents(ctx context.Context, client *ethclient.Client, databas dbTx.Rollback() log.Crit("l2FetchAndSaveEvents: Failed to insert relayed message event logs", "err", err) } + err = updateL2CrossMsgMsgHash(ctx, dbTx, database, msgHashes) if err != nil { dbTx.Rollback() log.Crit("l2FetchAndSaveEvents: Failed to update msgHash in L2 cross msg", "err", err) } + + err = database.BatchInsertL2SentMsgDBTx(dbTx, l2sentMsgs) + if err != nil { + dbTx.Rollback() + log.Crit("l2FetchAndSaveEvents: Failed to insert l2 sent message", "err", err) + } + err = dbTx.Commit() if err != nil { // if we can not insert into DB, there must something wrong, need a on-call member handle the dababase manually @@ -194,234 +202,61 @@ func L2FetchAndSaveEvents(ctx context.Context, client *ethclient.Client, databas return nil } -func parseBackendL1EventLogs(logs []types.Log) ([]*orm.CrossMsg, []msgHashWrapper, []*orm.RelayedMsg, error) { - // Need use contract abi to parse event Log - // Can only be tested after we have our contracts set up - - var l1CrossMsg []*orm.CrossMsg - var relayedMsgs []*orm.RelayedMsg - var msgHashes []msgHashWrapper - for _, vlog := range logs { - switch vlog.Topics[0] { - case backendabi.L1DepositETHSig: - event := backendabi.DepositETH{} - err := utils.UnpackLog(backendabi.L1ETHGatewayABI, &event, "DepositETH", vlog) - if err != nil { - log.Warn("Failed to unpack DepositETH event", "err", err) - return l1CrossMsg, msgHashes, relayedMsgs, err - } - l1CrossMsg = append(l1CrossMsg, &orm.CrossMsg{ - Height: vlog.BlockNumber, - Sender: event.From.String(), - Target: event.To.String(), - Amount: event.Amount.String(), - Asset: int(orm.ETH), - Layer1Hash: vlog.TxHash.Hex(), - }) - case backendabi.L1DepositERC20Sig: - event := backendabi.ERC20MessageEvent{} - err := utils.UnpackLog(backendabi.L1StandardERC20GatewayABI, &event, "DepositERC20", vlog) - if err != nil { - log.Warn("Failed to unpack DepositERC20 event", "err", err) - return l1CrossMsg, msgHashes, relayedMsgs, err - } - l1CrossMsg = append(l1CrossMsg, &orm.CrossMsg{ - Height: vlog.BlockNumber, - Sender: event.From.String(), - Target: event.To.String(), - Amount: event.Amount.String(), - Asset: int(orm.ERC20), - Layer1Hash: vlog.TxHash.Hex(), - Layer1Token: event.L1Token.Hex(), - Layer2Token: event.L2Token.Hex(), - }) - case backendabi.L1DepositERC721Sig: - event := backendabi.ERC721MessageEvent{} - err := utils.UnpackLog(backendabi.L1ERC721GatewayABI, &event, "DepositERC721", vlog) - if err != nil { - log.Warn("Failed to unpack DepositERC721 event", "err", err) - return l1CrossMsg, msgHashes, relayedMsgs, err - } - l1CrossMsg = append(l1CrossMsg, &orm.CrossMsg{ - Height: vlog.BlockNumber, - Sender: event.From.String(), - Target: event.To.String(), - Asset: int(orm.ERC721), - Layer1Hash: vlog.TxHash.Hex(), - Layer1Token: event.L1Token.Hex(), - Layer2Token: event.L2Token.Hex(), - TokenID: event.TokenID.Uint64(), - }) - case backendabi.L1DepositERC1155Sig: - event := backendabi.ERC1155MessageEvent{} - err := utils.UnpackLog(backendabi.L1ERC1155GatewayABI, &event, "DepositERC1155", vlog) - if err != nil { - log.Warn("Failed to unpack DepositERC1155 event", "err", err) - return l1CrossMsg, msgHashes, relayedMsgs, err - } - l1CrossMsg = append(l1CrossMsg, &orm.CrossMsg{ - Height: vlog.BlockNumber, - Sender: event.From.String(), - Target: event.To.String(), - Asset: int(orm.ERC1155), - Layer1Hash: vlog.TxHash.Hex(), - Layer1Token: event.L1Token.Hex(), - Layer2Token: event.L2Token.Hex(), - TokenID: event.TokenID.Uint64(), - Amount: event.Amount.String(), - }) - case backendabi.L1SentMessageEventSignature: - event := backendabi.L1SentMessageEvent{} - err := utils.UnpackLog(backendabi.L1ScrollMessengerABI, &event, "SentMessage", vlog) - if err != nil { - log.Warn("Failed to unpack SentMessage event", "err", err) - return l1CrossMsg, msgHashes, relayedMsgs, err - } - msgHash := utils.ComputeMessageHash(event.Sender, event.Target, event.Value, event.MessageNonce, event.Message) - msgHashes = append(msgHashes, msgHashWrapper{ - msgHash: msgHash, - txHash: vlog.TxHash}) - case backendabi.L1RelayedMessageEventSignature: - event := backendabi.L1RelayedMessageEvent{} - err := utils.UnpackLog(backendabi.L1ScrollMessengerABI, &event, "RelayedMessage", vlog) - if err != nil { - log.Warn("Failed to unpack RelayedMessage event", "err", err) - return l1CrossMsg, msgHashes, relayedMsgs, err - } - relayedMsgs = append(relayedMsgs, &orm.RelayedMsg{ - MsgHash: event.MessageHash.String(), - Height: vlog.BlockNumber, - Layer1Hash: vlog.TxHash.Hex(), - }) - - } - +func FetchAndSaveBatchIndex(ctx context.Context, client *ethclient.Client, database db.OrmFactory, from int64, to int64, scrollChainAddr common.Address) error { + query := geth.FilterQuery{ + FromBlock: big.NewInt(from), // inclusive + ToBlock: big.NewInt(to), // inclusive + Addresses: []common.Address{scrollChainAddr}, + Topics: make([][]common.Hash, 1), } - return l1CrossMsg, msgHashes, relayedMsgs, nil + query.Topics[0] = make([]common.Hash, 1) + query.Topics[0][0] = backendabi.L1CommitBatchEventSignature + logs, err := client.FilterLogs(ctx, query) + if err != nil { + log.Warn("Failed to get batch commit event logs", "err", err) + return err + } + rollupBatches, err := utils.ParseBatchInfoFromScrollChain(ctx, client, logs) + if err != nil { + log.Error("FetchAndSaveBatchIndex: Failed to parse batch commit msg event logs", "err", err) + return err + } + dbTx, err := database.Beginx() + if err != nil { + log.Error("FetchAndSaveBatchIndex: Failed to begin db transaction", "err", err) + return err + } + err = database.BatchInsertRollupBatchDBTx(dbTx, rollupBatches) + if err != nil { + dbTx.Rollback() + log.Crit("FetchAndSaveBatchIndex: Failed to insert batch commit msg event logs", "err", err) + } + err = dbTx.Commit() + if err != nil { + // if we can not insert into DB, there must something wrong, need a on-call member handle the dababase manually + dbTx.Rollback() + log.Error("FetchAndSaveBatchIndex: Failed to commit db transaction", "err", err) + return err + } + return nil } -func parseBackendL2EventLogs(logs []types.Log) ([]*orm.CrossMsg, []msgHashWrapper, []*orm.RelayedMsg, error) { - // Need use contract abi to parse event Log - // Can only be tested after we have our contracts set up - - var l2CrossMsg []*orm.CrossMsg - var relayedMsgs []*orm.RelayedMsg - var msgHashes []msgHashWrapper - for _, vlog := range logs { - switch vlog.Topics[0] { - case backendabi.L2WithdrawETHSig: - event := backendabi.DepositETH{} - err := utils.UnpackLog(backendabi.L2ETHGatewayABI, &event, "WithdrawETH", vlog) - if err != nil { - log.Warn("Failed to unpack WithdrawETH event", "err", err) - return l2CrossMsg, msgHashes, relayedMsgs, err - } - l2CrossMsg = append(l2CrossMsg, &orm.CrossMsg{ - Height: vlog.BlockNumber, - Sender: event.From.String(), - Target: event.To.String(), - Amount: event.Amount.String(), - Asset: int(orm.ETH), - Layer2Hash: vlog.TxHash.Hex(), - }) - case backendabi.L2WithdrawERC20Sig: - event := backendabi.ERC20MessageEvent{} - err := utils.UnpackLog(backendabi.L2StandardERC20GatewayABI, &event, "WithdrawERC20", vlog) - if err != nil { - log.Warn("Failed to unpack WithdrawERC20 event", "err", err) - return l2CrossMsg, msgHashes, relayedMsgs, err - } - l2CrossMsg = append(l2CrossMsg, &orm.CrossMsg{ - Height: vlog.BlockNumber, - Sender: event.From.String(), - Target: event.To.String(), - Amount: event.Amount.String(), - Asset: int(orm.ERC20), - Layer2Hash: vlog.TxHash.Hex(), - Layer1Token: event.L1Token.Hex(), - Layer2Token: event.L2Token.Hex(), - }) - case backendabi.L2WithdrawERC721Sig: - event := backendabi.ERC721MessageEvent{} - err := utils.UnpackLog(backendabi.L2ERC721GatewayABI, &event, "WithdrawERC721", vlog) - if err != nil { - log.Warn("Failed to unpack WithdrawERC721 event", "err", err) - return l2CrossMsg, msgHashes, relayedMsgs, err - } - l2CrossMsg = append(l2CrossMsg, &orm.CrossMsg{ - Height: vlog.BlockNumber, - Sender: event.From.String(), - Target: event.To.String(), - Asset: int(orm.ERC721), - Layer2Hash: vlog.TxHash.Hex(), - Layer1Token: event.L1Token.Hex(), - Layer2Token: event.L2Token.Hex(), - TokenID: event.TokenID.Uint64(), - }) - case backendabi.L2WithdrawERC1155Sig: - event := backendabi.ERC1155MessageEvent{} - err := utils.UnpackLog(backendabi.L2ERC1155GatewayABI, &event, "WithdrawERC1155", vlog) - if err != nil { - log.Warn("Failed to unpack WithdrawERC1155 event", "err", err) - return l2CrossMsg, msgHashes, relayedMsgs, err - } - l2CrossMsg = append(l2CrossMsg, &orm.CrossMsg{ - Height: vlog.BlockNumber, - Sender: event.From.String(), - Target: event.To.String(), - Asset: int(orm.ERC1155), - Layer2Hash: vlog.TxHash.Hex(), - Layer1Token: event.L1Token.Hex(), - Layer2Token: event.L2Token.Hex(), - TokenID: event.TokenID.Uint64(), - Amount: event.Amount.String(), - }) - case backendabi.L2SentMessageEventSignature: - event := backendabi.L2SentMessageEvent{} - err := utils.UnpackLog(backendabi.L2ScrollMessengerABI, &event, "SentMessage", vlog) - if err != nil { - log.Warn("Failed to unpack SentMessage event", "err", err) - return l2CrossMsg, msgHashes, relayedMsgs, err - } - msgHash := utils.ComputeMessageHash(event.Sender, event.Target, event.Value, event.MessageNonce, event.Message) - msgHashes = append(msgHashes, msgHashWrapper{ - msgHash: msgHash, - txHash: vlog.TxHash}) - case backendabi.L2RelayedMessageEventSignature: - event := backendabi.L2RelayedMessageEvent{} - err := utils.UnpackLog(backendabi.L2ScrollMessengerABI, &event, "RelayedMessage", vlog) - if err != nil { - log.Warn("Failed to unpack RelayedMessage event", "err", err) - return l2CrossMsg, msgHashes, relayedMsgs, err - } - relayedMsgs = append(relayedMsgs, &orm.RelayedMsg{ - MsgHash: event.MessageHash.String(), - Height: vlog.BlockNumber, - Layer2Hash: vlog.TxHash.Hex(), - }) - - } - - } - return l2CrossMsg, msgHashes, relayedMsgs, nil -} - -func updateL1CrossMsgMsgHash(ctx context.Context, dbTx *sqlx.Tx, database db.OrmFactory, msgHashes []msgHashWrapper) error { +func updateL1CrossMsgMsgHash(ctx context.Context, dbTx *sqlx.Tx, database db.OrmFactory, msgHashes []utils.MsgHashWrapper) error { for _, msgHash := range msgHashes { - err := database.UpdateL1CrossMsgHashDBTx(ctx, dbTx, msgHash.txHash, msgHash.msgHash) + err := database.UpdateL1CrossMsgHashDBTx(ctx, dbTx, msgHash.TxHash, msgHash.MsgHash) if err != nil { - log.Error("updateL1CrossMsgMsgHash: can not update layer1 cross msg MsgHash", "layer1 hash", msgHash.txHash, "err", err) + log.Error("updateL1CrossMsgMsgHash: can not update layer1 cross msg MsgHash", "layer1 hash", msgHash.TxHash, "err", err) continue } } return nil } -func updateL2CrossMsgMsgHash(ctx context.Context, dbTx *sqlx.Tx, database db.OrmFactory, msgHashes []msgHashWrapper) error { +func updateL2CrossMsgMsgHash(ctx context.Context, dbTx *sqlx.Tx, database db.OrmFactory, msgHashes []utils.MsgHashWrapper) error { for _, msgHash := range msgHashes { - err := database.UpdateL2CrossMsgHashDBTx(ctx, dbTx, msgHash.txHash, msgHash.msgHash) + err := database.UpdateL2CrossMsgHashDBTx(ctx, dbTx, msgHash.TxHash, msgHash.MsgHash) if err != nil { - log.Error("updateL2CrossMsgMsgHash: can not update layer2 cross msg MsgHash", "layer2 hash", msgHash.txHash, "err", err) + log.Error("updateL2CrossMsgMsgHash: can not update layer2 cross msg MsgHash", "layer2 hash", msgHash.TxHash, "err", err) continue } } diff --git a/bridge-history-api/cross_msg/message_proof/msg_proof_updater.go b/bridge-history-api/cross_msg/message_proof/msg_proof_updater.go new file mode 100644 index 000000000..458931d94 --- /dev/null +++ b/bridge-history-api/cross_msg/message_proof/msg_proof_updater.go @@ -0,0 +1,241 @@ +package message_proof + +import ( + "context" + "database/sql" + "errors" + "fmt" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + + "bridge-history-api/db" + "bridge-history-api/db/orm" +) + +type MsgProofUpdater struct { + ctx context.Context + db db.OrmFactory + withdrawTrie *WithdrawTrie +} + +func NewMsgProofUpdater(ctx context.Context, confirmations uint64, startBlock uint64, db db.OrmFactory) *MsgProofUpdater { + return &MsgProofUpdater{ + ctx: ctx, + db: db, + withdrawTrie: NewWithdrawTrie(), + } +} + +func (m *MsgProofUpdater) Start() { + log.Info("MsgProofUpdater Start") + m.initialize(m.ctx) + go func() { + tick := time.NewTicker(10 * time.Second) + for { + select { + case <-m.ctx.Done(): + tick.Stop() + return + case <-tick.C: + latestBatch, err := m.db.GetLatestRollupBatch() + if err != nil { + log.Warn("MsgProofUpdater: Can not get latest RollupBatch: ", "err", err) + continue + } + if latestBatch == nil { + continue + } + latestBatchIndexWithProof, err := m.db.GetLatestL2SentMsgBatchIndex() + if err != nil { + log.Error("MsgProofUpdater: Can not get latest L2SentMsgBatchIndex: ", "err", err) + continue + } + var start uint64 + if latestBatchIndexWithProof < 0 { + start = 1 + } else { + start = uint64(latestBatchIndexWithProof) + 1 + } + for i := start; i <= latestBatch.BatchIndex; i++ { + batch, err := m.db.GetRollupBatchByIndex(i) + if err != nil { + log.Error("MsgProofUpdater: Can not get RollupBatch: ", "err", err, "index", i) + break + } + // get all l2 messages in this batch + msgs, proofs, err := m.appendL2Messages(batch.StartBlockNumber, batch.EndBlockNumber) + if err != nil { + log.Error("MsgProofUpdater: can not append l2messages", "startBlockNumber", batch.StartBlockNumber, "endBlockNumber", batch.EndBlockNumber, "err", err) + break + } + err = m.updateMsgProof(msgs, proofs, batch.BatchIndex) + if err != nil { + log.Error("MsgProofUpdater: can not update msg proof", "err", err) + break + } + } + + } + } + }() + +} + +func (m *MsgProofUpdater) Stop() { + log.Info("MsgProofUpdater Stop") +} + +func (m *MsgProofUpdater) initialize(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + default: + err := m.initializeWithdrawTrie() + if err != nil { + log.Error("can not initialize withdraw trie", "err", err) + // give it some time to retry + time.Sleep(10 * time.Second) + continue + } + return + } + } +} + +func (m *MsgProofUpdater) initializeWithdrawTrie() error { + var batch *orm.RollupBatch + firstMsg, err := m.db.GetL2SentMessageByNonce(0) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return fmt.Errorf("failed to get first l2 message: %v", err) + } + // no l2 message + // TO DO: check if we realy dont have l2 sent message with nonce 0 + if firstMsg == nil { + log.Info("No first l2sentmsg in db") + return nil + } + + // if no batch, return and wait for next try round + batch, err = m.db.GetLatestRollupBatch() + if err != nil { + return fmt.Errorf("failed to get latest batch: %v", err) + } + if batch == nil { + return fmt.Errorf("no batch found") + } + + var batches []*orm.RollupBatch + batchIndex := batch.BatchIndex + for { + var msg *orm.L2SentMsg + msg, err = m.db.GetLatestL2SentMsgLEHeight(batch.EndBlockNumber) + if err != nil { + log.Warn("failed to get l2 sent message less than height", "endBlocknum", batch.EndBlockNumber, "err", err) + } + if msg != nil && msg.MsgProof != "" { + log.Info("Found latest l2 sent msg with proof: ", "msg_proof", msg.MsgProof, "height", msg.Height, "msg_hash", msg.MsgHash) + // initialize withdrawTrie + proofBytes := common.Hex2Bytes(msg.MsgProof) + m.withdrawTrie.Initialize(msg.Nonce, common.HexToHash(msg.MsgHash), proofBytes) + break + } + + // append unprocessed batch + batches = append(batches, batch) + + if batchIndex == 1 { + // otherwise overflow + // and batchIndex 0 is not in DB + // To Do: check if we dont have batch with index 0 in future + break + } + // iterate for next batch + batchIndex-- + + batch, err = m.db.GetRollupBatchByIndex(batchIndex) + if err != nil { + return fmt.Errorf("failed to get block batch %v: %v", batchIndex, err) + } + } + + log.Info("Build withdraw trie with pending messages") + for i := len(batches) - 1; i >= 0; i-- { + b := batches[i] + msgs, proofs, err := m.appendL2Messages(b.StartBlockNumber, b.EndBlockNumber) + if err != nil { + return err + } + + err = m.updateMsgProof(msgs, proofs, b.BatchIndex) + if err != nil { + return err + } + } + log.Info("Build withdraw trie finished") + + return nil +} + +func (m *MsgProofUpdater) updateMsgProof(msgs []*orm.L2SentMsg, proofs [][]byte, batchIndex uint64) error { + if len(msgs) == 0 { + return nil + } + // this should not happend, but double checked + if len(msgs) != len(proofs) { + return fmt.Errorf("illegal state: len(msgs) != len(proofs)") + } + dbTx, err := m.db.Beginx() + if err != nil { + return err + } + + for i, msg := range msgs { + proofHex := common.Bytes2Hex(proofs[i]) + log.Debug("updateMsgProof", "msgHash", msg.MsgHash, "batchIndex", batchIndex, "proof", proofHex) + if dbTxErr := m.db.UpdateL2MessageProofInDBTx(m.ctx, dbTx, msg.MsgHash, proofHex, batchIndex); dbTxErr != nil { + if err := dbTx.Rollback(); err != nil { + log.Error("dbTx.Rollback()", "err", err) + } + return dbTxErr + } + } + + if dbTxErr := dbTx.Commit(); dbTxErr != nil { + if err := dbTx.Rollback(); err != nil { + log.Error("dbTx.Rollback()", "err", err) + } + return dbTxErr + } + + return nil +} + +// appendL2Messages will append all messages between firstBlock and lastBlock (both inclusive) to withdrawTrie and compute corresponding merkle proof of each message. +func (m *MsgProofUpdater) appendL2Messages(firstBlock, lastBlock uint64) ([]*orm.L2SentMsg, [][]byte, error) { + var msgProofs [][]byte + messages, err := m.db.GetL2SentMsgMsgHashByHeightRange(firstBlock, lastBlock) + if err != nil { + log.Error("GetL2SentMsgMsgHashByHeightRange failed", "error", err, "firstBlock", firstBlock, "lastBlock", lastBlock) + return messages, msgProofs, err + } + if len(messages) == 0 { + return messages, msgProofs, nil + } + + // double check whether nonce is matched + if messages[0].Nonce != m.withdrawTrie.NextMessageNonce { + log.Error("L2 message nonce mismatch", "expected", m.withdrawTrie.NextMessageNonce, "found", messages[0].Nonce) + return messages, msgProofs, fmt.Errorf("l2 message nonce mismatch, expected: %v, found: %v", m.withdrawTrie.NextMessageNonce, messages[0].Nonce) + } + + var hashes []common.Hash + for _, msg := range messages { + hashes = append(hashes, common.HexToHash(msg.MsgHash)) + } + msgProofs = m.withdrawTrie.AppendMessages(hashes) + + return messages, msgProofs, nil +} diff --git a/bridge-history-api/cross_msg/withdraw_trie.go b/bridge-history-api/cross_msg/message_proof/withdraw_trie.go similarity index 99% rename from bridge-history-api/cross_msg/withdraw_trie.go rename to bridge-history-api/cross_msg/message_proof/withdraw_trie.go index d09fdb7e2..331a1a8b5 100644 --- a/bridge-history-api/cross_msg/withdraw_trie.go +++ b/bridge-history-api/cross_msg/message_proof/withdraw_trie.go @@ -1,4 +1,4 @@ -package cross_msg +package message_proof import ( "github.com/ethereum/go-ethereum/common" @@ -42,7 +42,6 @@ func NewWithdrawTrie() *WithdrawTrie { func (w *WithdrawTrie) Initialize(currentMessageNonce uint64, msgHash common.Hash, proofBytes []byte) { proof := DecodeBytesToMerkleProof(proofBytes) branches := RecoverBranchFromProof(proof, currentMessageNonce, msgHash) - w.height = len(proof) w.branches = branches w.NextMessageNonce = currentMessageNonce + 1 diff --git a/bridge-history-api/cross_msg/withdraw_trie_test.go b/bridge-history-api/cross_msg/message_proof/withdraw_trie_test.go similarity index 73% rename from bridge-history-api/cross_msg/withdraw_trie_test.go rename to bridge-history-api/cross_msg/message_proof/withdraw_trie_test.go index 845d41d13..e003a9cea 100644 --- a/bridge-history-api/cross_msg/withdraw_trie_test.go +++ b/bridge-history-api/cross_msg/message_proof/withdraw_trie_test.go @@ -1,4 +1,4 @@ -package cross_msg_test +package message_proof import ( "math/big" @@ -7,7 +7,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/assert" - "bridge-history-api/cross_msg" "bridge-history-api/utils" ) @@ -19,29 +18,29 @@ func TestUpdateBranchWithNewMessage(t *testing.T) { zeroes[i] = utils.Keccak2(zeroes[i-1], zeroes[i-1]) } - cross_msg.UpdateBranchWithNewMessage(zeroes, branches, 0, common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000001")) + UpdateBranchWithNewMessage(zeroes, branches, 0, common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000001")) if branches[0] != common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000001") { t.Fatalf("Invalid root, want %s, got %s", "0x0000000000000000000000000000000000000000000000000000000000000001", branches[0].Hex()) } - cross_msg.UpdateBranchWithNewMessage(zeroes, branches, 1, common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000002")) + UpdateBranchWithNewMessage(zeroes, branches, 1, common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000002")) if branches[1] != common.HexToHash("0xe90b7bceb6e7df5418fb78d8ee546e97c83a08bbccc01a0644d599ccd2a7c2e0") { t.Fatalf("Invalid root, want %s, got %s", "0xe90b7bceb6e7df5418fb78d8ee546e97c83a08bbccc01a0644d599ccd2a7c2e0", branches[1].Hex()) } - cross_msg.UpdateBranchWithNewMessage(zeroes, branches, 2, common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000003")) + UpdateBranchWithNewMessage(zeroes, branches, 2, common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000003")) if branches[2] != common.HexToHash("0x222ff5e0b5877792c2bc1670e2ccd0c2c97cd7bb1672a57d598db05092d3d72c") { t.Fatalf("Invalid root, want %s, got %s", "0x222ff5e0b5877792c2bc1670e2ccd0c2c97cd7bb1672a57d598db05092d3d72c", branches[2].Hex()) } - cross_msg.UpdateBranchWithNewMessage(zeroes, branches, 3, common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000004")) + UpdateBranchWithNewMessage(zeroes, branches, 3, common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000004")) if branches[2] != common.HexToHash("0xa9bb8c3f1f12e9aa903a50c47f314b57610a3ab32f2d463293f58836def38d36") { t.Fatalf("Invalid root, want %s, got %s", "0xa9bb8c3f1f12e9aa903a50c47f314b57610a3ab32f2d463293f58836def38d36", branches[2].Hex()) } } func TestDecodeEncodeMerkleProof(t *testing.T) { - proof := cross_msg.DecodeBytesToMerkleProof(common.Hex2Bytes("2ebffc1a6671c51e30777a680904b103992630ec995b6e6ff76a04d5259d49012ebffc1a6671c51e30777a680904b103992630ec995b6e6ff76a04d5259d49022ebffc1a6671c51e30777a680904b103992630ec995b6e6ff76a04d5259d49032ebffc1a6671c51e30777a680904b103992630ec995b6e6ff76a04d5259d4904")) + proof := DecodeBytesToMerkleProof(common.Hex2Bytes("2ebffc1a6671c51e30777a680904b103992630ec995b6e6ff76a04d5259d49012ebffc1a6671c51e30777a680904b103992630ec995b6e6ff76a04d5259d49022ebffc1a6671c51e30777a680904b103992630ec995b6e6ff76a04d5259d49032ebffc1a6671c51e30777a680904b103992630ec995b6e6ff76a04d5259d4904")) if len(proof) != 4 { t.Fatalf("proof length mismatch, want %d, got %d", 4, len(proof)) } @@ -58,7 +57,7 @@ func TestDecodeEncodeMerkleProof(t *testing.T) { t.Fatalf("proof[3] mismatch, want %s, got %s", "0x2ebffc1a6671c51e30777a680904b103992630ec995b6e6ff76a04d5259d4904", proof[0].Hex()) } - bytes := cross_msg.EncodeMerkleProofToBytes(proof) + bytes := EncodeMerkleProofToBytes(proof) if common.Bytes2Hex(bytes) != "2ebffc1a6671c51e30777a680904b103992630ec995b6e6ff76a04d5259d49012ebffc1a6671c51e30777a680904b103992630ec995b6e6ff76a04d5259d49022ebffc1a6671c51e30777a680904b103992630ec995b6e6ff76a04d5259d49032ebffc1a6671c51e30777a680904b103992630ec995b6e6ff76a04d5259d4904" { t.Fatalf("wrong encoded bytes") } @@ -72,32 +71,32 @@ func TestRecoverBranchFromProof(t *testing.T) { zeroes[i] = utils.Keccak2(zeroes[i-1], zeroes[i-1]) } - proof := cross_msg.UpdateBranchWithNewMessage(zeroes, branches, 0, common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000001")) - tmpBranches := cross_msg.RecoverBranchFromProof(proof, 0, common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000001")) + proof := UpdateBranchWithNewMessage(zeroes, branches, 0, common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000001")) + tmpBranches := RecoverBranchFromProof(proof, 0, common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000001")) for i := 0; i < 64; i++ { if tmpBranches[i] != branches[i] { t.Fatalf("Invalid branch, want %s, got %s", branches[i].Hex(), tmpBranches[i].Hex()) } } - proof = cross_msg.UpdateBranchWithNewMessage(zeroes, branches, 1, common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000002")) - tmpBranches = cross_msg.RecoverBranchFromProof(proof, 1, common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000002")) + proof = UpdateBranchWithNewMessage(zeroes, branches, 1, common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000002")) + tmpBranches = RecoverBranchFromProof(proof, 1, common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000002")) for i := 0; i < 64; i++ { if tmpBranches[i] != branches[i] { t.Fatalf("Invalid branch, want %s, got %s", branches[i].Hex(), tmpBranches[i].Hex()) } } - proof = cross_msg.UpdateBranchWithNewMessage(zeroes, branches, 2, common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000003")) - tmpBranches = cross_msg.RecoverBranchFromProof(proof, 2, common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000003")) + proof = UpdateBranchWithNewMessage(zeroes, branches, 2, common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000003")) + tmpBranches = RecoverBranchFromProof(proof, 2, common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000003")) for i := 0; i < 64; i++ { if tmpBranches[i] != branches[i] { t.Fatalf("Invalid branch, want %s, got %s", branches[i].Hex(), tmpBranches[i].Hex()) } } - proof = cross_msg.UpdateBranchWithNewMessage(zeroes, branches, 3, common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000004")) - tmpBranches = cross_msg.RecoverBranchFromProof(proof, 3, common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000004")) + proof = UpdateBranchWithNewMessage(zeroes, branches, 3, common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000004")) + tmpBranches = RecoverBranchFromProof(proof, 3, common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000004")) for i := 0; i < 64; i++ { if tmpBranches[i] != branches[i] { t.Fatalf("Invalid branch, want %s, got %s", branches[i].Hex(), tmpBranches[i].Hex()) @@ -107,7 +106,7 @@ func TestRecoverBranchFromProof(t *testing.T) { func TestWithdrawTrieOneByOne(t *testing.T) { for initial := 0; initial < 128; initial++ { - withdrawTrie := cross_msg.NewWithdrawTrie() + withdrawTrie := NewWithdrawTrie() var hashes []common.Hash for i := 0; i < initial; i++ { hash := common.BigToHash(big.NewInt(int64(i + 1))) @@ -126,7 +125,7 @@ func TestWithdrawTrieOneByOne(t *testing.T) { }) assert.Equal(t, withdrawTrie.NextMessageNonce, uint64(i+1)) assert.Equal(t, expectedRoot.String(), withdrawTrie.MessageRoot().String()) - proof := cross_msg.DecodeBytesToMerkleProof(proofBytes[0]) + proof := DecodeBytesToMerkleProof(proofBytes[0]) verifiedRoot := verifyMerkleProof(uint64(i), hash, proof) assert.Equal(t, expectedRoot.String(), verifiedRoot.String()) } @@ -153,7 +152,7 @@ func TestWithdrawTrieMultiple(t *testing.T) { } for finish := initial; finish < 100; finish++ { - withdrawTrie := cross_msg.NewWithdrawTrie() + withdrawTrie := NewWithdrawTrie() withdrawTrie.AppendMessages(hashes) var newHashes []common.Hash @@ -167,7 +166,7 @@ func TestWithdrawTrieMultiple(t *testing.T) { for i := initial; i <= finish; i++ { hash := common.BigToHash(big.NewInt(int64(i + 1))) - proof := cross_msg.DecodeBytesToMerkleProof(proofBytes[i-initial]) + proof := DecodeBytesToMerkleProof(proofBytes[i-initial]) verifiedRoot := verifyMerkleProof(uint64(i), hash, proof) assert.Equal(t, expectedRoots[finish].String(), verifiedRoot.String()) } diff --git a/bridge-history-api/cross_msg/reorg_handle.go b/bridge-history-api/cross_msg/reorg_handle.go index 8acb55430..6ed912dc1 100644 --- a/bridge-history-api/cross_msg/reorg_handle.go +++ b/bridge-history-api/cross_msg/reorg_handle.go @@ -94,6 +94,11 @@ func L2ReorgHandling(ctx context.Context, reorgHeight int64, db db.OrmFactory) e dbTx.Rollback() log.Crit("delete l2 relayed hash from height", "height", reorgHeight, "err", err) } + err = db.DeleteL2SentMsgAfterHeightDBTx(dbTx, reorgHeight) + if err != nil { + dbTx.Rollback() + log.Crit("delete l2 sent msg from height", "height", reorgHeight, "err", err) + } err = dbTx.Commit() if err != nil { dbTx.Rollback() diff --git a/bridge-history-api/db/migrate/migrations/00003_l2_sent_msg.sql b/bridge-history-api/db/migrate/migrations/00003_l2_sent_msg.sql new file mode 100644 index 000000000..41174e177 --- /dev/null +++ b/bridge-history-api/db/migrate/migrations/00003_l2_sent_msg.sql @@ -0,0 +1,57 @@ +-- +goose Up +-- +goose StatementBegin +create table l2_sent_msg +( + id BIGSERIAL PRIMARY KEY, + sender VARCHAR NOT NULL, + target VARCHAR NOT NULL, + value VARCHAR NOT NULL, + msg_hash VARCHAR NOT NULL, + height BIGINT NOT NULL, + nonce BIGINT NOT NULL, + batch_index BIGINT NOT NULL DEFAULT 0, + msg_proof TEXT NOT NULL DEFAULT '', + msg_data TEXT NOT NULL DEFAULT '', + is_deleted BOOLEAN NOT NULL DEFAULT FALSE, + created_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP, + deleted_at TIMESTAMP(0) DEFAULT NULL +); + +comment +on column l2_sent_msg.is_deleted is 'NotDeleted, Deleted'; + +CREATE OR REPLACE FUNCTION update_timestamp() +RETURNS TRIGGER AS $$ +BEGIN + NEW.updated_at = CURRENT_TIMESTAMP; + RETURN NEW; +END; +$$ language 'plpgsql'; + +CREATE TRIGGER update_timestamp BEFORE UPDATE +ON l2_sent_msg FOR EACH ROW EXECUTE PROCEDURE +update_timestamp(); + +CREATE OR REPLACE FUNCTION deleted_at_trigger() +RETURNS TRIGGER AS $$ +BEGIN + IF NEW.is_deleted AND OLD.is_deleted != NEW.is_deleted THEN + UPDATE l2_sent_msg SET deleted_at = NOW() WHERE id = NEW.id; + END IF; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER deleted_at_trigger +AFTER UPDATE ON l2_sent_msg +FOR EACH ROW +EXECUTE FUNCTION deleted_at_trigger(); + + +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +drop table if exists l2_sent_msg; +-- +goose StatementEnd \ No newline at end of file diff --git a/bridge-history-api/db/migrate/migrations/00004_rollup_batch.sql b/bridge-history-api/db/migrate/migrations/00004_rollup_batch.sql new file mode 100644 index 000000000..301f68c54 --- /dev/null +++ b/bridge-history-api/db/migrate/migrations/00004_rollup_batch.sql @@ -0,0 +1,49 @@ +-- +goose Up +-- +goose StatementBegin +create table rollup_batch +( + id BIGSERIAL PRIMARY KEY, + batch_index BIGINT NOT NULL, + commit_height BIGINT NOT NULL, + start_block_number BIGINT NOT NULL, + end_block_number BIGINT NOT NULL, + batch_hash VARCHAR NOT NULL, + is_deleted BOOLEAN NOT NULL DEFAULT FALSE, + created_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP(0) NOT NULL DEFAULT CURRENT_TIMESTAMP, + deleted_at TIMESTAMP(0) DEFAULT NULL +); + +CREATE OR REPLACE FUNCTION update_timestamp() +RETURNS TRIGGER AS $$ +BEGIN + NEW.updated_at = CURRENT_TIMESTAMP; + RETURN NEW; +END; +$$ language 'plpgsql'; + +CREATE TRIGGER update_timestamp BEFORE UPDATE +ON rollup_batch FOR EACH ROW EXECUTE PROCEDURE +update_timestamp(); + +CREATE OR REPLACE FUNCTION deleted_at_trigger() +RETURNS TRIGGER AS $$ +BEGIN + IF NEW.is_deleted AND OLD.is_deleted != NEW.is_deleted THEN + UPDATE rollup_batch SET deleted_at = NOW() WHERE id = NEW.id; + END IF; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER deleted_at_trigger +AFTER UPDATE ON rollup_batch +FOR EACH ROW +EXECUTE FUNCTION deleted_at_trigger(); + +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +drop table if exists rollup_batch; +-- +goose StatementEnd \ No newline at end of file diff --git a/bridge-history-api/db/orm/batch.go b/bridge-history-api/db/orm/batch.go new file mode 100644 index 000000000..a37d7aea3 --- /dev/null +++ b/bridge-history-api/db/orm/batch.go @@ -0,0 +1,79 @@ +package orm + +import ( + "database/sql" + "fmt" + + "github.com/ethereum/go-ethereum/log" + "github.com/jmoiron/sqlx" +) + +type rollupBatchOrm struct { + db *sqlx.DB +} + +type RollupBatch struct { + ID uint64 `json:"id" db:"id"` + BatchIndex uint64 `json:"batch_index" db:"batch_index"` + BatchHash string `json:"batch_hash" db:"batch_hash"` + CommitHeight uint64 `json:"commit_height" db:"commit_height"` + StartBlockNumber uint64 `json:"start_block_number" db:"start_block_number"` + EndBlockNumber uint64 `json:"end_block_number" db:"end_block_number"` +} + +// NewRollupBatchOrm create an NewRollupBatchOrm instance +func NewRollupBatchOrm(db *sqlx.DB) RollupBatchOrm { + return &rollupBatchOrm{db: db} +} + +func (b *rollupBatchOrm) BatchInsertRollupBatchDBTx(dbTx *sqlx.Tx, batches []*RollupBatch) error { + if len(batches) == 0 { + return nil + } + var err error + batchMaps := make([]map[string]interface{}, len(batches)) + for i, batch := range batches { + batchMaps[i] = map[string]interface{}{ + "commit_height": batch.CommitHeight, + "batch_index": batch.BatchIndex, + "batch_hash": batch.BatchHash, + "start_block_number": batch.StartBlockNumber, + "end_block_number": batch.EndBlockNumber, + } + var exists bool + err = dbTx.QueryRow(`SELECT EXISTS(SELECT 1 FROM rollup_batch WHERE batch_index = $1 AND NOT is_deleted)`, batch.BatchIndex).Scan(&exists) + if err != nil { + return err + } + if exists { + return fmt.Errorf("BatchInsertRollupBatchDBTx: batch index %v already exists at height %v", batch.BatchIndex, batch.CommitHeight) + } + } + _, err = dbTx.NamedExec(`insert into rollup_batch(commit_height, batch_index, batch_hash, start_block_number, end_block_number) values(:commit_height, :batch_index, :batch_hash, :start_block_number, :end_block_number);`, batchMaps) + if err != nil { + log.Error("BatchInsertRollupBatchDBTx: failed to insert batch event msgs", "err", err) + return err + } + return nil +} + +func (b *rollupBatchOrm) GetLatestRollupBatch() (*RollupBatch, error) { + result := &RollupBatch{} + row := b.db.QueryRowx(`SELECT id, batch_index, commit_height, batch_hash, start_block_number, end_block_number FROM rollup_batch ORDER BY batch_index DESC LIMIT 1;`) + if err := row.StructScan(result); err != nil { + if err == sql.ErrNoRows { + return nil, nil + } + return nil, err + } + return result, nil +} + +func (b *rollupBatchOrm) GetRollupBatchByIndex(index uint64) (*RollupBatch, error) { + result := &RollupBatch{} + row := b.db.QueryRowx(`SELECT id, batch_index, batch_hash, commit_height, start_block_number, end_block_number FROM rollup_batch WHERE batch_index = $1;`, index) + if err := row.StructScan(result); err != nil { + return nil, err + } + return result, nil +} diff --git a/bridge-history-api/db/orm/interface.go b/bridge-history-api/db/orm/interface.go index 1df21a89c..63b9f35b8 100644 --- a/bridge-history-api/db/orm/interface.go +++ b/bridge-history-api/db/orm/interface.go @@ -72,27 +72,27 @@ type L1CrossMsgOrm interface { GetL1CrossMsgByHash(l1Hash common.Hash) (*CrossMsg, error) GetL1CrossMsgsByAddress(sender common.Address) ([]*CrossMsg, error) BatchInsertL1CrossMsgDBTx(dbTx *sqlx.Tx, messages []*CrossMsg) error - // UpdateL1CrossMsgHash invoked when SentMessage event is received + // UpdateL1CrossMsgHashDBTx invoked when SentMessage event is received UpdateL1CrossMsgHashDBTx(ctx context.Context, dbTx *sqlx.Tx, l1Hash, msgHash common.Hash) error UpdateL1CrossMsgHash(ctx context.Context, l1Hash, msgHash common.Hash) error GetLatestL1ProcessedHeight() (int64, error) DeleteL1CrossMsgAfterHeightDBTx(dbTx *sqlx.Tx, height int64) error - UpdateL1Blocktimestamp(height uint64, timestamp time.Time) error - GetL1EarliestNoBlocktimestampHeight() (uint64, error) + UpdateL1BlockTimestamp(height uint64, timestamp time.Time) error + GetL1EarliestNoBlockTimestampHeight() (uint64, error) } -// L2CrossMsgOrm provides operations on l2_cross_message table +// L2CrossMsgOrm provides operations on cross_message table type L2CrossMsgOrm interface { GetL2CrossMsgByHash(l2Hash common.Hash) (*CrossMsg, error) GetL2CrossMsgByAddress(sender common.Address) ([]*CrossMsg, error) BatchInsertL2CrossMsgDBTx(dbTx *sqlx.Tx, messages []*CrossMsg) error - // UpdateL2CrossMsgHash invoked when SentMessage event is received + // UpdateL2CrossMsgHashDBTx invoked when SentMessage event is received UpdateL2CrossMsgHashDBTx(ctx context.Context, dbTx *sqlx.Tx, l2Hash, msgHash common.Hash) error UpdateL2CrossMsgHash(ctx context.Context, l2Hash, msgHash common.Hash) error GetLatestL2ProcessedHeight() (int64, error) DeleteL2CrossMsgFromHeightDBTx(dbTx *sqlx.Tx, height int64) error - UpdateL2Blocktimestamp(height uint64, timestamp time.Time) error - GetL2EarliestNoBlocktimestampHeight() (uint64, error) + UpdateL2BlockTimestamp(height uint64, timestamp time.Time) error + GetL2EarliestNoBlockTimestampHeight() (uint64, error) } type RelayedMsgOrm interface { @@ -103,3 +103,21 @@ type RelayedMsgOrm interface { DeleteL1RelayedHashAfterHeightDBTx(dbTx *sqlx.Tx, height int64) error DeleteL2RelayedHashAfterHeightDBTx(dbTx *sqlx.Tx, height int64) error } + +type L2SentMsgOrm interface { + BatchInsertL2SentMsgDBTx(dbTx *sqlx.Tx, messages []*L2SentMsg) error + GetL2SentMsgByHash(l2Hash string) (*L2SentMsg, error) + GetLatestSentMsgHeightOnL2() (int64, error) + GetL2SentMessageByNonce(nonce uint64) (*L2SentMsg, error) + GetLatestL2SentMsgLEHeight(endBlockNumber uint64) (*L2SentMsg, error) + GetL2SentMsgMsgHashByHeightRange(startHeight, endHeight uint64) ([]*L2SentMsg, error) + UpdateL2MessageProofInDBTx(ctx context.Context, dbTx *sqlx.Tx, msgHash string, proof string, batch_index uint64) error + GetLatestL2SentMsgBatchIndex() (int64, error) + DeleteL2SentMsgAfterHeightDBTx(dbTx *sqlx.Tx, height int64) error +} + +type RollupBatchOrm interface { + GetLatestRollupBatch() (*RollupBatch, error) + GetRollupBatchByIndex(index uint64) (*RollupBatch, error) + BatchInsertRollupBatchDBTx(dbTx *sqlx.Tx, messages []*RollupBatch) error +} diff --git a/bridge-history-api/db/orm/l1_cross_msg.go b/bridge-history-api/db/orm/l1_cross_msg.go index 09a2caee3..1a74fb7f3 100644 --- a/bridge-history-api/db/orm/l1_cross_msg.go +++ b/bridge-history-api/db/orm/l1_cross_msg.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "errors" + "fmt" "time" "github.com/ethereum/go-ethereum/common" @@ -71,14 +72,22 @@ func (l *l1CrossMsgOrm) BatchInsertL1CrossMsgDBTx(dbTx *sqlx.Tx, messages []*Cro "token_id": msg.TokenID, "msg_type": Layer1Msg, } - - _, err = dbTx.NamedExec(`insert into cross_message(height, sender, target, asset, layer1_hash, layer1_token, layer2_token, token_id, amount, msg_type) select :height, :sender, :target, :asset, :layer1_hash, :layer1_token, :layer2_token, :token_id, :amount, :msg_type WHERE NOT EXISTS (SELECT 1 FROM cross_message WHERE layer1_hash = :layer1_hash AND NOT is_deleted);`, messageMaps[i]) + var exists bool + err = dbTx.QueryRow(`SELECT EXISTS(SELECT 1 FROM cross_message WHERE layer1_hash = $1 AND NOT is_deleted)`, msg.Layer1Hash).Scan(&exists) if err != nil { - log.Error("BatchInsertL1CrossMsgDBTx: failed to insert l1 cross msgs", "l1hashes", msg.Layer1Hash, "heights", msg.Height, "err", err) - break + return err + } + if exists { + return fmt.Errorf("BatchInsertL1CrossMsgDBTx: l1 cross msg layer1Hash %v already exists at height %v", msg.Layer1Hash, msg.Height) } } - return err + _, err = dbTx.NamedExec(`insert into cross_message(height, sender, target, asset, layer1_hash, layer1_token, layer2_token, token_id, amount, msg_type) values(:height, :sender, :target, :asset, :layer1_hash, :layer1_token, :layer2_token, :token_id, :amount, :msg_type);`, messageMaps) + if err != nil { + log.Error("BatchInsertL1CrossMsgDBTx: failed to insert l1 cross msgs", "err", err) + return err + } + + return nil } // UpdateL1CrossMsgHashDBTx update l1 cross msg hash in db, no need to check msg_type since layer1_hash wont be empty if its layer1 msg @@ -120,14 +129,14 @@ func (l *l1CrossMsgOrm) DeleteL1CrossMsgAfterHeightDBTx(dbTx *sqlx.Tx, height in return nil } -func (l *l1CrossMsgOrm) UpdateL1Blocktimestamp(height uint64, timestamp time.Time) error { +func (l *l1CrossMsgOrm) UpdateL1BlockTimestamp(height uint64, timestamp time.Time) error { if _, err := l.db.Exec(`UPDATE cross_message SET block_timestamp = $1 where height = $2 AND msg_type = $3 AND NOT is_deleted`, timestamp, height, Layer1Msg); err != nil { return err } return nil } -func (l *l1CrossMsgOrm) GetL1EarliestNoBlocktimestampHeight() (uint64, error) { +func (l *l1CrossMsgOrm) GetL1EarliestNoBlockTimestampHeight() (uint64, error) { row := l.db.QueryRowx(`SELECT height FROM cross_message WHERE block_timestamp IS NULL AND msg_type = $1 AND NOT is_deleted ORDER BY height ASC LIMIT 1;`, Layer1Msg) var result uint64 if err := row.Scan(&result); err != nil { diff --git a/bridge-history-api/db/orm/l2_cross_msg.go b/bridge-history-api/db/orm/l2_cross_msg.go index 1e92f4366..f6a41959d 100644 --- a/bridge-history-api/db/orm/l2_cross_msg.go +++ b/bridge-history-api/db/orm/l2_cross_msg.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "errors" + "fmt" "time" "github.com/ethereum/go-ethereum/common" @@ -32,7 +33,7 @@ func (l *l2CrossMsgOrm) GetL2CrossMsgByHash(l2Hash common.Hash) (*CrossMsg, erro return result, nil } -// GetL2CrossMsgsByAddress returns all layer2 cross messages under given address +// GetL2CrossMsgByAddress returns all layer2 cross messages under given address // Warning: return empty slice if no data found func (l *l2CrossMsgOrm) GetL2CrossMsgByAddress(sender common.Address) ([]*CrossMsg, error) { var results []*CrossMsg @@ -84,14 +85,21 @@ func (l *l2CrossMsgOrm) BatchInsertL2CrossMsgDBTx(dbTx *sqlx.Tx, messages []*Cro "amount": msg.Amount, "msg_type": Layer2Msg, } - - _, err = dbTx.NamedExec(`insert into cross_message(height, sender, target, asset, layer2_hash, layer1_token, layer2_token, token_id, amount, msg_type) select :height, :sender, :target, :asset, :layer2_hash, :layer1_token, :layer2_token, :token_id, :amount, :msg_type WHERE NOT EXISTS (SELECT 1 FROM cross_message WHERE layer2_hash = :layer2_hash AND NOT is_deleted);`, messageMaps[i]) + var exists bool + err = dbTx.QueryRow(`SELECT EXISTS(SELECT 1 FROM cross_message WHERE layer2_hash = $1 AND NOT is_deleted)`, msg.Layer2Hash).Scan(&exists) if err != nil { - log.Error("BatchInsertL2CrossMsgDBTx: failed to insert l2 cross msgs", "layer2hash", msg.Layer2Hash, "heights", msg.Height, "err", err) - break + return err + } + if exists { + return fmt.Errorf("BatchInsertL2CrossMsgDBTx: l2 cross msg layer2Hash %v already exists at height %v", msg.Layer2Hash, msg.Height) } } - return err + _, err = dbTx.NamedExec(`insert into cross_message(height, sender, target, asset, layer2_hash, layer1_token, layer2_token, token_id, amount, msg_type) values(:height, :sender, :target, :asset, :layer2_hash, :layer1_token, :layer2_token, :token_id, :amount, :msg_type);`, messageMaps) + if err != nil { + log.Error("BatchInsertL2CrossMsgDBTx: failed to insert l2 cross msgs", "err", err) + return err + } + return nil } func (l *l2CrossMsgOrm) UpdateL2CrossMsgHashDBTx(ctx context.Context, dbTx *sqlx.Tx, l2Hash, msgHash common.Hash) error { @@ -123,14 +131,14 @@ func (l *l2CrossMsgOrm) GetLatestL2ProcessedHeight() (int64, error) { return 0, nil } -func (l *l2CrossMsgOrm) UpdateL2Blocktimestamp(height uint64, timestamp time.Time) error { +func (l *l2CrossMsgOrm) UpdateL2BlockTimestamp(height uint64, timestamp time.Time) error { if _, err := l.db.Exec(`UPDATE cross_message SET block_timestamp = $1 where height = $2 AND msg_type = $3 AND NOT is_deleted`, timestamp, height, Layer2Msg); err != nil { return err } return nil } -func (l *l2CrossMsgOrm) GetL2EarliestNoBlocktimestampHeight() (uint64, error) { +func (l *l2CrossMsgOrm) GetL2EarliestNoBlockTimestampHeight() (uint64, error) { row := l.db.QueryRowx(`SELECT height FROM cross_message WHERE block_timestamp IS NULL AND msg_type = $1 AND NOT is_deleted ORDER BY height ASC LIMIT 1;`, Layer2Msg) var result uint64 if err := row.Scan(&result); err != nil { diff --git a/bridge-history-api/db/orm/l2_sent_msg.go b/bridge-history-api/db/orm/l2_sent_msg.go new file mode 100644 index 000000000..7bf34cdc5 --- /dev/null +++ b/bridge-history-api/db/orm/l2_sent_msg.go @@ -0,0 +1,159 @@ +package orm + +import ( + "context" + "database/sql" + "fmt" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/jmoiron/sqlx" +) + +type L2SentMsg struct { + ID uint64 `json:"id" db:"id"` + MsgHash string `json:"msg_hash" db:"msg_hash"` + Sender string `json:"sender" db:"sender"` + Target string `json:"target" db:"target"` + Value string `json:"value" db:"value"` + Height uint64 `json:"height" db:"height"` + Nonce uint64 `json:"nonce" db:"nonce"` + BatchIndex uint64 `json:"batch_index" db:"batch_index"` + MsgProof string `json:"msg_proof" db:"msg_proof"` + MsgData string `json:"msg_data" db:"msg_data"` + IsDeleted bool `json:"is_deleted" db:"is_deleted"` + CreatedAt *time.Time `json:"created_at" db:"created_at"` + UpdatedAt *time.Time `json:"updated_at" db:"updated_at"` + DeletedAt *time.Time `json:"deleted_at" db:"deleted_at"` +} + +type l2SentMsgOrm struct { + db *sqlx.DB +} + +// NewL2SentMsgOrm create an NewRollupBatchOrm instance +func NewL2SentMsgOrm(db *sqlx.DB) L2SentMsgOrm { + return &l2SentMsgOrm{db: db} +} + +func (l *l2SentMsgOrm) GetL2SentMsgByHash(msgHash string) (*L2SentMsg, error) { + result := &L2SentMsg{} + row := l.db.QueryRowx(`SELECT * FROM l2_sent_msg WHERE msg_hash = $1 AND NOT is_deleted;`, msgHash) + if err := row.StructScan(result); err != nil { + return nil, err + } + return result, nil +} + +func (l *l2SentMsgOrm) BatchInsertL2SentMsgDBTx(dbTx *sqlx.Tx, messages []*L2SentMsg) error { + if len(messages) == 0 { + return nil + } + var err error + messageMaps := make([]map[string]interface{}, len(messages)) + for i, msg := range messages { + messageMaps[i] = map[string]interface{}{ + "sender": msg.Sender, + "target": msg.Target, + "value": msg.Value, + "msg_hash": msg.MsgHash, + "height": msg.Height, + "nonce": msg.Nonce, + "batch_index": msg.BatchIndex, + "msg_proof": msg.MsgProof, + "msg_data": msg.MsgData, + } + var exists bool + err = dbTx.QueryRow(`SELECT EXISTS(SELECT 1 FROM l2_sent_msg WHERE (msg_hash = $1 OR nonce = $2) AND NOT is_deleted)`, msg.MsgHash, msg.Nonce).Scan(&exists) + if err != nil { + return err + } + if exists { + return fmt.Errorf("BatchInsertL2SentMsgDBTx: l2 sent msg_hash %v already exists at height %v", msg.MsgHash, msg.Height) + } + } + _, err = dbTx.NamedExec(`insert into l2_sent_msg(sender, target, value, msg_hash, height, nonce, batch_index, msg_proof, msg_data) values(:sender, :target, :value, :msg_hash, :height, :nonce, :batch_index, :msg_proof, :msg_data);`, messageMaps) + if err != nil { + log.Error("BatchInsertL2SentMsgDBTx: failed to insert l2 sent msgs", "msg_Hash", "err", err) + return err + } + return err +} + +func (l *l2SentMsgOrm) GetLatestSentMsgHeightOnL2() (int64, error) { + row := l.db.QueryRow(`SELECT height FROM l2_sent_msg WHERE NOT is_deleted ORDER BY nonce DESC LIMIT 1;`) + var result sql.NullInt64 + if err := row.Scan(&result); err != nil { + if err == sql.ErrNoRows || !result.Valid { + return -1, nil + } + return 0, err + } + if result.Valid { + return result.Int64, nil + } + return 0, nil +} + +func (l *l2SentMsgOrm) UpdateL2MessageProofInDBTx(ctx context.Context, dbTx *sqlx.Tx, msgHash string, proof string, batch_index uint64) error { + if _, err := dbTx.ExecContext(ctx, l.db.Rebind("update l2_sent_msg set msg_proof = ?, batch_index = ? where msg_hash = ? AND NOT is_deleted;"), proof, batch_index, msgHash); err != nil { + return err + } + return nil +} + +func (l *l2SentMsgOrm) GetLatestL2SentMsgBatchIndex() (int64, error) { + row := l.db.QueryRow(`SELECT batch_index FROM l2_sent_msg WHERE msg_proof != '' AND NOT is_deleted ORDER BY batch_index DESC LIMIT 1;`) + var result sql.NullInt64 + if err := row.Scan(&result); err != nil { + if err == sql.ErrNoRows || !result.Valid { + return -1, nil + } + return -1, err + } + if result.Valid { + return result.Int64, nil + } + return -1, nil +} + +func (l *l2SentMsgOrm) GetL2SentMsgMsgHashByHeightRange(startHeight, endHeight uint64) ([]*L2SentMsg, error) { + var results []*L2SentMsg + rows, err := l.db.Queryx(`SELECT * FROM l2_sent_msg WHERE height >= $1 AND height <= $2 AND NOT is_deleted ORDER BY nonce ASC;`, startHeight, endHeight) + if err != nil { + return nil, err + } + for rows.Next() { + msg := &L2SentMsg{} + if err = rows.StructScan(msg); err != nil { + break + } + results = append(results, msg) + } + return results, err +} + +func (l *l2SentMsgOrm) GetL2SentMessageByNonce(nonce uint64) (*L2SentMsg, error) { + result := &L2SentMsg{} + row := l.db.QueryRowx(`SELECT * FROM l2_sent_msg WHERE nonce = $1 AND NOT is_deleted;`, nonce) + err := row.StructScan(result) + if err != nil { + return nil, err + } + return result, nil +} + +func (l *l2SentMsgOrm) GetLatestL2SentMsgLEHeight(endBlockNumber uint64) (*L2SentMsg, error) { + result := &L2SentMsg{} + row := l.db.QueryRowx(`select * from l2_sent_msg where height <= $1 AND NOT is_deleted order by nonce desc limit 1`, endBlockNumber) + err := row.StructScan(result) + if err != nil { + return nil, err + } + return result, nil +} + +func (l *l2SentMsgOrm) DeleteL2SentMsgAfterHeightDBTx(dbTx *sqlx.Tx, height int64) error { + _, err := dbTx.Exec(`UPDATE l2_sent_msg SET is_deleted = true WHERE height > $1;`, height) + return err +} diff --git a/bridge-history-api/db/orm/relayed_msg.go b/bridge-history-api/db/orm/relayed_msg.go index 388ba100e..8ae5ccb1f 100644 --- a/bridge-history-api/db/orm/relayed_msg.go +++ b/bridge-history-api/db/orm/relayed_msg.go @@ -3,7 +3,6 @@ package orm import ( "database/sql" "errors" - "strings" "github.com/ethereum/go-ethereum/log" "github.com/jmoiron/sqlx" @@ -31,14 +30,13 @@ func (l *relayedMsgOrm) BatchInsertRelayedMsgDBTx(dbTx *sqlx.Tx, messages []*Rel "layer1_hash": msg.Layer1Hash, "layer2_hash": msg.Layer2Hash, } - - _, err = dbTx.NamedExec(`insert into relayed_msg(msg_hash, height, layer1_hash, layer2_hash) values(:msg_hash, :height, :layer1_hash, :layer2_hash);`, messageMaps[i]) - if err != nil && !strings.Contains(err.Error(), "pq: duplicate key value violates unique constraint \"relayed_msg_hash_uindex") { - log.Error("BatchInsertRelayedMsgDBTx: failed to insert l1 cross msgs", "msg_Hashe", msg.MsgHash, "height", msg.Height, "err", err) - break - } } - return err + _, err = dbTx.NamedExec(`insert into relayed_msg(msg_hash, height, layer1_hash, layer2_hash) values(:msg_hash, :height, :layer1_hash, :layer2_hash);`, messageMaps) + if err != nil { + log.Error("BatchInsertRelayedMsgDBTx: failed to insert l1 cross msgs", "msg_Hashe", "err", err) + return err + } + return nil } func (l *relayedMsgOrm) GetRelayedMsgByHash(msg_hash string) (*RelayedMsg, error) { diff --git a/bridge-history-api/db/orm_factory.go b/bridge-history-api/db/orm_factory.go index c534beff4..e1282bdeb 100644 --- a/bridge-history-api/db/orm_factory.go +++ b/bridge-history-api/db/orm_factory.go @@ -16,6 +16,8 @@ type OrmFactory interface { orm.L1CrossMsgOrm orm.L2CrossMsgOrm orm.RelayedMsgOrm + orm.L2SentMsgOrm + orm.RollupBatchOrm GetTotalCrossMsgCountByAddress(sender string) (uint64, error) GetCrossMsgsByAddressWithOffset(sender string, offset int64, limit int64) ([]*orm.CrossMsg, error) GetDB() *sqlx.DB @@ -27,6 +29,8 @@ type ormFactory struct { orm.L1CrossMsgOrm orm.L2CrossMsgOrm orm.RelayedMsgOrm + orm.L2SentMsgOrm + orm.RollupBatchOrm *sqlx.DB } @@ -45,10 +49,12 @@ func NewOrmFactory(cfg *config.Config) (OrmFactory, error) { } return &ormFactory{ - L1CrossMsgOrm: orm.NewL1CrossMsgOrm(db), - L2CrossMsgOrm: orm.NewL2CrossMsgOrm(db), - RelayedMsgOrm: orm.NewRelayedMsgOrm(db), - DB: db, + L1CrossMsgOrm: orm.NewL1CrossMsgOrm(db), + L2CrossMsgOrm: orm.NewL2CrossMsgOrm(db), + RelayedMsgOrm: orm.NewRelayedMsgOrm(db), + L2SentMsgOrm: orm.NewL2SentMsgOrm(db), + RollupBatchOrm: orm.NewRollupBatchOrm(db), + DB: db, }, nil } diff --git a/bridge-history-api/service/service.go b/bridge-history-api/service/service.go index 9fffc07e2..4f81ddf56 100644 --- a/bridge-history-api/service/service.go +++ b/bridge-history-api/service/service.go @@ -1,6 +1,7 @@ package service import ( + "strconv" "time" "github.com/ethereum/go-ethereum/common" @@ -19,15 +20,27 @@ type Finalized struct { BlockTimestamp *time.Time `json:"blockTimestamp"` // uselesss } +type UserClaimInfo struct { + From string `json:"from"` + To string `json:"to"` + Value string `json:"value"` + Nonce string `json:"nonce"` + BatchHash string `json:"batch_hash"` + Message string `json:"message"` + Proof string `json:"proof"` + BatchIndex string `json:"batch_index"` +} + type TxHistoryInfo struct { - Hash string `json:"hash"` - Amount string `json:"amount"` - To string `json:"to"` // useless - IsL1 bool `json:"isL1"` - BlockNumber uint64 `json:"blockNumber"` - BlockTimestamp *time.Time `json:"blockTimestamp"` // useless - FinalizeTx *Finalized `json:"finalizeTx"` - CreatedAt *time.Time `json:"createdTime"` + Hash string `json:"hash"` + Amount string `json:"amount"` + To string `json:"to"` // useless + IsL1 bool `json:"isL1"` + BlockNumber uint64 `json:"blockNumber"` + BlockTimestamp *time.Time `json:"blockTimestamp"` // useless + FinalizeTx *Finalized `json:"finalizeTx"` + ClaimInfo *UserClaimInfo `json:"claimInfo"` + CreatedAt *time.Time `json:"createdTime"` } // HistoryService example service. @@ -47,6 +60,30 @@ type historyBackend struct { db db.OrmFactory } +func GetCrossTxClaimInfo(msgHash string, db db.OrmFactory) *UserClaimInfo { + l2sentMsg, err := db.GetL2SentMsgByHash(msgHash) + if err != nil { + log.Debug("GetCrossTxClaimInfo failed", "error", err) + return &UserClaimInfo{} + } + batch, err := db.GetRollupBatchByIndex(l2sentMsg.BatchIndex) + if err != nil { + log.Debug("GetCrossTxClaimInfo failed", "error", err) + return &UserClaimInfo{} + } + return &UserClaimInfo{ + From: l2sentMsg.Sender, + To: l2sentMsg.Target, + Value: l2sentMsg.Value, + Nonce: strconv.FormatUint(l2sentMsg.Nonce, 10), + Message: l2sentMsg.MsgData, + Proof: l2sentMsg.MsgProof, + BatchHash: batch.BatchHash, + BatchIndex: strconv.FormatUint(l2sentMsg.BatchIndex, 10), + } + +} + func updateCrossTxHash(msgHash string, txInfo *TxHistoryInfo, db db.OrmFactory) { relayed, err := db.GetRelayedMsgByHash(msgHash) if err != nil { @@ -76,6 +113,7 @@ func (h *historyBackend) GetTxsByAddress(address common.Address, offset int64, l return txHistories, 0, err } result, err := h.db.GetCrossMsgsByAddressWithOffset(address.String(), offset, limit) + if err != nil { return nil, 0, err } @@ -91,6 +129,7 @@ func (h *historyBackend) GetTxsByAddress(address common.Address, offset int64, l FinalizeTx: &Finalized{ Hash: "", }, + ClaimInfo: GetCrossTxClaimInfo(msg.MsgHash, h.db), } updateCrossTxHash(msg.MsgHash, txHistory, h.db) txHistories = append(txHistories, txHistory) @@ -138,6 +177,7 @@ func (h *historyBackend) GetTxsByHashes(hashes []string) ([]*TxHistoryInfo, erro FinalizeTx: &Finalized{ Hash: "", }, + ClaimInfo: GetCrossTxClaimInfo(l2result.MsgHash, h.db), } updateCrossTxHash(l2result.MsgHash, txHistory, h.db) txHistories = append(txHistories, txHistory) diff --git a/bridge-history-api/utils/parse_event.go b/bridge-history-api/utils/parse_event.go new file mode 100644 index 000000000..8bd3a1523 --- /dev/null +++ b/bridge-history-api/utils/parse_event.go @@ -0,0 +1,305 @@ +package utils + +import ( + "context" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/log" + + backendabi "bridge-history-api/abi" + "bridge-history-api/db/orm" +) + +type MsgHashWrapper struct { + MsgHash common.Hash + TxHash common.Hash +} + +type CachedParsedTxCalldata struct { + CallDataIndex uint64 + BatchIndices []uint64 + StartBlocks []uint64 + EndBlocks []uint64 +} + +func ParseBackendL1EventLogs(logs []types.Log) ([]*orm.CrossMsg, []MsgHashWrapper, []*orm.RelayedMsg, error) { + // Need use contract abi to parse event Log + // Can only be tested after we have our contracts set up + + var l1CrossMsg []*orm.CrossMsg + var relayedMsgs []*orm.RelayedMsg + var msgHashes []MsgHashWrapper + for _, vlog := range logs { + switch vlog.Topics[0] { + case backendabi.L1DepositETHSig: + event := backendabi.DepositETH{} + err := UnpackLog(backendabi.L1ETHGatewayABI, &event, "DepositETH", vlog) + if err != nil { + log.Warn("Failed to unpack DepositETH event", "err", err) + return l1CrossMsg, msgHashes, relayedMsgs, err + } + l1CrossMsg = append(l1CrossMsg, &orm.CrossMsg{ + Height: vlog.BlockNumber, + Sender: event.From.String(), + Target: event.To.String(), + Amount: event.Amount.String(), + Asset: int(orm.ETH), + Layer1Hash: vlog.TxHash.Hex(), + }) + case backendabi.L1DepositERC20Sig: + event := backendabi.ERC20MessageEvent{} + err := UnpackLog(backendabi.L1StandardERC20GatewayABI, &event, "DepositERC20", vlog) + if err != nil { + log.Warn("Failed to unpack DepositERC20 event", "err", err) + return l1CrossMsg, msgHashes, relayedMsgs, err + } + l1CrossMsg = append(l1CrossMsg, &orm.CrossMsg{ + Height: vlog.BlockNumber, + Sender: event.From.String(), + Target: event.To.String(), + Amount: event.Amount.String(), + Asset: int(orm.ERC20), + Layer1Hash: vlog.TxHash.Hex(), + Layer1Token: event.L1Token.Hex(), + Layer2Token: event.L2Token.Hex(), + }) + case backendabi.L1DepositERC721Sig: + event := backendabi.ERC721MessageEvent{} + err := UnpackLog(backendabi.L1ERC721GatewayABI, &event, "DepositERC721", vlog) + if err != nil { + log.Warn("Failed to unpack DepositERC721 event", "err", err) + return l1CrossMsg, msgHashes, relayedMsgs, err + } + l1CrossMsg = append(l1CrossMsg, &orm.CrossMsg{ + Height: vlog.BlockNumber, + Sender: event.From.String(), + Target: event.To.String(), + Asset: int(orm.ERC721), + Layer1Hash: vlog.TxHash.Hex(), + Layer1Token: event.L1Token.Hex(), + Layer2Token: event.L2Token.Hex(), + TokenID: event.TokenID.Uint64(), + }) + case backendabi.L1DepositERC1155Sig: + event := backendabi.ERC1155MessageEvent{} + err := UnpackLog(backendabi.L1ERC1155GatewayABI, &event, "DepositERC1155", vlog) + if err != nil { + log.Warn("Failed to unpack DepositERC1155 event", "err", err) + return l1CrossMsg, msgHashes, relayedMsgs, err + } + l1CrossMsg = append(l1CrossMsg, &orm.CrossMsg{ + Height: vlog.BlockNumber, + Sender: event.From.String(), + Target: event.To.String(), + Asset: int(orm.ERC1155), + Layer1Hash: vlog.TxHash.Hex(), + Layer1Token: event.L1Token.Hex(), + Layer2Token: event.L2Token.Hex(), + TokenID: event.TokenID.Uint64(), + Amount: event.Amount.String(), + }) + case backendabi.L1SentMessageEventSignature: + event := backendabi.L1SentMessageEvent{} + err := UnpackLog(backendabi.L1ScrollMessengerABI, &event, "SentMessage", vlog) + if err != nil { + log.Warn("Failed to unpack SentMessage event", "err", err) + return l1CrossMsg, msgHashes, relayedMsgs, err + } + msgHash := ComputeMessageHash(event.Sender, event.Target, event.Value, event.MessageNonce, event.Message) + msgHashes = append(msgHashes, MsgHashWrapper{ + MsgHash: msgHash, + TxHash: vlog.TxHash}) + case backendabi.L1RelayedMessageEventSignature: + event := backendabi.L1RelayedMessageEvent{} + err := UnpackLog(backendabi.L1ScrollMessengerABI, &event, "RelayedMessage", vlog) + if err != nil { + log.Warn("Failed to unpack RelayedMessage event", "err", err) + return l1CrossMsg, msgHashes, relayedMsgs, err + } + relayedMsgs = append(relayedMsgs, &orm.RelayedMsg{ + MsgHash: event.MessageHash.String(), + Height: vlog.BlockNumber, + Layer1Hash: vlog.TxHash.Hex(), + }) + + } + + } + return l1CrossMsg, msgHashes, relayedMsgs, nil +} + +func ParseBackendL2EventLogs(logs []types.Log) ([]*orm.CrossMsg, []MsgHashWrapper, []*orm.RelayedMsg, []*orm.L2SentMsg, error) { + // Need use contract abi to parse event Log + // Can only be tested after we have our contracts set up + + var l2CrossMsg []*orm.CrossMsg + // this is use to confirm finalized l1 msg + var relayedMsgs []*orm.RelayedMsg + var l2SentMsg []*orm.L2SentMsg + var msgHashes []MsgHashWrapper + for _, vlog := range logs { + switch vlog.Topics[0] { + case backendabi.L2WithdrawETHSig: + event := backendabi.DepositETH{} + err := UnpackLog(backendabi.L2ETHGatewayABI, &event, "WithdrawETH", vlog) + if err != nil { + log.Warn("Failed to unpack WithdrawETH event", "err", err) + return l2CrossMsg, msgHashes, relayedMsgs, l2SentMsg, err + } + l2CrossMsg = append(l2CrossMsg, &orm.CrossMsg{ + Height: vlog.BlockNumber, + Sender: event.From.String(), + Target: event.To.String(), + Amount: event.Amount.String(), + Asset: int(orm.ETH), + Layer2Hash: vlog.TxHash.Hex(), + }) + case backendabi.L2WithdrawERC20Sig: + event := backendabi.ERC20MessageEvent{} + err := UnpackLog(backendabi.L2StandardERC20GatewayABI, &event, "WithdrawERC20", vlog) + if err != nil { + log.Warn("Failed to unpack WithdrawERC20 event", "err", err) + return l2CrossMsg, msgHashes, relayedMsgs, l2SentMsg, err + } + l2CrossMsg = append(l2CrossMsg, &orm.CrossMsg{ + Height: vlog.BlockNumber, + Sender: event.From.String(), + Target: event.To.String(), + Amount: event.Amount.String(), + Asset: int(orm.ERC20), + Layer2Hash: vlog.TxHash.Hex(), + Layer1Token: event.L1Token.Hex(), + Layer2Token: event.L2Token.Hex(), + }) + case backendabi.L2WithdrawERC721Sig: + event := backendabi.ERC721MessageEvent{} + err := UnpackLog(backendabi.L2ERC721GatewayABI, &event, "WithdrawERC721", vlog) + if err != nil { + log.Warn("Failed to unpack WithdrawERC721 event", "err", err) + return l2CrossMsg, msgHashes, relayedMsgs, l2SentMsg, err + } + l2CrossMsg = append(l2CrossMsg, &orm.CrossMsg{ + Height: vlog.BlockNumber, + Sender: event.From.String(), + Target: event.To.String(), + Asset: int(orm.ERC721), + Layer2Hash: vlog.TxHash.Hex(), + Layer1Token: event.L1Token.Hex(), + Layer2Token: event.L2Token.Hex(), + TokenID: event.TokenID.Uint64(), + }) + case backendabi.L2WithdrawERC1155Sig: + event := backendabi.ERC1155MessageEvent{} + err := UnpackLog(backendabi.L2ERC1155GatewayABI, &event, "WithdrawERC1155", vlog) + if err != nil { + log.Warn("Failed to unpack WithdrawERC1155 event", "err", err) + return l2CrossMsg, msgHashes, relayedMsgs, l2SentMsg, err + } + l2CrossMsg = append(l2CrossMsg, &orm.CrossMsg{ + Height: vlog.BlockNumber, + Sender: event.From.String(), + Target: event.To.String(), + Asset: int(orm.ERC1155), + Layer2Hash: vlog.TxHash.Hex(), + Layer1Token: event.L1Token.Hex(), + Layer2Token: event.L2Token.Hex(), + TokenID: event.TokenID.Uint64(), + Amount: event.Amount.String(), + }) + case backendabi.L2SentMessageEventSignature: + event := backendabi.L2SentMessageEvent{} + err := UnpackLog(backendabi.L2ScrollMessengerABI, &event, "SentMessage", vlog) + if err != nil { + log.Warn("Failed to unpack SentMessage event", "err", err) + return l2CrossMsg, msgHashes, relayedMsgs, l2SentMsg, err + } + msgHash := ComputeMessageHash(event.Sender, event.Target, event.Value, event.MessageNonce, event.Message) + msgHashes = append(msgHashes, MsgHashWrapper{ + MsgHash: msgHash, + TxHash: vlog.TxHash}) + l2SentMsg = append(l2SentMsg, &orm.L2SentMsg{ + Sender: event.Sender.Hex(), + Target: event.Target.Hex(), + Value: event.Value.String(), + MsgHash: msgHash.Hex(), + Height: vlog.BlockNumber, + Nonce: event.MessageNonce.Uint64(), + MsgData: hexutil.Encode(event.Message), + }) + case backendabi.L2RelayedMessageEventSignature: + event := backendabi.L2RelayedMessageEvent{} + err := UnpackLog(backendabi.L2ScrollMessengerABI, &event, "RelayedMessage", vlog) + if err != nil { + log.Warn("Failed to unpack RelayedMessage event", "err", err) + return l2CrossMsg, msgHashes, relayedMsgs, l2SentMsg, err + } + relayedMsgs = append(relayedMsgs, &orm.RelayedMsg{ + MsgHash: event.MessageHash.String(), + Height: vlog.BlockNumber, + Layer2Hash: vlog.TxHash.Hex(), + }) + + } + } + return l2CrossMsg, msgHashes, relayedMsgs, l2SentMsg, nil +} + +func ParseBatchInfoFromScrollChain(ctx context.Context, client *ethclient.Client, logs []types.Log) ([]*orm.RollupBatch, error) { + var rollupBatches []*orm.RollupBatch + cache := make(map[string]CachedParsedTxCalldata) + for _, vlog := range logs { + switch vlog.Topics[0] { + case backendabi.L1CommitBatchEventSignature: + event := backendabi.L1CommitBatchEvent{} + err := UnpackLog(backendabi.ScrollChainABI, &event, "CommitBatch", vlog) + if err != nil { + log.Warn("Failed to unpack CommitBatch event", "err", err) + return rollupBatches, err + } + if _, ok := cache[vlog.TxHash.Hex()]; ok { + c := cache[vlog.TxHash.Hex()] + c.CallDataIndex++ + rollupBatches = append(rollupBatches, &orm.RollupBatch{ + CommitHeight: vlog.BlockNumber, + BatchIndex: c.BatchIndices[c.CallDataIndex], + BatchHash: event.BatchHash.Hex(), + StartBlockNumber: c.StartBlocks[c.CallDataIndex], + EndBlockNumber: c.EndBlocks[c.CallDataIndex], + }) + cache[vlog.TxHash.Hex()] = c + continue + } + + commitTx, isPending, err := client.TransactionByHash(ctx, vlog.TxHash) + if err != nil || isPending { + log.Warn("Failed to get commit Batch tx receipt or the tx is still pending", "err", err) + return rollupBatches, err + } + indices, startBlocks, endBlocks, err := GetBatchRangeFromCalldataV1(commitTx.Data()) + if err != nil { + log.Warn("Failed to get batch range from calldata", "hash", commitTx.Hash().Hex(), "height", vlog.BlockNumber) + return rollupBatches, err + } + cache[vlog.TxHash.Hex()] = CachedParsedTxCalldata{ + CallDataIndex: 0, + BatchIndices: indices, + StartBlocks: startBlocks, + EndBlocks: endBlocks, + } + rollupBatches = append(rollupBatches, &orm.RollupBatch{ + CommitHeight: vlog.BlockNumber, + BatchIndex: indices[0], + BatchHash: event.BatchHash.Hex(), + StartBlockNumber: startBlocks[0], + EndBlockNumber: endBlocks[0], + }) + + default: + continue + } + } + return rollupBatches, nil +} diff --git a/common/version/version.go b/common/version/version.go index 2eb98bd0c..88d961ef4 100644 --- a/common/version/version.go +++ b/common/version/version.go @@ -5,7 +5,7 @@ import ( "runtime/debug" ) -var tag = "v4.0.0" +var tag = "v4.0.1" var commit = func() string { if info, ok := debug.ReadBuildInfo(); ok {