Files
scroll/rollup/internal/controller/watcher/l2_watcher.go
Zhang Zhuo 6bee33036f feat: the CLOAK privacy solution (#1737)
Co-authored-by: Ho <fan@scroll.io>
Co-authored-by: Rohit Narurkar <rohit.narurkar@proton.me>
Co-authored-by: Péter Garamvölgyi <peter@scroll.io>
2025-11-14 15:00:37 +01:00

169 lines
5.3 KiB
Go

package watcher
import (
"context"
"fmt"
"math/big"
"github.com/prometheus/client_golang/prometheus"
"github.com/scroll-tech/da-codec/encoding"
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/ethclient"
"github.com/scroll-tech/go-ethereum/event"
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/params"
"github.com/scroll-tech/go-ethereum/rpc"
"gorm.io/gorm"
"scroll-tech/rollup/internal/orm"
)
// L2WatcherClient provide APIs which support others to subscribe to various event from l2geth
type L2WatcherClient struct {
ctx context.Context
event.Feed
*ethclient.Client
rpcCli *rpc.Client
l2BlockOrm *orm.L2Block
confirmations rpc.BlockNumber
messageQueueAddress common.Address
withdrawTrieRootSlot common.Hash
validiumMode bool
metrics *l2WatcherMetrics
chainCfg *params.ChainConfig
}
// NewL2WatcherClient take a l2geth instance to generate a l2watcherclient instance
func NewL2WatcherClient(ctx context.Context, client *rpc.Client, confirmations rpc.BlockNumber, messageQueueAddress common.Address, withdrawTrieRootSlot common.Hash, chainCfg *params.ChainConfig, db *gorm.DB, validiumMode bool, reg prometheus.Registerer) *L2WatcherClient {
return &L2WatcherClient{
ctx: ctx,
Client: ethclient.NewClient(client),
rpcCli: client,
l2BlockOrm: orm.NewL2Block(db),
confirmations: confirmations,
messageQueueAddress: messageQueueAddress,
withdrawTrieRootSlot: withdrawTrieRootSlot,
validiumMode: validiumMode,
metrics: initL2WatcherMetrics(reg),
chainCfg: chainCfg,
}
}
const blocksFetchLimit = uint64(10)
// TryFetchRunningMissingBlocks attempts to fetch and store block traces for any missing blocks.
func (w *L2WatcherClient) TryFetchRunningMissingBlocks(blockHeight uint64) error {
w.metrics.fetchRunningMissingBlocksTotal.Inc()
heightInDB, err := w.l2BlockOrm.GetL2BlocksLatestHeight(w.ctx)
if err != nil {
log.Error("failed to GetL2BlocksLatestHeight", "err", err)
return fmt.Errorf("failed to GetL2BlocksLatestHeight: %w", err)
}
// Fetch and store block traces for missing blocks
for from := heightInDB + 1; from <= blockHeight; from += blocksFetchLimit {
to := from + blocksFetchLimit - 1
if to > blockHeight {
to = blockHeight
}
if err = w.GetAndStoreBlocks(w.ctx, from, to); err != nil {
log.Error("fail to getAndStoreBlockTraces", "from", from, "to", to, "err", err)
return fmt.Errorf("fail to getAndStoreBlockTraces: %w", err)
}
w.metrics.fetchRunningMissingBlocksHeight.Set(float64(to))
w.metrics.rollupL2BlocksFetchedGap.Set(float64(blockHeight - to))
}
return nil
}
func (w *L2WatcherClient) GetAndStoreBlocks(ctx context.Context, from, to uint64) error {
var blocks []*encoding.Block
for number := from; number <= to; number++ {
log.Debug("retrieving block", "height", number)
block, err := w.BlockByNumber(ctx, new(big.Int).SetUint64(number))
if err != nil {
return fmt.Errorf("failed to BlockByNumber: %v. number: %v", err, number)
}
blockTxs := block.Transactions()
var count int
for _, tx := range blockTxs {
if tx.IsL1MessageTx() {
count++
}
}
log.Info("retrieved block", "height", block.Header().Number, "hash", block.Header().Hash().String(), "L1 message count", count)
// use original (encrypted) L1 message txs in validium mode
if w.validiumMode {
var txs []*types.Transaction
if count > 0 {
log.Info("Fetching encrypted messages in validium mode")
err = w.rpcCli.CallContext(ctx, &txs, "scroll_getL1MessagesInBlock", block.Hash(), "synced")
if err != nil {
return fmt.Errorf("failed to get L1 messages: %v, block hash: %v", err, block.Hash().Hex())
}
}
// sanity check
if len(txs) != count {
return fmt.Errorf("L1 message count mismatch: expected %d, got %d", count, len(txs))
}
for ii := 0; ii < count; ii++ {
// sanity check
if blockTxs[ii].AsL1MessageTx().QueueIndex != txs[ii].AsL1MessageTx().QueueIndex {
return fmt.Errorf("L1 message queue index mismatch at index %d: expected %d, got %d", ii, blockTxs[ii].AsL1MessageTx().QueueIndex, txs[ii].AsL1MessageTx().QueueIndex)
}
log.Info("Replacing L1 message tx in validium mode", "index", ii, "queueIndex", txs[ii].AsL1MessageTx().QueueIndex, "decryptedTxHash", blockTxs[ii].Hash().Hex(), "originalTxHash", txs[ii].Hash().Hex())
blockTxs[ii] = txs[ii]
}
}
withdrawRoot, err3 := w.StorageAt(ctx, w.messageQueueAddress, w.withdrawTrieRootSlot, big.NewInt(int64(number)))
if err3 != nil {
return fmt.Errorf("failed to get withdrawRoot: %v. number: %v", err3, number)
}
blocks = append(blocks, &encoding.Block{
Header: block.Header(),
Transactions: encoding.TxsToTxsData(blockTxs),
WithdrawRoot: common.BytesToHash(withdrawRoot),
})
}
if len(blocks) > 0 {
for _, block := range blocks {
codec := encoding.CodecFromConfig(w.chainCfg, block.Header.Number, block.Header.Time)
if codec == nil {
return fmt.Errorf("failed to retrieve codec for block number %v and time %v", block.Header.Number, block.Header.Time)
}
w.metrics.rollupL2WatcherSyncThroughput.Add(float64(block.Header.GasUsed))
}
if err := w.l2BlockOrm.InsertL2Blocks(w.ctx, blocks); err != nil {
return fmt.Errorf("failed to batch insert BlockTraces: %v", err)
}
}
return nil
}