Files
scroll/rollup/internal/controller/watcher/l1_watcher.go
2023-10-10 17:25:39 +08:00

323 lines
10 KiB
Go

package watcher
import (
"context"
"errors"
"math/big"
"github.com/prometheus/client_golang/prometheus"
geth "github.com/scroll-tech/go-ethereum"
"github.com/scroll-tech/go-ethereum/accounts/abi"
"github.com/scroll-tech/go-ethereum/common"
gethTypes "github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/crypto"
"github.com/scroll-tech/go-ethereum/ethclient"
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/rpc"
"gorm.io/gorm"
"scroll-tech/common/types"
bridgeAbi "scroll-tech/rollup/abi"
"scroll-tech/rollup/internal/orm"
"scroll-tech/rollup/internal/utils"
)
type rollupEvent struct {
batchHash common.Hash
txHash common.Hash
status types.RollupStatus
}
// L1WatcherClient will listen for smart contract events from Eth L1.
type L1WatcherClient struct {
ctx context.Context
client *ethclient.Client
l1MessageOrm *orm.L1Message
l1BlockOrm *orm.L1Block
batchOrm *orm.Batch
// The number of new blocks to wait for a block to be confirmed
confirmations rpc.BlockNumber
messageQueueAddress common.Address
messageQueueABI *abi.ABI
scrollChainAddress common.Address
scrollChainABI *abi.ABI
// The height of the block that the watcher has retrieved event logs
processedMsgHeight uint64
// The height of the block that the watcher has retrieved header rlp
processedBlockHeight uint64
metrics *l1WatcherMetrics
}
// NewL1WatcherClient returns a new instance of L1WatcherClient.
func NewL1WatcherClient(ctx context.Context, client *ethclient.Client, startHeight uint64, confirmations rpc.BlockNumber, messageQueueAddress, scrollChainAddress common.Address, db *gorm.DB, reg prometheus.Registerer) *L1WatcherClient {
l1MessageOrm := orm.NewL1Message(db)
savedHeight, err := l1MessageOrm.GetLayer1LatestWatchedHeight()
if err != nil {
log.Warn("Failed to fetch height from db", "err", err)
savedHeight = 0
}
if savedHeight < int64(startHeight) {
savedHeight = int64(startHeight)
}
l1BlockOrm := orm.NewL1Block(db)
savedL1BlockHeight, err := l1BlockOrm.GetLatestL1BlockHeight(ctx)
if err != nil {
log.Warn("Failed to fetch latest L1 block height from db", "err", err)
savedL1BlockHeight = 0
}
if savedL1BlockHeight < startHeight {
savedL1BlockHeight = startHeight
}
return &L1WatcherClient{
ctx: ctx,
client: client,
l1MessageOrm: l1MessageOrm,
l1BlockOrm: l1BlockOrm,
batchOrm: orm.NewBatch(db),
confirmations: confirmations,
messageQueueAddress: messageQueueAddress,
messageQueueABI: bridgeAbi.L1MessageQueueABI,
scrollChainAddress: scrollChainAddress,
scrollChainABI: bridgeAbi.ScrollChainABI,
processedMsgHeight: uint64(savedHeight),
processedBlockHeight: savedL1BlockHeight,
metrics: initL1WatcherMetrics(reg),
}
}
// ProcessedBlockHeight get processedBlockHeight
// Currently only use for unit test
func (w *L1WatcherClient) ProcessedBlockHeight() uint64 {
return w.processedBlockHeight
}
// Confirmations get confirmations
// Currently only use for unit test
func (w *L1WatcherClient) Confirmations() rpc.BlockNumber {
return w.confirmations
}
// SetConfirmations set the confirmations for L1WatcherClient
// Currently only use for unit test
func (w *L1WatcherClient) SetConfirmations(confirmations rpc.BlockNumber) {
w.confirmations = confirmations
}
// FetchBlockHeader pull latest L1 blocks and save in DB
func (w *L1WatcherClient) FetchBlockHeader(blockHeight uint64) error {
w.metrics.l1WatcherFetchBlockHeaderTotal.Inc()
var block *gethTypes.Header
block, err := w.client.HeaderByNumber(w.ctx, big.NewInt(int64(blockHeight)))
if err != nil {
log.Warn("Failed to get block", "height", blockHeight, "err", err)
return err
}
if block == nil {
log.Warn("Received nil block", "height", blockHeight)
return errors.New("received nil block")
}
var baseFee uint64
if block.BaseFee != nil {
baseFee = block.BaseFee.Uint64()
}
l1Block := orm.L1Block{
Number: blockHeight,
Hash: block.Hash().String(),
BaseFee: baseFee,
GasOracleStatus: int16(types.GasOraclePending),
}
err = w.l1BlockOrm.InsertL1Blocks(w.ctx, []orm.L1Block{l1Block})
if err != nil {
log.Warn("Failed to insert L1 block to db", "blockHeight", blockHeight, "err", err)
return err
}
// update processed height
w.processedBlockHeight = blockHeight
w.metrics.l1WatcherFetchBlockHeaderProcessedBlockHeight.Set(float64(w.processedBlockHeight))
return nil
}
// FetchContractEvent pull latest event logs from given contract address and save in DB
func (w *L1WatcherClient) FetchContractEvent() error {
defer func() {
log.Info("l1 watcher fetchContractEvent", "w.processedMsgHeight", w.processedMsgHeight)
}()
blockHeight, err := utils.GetLatestConfirmedBlockNumber(w.ctx, w.client, w.confirmations)
if err != nil {
log.Error("failed to get block number", "err", err)
return err
}
fromBlock := int64(w.processedMsgHeight) + 1
toBlock := int64(blockHeight)
for from := fromBlock; from <= toBlock; from += contractEventsBlocksFetchLimit {
w.metrics.l1WatcherFetchContractEventTotal.Inc()
to := from + contractEventsBlocksFetchLimit - 1
if to > toBlock {
to = toBlock
}
// warning: uint int conversion...
query := geth.FilterQuery{
FromBlock: big.NewInt(from), // inclusive
ToBlock: big.NewInt(to), // inclusive
Addresses: []common.Address{
w.scrollChainAddress,
w.messageQueueAddress,
},
Topics: make([][]common.Hash, 1),
}
query.Topics[0] = make([]common.Hash, 3)
query.Topics[0][0] = bridgeAbi.L1QueueTransactionEventSignature
query.Topics[0][1] = bridgeAbi.L1CommitBatchEventSignature
query.Topics[0][2] = bridgeAbi.L1FinalizeBatchEventSignature
logs, err := w.client.FilterLogs(w.ctx, query)
if err != nil {
log.Warn("Failed to get event logs", "err", err)
return err
}
if len(logs) == 0 {
w.processedMsgHeight = uint64(to)
w.metrics.l1WatcherFetchContractEventProcessedBlockHeight.Set(float64(to))
continue
}
log.Info("Received new L1 events", "fromBlock", from, "toBlock", to, "cnt", len(logs))
sentMessageEvents, rollupEvents, err := w.parseBridgeEventLogs(logs)
if err != nil {
log.Error("Failed to parse emitted events log", "err", err)
return err
}
sentMessageCount := int64(len(sentMessageEvents))
rollupEventCount := int64(len(rollupEvents))
w.metrics.l1WatcherFetchContractEventSentEventsTotal.Add(float64(sentMessageCount))
w.metrics.l1WatcherFetchContractEventRollupEventsTotal.Add(float64(rollupEventCount))
log.Info("L1 events types", "SentMessageCount", sentMessageCount, "RollupEventCount", rollupEventCount)
// use rollup event to update rollup results db status
var batchHashes []string
for _, event := range rollupEvents {
batchHashes = append(batchHashes, event.batchHash.String())
}
statuses, err := w.batchOrm.GetRollupStatusByHashList(w.ctx, batchHashes)
if err != nil {
log.Error("Failed to GetRollupStatusByHashList", "err", err)
return err
}
if len(statuses) != len(batchHashes) {
log.Error("RollupStatus.Length mismatch with batchHashes.Length", "RollupStatus.Length", len(statuses), "batchHashes.Length", len(batchHashes))
return nil
}
for index, event := range rollupEvents {
batchHash := event.batchHash.String()
status := statuses[index]
// only update when db status is before event status
if event.status > status {
if event.status == types.RollupFinalized {
err = w.batchOrm.UpdateFinalizeTxHashAndRollupStatus(w.ctx, batchHash, event.txHash.String(), event.status)
} else if event.status == types.RollupCommitted {
err = w.batchOrm.UpdateCommitTxHashAndRollupStatus(w.ctx, batchHash, event.txHash.String(), event.status)
}
if err != nil {
log.Error("Failed to update Rollup/Finalize TxHash and Status", "err", err)
return err
}
}
}
if err = w.l1MessageOrm.SaveL1Messages(w.ctx, sentMessageEvents); err != nil {
return err
}
w.processedMsgHeight = uint64(to)
w.metrics.l1WatcherFetchContractEventSuccessTotal.Inc()
w.metrics.l1WatcherFetchContractEventProcessedBlockHeight.Set(float64(w.processedMsgHeight))
}
return nil
}
func (w *L1WatcherClient) parseBridgeEventLogs(logs []gethTypes.Log) ([]*orm.L1Message, []rollupEvent, error) {
// Need use contract abi to parse event Log
// Can only be tested after we have our contracts set up
var l1Messages []*orm.L1Message
var rollupEvents []rollupEvent
for _, vLog := range logs {
switch vLog.Topics[0] {
case bridgeAbi.L1QueueTransactionEventSignature:
event := bridgeAbi.L1QueueTransactionEvent{}
err := utils.UnpackLog(w.messageQueueABI, &event, "QueueTransaction", vLog)
if err != nil {
log.Warn("Failed to unpack layer1 QueueTransaction event", "err", err)
return l1Messages, rollupEvents, err
}
msgHash := common.BytesToHash(crypto.Keccak256(event.Data))
l1Messages = append(l1Messages, &orm.L1Message{
QueueIndex: event.QueueIndex,
MsgHash: msgHash.String(),
Height: vLog.BlockNumber,
Sender: event.Sender.String(),
Value: event.Value.String(),
Target: event.Target.String(),
Calldata: common.Bytes2Hex(event.Data),
GasLimit: event.GasLimit.Uint64(),
Layer1Hash: vLog.TxHash.Hex(),
})
case bridgeAbi.L1CommitBatchEventSignature:
event := bridgeAbi.L1CommitBatchEvent{}
err := utils.UnpackLog(w.scrollChainABI, &event, "CommitBatch", vLog)
if err != nil {
log.Warn("Failed to unpack layer1 CommitBatch event", "err", err)
return l1Messages, rollupEvents, err
}
rollupEvents = append(rollupEvents, rollupEvent{
batchHash: event.BatchHash,
txHash: vLog.TxHash,
status: types.RollupCommitted,
})
case bridgeAbi.L1FinalizeBatchEventSignature:
event := bridgeAbi.L1FinalizeBatchEvent{}
err := utils.UnpackLog(w.scrollChainABI, &event, "FinalizeBatch", vLog)
if err != nil {
log.Warn("Failed to unpack layer1 FinalizeBatch event", "err", err)
return l1Messages, rollupEvents, err
}
rollupEvents = append(rollupEvents, rollupEvent{
batchHash: event.BatchHash,
txHash: vLog.TxHash,
status: types.RollupFinalized,
})
default:
log.Error("Unknown event", "topic", vLog.Topics[0], "txHash", vLog.TxHash)
}
}
return l1Messages, rollupEvents, nil
}