Files
scroll/rollup/internal/controller/watcher/chunk_proposer.go

325 lines
12 KiB
Go

package watcher
import (
"context"
"errors"
"fmt"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
gethTypes "github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/log"
"gorm.io/gorm"
"scroll-tech/common/types"
"scroll-tech/rollup/internal/config"
"scroll-tech/rollup/internal/orm"
)
// chunkRowConsumption is map(sub-circuit name => sub-circuit row count)
type chunkRowConsumption map[string]uint64
// add accumulates row consumption per sub-circuit
func (crc *chunkRowConsumption) add(rowConsumption *gethTypes.RowConsumption) error {
if rowConsumption == nil {
return errors.New("rowConsumption is <nil>")
}
for _, subCircuit := range *rowConsumption {
(*crc)[subCircuit.Name] += subCircuit.RowNumber
}
return nil
}
// max finds the maximum row consumption among all sub-circuits
func (crc *chunkRowConsumption) max() uint64 {
var max uint64
for _, value := range *crc {
if value > max {
max = value
}
}
return max
}
// ChunkProposer proposes chunks based on available unchunked blocks.
type ChunkProposer struct {
ctx context.Context
db *gorm.DB
chunkOrm *orm.Chunk
l2BlockOrm *orm.L2Block
maxBlockNumPerChunk uint64
maxTxNumPerChunk uint64
maxL1CommitGasPerChunk uint64
maxL1CommitCalldataSizePerChunk uint64
maxRowConsumptionPerChunk uint64
chunkTimeoutSec uint64
gasCostIncreaseMultiplier float64
chunkProposerCircleTotal prometheus.Counter
proposeChunkFailureTotal prometheus.Counter
proposeChunkUpdateInfoTotal prometheus.Counter
proposeChunkUpdateInfoFailureTotal prometheus.Counter
chunkTxNum prometheus.Gauge
chunkEstimateL1CommitGas prometheus.Gauge
totalL1CommitCalldataSize prometheus.Gauge
totalTxGasUsed prometheus.Gauge
maxTxConsumption prometheus.Gauge
chunkBlocksNum prometheus.Gauge
chunkFirstBlockTimeoutReached prometheus.Counter
chunkBlocksProposeNotEnoughTotal prometheus.Counter
}
// NewChunkProposer creates a new ChunkProposer instance.
func NewChunkProposer(ctx context.Context, cfg *config.ChunkProposerConfig, db *gorm.DB, reg prometheus.Registerer) *ChunkProposer {
log.Debug("new chunk proposer",
"maxTxNumPerChunk", cfg.MaxTxNumPerChunk,
"maxL1CommitGasPerChunk", cfg.MaxL1CommitGasPerChunk,
"maxL1CommitCalldataSizePerChunk", cfg.MaxL1CommitCalldataSizePerChunk,
"maxRowConsumptionPerChunk", cfg.MaxRowConsumptionPerChunk,
"chunkTimeoutSec", cfg.ChunkTimeoutSec,
"gasCostIncreaseMultiplier", cfg.GasCostIncreaseMultiplier)
return &ChunkProposer{
ctx: ctx,
db: db,
chunkOrm: orm.NewChunk(db),
l2BlockOrm: orm.NewL2Block(db),
maxBlockNumPerChunk: cfg.MaxBlockNumPerChunk,
maxTxNumPerChunk: cfg.MaxTxNumPerChunk,
maxL1CommitGasPerChunk: cfg.MaxL1CommitGasPerChunk,
maxL1CommitCalldataSizePerChunk: cfg.MaxL1CommitCalldataSizePerChunk,
maxRowConsumptionPerChunk: cfg.MaxRowConsumptionPerChunk,
chunkTimeoutSec: cfg.ChunkTimeoutSec,
gasCostIncreaseMultiplier: cfg.GasCostIncreaseMultiplier,
chunkProposerCircleTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "rollup_propose_chunk_circle_total",
Help: "Total number of propose chunk total.",
}),
proposeChunkFailureTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "rollup_propose_chunk_failure_circle_total",
Help: "Total number of propose chunk failure total.",
}),
proposeChunkUpdateInfoTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "rollup_propose_chunk_update_info_total",
Help: "Total number of propose chunk update info total.",
}),
proposeChunkUpdateInfoFailureTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "rollup_propose_chunk_update_info_failure_total",
Help: "Total number of propose chunk update info failure total.",
}),
chunkTxNum: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "rollup_propose_chunk_tx_num",
Help: "The chunk tx num",
}),
chunkEstimateL1CommitGas: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "rollup_propose_chunk_estimate_l1_commit_gas",
Help: "The chunk estimate l1 commit gas",
}),
totalL1CommitCalldataSize: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "rollup_propose_chunk_total_l1_commit_call_data_size",
Help: "The total l1 commit call data size",
}),
totalTxGasUsed: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "rollup_propose_chunk_total_tx_gas_used",
Help: "The total tx gas used",
}),
maxTxConsumption: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "rollup_propose_chunk_max_tx_consumption",
Help: "The max tx consumption",
}),
chunkBlocksNum: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "rollup_propose_chunk_chunk_block_number",
Help: "The number of blocks in the chunk",
}),
chunkFirstBlockTimeoutReached: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "rollup_propose_chunk_first_block_timeout_reached_total",
Help: "Total times of chunk's first block timeout reached",
}),
chunkBlocksProposeNotEnoughTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "rollup_propose_chunk_blocks_propose_not_enough_total",
Help: "Total number of chunk block propose not enough",
}),
}
}
// TryProposeChunk tries to propose a new chunk.
func (p *ChunkProposer) TryProposeChunk() {
p.chunkProposerCircleTotal.Inc()
proposedChunk, err := p.proposeChunk()
if err != nil {
p.proposeChunkFailureTotal.Inc()
log.Error("propose new chunk failed", "err", err)
return
}
if err := p.updateChunkInfoInDB(proposedChunk); err != nil {
p.proposeChunkUpdateInfoFailureTotal.Inc()
log.Error("update chunk info in orm failed", "err", err)
}
}
func (p *ChunkProposer) updateChunkInfoInDB(chunk *types.Chunk) error {
if chunk == nil {
return nil
}
p.proposeChunkUpdateInfoTotal.Inc()
err := p.db.Transaction(func(dbTX *gorm.DB) error {
dbChunk, err := p.chunkOrm.InsertChunk(p.ctx, chunk, dbTX)
if err != nil {
log.Warn("ChunkProposer.InsertChunk failed", "chunk hash", chunk.Hash)
return err
}
if err := p.l2BlockOrm.UpdateChunkHashInRange(p.ctx, dbChunk.StartBlockNumber, dbChunk.EndBlockNumber, dbChunk.Hash, dbTX); err != nil {
log.Error("failed to update chunk_hash for l2_blocks", "chunk hash", chunk.Hash, "start block", 0, "end block", 0, "err", err)
return err
}
return nil
})
return err
}
func (p *ChunkProposer) proposeChunk() (*types.Chunk, error) {
unchunkedBlockHeight, err := p.chunkOrm.GetUnchunkedBlockHeight(p.ctx)
if err != nil {
return nil, err
}
// select at most p.maxBlockNumPerChunk blocks
blocks, err := p.l2BlockOrm.GetL2WrappedBlocksGEHeight(p.ctx, unchunkedBlockHeight, int(p.maxBlockNumPerChunk))
if err != nil {
return nil, err
}
if len(blocks) == 0 {
return nil, nil
}
var chunk types.Chunk
var totalTxGasUsed uint64
var totalTxNum uint64
var totalL1CommitCalldataSize uint64
var totalL1CommitGas uint64
crc := chunkRowConsumption{}
for i, block := range blocks {
// metric values
lastTotalTxNum := totalTxNum
lastTotalL1CommitGas := totalL1CommitGas
lastCrcMax := crc.max()
lastTotalL1CommitCalldataSize := totalL1CommitCalldataSize
lastTotalTxGasUsed := totalTxGasUsed
totalTxGasUsed += block.Header.GasUsed
totalTxNum += uint64(len(block.Transactions))
totalL1CommitCalldataSize += block.EstimateL1CommitCalldataSize()
totalL1CommitGas = chunk.EstimateL1CommitGas()
totalOverEstimateL1CommitGas := uint64(p.gasCostIncreaseMultiplier * float64(totalL1CommitGas))
if err := crc.add(block.RowConsumption); err != nil {
return nil, fmt.Errorf("chunk-proposer failed to update chunk row consumption: %v", err)
}
crcMax := crc.max()
if totalTxNum > p.maxTxNumPerChunk ||
totalL1CommitCalldataSize > p.maxL1CommitCalldataSizePerChunk ||
totalOverEstimateL1CommitGas > p.maxL1CommitGasPerChunk ||
crcMax > p.maxRowConsumptionPerChunk {
// Check if the first block breaks hard limits.
// If so, it indicates there are bugs in sequencer, manual fix is needed.
if i == 0 {
if totalTxNum > p.maxTxNumPerChunk {
return nil, fmt.Errorf(
"the first block exceeds l2 tx number limit; block number: %v, number of transactions: %v, max transaction number limit: %v",
block.Header.Number,
totalTxNum,
p.maxTxNumPerChunk,
)
}
if totalOverEstimateL1CommitGas > p.maxL1CommitGasPerChunk {
return nil, fmt.Errorf(
"the first block exceeds l1 commit gas limit; block number: %v, commit gas: %v, max commit gas limit: %v",
block.Header.Number,
totalL1CommitGas,
p.maxL1CommitGasPerChunk,
)
}
if totalL1CommitCalldataSize > p.maxL1CommitCalldataSizePerChunk {
return nil, fmt.Errorf(
"the first block exceeds l1 commit calldata size limit; block number: %v, calldata size: %v, max calldata size limit: %v",
block.Header.Number,
totalL1CommitCalldataSize,
p.maxL1CommitCalldataSizePerChunk,
)
}
if crcMax > p.maxRowConsumptionPerChunk {
return nil, fmt.Errorf(
"the first block exceeds row consumption limit; block number: %v, row consumption: %v, max: %v, limit: %v",
block.Header.Number,
crc,
crcMax,
p.maxRowConsumptionPerChunk,
)
}
}
log.Debug("breaking limit condition in chunking",
"totalTxNum", totalTxNum,
"maxTxNumPerChunk", p.maxTxNumPerChunk,
"currentL1CommitCalldataSize", totalL1CommitCalldataSize,
"maxL1CommitCalldataSizePerChunk", p.maxL1CommitCalldataSizePerChunk,
"currentOverEstimateL1CommitGas", totalOverEstimateL1CommitGas,
"maxL1CommitGasPerChunk", p.maxL1CommitGasPerChunk,
"chunkRowConsumptionMax", crcMax,
"chunkRowConsumption", crc,
"p.maxRowConsumptionPerChunk", p.maxRowConsumptionPerChunk)
p.chunkTxNum.Set(float64(lastTotalTxNum))
p.chunkEstimateL1CommitGas.Set(float64(lastTotalL1CommitGas))
p.totalL1CommitCalldataSize.Set(float64(lastTotalL1CommitCalldataSize))
p.maxTxConsumption.Set(float64(lastCrcMax))
p.totalTxGasUsed.Set(float64(lastTotalTxGasUsed))
p.chunkBlocksNum.Set(float64(len(chunk.Blocks)))
return &chunk, nil
}
chunk.Blocks = append(chunk.Blocks, block)
}
currentTimeSec := uint64(time.Now().Unix())
if chunk.Blocks[0].Header.Time+p.chunkTimeoutSec < currentTimeSec ||
uint64(len(chunk.Blocks)) == p.maxBlockNumPerChunk {
if chunk.Blocks[0].Header.Time+p.chunkTimeoutSec < currentTimeSec {
log.Warn("first block timeout",
"block number", chunk.Blocks[0].Header.Number,
"block timestamp", chunk.Blocks[0].Header.Time,
"current time", currentTimeSec,
)
} else {
log.Info("reached maximum number of blocks in chunk",
"start block number", chunk.Blocks[0].Header.Number,
"block count", len(chunk.Blocks),
)
}
p.chunkFirstBlockTimeoutReached.Inc()
p.chunkTxNum.Set(float64(totalTxNum))
p.chunkEstimateL1CommitGas.Set(float64(totalL1CommitGas))
p.totalL1CommitCalldataSize.Set(float64(totalL1CommitCalldataSize))
p.maxTxConsumption.Set(float64(crc.max()))
p.totalTxGasUsed.Set(float64(totalTxGasUsed))
p.chunkBlocksNum.Set(float64(len(chunk.Blocks)))
return &chunk, nil
}
log.Debug("pending blocks do not reach one of the constraints or contain a timeout block")
p.chunkBlocksProposeNotEnoughTotal.Inc()
return nil, nil
}