Files
scroll/rollup/internal/controller/watcher/batch_proposer.go
colin 4233ad928c feat(rollup-relayer): support Validium (#1693)
Co-authored-by: Péter Garamvölgyi <peter@scroll.io>
2025-07-09 15:02:54 +08:00

376 lines
15 KiB
Go

package watcher
import (
"context"
"fmt"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/scroll-tech/da-codec/encoding"
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/params"
"gorm.io/gorm"
"scroll-tech/common/types"
"scroll-tech/rollup/internal/config"
"scroll-tech/rollup/internal/orm"
"scroll-tech/rollup/internal/utils"
)
// BatchProposer proposes batches based on available unbatched chunks.
type BatchProposer struct {
ctx context.Context
db *gorm.DB
batchOrm *orm.Batch
chunkOrm *orm.Chunk
l2BlockOrm *orm.L2Block
cfg *config.BatchProposerConfig
replayMode bool
validiumMode bool
minCodecVersion encoding.CodecVersion
chainCfg *params.ChainConfig
batchProposerCircleTotal prometheus.Counter
proposeBatchFailureTotal prometheus.Counter
proposeBatchUpdateInfoTotal prometheus.Counter
proposeBatchUpdateInfoFailureTotal prometheus.Counter
totalL1CommitBlobSize prometheus.Gauge
batchChunksNum prometheus.Gauge
batchFirstBlockTimeoutReached prometheus.Counter
batchChunksProposeNotEnoughTotal prometheus.Counter
batchEstimateBlobSizeTime prometheus.Gauge
// total number of times that batch proposer stops early due to compressed data compatibility breach
compressedDataCompatibilityBreachTotal prometheus.Counter
batchProposeBlockHeight prometheus.Gauge
batchProposeThroughput prometheus.Counter
}
// NewBatchProposer creates a new BatchProposer instance.
func NewBatchProposer(ctx context.Context, cfg *config.BatchProposerConfig, minCodecVersion encoding.CodecVersion, chainCfg *params.ChainConfig, db *gorm.DB, validiumMode bool, reg prometheus.Registerer) *BatchProposer {
log.Info("new batch proposer", "batchTimeoutSec", cfg.BatchTimeoutSec, "maxBlobSize", maxBlobSize, "maxUncompressedBatchBytesSize", cfg.MaxUncompressedBatchBytesSize)
p := &BatchProposer{
ctx: ctx,
db: db,
batchOrm: orm.NewBatch(db),
chunkOrm: orm.NewChunk(db),
l2BlockOrm: orm.NewL2Block(db),
cfg: cfg,
replayMode: false, // default is false, set to true when using proposer tool
validiumMode: validiumMode,
minCodecVersion: minCodecVersion,
chainCfg: chainCfg,
batchProposerCircleTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "rollup_propose_batch_circle_total",
Help: "Total number of propose batch total.",
}),
proposeBatchFailureTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "rollup_propose_batch_failure_circle_total",
Help: "Total number of propose batch total.",
}),
proposeBatchUpdateInfoTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "rollup_propose_batch_update_info_total",
Help: "Total number of propose batch update info total.",
}),
proposeBatchUpdateInfoFailureTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "rollup_propose_batch_update_info_failure_total",
Help: "Total number of propose batch update info failure total.",
}),
compressedDataCompatibilityBreachTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "rollup_propose_batch_due_to_compressed_data_compatibility_breach_total",
Help: "Total number of propose batch due to compressed data compatibility breach.",
}),
totalL1CommitBlobSize: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "rollup_propose_batch_total_l1_commit_blob_size",
Help: "The total l1 commit blob size",
}),
batchChunksNum: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "rollup_propose_batch_chunks_number",
Help: "The number of chunks in the batch",
}),
batchFirstBlockTimeoutReached: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "rollup_propose_batch_first_block_timeout_reached_total",
Help: "Total times of batch's first block timeout reached",
}),
batchChunksProposeNotEnoughTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "rollup_propose_batch_chunks_propose_not_enough_total",
Help: "Total number of batch chunk propose not enough",
}),
batchEstimateBlobSizeTime: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "rollup_propose_batch_estimate_blob_size_time",
Help: "Time taken to estimate blob size for the chunk.",
}),
batchProposeBlockHeight: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "rollup_batch_propose_block_height",
Help: "The block height of the latest proposed batch",
}),
batchProposeThroughput: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "rollup_batch_propose_throughput",
Help: "The total gas used in proposed batches",
}),
}
return p
}
// SetReplayDB sets the replay database for the BatchProposer.
// This is used for the proposer tool only, to change the l2_block data source.
// This function is not thread-safe and should be called after initializing the BatchProposer and before starting to propose chunks.
func (p *BatchProposer) SetReplayDB(replayDB *gorm.DB) {
p.l2BlockOrm = orm.NewL2Block(replayDB)
p.replayMode = true
}
// TryProposeBatch tries to propose a new batches.
func (p *BatchProposer) TryProposeBatch() {
p.batchProposerCircleTotal.Inc()
if err := p.proposeBatch(); err != nil {
p.proposeBatchFailureTotal.Inc()
log.Error("proposeBatchChunks failed", "err", err)
return
}
}
func (p *BatchProposer) updateDBBatchInfo(batch *encoding.Batch, codecVersion encoding.CodecVersion, metrics *utils.BatchMetrics) error {
compatibilityBreachOccurred := false
for {
compatible, err := encoding.CheckBatchCompressedDataCompatibility(batch, codecVersion)
if err != nil {
log.Error("Failed to check batch compressed data compatibility", "batch index", batch.Index, "codecVersion", codecVersion, "err", err)
return err
}
if compatible {
break
}
compatibilityBreachOccurred = true
if len(batch.Chunks) == 1 {
log.Warn("Disable compression: cannot truncate batch with only 1 chunk for compatibility", "start block number", batch.Chunks[0].Blocks[0].Header.Number.Uint64(),
"end block number", batch.Chunks[0].Blocks[len(batch.Chunks[0].Blocks)-1].Header.Number.Uint64())
break
}
batch.Chunks = batch.Chunks[:len(batch.Chunks)-1]
batch.PostL1MessageQueueHash = batch.Chunks[len(batch.Chunks)-1].PostL1MessageQueueHash
log.Info("Batch not compatible with compressed data, removing last chunk", "batch index", batch.Index, "truncated chunk length", len(batch.Chunks))
}
if compatibilityBreachOccurred {
p.compressedDataCompatibilityBreachTotal.Inc()
// recalculate batch metrics after truncation
var calcErr error
metrics, calcErr = utils.CalculateBatchMetrics(batch, codecVersion, p.validiumMode)
if calcErr != nil {
return fmt.Errorf("failed to calculate batch metrics, batch index: %v, error: %w", batch.Index, calcErr)
}
p.recordTimerBatchMetrics(metrics)
p.recordAllBatchMetrics(metrics)
}
if len(batch.Chunks) > 0 && len(batch.Chunks[len(batch.Chunks)-1].Blocks) > 0 {
lastChunk := batch.Chunks[len(batch.Chunks)-1]
lastBlock := lastChunk.Blocks[len(lastChunk.Blocks)-1]
p.batchProposeBlockHeight.Set(float64(lastBlock.Header.Number.Uint64()))
}
var totalGasUsed uint64
for _, chunk := range batch.Chunks {
totalGasUsed += chunk.TotalGasUsed()
}
p.batchProposeThroughput.Add(float64(totalGasUsed))
p.proposeBatchUpdateInfoTotal.Inc()
err := p.db.Transaction(func(dbTX *gorm.DB) error {
dbBatch, dbErr := p.batchOrm.InsertBatch(p.ctx, batch, codecVersion, *metrics, dbTX)
if dbErr != nil {
log.Warn("BatchProposer.updateDBBatchInfo insert batch failure", "index", batch.Index, "parent hash", batch.ParentBatchHash.Hex(), "codec version", codecVersion, "error", dbErr)
return dbErr
}
if dbErr = p.chunkOrm.UpdateBatchHashInRange(p.ctx, dbBatch.StartChunkIndex, dbBatch.EndChunkIndex, dbBatch.Hash, dbTX); dbErr != nil {
log.Warn("BatchProposer.UpdateBatchHashInRange update the chunk's batch hash failure", "hash", dbBatch.Hash, "error", dbErr)
return dbErr
}
if p.replayMode {
// If replayMode is true, meaning the batch was proposed by the proposer tool,
// set batch status to types.RollupCommitted and assign a unique commit tx hash to enable new bundle proposals.
if dbErr = p.batchOrm.UpdateCommitTxHashAndRollupStatus(p.ctx, dbBatch.Hash, dbBatch.Hash, types.RollupCommitted, dbTX); dbErr != nil {
log.Warn("BatchProposer.UpdateCommitTxHashAndRollupStatus update the batch's commit tx hash failure", "hash", dbBatch.Hash, "error", dbErr)
return dbErr
}
}
return nil
})
if err != nil {
p.proposeBatchUpdateInfoFailureTotal.Inc()
log.Error("update batch info in db failed", "err", err)
}
return nil
}
func (p *BatchProposer) proposeBatch() error {
firstUnbatchedChunkIndex, err := p.batchOrm.GetFirstUnbatchedChunkIndex(p.ctx)
if err != nil {
return err
}
firstUnbatchedChunk, err := p.chunkOrm.GetChunkByIndex(p.ctx, firstUnbatchedChunkIndex)
if err != nil || firstUnbatchedChunk == nil {
return err
}
codec, err := encoding.CodecFromVersion(encoding.CodecVersion(firstUnbatchedChunk.CodecVersion))
if codec == nil || err != nil {
return fmt.Errorf("failed to retrieve codec for block number %v and time %v: %w", firstUnbatchedChunk.StartBlockNumber, firstUnbatchedChunk.StartBlockTime, err)
}
if codec.Version() < p.minCodecVersion {
return fmt.Errorf("unsupported codec version: %v, expected at least %v", codec.Version(), p.minCodecVersion)
}
// always take the minimum of the configured max chunks per batch and the codec's max chunks per batch
maxChunksThisBatch := min(codec.MaxNumChunksPerBatch(), p.cfg.MaxChunksPerBatch)
// select at most maxChunkNumPerBatch chunks
dbChunks, err := p.chunkOrm.GetChunksGEIndex(p.ctx, firstUnbatchedChunkIndex, maxChunksThisBatch)
if err != nil {
return err
}
if len(dbChunks) == 0 {
return nil
}
// Ensure all chunks in the same batch use the same hardfork name
// If a different hardfork name is found, truncate the chunks slice at that point
hardforkName := encoding.GetHardforkName(p.chainCfg, dbChunks[0].StartBlockNumber, dbChunks[0].StartBlockTime)
for i := 1; i < len(dbChunks); i++ {
currentHardfork := encoding.GetHardforkName(p.chainCfg, dbChunks[i].StartBlockNumber, dbChunks[i].StartBlockTime)
if currentHardfork != hardforkName {
dbChunks = dbChunks[:i]
maxChunksThisBatch = len(dbChunks) // update maxChunksThisBatch to trigger batching, because these chunks are the last chunks before the hardfork
break
}
}
daChunks, err := p.getDAChunks(dbChunks)
if err != nil {
return err
}
dbParentBatch, err := p.batchOrm.GetLatestBatch(p.ctx)
if err != nil {
return err
}
var batch encoding.Batch
batch.Index = dbParentBatch.Index + 1
batch.ParentBatchHash = common.HexToHash(dbParentBatch.Hash)
batch.TotalL1MessagePoppedBefore = firstUnbatchedChunk.TotalL1MessagesPoppedBefore // set for compatibility within relayer
batch.PrevL1MessageQueueHash = common.HexToHash(firstUnbatchedChunk.PrevL1MessageQueueHash)
for i, chunk := range daChunks {
batch.Chunks = append(batch.Chunks, chunk)
batch.Blocks = append(batch.Blocks, chunk.Blocks...)
batch.PostL1MessageQueueHash = common.HexToHash(dbChunks[i].PostL1MessageQueueHash)
metrics, calcErr := utils.CalculateBatchMetrics(&batch, codec.Version(), p.validiumMode)
if calcErr != nil {
return fmt.Errorf("failed to calculate batch metrics: %w", calcErr)
}
p.recordTimerBatchMetrics(metrics)
if metrics.L1CommitBlobSize > maxBlobSize || metrics.L1CommitUncompressedBatchBytesSize > p.cfg.MaxUncompressedBatchBytesSize {
if i == 0 {
// The first chunk exceeds hard limits, which indicates a bug in the chunk-proposer, manual fix is needed.
return fmt.Errorf("the first chunk exceeds limits; start block number: %v, end block number: %v, limits: %+v, maxChunkNum: %v, maxBlobSize: %v, maxUncompressedBatchBytesSize: %v",
dbChunks[0].StartBlockNumber, dbChunks[0].EndBlockNumber, metrics, maxChunksThisBatch, maxBlobSize, p.cfg.MaxUncompressedBatchBytesSize)
}
log.Debug("breaking limit condition in batching",
"l1CommitBlobSize", metrics.L1CommitBlobSize,
"maxBlobSize", maxBlobSize,
"L1CommitUncompressedBatchBytesSize", metrics.L1CommitUncompressedBatchBytesSize,
"maxUncompressedBatchBytesSize", p.cfg.MaxUncompressedBatchBytesSize)
lastChunk := batch.Chunks[len(batch.Chunks)-1]
batch.Chunks = batch.Chunks[:len(batch.Chunks)-1]
batch.PostL1MessageQueueHash = common.HexToHash(dbChunks[i-1].PostL1MessageQueueHash)
batch.Blocks = batch.Blocks[:len(batch.Blocks)-len(lastChunk.Blocks)]
metrics, err = utils.CalculateBatchMetrics(&batch, codec.Version(), p.validiumMode)
if err != nil {
return fmt.Errorf("failed to calculate batch metrics: %w", err)
}
p.recordAllBatchMetrics(metrics)
return p.updateDBBatchInfo(&batch, codec.Version(), metrics)
}
}
metrics, calcErr := utils.CalculateBatchMetrics(&batch, codec.Version(), p.validiumMode)
if calcErr != nil {
return fmt.Errorf("failed to calculate batch metrics: %w", calcErr)
}
currentTimeSec := uint64(time.Now().Unix())
if metrics.FirstBlockTimestamp+p.cfg.BatchTimeoutSec < currentTimeSec || metrics.NumChunks == uint64(maxChunksThisBatch) {
log.Info("reached maximum number of chunks in batch or first block timeout",
"chunk count", metrics.NumChunks,
"start block number", dbChunks[0].StartBlockNumber,
"start block timestamp", dbChunks[0].StartBlockTime,
"current time", currentTimeSec)
p.batchFirstBlockTimeoutReached.Inc()
p.recordAllBatchMetrics(metrics)
return p.updateDBBatchInfo(&batch, codec.Version(), metrics)
}
log.Debug("pending chunks do not reach one of the constraints or contain a timeout block")
p.recordTimerBatchMetrics(metrics)
p.batchChunksProposeNotEnoughTotal.Inc()
return nil
}
func (p *BatchProposer) getDAChunks(dbChunks []*orm.Chunk) ([]*encoding.Chunk, error) {
chunks := make([]*encoding.Chunk, len(dbChunks))
for i, c := range dbChunks {
blocks, err := p.l2BlockOrm.GetL2BlocksInRange(p.ctx, c.StartBlockNumber, c.EndBlockNumber)
if err != nil {
log.Error("Failed to fetch blocks", "start number", c.StartBlockNumber, "end number", c.EndBlockNumber, "error", err)
return nil, err
}
chunks[i] = &encoding.Chunk{
Blocks: blocks,
PrevL1MessageQueueHash: common.HexToHash(c.PrevL1MessageQueueHash),
PostL1MessageQueueHash: common.HexToHash(c.PostL1MessageQueueHash),
}
}
return chunks, nil
}
func (p *BatchProposer) recordAllBatchMetrics(metrics *utils.BatchMetrics) {
p.batchChunksNum.Set(float64(metrics.NumChunks))
p.totalL1CommitBlobSize.Set(float64(metrics.L1CommitBlobSize))
p.batchEstimateBlobSizeTime.Set(float64(metrics.EstimateBlobSizeTime))
}
func (p *BatchProposer) recordTimerBatchMetrics(metrics *utils.BatchMetrics) {
p.batchEstimateBlobSizeTime.Set(float64(metrics.EstimateBlobSizeTime))
}