mirror of
https://github.com/scroll-tech/scroll.git
synced 2026-01-15 00:48:01 -05:00
address comments
This commit is contained in:
@@ -2,8 +2,10 @@ package fetcher
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/big"
|
||||
"time"
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/common"
|
||||
"github.com/scroll-tech/go-ethereum/ethclient"
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
"gorm.io/gorm"
|
||||
@@ -20,7 +22,7 @@ type L1MessageFetcher struct {
|
||||
client *ethclient.Client
|
||||
|
||||
syncInfo *SyncInfo
|
||||
l1ScanHeight uint64
|
||||
l1SyncHeight uint64
|
||||
|
||||
eventUpdateLogic *logic.EventUpdateLogic
|
||||
l1FetcherLogic *logic.L1FetcherLogic
|
||||
@@ -47,37 +49,46 @@ func (c *L1MessageFetcher) Start() {
|
||||
log.Crit("L1MessageFetcher start failed", "err", dbErr)
|
||||
}
|
||||
|
||||
c.l1ScanHeight = messageSyncedHeight
|
||||
if batchSyncedHeight > c.l1ScanHeight {
|
||||
c.l1ScanHeight = batchSyncedHeight
|
||||
c.l1SyncHeight = messageSyncedHeight
|
||||
if batchSyncedHeight > c.l1SyncHeight {
|
||||
c.l1SyncHeight = batchSyncedHeight
|
||||
}
|
||||
if c.cfg.StartHeight > c.l1ScanHeight {
|
||||
c.l1ScanHeight = c.cfg.StartHeight - 1
|
||||
if c.cfg.StartHeight > c.l1SyncHeight {
|
||||
c.l1SyncHeight = c.cfg.StartHeight - 1
|
||||
}
|
||||
|
||||
log.Info("Start L1 message fetcher", "message synced height", messageSyncedHeight, "batch synced height", batchSyncedHeight, "config start height", c.cfg.StartHeight)
|
||||
// Sync from an older block to prevent reorg during restart.
|
||||
if c.l1SyncHeight < logic.L1ReorgSafeDepth {
|
||||
c.l1SyncHeight = 0
|
||||
} else {
|
||||
c.l1SyncHeight -= logic.L1ReorgSafeDepth
|
||||
}
|
||||
|
||||
log.Info("Start L1 message fetcher", "message synced height", messageSyncedHeight, "batch synced height", batchSyncedHeight, "config start height", c.cfg.StartHeight, "sync start height", c.l1SyncHeight+1)
|
||||
|
||||
tick := time.NewTicker(time.Duration(c.cfg.BlockTime) * time.Second)
|
||||
go func() {
|
||||
var lastSyncBlockHash common.Hash
|
||||
for {
|
||||
select {
|
||||
case <-c.ctx.Done():
|
||||
tick.Stop()
|
||||
return
|
||||
case <-tick.C:
|
||||
c.fetchAndSaveEvents(c.cfg.Confirmation)
|
||||
lastSyncBlockHash = c.fetchAndSaveEvents(c.cfg.Confirmation, lastSyncBlockHash)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (c *L1MessageFetcher) fetchAndSaveEvents(confirmation uint64) {
|
||||
startHeight := c.l1ScanHeight + 1
|
||||
func (c *L1MessageFetcher) fetchAndSaveEvents(confirmation uint64, lastSyncBlockHash common.Hash) common.Hash {
|
||||
startHeight := c.l1SyncHeight + 1
|
||||
endHeight, rpcErr := utils.GetBlockNumber(c.ctx, c.client, confirmation)
|
||||
if rpcErr != nil {
|
||||
log.Error("failed to get L1 safe block number", "err", rpcErr)
|
||||
return
|
||||
log.Error("failed to get L1 block number", "confirmation", confirmation, "err", rpcErr)
|
||||
return lastSyncBlockHash
|
||||
}
|
||||
|
||||
log.Info("fetch and save missing L1 events", "start height", startHeight, "end height", endHeight)
|
||||
|
||||
for from := startHeight; from <= endHeight; from += c.cfg.FetchLimit {
|
||||
@@ -86,41 +97,49 @@ func (c *L1MessageFetcher) fetchAndSaveEvents(confirmation uint64) {
|
||||
to = endHeight
|
||||
}
|
||||
|
||||
if endHeight-to <= logic.L1ReorgSafeDepth {
|
||||
isReorg, resyncHeight, handleErr := c.l1ReorgHandlingLogic.HandleL1Reorg(c.ctx)
|
||||
if c.l1SyncHeight+logic.L1ReorgSafeDepth > endHeight && lastSyncBlockHash != emptyHash {
|
||||
isReorg, resyncHeight, handleErr := c.l1ReorgHandlingLogic.HandleL1Reorg(c.ctx, c.l1SyncHeight, lastSyncBlockHash)
|
||||
if handleErr != nil {
|
||||
log.Error("failed to Handle L1 Reorg", "err", handleErr)
|
||||
return
|
||||
return lastSyncBlockHash
|
||||
}
|
||||
|
||||
if isReorg {
|
||||
c.l1ScanHeight = resyncHeight
|
||||
log.Warn("L1 reorg happened, exit and re-enter fetchAndSaveEvents", "restart height", c.l1ScanHeight)
|
||||
return
|
||||
c.l1SyncHeight = resyncHeight
|
||||
log.Warn("L1 reorg happened, exit and re-enter fetchAndSaveEvents", "restart height", c.l1SyncHeight)
|
||||
return lastSyncBlockHash
|
||||
}
|
||||
}
|
||||
|
||||
fetcherResult, fetcherErr := c.l1FetcherLogic.L1Fetcher(c.ctx, from, to)
|
||||
if fetcherErr != nil {
|
||||
log.Error("failed to fetch L1 events", "from", from, "to", to, "err", fetcherErr)
|
||||
return
|
||||
return lastSyncBlockHash
|
||||
}
|
||||
|
||||
if insertUpdateErr := c.eventUpdateLogic.L1InsertOrUpdate(c.ctx, fetcherResult); insertUpdateErr != nil {
|
||||
log.Error("failed to save L1 events", "from", from, "to", to, "err", insertUpdateErr)
|
||||
return
|
||||
return lastSyncBlockHash
|
||||
}
|
||||
c.l1ScanHeight = to
|
||||
|
||||
l2ScannedHeight := c.syncInfo.GetL2ScanHeight()
|
||||
lastBlockHeader, rpcErr := c.client.HeaderByNumber(c.ctx, new(big.Int).SetUint64(to))
|
||||
if rpcErr != nil {
|
||||
log.Error("failed to get header by number", "block number", to)
|
||||
return lastSyncBlockHash
|
||||
}
|
||||
c.l1SyncHeight = to
|
||||
lastSyncBlockHash = lastBlockHeader.Hash()
|
||||
|
||||
l2ScannedHeight := c.syncInfo.GetL2SyncHeight()
|
||||
if l2ScannedHeight == 0 {
|
||||
log.Error("L2 fetcher has not successfully synced at least one round yet")
|
||||
return
|
||||
return lastSyncBlockHash
|
||||
}
|
||||
|
||||
if updateErr := c.eventUpdateLogic.UpdateL1BatchIndexAndStatus(c.ctx, l2ScannedHeight); updateErr != nil {
|
||||
log.Error("failed to update L1 batch index and status", "from", from, "to", to, "err", updateErr)
|
||||
return
|
||||
return lastSyncBlockHash
|
||||
}
|
||||
}
|
||||
return lastSyncBlockHash
|
||||
}
|
||||
|
||||
@@ -17,6 +17,8 @@ import (
|
||||
"scroll-tech/bridge-history-api/internal/utils"
|
||||
)
|
||||
|
||||
var emptyHash common.Hash
|
||||
|
||||
// L2MessageFetcher fetches cross message events from L2 and saves them to database.
|
||||
type L2MessageFetcher struct {
|
||||
ctx context.Context
|
||||
@@ -52,29 +54,38 @@ func (c *L2MessageFetcher) Start() {
|
||||
return
|
||||
}
|
||||
|
||||
c.syncInfo.SetL2ScanHeight(l2SentMessageSyncedHeight)
|
||||
log.Info("Start L2 message fetcher", "message synced height", l2SentMessageSyncedHeight)
|
||||
l2SyncHeight := l2SentMessageSyncedHeight
|
||||
// Sync from an older block to prevent reorg during restart.
|
||||
if l2SyncHeight < logic.L2ReorgSafeDepth {
|
||||
l2SyncHeight = 0
|
||||
} else {
|
||||
l2SyncHeight -= logic.L2ReorgSafeDepth
|
||||
}
|
||||
|
||||
c.syncInfo.SetL2SyncHeight(l2SyncHeight)
|
||||
log.Info("Start L2 message fetcher", "message synced height", l2SentMessageSyncedHeight, "sync start height", l2SyncHeight+1)
|
||||
|
||||
tick := time.NewTicker(time.Duration(c.cfg.BlockTime) * time.Second)
|
||||
go func() {
|
||||
var lastSyncBlockHash common.Hash
|
||||
for {
|
||||
select {
|
||||
case <-c.ctx.Done():
|
||||
tick.Stop()
|
||||
return
|
||||
case <-tick.C:
|
||||
c.fetchAndSaveEvents(c.cfg.Confirmation)
|
||||
c.fetchAndSaveEvents(c.cfg.Confirmation, lastSyncBlockHash)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (c *L2MessageFetcher) fetchAndSaveEvents(confirmation uint64) {
|
||||
startHeight := c.syncInfo.GetL2ScanHeight() + 1
|
||||
func (c *L2MessageFetcher) fetchAndSaveEvents(confirmation uint64, lastSyncBlockHash common.Hash) common.Hash {
|
||||
startHeight := c.syncInfo.GetL2SyncHeight() + 1
|
||||
endHeight, rpcErr := utils.GetBlockNumber(c.ctx, c.client, confirmation)
|
||||
if rpcErr != nil {
|
||||
log.Error("failed to get L1 safe block number", "err", rpcErr)
|
||||
return
|
||||
log.Error("failed to get L2 block number", "confirmation", confirmation, "err", rpcErr)
|
||||
return lastSyncBlockHash
|
||||
}
|
||||
log.Info("fetch and save missing L2 events", "start height", startHeight, "end height", endHeight)
|
||||
|
||||
@@ -84,38 +95,45 @@ func (c *L2MessageFetcher) fetchAndSaveEvents(confirmation uint64) {
|
||||
to = endHeight
|
||||
}
|
||||
|
||||
if endHeight-to <= logic.L2ReorgSafeDepth {
|
||||
isReorg, resyncHeight, handleErr := c.l2ReorgHandlingLogic.HandleL2Reorg(c.ctx)
|
||||
if c.syncInfo.GetL2SyncHeight()+logic.L2ReorgSafeDepth > endHeight && lastSyncBlockHash != emptyHash {
|
||||
isReorg, resyncHeight, handleErr := c.l2ReorgHandlingLogic.HandleL2Reorg(c.ctx, c.syncInfo.GetL2SyncHeight(), lastSyncBlockHash)
|
||||
if handleErr != nil {
|
||||
log.Error("failed to Handle L2 Reorg", "err", handleErr)
|
||||
return
|
||||
return lastSyncBlockHash
|
||||
}
|
||||
|
||||
if isReorg {
|
||||
c.syncInfo.SetL2ScanHeight(resyncHeight)
|
||||
c.syncInfo.SetL2SyncHeight(resyncHeight)
|
||||
log.Warn("L2 reorg happened, exit and re-enter fetchAndSaveEvents", "restart height", resyncHeight)
|
||||
return
|
||||
return lastSyncBlockHash
|
||||
}
|
||||
}
|
||||
|
||||
l2FilterResult, fetchErr := c.l2FetcherLogic.L2Fetcher(c.ctx, from, to)
|
||||
if fetchErr != nil {
|
||||
log.Error("failed to fetch L2 events", "from", from, "to", to, "err", fetchErr)
|
||||
return
|
||||
return lastSyncBlockHash
|
||||
}
|
||||
|
||||
if updateWithdrawErr := c.updateL2WithdrawMessageProofs(c.ctx, l2FilterResult.WithdrawMessages, to); updateWithdrawErr != nil {
|
||||
log.Error("failed to update L2 withdraw message", "from", from, "to", to, "err", updateWithdrawErr)
|
||||
return
|
||||
return lastSyncBlockHash
|
||||
}
|
||||
|
||||
if insertUpdateErr := c.eventUpdateLogic.L2InsertOrUpdate(c.ctx, l2FilterResult); insertUpdateErr != nil {
|
||||
log.Error("failed to save L2 events", "from", from, "to", to, "err", insertUpdateErr)
|
||||
return
|
||||
return lastSyncBlockHash
|
||||
}
|
||||
|
||||
c.syncInfo.SetL2ScanHeight(to)
|
||||
lastBlockHeader, rpcErr := c.client.HeaderByNumber(c.ctx, new(big.Int).SetUint64(to))
|
||||
if rpcErr != nil {
|
||||
log.Error("failed to get header by number", "block number", to)
|
||||
return lastSyncBlockHash
|
||||
}
|
||||
c.syncInfo.SetL2SyncHeight(to)
|
||||
lastSyncBlockHash = lastBlockHeader.Hash()
|
||||
}
|
||||
return lastSyncBlockHash
|
||||
}
|
||||
|
||||
func (c *L2MessageFetcher) updateL2WithdrawMessageProofs(ctx context.Context, l2WithdrawMessages []*orm.CrossMessage, endBlock uint64) error {
|
||||
|
||||
@@ -4,15 +4,15 @@ import "sync/atomic"
|
||||
|
||||
// SyncInfo is a struct that stores synchronization information shared between L1 fetcher and L2 fetcher.
|
||||
type SyncInfo struct {
|
||||
l2ScanHeight uint64
|
||||
l2SyncHeight uint64
|
||||
}
|
||||
|
||||
// SetL2ScanHeight is a method that sets the value of l2ScanHeight in SyncInfo.
|
||||
func (s *SyncInfo) SetL2ScanHeight(height uint64) {
|
||||
atomic.StoreUint64(&s.l2ScanHeight, height)
|
||||
// SetL2SyncHeight is a method that sets the value of l2SyncHeight in SyncInfo.
|
||||
func (s *SyncInfo) SetL2SyncHeight(height uint64) {
|
||||
atomic.StoreUint64(&s.l2SyncHeight, height)
|
||||
}
|
||||
|
||||
// GetL2ScanHeight is a method that retrieves the value of l2ScanHeight in SyncInfo.
|
||||
func (s *SyncInfo) GetL2ScanHeight() uint64 {
|
||||
return atomic.LoadUint64(&s.l2ScanHeight)
|
||||
// GetL2SyncHeight is a method that retrieves the value of l2SyncHeight in SyncInfo.
|
||||
func (s *SyncInfo) GetL2SyncHeight() uint64 {
|
||||
return atomic.LoadUint64(&s.l2SyncHeight)
|
||||
}
|
||||
|
||||
@@ -116,7 +116,6 @@ func (e *L1EventParser) ParseL1CrossChainEventLogs(logs []types.Log, blockTimest
|
||||
}
|
||||
l1DepositMessages = append(l1DepositMessages, &orm.CrossMessage{
|
||||
L1BlockNumber: vlog.BlockNumber,
|
||||
L1BlockHash: vlog.BlockHash.String(),
|
||||
Sender: event.Sender.String(),
|
||||
Receiver: event.Target.String(),
|
||||
TokenType: int(orm.TokenTypeETH),
|
||||
@@ -137,7 +136,6 @@ func (e *L1EventParser) ParseL1CrossChainEventLogs(logs []types.Log, blockTimest
|
||||
l1RelayedMessages = append(l1RelayedMessages, &orm.CrossMessage{
|
||||
MessageHash: event.MessageHash.String(),
|
||||
L1BlockNumber: vlog.BlockNumber,
|
||||
L1BlockHash: vlog.BlockHash.String(),
|
||||
L1TxHash: vlog.TxHash.String(),
|
||||
TxStatus: int(orm.TxStatusTypeRelayed),
|
||||
MessageType: int(orm.MessageTypeL2SentMessage),
|
||||
@@ -151,7 +149,6 @@ func (e *L1EventParser) ParseL1CrossChainEventLogs(logs []types.Log, blockTimest
|
||||
l1RelayedMessages = append(l1RelayedMessages, &orm.CrossMessage{
|
||||
MessageHash: event.MessageHash.String(),
|
||||
L1BlockNumber: vlog.BlockNumber,
|
||||
L1BlockHash: vlog.BlockHash.String(),
|
||||
L1TxHash: vlog.TxHash.String(),
|
||||
TxStatus: int(orm.TxStatusTypeFailedRelayed),
|
||||
MessageType: int(orm.MessageTypeL2SentMessage),
|
||||
@@ -189,7 +186,6 @@ func (e *L1EventParser) ParseL1BatchEventLogs(ctx context.Context, logs []types.
|
||||
StartBlockNumber: startBlock,
|
||||
EndBlockNumber: endBlock,
|
||||
L1BlockNumber: vlog.BlockNumber,
|
||||
L1BlockHash: vlog.BlockHash.String(),
|
||||
})
|
||||
case backendabi.L1RevertBatchEventSig:
|
||||
event := backendabi.L1RevertBatchEvent{}
|
||||
@@ -202,7 +198,6 @@ func (e *L1EventParser) ParseL1BatchEventLogs(ctx context.Context, logs []types.
|
||||
BatchIndex: event.BatchIndex.Uint64(),
|
||||
BatchHash: event.BatchHash.String(),
|
||||
L1BlockNumber: vlog.BlockNumber,
|
||||
L1BlockHash: vlog.BlockHash.String(),
|
||||
})
|
||||
case backendabi.L1FinalizeBatchEventSig:
|
||||
event := backendabi.L1FinalizeBatchEvent{}
|
||||
@@ -215,7 +210,6 @@ func (e *L1EventParser) ParseL1BatchEventLogs(ctx context.Context, logs []types.
|
||||
BatchIndex: event.BatchIndex.Uint64(),
|
||||
BatchHash: event.BatchHash.String(),
|
||||
L1BlockNumber: vlog.BlockNumber,
|
||||
L1BlockHash: vlog.BlockHash.String(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -126,7 +126,6 @@ func (f *L1FetcherLogic) gatewayRouterFailedTxs(ctx context.Context, from, to ui
|
||||
Sender: sender.String(),
|
||||
Receiver: (*tx.To()).String(),
|
||||
L1BlockNumber: receipt.BlockNumber.Uint64(),
|
||||
L1BlockHash: receipt.BlockHash.String(),
|
||||
BlockTimestamp: block.Time(),
|
||||
TxStatus: int(orm.TxStatusTypeSentFailed),
|
||||
})
|
||||
|
||||
@@ -33,8 +33,8 @@ func NewL1ReorgHandlingLogic(db *gorm.DB, client *ethclient.Client) *L1ReorgHand
|
||||
}
|
||||
|
||||
// HandleL1Reorg performs L1 reorg handling by detecting reorgs and updating sync height.
|
||||
func (l *L1ReorgHandlingLogic) HandleL1Reorg(ctx context.Context) (bool, uint64, error) {
|
||||
reorgDetected, reorgDetectedHeight, err := l.detectReorg(ctx)
|
||||
func (l *L1ReorgHandlingLogic) HandleL1Reorg(ctx context.Context, blockNumber uint64, blockHash common.Hash) (bool, uint64, error) {
|
||||
reorgDetected, err := l.detectReorg(ctx, blockNumber, blockHash)
|
||||
if err != nil {
|
||||
log.Error("failed to detect reorg", "err", err)
|
||||
return false, 0, err
|
||||
@@ -42,8 +42,8 @@ func (l *L1ReorgHandlingLogic) HandleL1Reorg(ctx context.Context) (bool, uint64,
|
||||
|
||||
if reorgDetected {
|
||||
resyncHeight := uint64(1)
|
||||
if reorgDetectedHeight > L1ReorgSafeDepth {
|
||||
resyncHeight = reorgDetectedHeight - L1ReorgSafeDepth
|
||||
if blockNumber > L1ReorgSafeDepth {
|
||||
resyncHeight = blockNumber - L1ReorgSafeDepth
|
||||
}
|
||||
return true, resyncHeight - 1, nil
|
||||
}
|
||||
@@ -51,49 +51,22 @@ func (l *L1ReorgHandlingLogic) HandleL1Reorg(ctx context.Context) (bool, uint64,
|
||||
return false, 0, nil
|
||||
}
|
||||
|
||||
func (l *L1ReorgHandlingLogic) detectReorg(ctx context.Context) (bool, uint64, error) {
|
||||
batchBlockNumber, batchBlockHash, err := l.batchEventOrm.GetMaxL1BlockNumberAndHash(ctx)
|
||||
func (l *L1ReorgHandlingLogic) detectReorg(ctx context.Context, blockNumber uint64, blockHash common.Hash) (bool, error) {
|
||||
currentHeader, err := l.client.HeaderByNumber(ctx, big.NewInt(0).SetUint64(blockNumber))
|
||||
if err != nil {
|
||||
log.Error("failed to get max L1 block number and hash in batch event orm", "err", err)
|
||||
return false, 0, err
|
||||
log.Error("failed to get header by number", "height", blockNumber, "err", err)
|
||||
return false, err
|
||||
}
|
||||
|
||||
messageBlockNumber, messageBlockHash, err := l.crossMessageOrm.GetMaxL1BlockNumberAndHash(ctx)
|
||||
if err != nil {
|
||||
log.Error("failed to get max L1 block number and hash in cross message orm", "err", err)
|
||||
return false, 0, err
|
||||
if currentHeader == nil {
|
||||
log.Warn("cannot fetch remote block header", "height", blockNumber, "last block hash", blockHash.String())
|
||||
return true, nil
|
||||
}
|
||||
|
||||
var localBlockNumber uint64
|
||||
var localBlockHash common.Hash
|
||||
if batchBlockNumber > messageBlockNumber {
|
||||
localBlockNumber = batchBlockNumber
|
||||
localBlockHash = batchBlockHash
|
||||
} else {
|
||||
localBlockNumber = messageBlockNumber
|
||||
localBlockHash = messageBlockHash
|
||||
if blockHash != currentHeader.Hash() {
|
||||
log.Warn("block hash mismatch, reorg happened", "height", blockNumber, "last block hash", blockHash.String(), "current block hash", currentHeader.Hash().String())
|
||||
return true, nil
|
||||
}
|
||||
|
||||
if localBlockNumber == 0 {
|
||||
log.Warn("no local info of latest block number and hash")
|
||||
return false, 0, nil
|
||||
}
|
||||
|
||||
remoteHeader, err := l.client.HeaderByNumber(ctx, big.NewInt(0).SetUint64(localBlockNumber))
|
||||
if err != nil {
|
||||
log.Error("failed to get header by number", "height", localBlockNumber, "err", err)
|
||||
return false, 0, err
|
||||
}
|
||||
|
||||
if remoteHeader == nil {
|
||||
log.Warn("cannot fetch remote block header", "blockNumber", localBlockNumber, "local hash", localBlockHash.String())
|
||||
return true, localBlockNumber, nil
|
||||
}
|
||||
|
||||
if localBlockHash != remoteHeader.Hash() {
|
||||
log.Warn("block hash mismatch, reorg happened", "height", localBlockNumber, "local hash", localBlockHash.String(), "remote hash", remoteHeader.Hash().String())
|
||||
return true, localBlockNumber, nil
|
||||
}
|
||||
|
||||
return false, 0, nil
|
||||
return false, nil
|
||||
}
|
||||
|
||||
@@ -132,7 +132,6 @@ func (e *L2EventParser) ParseL2EventLogs(logs []types.Log, blockTimestampsMap ma
|
||||
TxStatus: int(orm.TxStatusTypeSent),
|
||||
BlockTimestamp: blockTimestampsMap[vlog.BlockNumber],
|
||||
L2BlockNumber: vlog.BlockNumber,
|
||||
L2BlockHash: vlog.BlockHash.String(),
|
||||
})
|
||||
case backendabi.L2RelayedMessageEventSig:
|
||||
event := backendabi.L2RelayedMessageEvent{}
|
||||
@@ -144,7 +143,6 @@ func (e *L2EventParser) ParseL2EventLogs(logs []types.Log, blockTimestampsMap ma
|
||||
l2RelayedMessages = append(l2RelayedMessages, &orm.CrossMessage{
|
||||
MessageHash: event.MessageHash.String(),
|
||||
L2BlockNumber: vlog.BlockNumber,
|
||||
L2BlockHash: vlog.BlockHash.String(),
|
||||
L2TxHash: vlog.TxHash.String(),
|
||||
TxStatus: int(orm.TxStatusTypeRelayed),
|
||||
MessageType: int(orm.MessageTypeL1SentMessage),
|
||||
@@ -159,7 +157,6 @@ func (e *L2EventParser) ParseL2EventLogs(logs []types.Log, blockTimestampsMap ma
|
||||
l2RelayedMessages = append(l2RelayedMessages, &orm.CrossMessage{
|
||||
MessageHash: event.MessageHash.String(),
|
||||
L2BlockNumber: vlog.BlockNumber,
|
||||
L2BlockHash: vlog.BlockHash.String(),
|
||||
L2TxHash: vlog.TxHash.String(),
|
||||
TxStatus: int(orm.TxStatusTypeFailedRelayed),
|
||||
MessageType: int(orm.MessageTypeL1SentMessage),
|
||||
|
||||
@@ -116,7 +116,6 @@ func (f *L2FetcherLogic) gatewayRouterFailedTxs(ctx context.Context, from, to ui
|
||||
Sender: sender.String(),
|
||||
Receiver: (*tx.To()).String(),
|
||||
L2BlockNumber: receipt.BlockNumber.Uint64(),
|
||||
L2BlockHash: receipt.BlockHash.String(),
|
||||
BlockTimestamp: block.Time(),
|
||||
TxStatus: int(orm.TxStatusTypeSentFailed),
|
||||
})
|
||||
@@ -137,7 +136,6 @@ func (f *L2FetcherLogic) gatewayRouterFailedTxs(ctx context.Context, from, to ui
|
||||
L2TxHash: tx.Hash().String(),
|
||||
TxStatus: int(orm.TxStatusTypeRelayedTxReverted),
|
||||
L2BlockNumber: receipt.BlockNumber.Uint64(),
|
||||
L2BlockHash: receipt.BlockHash.String(),
|
||||
MessageType: int(orm.MessageTypeL1SentMessage),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"math/big"
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/common"
|
||||
"github.com/scroll-tech/go-ethereum/ethclient"
|
||||
"github.com/scroll-tech/go-ethereum/log"
|
||||
"gorm.io/gorm"
|
||||
@@ -30,8 +31,8 @@ func NewL2ReorgHandlingLogic(db *gorm.DB, client *ethclient.Client) *L2ReorgHand
|
||||
}
|
||||
|
||||
// HandleL2Reorg performs L2 reorg handling by detecting reorgs and updating sync height.
|
||||
func (l *L2ReorgHandlingLogic) HandleL2Reorg(ctx context.Context) (bool, uint64, error) {
|
||||
l2ReorgDetected, l2ReorgDetectedHeight, err := l.detectL2Reorg(ctx)
|
||||
func (l *L2ReorgHandlingLogic) HandleL2Reorg(ctx context.Context, blockNumber uint64, blockHash common.Hash) (bool, uint64, error) {
|
||||
l2ReorgDetected, err := l.detectL2Reorg(ctx, blockNumber, blockHash)
|
||||
if err != nil {
|
||||
log.Error("failed to detect L2 reorg", "err", err)
|
||||
return false, 0, err
|
||||
@@ -39,8 +40,8 @@ func (l *L2ReorgHandlingLogic) HandleL2Reorg(ctx context.Context) (bool, uint64,
|
||||
|
||||
if l2ReorgDetected {
|
||||
startHeight := uint64(1)
|
||||
if l2ReorgDetectedHeight > L2ReorgSafeDepth {
|
||||
startHeight = l2ReorgDetectedHeight - L2ReorgSafeDepth
|
||||
if blockNumber > L2ReorgSafeDepth {
|
||||
startHeight = blockNumber - L2ReorgSafeDepth
|
||||
}
|
||||
return true, startHeight - 1, nil
|
||||
}
|
||||
@@ -48,33 +49,22 @@ func (l *L2ReorgHandlingLogic) HandleL2Reorg(ctx context.Context) (bool, uint64,
|
||||
return false, 0, nil
|
||||
}
|
||||
|
||||
func (l *L2ReorgHandlingLogic) detectL2Reorg(ctx context.Context) (bool, uint64, error) {
|
||||
localBlockNumber, localBlockHash, err := l.crossMessageOrm.GetMaxL2BlockNumberAndHash(ctx)
|
||||
func (l *L2ReorgHandlingLogic) detectL2Reorg(ctx context.Context, blockNumber uint64, blockHash common.Hash) (bool, error) {
|
||||
currentHeader, err := l.client.HeaderByNumber(ctx, big.NewInt(0).SetUint64(blockNumber))
|
||||
if err != nil {
|
||||
log.Error("failed to get max L1 block number and hash in cross message orm", "err", err)
|
||||
return false, 0, err
|
||||
log.Error("failed to get header by number", "height", blockNumber, "err", err)
|
||||
return false, err
|
||||
}
|
||||
|
||||
if localBlockNumber == 0 {
|
||||
log.Warn("no local info of latest block number and hash")
|
||||
return false, 0, nil
|
||||
if currentHeader == nil {
|
||||
log.Warn("cannot fetch remote block header", "height", blockNumber, "last block hash", blockHash.String())
|
||||
return true, nil
|
||||
}
|
||||
|
||||
remoteHeader, err := l.client.HeaderByNumber(ctx, big.NewInt(0).SetUint64(localBlockNumber))
|
||||
if err != nil {
|
||||
log.Error("failed to get header by number", "height", localBlockNumber, "err", err)
|
||||
return false, 0, err
|
||||
if blockHash != currentHeader.Hash() {
|
||||
log.Warn("block hash mismatch, reorg happened", "height", blockNumber, "last block hash", blockHash.String(), "current block hash", currentHeader.Hash().String())
|
||||
return true, nil
|
||||
}
|
||||
|
||||
if remoteHeader == nil {
|
||||
log.Warn("cannot fetch remote block header", "blockNumber", localBlockNumber, "local hash", localBlockHash.String())
|
||||
return true, localBlockNumber, nil
|
||||
}
|
||||
|
||||
if localBlockHash != remoteHeader.Hash() {
|
||||
log.Warn("block hash mismatch, reorg happened", "height", localBlockNumber, "local hash", localBlockHash.String(), "remote hash", remoteHeader.Hash().String())
|
||||
return true, localBlockNumber, nil
|
||||
}
|
||||
|
||||
return false, 0, nil
|
||||
return false, nil
|
||||
}
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/scroll-tech/go-ethereum/common"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
@@ -35,7 +34,6 @@ type BatchEvent struct {
|
||||
|
||||
ID uint64 `json:"id" gorm:"column:id;primary_key"`
|
||||
L1BlockNumber uint64 `json:"l1_block_number" gorm:"column:l1_block_number"`
|
||||
L1BlockHash string `json:"l1_block_hash" gorm:"column:l1_block_hash"`
|
||||
BatchStatus int `json:"batch_status" gorm:"column:batch_status"`
|
||||
BatchIndex uint64 `json:"batch_index" gorm:"column:batch_index"`
|
||||
BatchHash string `json:"batch_hash" gorm:"column:batch_hash"`
|
||||
@@ -57,22 +55,6 @@ func NewBatchEvent(db *gorm.DB) *BatchEvent {
|
||||
return &BatchEvent{db: db}
|
||||
}
|
||||
|
||||
// GetMaxL1BlockNumberAndHash retrieves the maximum L1 block number and its corresponding block hash.
|
||||
// for reorg detection.
|
||||
func (c *BatchEvent) GetMaxL1BlockNumberAndHash(ctx context.Context) (uint64, common.Hash, error) {
|
||||
var batch BatchEvent
|
||||
db := c.db.WithContext(ctx)
|
||||
db = db.Model(&BatchEvent{})
|
||||
db = db.Order("l1_block_number desc")
|
||||
if err := db.First(&batch).Error; err != nil {
|
||||
if err == gorm.ErrRecordNotFound {
|
||||
return 0, common.Hash{}, nil
|
||||
}
|
||||
return 0, common.Hash{}, fmt.Errorf("failed to get the maximum L1 block number and hash: %w", err)
|
||||
}
|
||||
return batch.L1BlockNumber, common.HexToHash(batch.L1BlockHash), nil
|
||||
}
|
||||
|
||||
// GetBatchEventSyncedHeightInDB returns the maximum l1_block_number from the batch_event table.
|
||||
func (c *BatchEvent) GetBatchEventSyncedHeightInDB(ctx context.Context) (uint64, error) {
|
||||
var batch BatchEvent
|
||||
|
||||
@@ -102,8 +102,6 @@ type CrossMessage struct {
|
||||
L2TxHash string `json:"l2_tx_hash" gorm:"column:l2_tx_hash"`
|
||||
L1BlockNumber uint64 `json:"l1_block_number" gorm:"column:l1_block_number"`
|
||||
L2BlockNumber uint64 `json:"l2_block_number" gorm:"column:l2_block_number"`
|
||||
L1BlockHash string `json:"l1_block_hash" gorm:"column:l1_block_hash"`
|
||||
L2BlockHash string `json:"l2_block_hash" gorm:"column:l2_block_hash"`
|
||||
L1TokenAddress string `json:"l1_token_address" gorm:"column:l1_token_address"`
|
||||
L2TokenAddress string `json:"l2_token_address" gorm:"column:l2_token_address"`
|
||||
TokenIDs string `json:"token_ids" gorm:"column:token_ids"`
|
||||
@@ -131,38 +129,6 @@ func NewCrossMessage(db *gorm.DB) *CrossMessage {
|
||||
return &CrossMessage{db: db}
|
||||
}
|
||||
|
||||
// GetMaxL1BlockNumberAndHash retrieves the maximum L1 block number and its corresponding block hash.
|
||||
// for reorg detection.
|
||||
func (c *CrossMessage) GetMaxL1BlockNumberAndHash(ctx context.Context) (uint64, common.Hash, error) {
|
||||
var message CrossMessage
|
||||
db := c.db.WithContext(ctx)
|
||||
db = db.Model(&CrossMessage{})
|
||||
db = db.Order("l1_block_number desc")
|
||||
if err := db.First(&message).Error; err != nil {
|
||||
if err == gorm.ErrRecordNotFound {
|
||||
return 0, common.Hash{}, nil
|
||||
}
|
||||
return 0, common.Hash{}, fmt.Errorf("failed to get the maximum L1 block number and hash, error: %w", err)
|
||||
}
|
||||
return message.L1BlockNumber, common.HexToHash(message.L1BlockHash), nil
|
||||
}
|
||||
|
||||
// GetMaxL2BlockNumberAndHash retrieves the maximum L2 block number and its corresponding block hash.
|
||||
// for reorg detection.
|
||||
func (c *CrossMessage) GetMaxL2BlockNumberAndHash(ctx context.Context) (uint64, common.Hash, error) {
|
||||
var message CrossMessage
|
||||
db := c.db.WithContext(ctx)
|
||||
db = db.Model(&CrossMessage{})
|
||||
db = db.Order("l2_block_number desc")
|
||||
if err := db.First(&message).Error; err != nil {
|
||||
if err == gorm.ErrRecordNotFound {
|
||||
return 0, common.Hash{}, nil
|
||||
}
|
||||
return 0, common.Hash{}, fmt.Errorf("failed to get the maximum L2 block number and hash, error: %w", err)
|
||||
}
|
||||
return message.L2BlockNumber, common.HexToHash(message.L2BlockHash), nil
|
||||
}
|
||||
|
||||
// GetMessageSyncedHeightInDB returns the latest synced cross message height from the database for a given message type.
|
||||
func (c *CrossMessage) GetMessageSyncedHeightInDB(ctx context.Context, messageType MessageType) (uint64, error) {
|
||||
var message CrossMessage
|
||||
@@ -326,7 +292,7 @@ func (c *CrossMessage) InsertOrUpdateL1Messages(ctx context.Context, messages []
|
||||
// 'tx_status' column is not explicitly assigned during the update to prevent a later status from being overwritten back to "sent".
|
||||
db = db.Clauses(clause.OnConflict{
|
||||
Columns: []clause.Column{{Name: "message_hash"}},
|
||||
DoUpdates: clause.AssignmentColumns([]string{"sender", "receiver", "token_type", "l1_block_number", "l1_block_hash", "l1_tx_hash", "l1_token_address", "l2_token_address", "token_ids", "token_amounts", "message_type", "block_timestamp", "message_nonce"}),
|
||||
DoUpdates: clause.AssignmentColumns([]string{"sender", "receiver", "token_type", "l1_block_number", "l1_tx_hash", "l1_token_address", "l2_token_address", "token_ids", "token_amounts", "message_type", "block_timestamp", "message_nonce"}),
|
||||
})
|
||||
if err := db.Create(messages).Error; err != nil {
|
||||
return fmt.Errorf("failed to insert message, error: %w", err)
|
||||
@@ -348,7 +314,7 @@ func (c *CrossMessage) InsertOrUpdateL2Messages(ctx context.Context, messages []
|
||||
// 'tx_status' column is not explicitly assigned during the update to prevent a later status from being overwritten back to "sent".
|
||||
db = db.Clauses(clause.OnConflict{
|
||||
Columns: []clause.Column{{Name: "message_hash"}},
|
||||
DoUpdates: clause.AssignmentColumns([]string{"sender", "receiver", "token_type", "l2_block_number", "l2_block_hash", "l2_tx_hash", "l1_token_address", "l2_token_address", "token_ids", "token_amounts", "message_type", "block_timestamp", "message_from", "message_to", "message_value", "message_data", "merkle_proof", "message_nonce"}),
|
||||
DoUpdates: clause.AssignmentColumns([]string{"sender", "receiver", "token_type", "l2_block_number", "l2_tx_hash", "l1_token_address", "l2_token_address", "token_ids", "token_amounts", "message_type", "block_timestamp", "message_from", "message_to", "message_value", "message_data", "merkle_proof", "message_nonce"}),
|
||||
})
|
||||
if err := db.Create(messages).Error; err != nil {
|
||||
return fmt.Errorf("failed to insert message, error: %w", err)
|
||||
@@ -417,7 +383,7 @@ func (c *CrossMessage) InsertOrUpdateL2RelayedMessagesOfL1Deposits(ctx context.C
|
||||
db = db.Model(&CrossMessage{})
|
||||
db = db.Clauses(clause.OnConflict{
|
||||
Columns: []clause.Column{{Name: "message_hash"}},
|
||||
DoUpdates: clause.AssignmentColumns([]string{"message_type", "l2_block_number", "l2_block_hash", "l2_tx_hash", "tx_status"}),
|
||||
DoUpdates: clause.AssignmentColumns([]string{"message_type", "l2_block_number", "l2_tx_hash", "tx_status"}),
|
||||
Where: clause.Where{Exprs: []clause.Expression{clause.Neq{Column: "cross_message.tx_status", Value: TxStatusTypeRelayed}}},
|
||||
})
|
||||
if err := db.Create(uniqueL2RelayedMessages).Error; err != nil {
|
||||
@@ -464,7 +430,7 @@ func (c *CrossMessage) InsertOrUpdateL1RelayedMessagesOfL2Withdrawals(ctx contex
|
||||
db = db.Model(&CrossMessage{})
|
||||
db = db.Clauses(clause.OnConflict{
|
||||
Columns: []clause.Column{{Name: "message_hash"}},
|
||||
DoUpdates: clause.AssignmentColumns([]string{"message_type", "l1_block_number", "l1_block_hash", "l1_tx_hash", "tx_status"}),
|
||||
DoUpdates: clause.AssignmentColumns([]string{"message_type", "l1_block_number", "l1_tx_hash", "tx_status"}),
|
||||
})
|
||||
if err := db.Create(uniqueL1RelayedMessages).Error; err != nil {
|
||||
return fmt.Errorf("failed to update L1 relayed message of L2 withdrawal, error: %w", err)
|
||||
|
||||
@@ -15,8 +15,6 @@ CREATE TABLE cross_message
|
||||
l2_tx_hash VARCHAR DEFAULT NULL,
|
||||
l1_block_number BIGINT DEFAULT NULL,
|
||||
l2_block_number BIGINT DEFAULT NULL,
|
||||
l1_block_hash VARCHAR DEFAULT NULL, -- for reorg detection
|
||||
l2_block_hash VARCHAR DEFAULT NULL, -- for reorg detection
|
||||
l1_token_address VARCHAR DEFAULT NULL,
|
||||
l2_token_address VARCHAR DEFAULT NULL,
|
||||
token_ids VARCHAR DEFAULT NULL,
|
||||
|
||||
@@ -4,7 +4,6 @@ CREATE TABLE batch_event
|
||||
(
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
l1_block_number BIGINT NOT NULL,
|
||||
l1_block_hash VARCHAR NOT NULL, -- for reorg detection
|
||||
batch_status SMALLINT NOT NULL,
|
||||
batch_index BIGINT NOT NULL,
|
||||
batch_hash VARCHAR NOT NULL,
|
||||
@@ -19,7 +18,6 @@ CREATE TABLE batch_event
|
||||
CREATE INDEX IF NOT EXISTS idx_be_l1_block_number ON batch_event (l1_block_number) WHERE deleted_at IS NULL;
|
||||
CREATE INDEX IF NOT EXISTS idx_be_batch_index ON batch_event (batch_index) WHERE deleted_at IS NULL;
|
||||
CREATE INDEX IF NOT EXISTS idx_be_batch_index_batch_hash ON batch_event (batch_index, batch_hash) WHERE deleted_at IS NULL;
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS unique_idx_be_batch_index_batch_hash_l1_block_hash ON batch_event (batch_index, batch_hash, l1_block_hash) WHERE deleted_at IS NULL;
|
||||
CREATE INDEX IF NOT EXISTS idx_be_end_block_number_update_status_batch_index ON batch_event (end_block_number, update_status, batch_index) WHERE deleted_at IS NULL;
|
||||
|
||||
-- +goose StatementEnd
|
||||
|
||||
Reference in New Issue
Block a user