package relayer import ( "context" "errors" "fmt" "math" "math/big" "sort" "strings" "time" "github.com/go-resty/resty/v2" "github.com/prometheus/client_golang/prometheus" "github.com/scroll-tech/da-codec/encoding" "gorm.io/gorm" "github.com/scroll-tech/go-ethereum/accounts/abi" "github.com/scroll-tech/go-ethereum/common" "github.com/scroll-tech/go-ethereum/crypto" "github.com/scroll-tech/go-ethereum/crypto/kzg4844" "github.com/scroll-tech/go-ethereum/ethclient" "github.com/scroll-tech/go-ethereum/log" "github.com/scroll-tech/go-ethereum/params" "scroll-tech/common/types" "scroll-tech/common/types/message" "scroll-tech/common/utils" bridgeAbi "scroll-tech/rollup/abi" "scroll-tech/rollup/internal/config" "scroll-tech/rollup/internal/controller/sender" "scroll-tech/rollup/internal/orm" rutils "scroll-tech/rollup/internal/utils" ) // RelaxType enumerates the relaxation functions we support when // turning a baseline fee into a “target” fee. type RelaxType int const ( // NoRelaxation means “don’t touch the baseline” (i.e. fallback/default). NoRelaxation RelaxType = iota Exponential Sigmoid ) const secondsPerBlock = 12 // BaselineType enumerates the baseline types we support when // turning a baseline fee into a “target” fee. type BaselineType int const ( // PctMin means “take the minimum of the last N blocks’ fees, then // take the PCT of that”. PctMin BaselineType = iota // EWMA means “take the exponentially‐weighted moving average of // the last N blocks’ fees”. EWMA ) // Layer2Relayer is responsible for: // i. committing and finalizing L2 blocks on L1. // ii. updating L2 gas price oracle contract on L1. type Layer2Relayer struct { ctx context.Context l2Client *ethclient.Client db *gorm.DB bundleOrm *orm.Bundle batchOrm *orm.Batch chunkOrm *orm.Chunk l2BlockOrm *orm.L2Block l1BlockOrm *orm.L1Block cfg *config.RelayerConfig commitSender *sender.Sender finalizeSender *sender.Sender l1RollupABI *abi.ABI validiumABI *abi.ABI l2GasOracleABI *abi.ABI // Used to get batch status from chain_monitor api. chainMonitorClient *resty.Client metrics *l2RelayerMetrics chainCfg *params.ChainConfig lastFetchedBlock uint64 // highest block number ever pulled feeHistory []*big.Int // sliding window of blob fees batchStrategy StrategyParams } // StrategyParams holds the per‐window fee‐submission rules. type StrategyParams struct { BaselineType BaselineType // "pct_min" or "ewma" BaselineParam float64 // percentile (0–1) or α for EWMA Gamma float64 // relaxation γ Beta float64 // relaxation β RelaxType RelaxType // Exponential or Sigmoid } // bestParams maps your 2h/5h/12h windows to their best rules. // Timeouts are in seconds, 2, 5 and 12 hours (and same + 20 mins to account for // time to create batch currently roughly, as time is measured from block creation) var bestParams = map[uint64]StrategyParams{ 7200: {BaselineType: PctMin, BaselineParam: 0.10, Gamma: 0.4, Beta: 8, RelaxType: Exponential}, 8400: {BaselineType: PctMin, BaselineParam: 0.10, Gamma: 0.4, Beta: 8, RelaxType: Exponential}, 18000: {BaselineType: PctMin, BaselineParam: 0.30, Gamma: 0.6, Beta: 20, RelaxType: Sigmoid}, 19200: {BaselineType: PctMin, BaselineParam: 0.30, Gamma: 0.6, Beta: 20, RelaxType: Sigmoid}, 42800: {BaselineType: PctMin, BaselineParam: 0.50, Gamma: 0.5, Beta: 20, RelaxType: Sigmoid}, 44400: {BaselineType: PctMin, BaselineParam: 0.50, Gamma: 0.5, Beta: 20, RelaxType: Sigmoid}, } // NewLayer2Relayer will return a new instance of Layer2RelayerClient func NewLayer2Relayer(ctx context.Context, l2Client *ethclient.Client, db *gorm.DB, cfg *config.RelayerConfig, chainCfg *params.ChainConfig, serviceType ServiceType, reg prometheus.Registerer) (*Layer2Relayer, error) { var commitSender, finalizeSender *sender.Sender switch serviceType { case ServiceTypeL2RollupRelayer: commitSenderAddr, err := addrFromSignerConfig(cfg.CommitSenderSignerConfig) if err != nil { return nil, fmt.Errorf("failed to parse addr from commit sender config, err: %v", err) } finalizeSenderAddr, err := addrFromSignerConfig(cfg.FinalizeSenderSignerConfig) if err != nil { return nil, fmt.Errorf("failed to parse addr from finalize sender config, err: %v", err) } if commitSenderAddr == finalizeSenderAddr { return nil, fmt.Errorf("commit and finalize sender addresses must be different. Got: Commit=%s, Finalize=%s", commitSenderAddr.Hex(), finalizeSenderAddr.Hex()) } commitSender, err = sender.NewSender(ctx, cfg.SenderConfig, cfg.CommitSenderSignerConfig, "l2_relayer", "commit_sender", types.SenderTypeCommitBatch, db, reg) if err != nil { return nil, fmt.Errorf("new commit sender failed, err: %w", err) } finalizeSender, err = sender.NewSender(ctx, cfg.SenderConfig, cfg.FinalizeSenderSignerConfig, "l2_relayer", "finalize_sender", types.SenderTypeFinalizeBatch, db, reg) if err != nil { return nil, fmt.Errorf("new finalize sender failed, err: %w", err) } // Ensure test features aren't enabled on the ethereum mainnet. if commitSender.GetChainID().Cmp(big.NewInt(1)) == 0 && cfg.EnableTestEnvBypassFeatures { return nil, errors.New("cannot enable test env features in mainnet") } default: return nil, fmt.Errorf("invalid service type for l2_relayer: %v", serviceType) } strategy, ok := bestParams[uint64(cfg.BatchSubmission.TimeoutSec)] if !ok { return nil, fmt.Errorf("invalid timeout for batch submission: %v", cfg.BatchSubmission.TimeoutSec) } layer2Relayer := &Layer2Relayer{ ctx: ctx, db: db, bundleOrm: orm.NewBundle(db), batchOrm: orm.NewBatch(db), l1BlockOrm: orm.NewL1Block(db), l2BlockOrm: orm.NewL2Block(db), chunkOrm: orm.NewChunk(db), l2Client: l2Client, commitSender: commitSender, finalizeSender: finalizeSender, l1RollupABI: bridgeAbi.ScrollChainABI, validiumABI: bridgeAbi.ValidiumABI, l2GasOracleABI: bridgeAbi.L2GasPriceOracleABI, batchStrategy: strategy, cfg: cfg, chainCfg: chainCfg, } // chain_monitor client if serviceType == ServiceTypeL2RollupRelayer && cfg.ChainMonitor.Enabled { layer2Relayer.chainMonitorClient = resty.New() layer2Relayer.chainMonitorClient.SetRetryCount(cfg.ChainMonitor.TryTimes) layer2Relayer.chainMonitorClient.SetTimeout(time.Duration(cfg.ChainMonitor.TimeOut) * time.Second) } // Initialize genesis before we do anything else if err := layer2Relayer.initializeGenesis(); err != nil { return nil, fmt.Errorf("failed to initialize and commit genesis batch, err: %v", err) } layer2Relayer.metrics = initL2RelayerMetrics(reg) switch serviceType { case ServiceTypeL2RollupRelayer: go layer2Relayer.handleL2RollupRelayerConfirmLoop(ctx) default: return nil, fmt.Errorf("invalid service type for l2_relayer: %v", serviceType) } return layer2Relayer, nil } func (r *Layer2Relayer) initializeGenesis() error { if count, err := r.batchOrm.GetBatchCount(r.ctx); err != nil { return fmt.Errorf("failed to get batch count: %v", err) } else if count > 0 { log.Info("genesis already imported", "batch count", count) return nil } genesis, err := r.l2Client.HeaderByNumber(r.ctx, big.NewInt(0)) if err != nil { return fmt.Errorf("failed to retrieve L2 genesis header: %v", err) } log.Info("retrieved L2 genesis header", "hash", genesis.Hash().String()) chunk := &encoding.Chunk{Blocks: []*encoding.Block{{Header: genesis}}} err = r.db.Transaction(func(dbTX *gorm.DB) error { if err = r.l2BlockOrm.InsertL2Blocks(r.ctx, chunk.Blocks, dbTX); err != nil { return fmt.Errorf("failed to insert genesis block: %v", err) } var dbChunk *orm.Chunk dbChunk, err = r.chunkOrm.InsertChunk(r.ctx, chunk, encoding.CodecV0, rutils.ChunkMetrics{}, dbTX) if err != nil { return fmt.Errorf("failed to insert chunk: %v", err) } if err = r.chunkOrm.UpdateProvingStatus(r.ctx, dbChunk.Hash, types.ProvingTaskVerified, dbTX); err != nil { return fmt.Errorf("failed to update genesis chunk proving status: %v", err) } batch := &encoding.Batch{ Index: 0, TotalL1MessagePoppedBefore: 0, ParentBatchHash: common.Hash{}, Chunks: []*encoding.Chunk{chunk}, Blocks: chunk.Blocks, } var dbBatch *orm.Batch dbBatch, err = r.batchOrm.InsertBatch(r.ctx, batch, encoding.CodecV0, rutils.BatchMetrics{ValidiumMode: r.cfg.ValidiumMode}, dbTX) if err != nil { return fmt.Errorf("failed to insert batch: %v", err) } if err = r.chunkOrm.UpdateBatchHashInRange(r.ctx, 0, 0, dbBatch.Hash, dbTX); err != nil { return fmt.Errorf("failed to update batch hash for chunks: %v", err) } if err = r.batchOrm.UpdateProvingStatus(r.ctx, dbBatch.Hash, types.ProvingTaskVerified, dbTX); err != nil { return fmt.Errorf("failed to update genesis batch proving status: %v", err) } if err = r.batchOrm.UpdateRollupStatus(r.ctx, dbBatch.Hash, types.RollupFinalized, dbTX); err != nil { return fmt.Errorf("failed to update genesis batch rollup status: %v", err) } // commit genesis batch on L1 // note: we do this inside the DB transaction so that we can revert all DB changes if this step fails return r.commitGenesisBatch(dbBatch.Hash, dbBatch.BatchHeader, common.HexToHash(dbBatch.StateRoot)) }) if err != nil { return fmt.Errorf("update genesis transaction failed: %v", err) } log.Info("successfully imported genesis chunk and batch") return nil } func (r *Layer2Relayer) commitGenesisBatch(batchHash string, batchHeader []byte, stateRoot common.Hash) error { var calldata []byte var packErr error if r.cfg.ValidiumMode { // validium mode: only pass batchHeader calldata, packErr = r.validiumABI.Pack("importGenesisBatch", batchHeader) if packErr != nil { return fmt.Errorf("failed to pack validium importGenesisBatch with batch header: %v. error: %v", common.Bytes2Hex(batchHeader), packErr) } log.Info("Validium importGenesis", "calldata", common.Bytes2Hex(calldata)) } else { // rollup mode: pass batchHeader and stateRoot // Check state root is not zero if stateRoot == (common.Hash{}) { return fmt.Errorf("state root is zero") } calldata, packErr = r.l1RollupABI.Pack("importGenesisBatch", batchHeader, stateRoot) if packErr != nil { return fmt.Errorf("failed to pack rollup importGenesisBatch with batch header: %v and state root: %v. error: %v", common.Bytes2Hex(batchHeader), stateRoot, packErr) } log.Info("Rollup importGenesis", "calldata", common.Bytes2Hex(calldata), "stateRoot", stateRoot) } // submit genesis batch to L1 rollup contract txHash, _, err := r.commitSender.SendTransaction(batchHash, &r.cfg.RollupContractAddress, calldata, nil) if err != nil { return fmt.Errorf("failed to send import genesis batch tx to L1, error: %v", err) } log.Info("importGenesisBatch transaction sent", "contract", r.cfg.RollupContractAddress, "txHash", txHash, "batchHash", batchHash, "validium", r.cfg.ValidiumMode) // wait for confirmation // we assume that no other transactions are sent before initializeGenesis completes ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() for { select { // print progress case <-ticker.C: log.Info("Waiting for confirmation") // timeout case <-time.After(5 * time.Minute): return fmt.Errorf("import genesis timeout after 5 minutes, original txHash: %v", txHash.String()) // handle confirmation case confirmation := <-r.commitSender.ConfirmChan(): if confirmation.ContextID != batchHash { return fmt.Errorf("unexpected import genesis confirmation id, expected: %v, got: %v", batchHash, confirmation.ContextID) } if !confirmation.IsSuccessful { return errors.New("import genesis batch tx failed") } log.Info("Successfully committed genesis batch on L1", "txHash", confirmation.TxHash.String(), "validium", r.cfg.ValidiumMode) return nil } } } // ProcessPendingBatches processes the pending batches by sending commitBatch transactions to layer 1. // Pending batches are submitted if one of the following conditions is met: // - the first batch is too old -> forceSubmit // - backlogCount > r.cfg.BatchSubmission.BacklogMax -> forceSubmit // - we have at least minBatches AND price hits a desired target price func (r *Layer2Relayer) ProcessPendingBatches() { // First, get the backlog count to determine batch submission strategy backlogCount, err := r.batchOrm.GetFailedAndPendingBatchesCount(r.ctx) if err != nil { log.Error("Failed to fetch pending L2 batches count", "err", err) return } r.metrics.rollupL2RelayerBacklogCounts.Set(float64(backlogCount)) // Get effective batch limits based on validium mode and backlog size. minBatches, maxBatches := r.getEffectiveBatchLimits(backlogCount) // get pending batches from database in ascending order by their index. dbBatches, err := r.batchOrm.GetFailedAndPendingBatches(r.ctx, maxBatches) if err != nil { log.Error("Failed to fetch pending L2 batches", "err", err) return } // nothing to do if we don't have any pending batches if len(dbBatches) == 0 { return } var forceSubmit bool startChunk, err := r.chunkOrm.GetChunkByIndex(r.ctx, dbBatches[0].StartChunkIndex) if err != nil { log.Error("failed to get first chunk", "err", err, "batch index", dbBatches[0].Index, "chunk index", dbBatches[0].StartChunkIndex) return } oldestBlockTimestamp := time.Unix(int64(startChunk.StartBlockTime), 0) // if the batch with the oldest index is too old, we force submit all batches that we have so far in the next step if r.cfg.BatchSubmission.TimeoutSec > 0 && time.Since(oldestBlockTimestamp) > time.Duration(r.cfg.BatchSubmission.TimeoutSec)*time.Second { forceSubmit = true } // force submit if backlog is too big if backlogCount > r.cfg.BatchSubmission.BacklogMax { forceSubmit = true } if !forceSubmit { // check if we should skip submitting the batch based on the fee target skip, err := r.skipSubmitByFee(oldestBlockTimestamp, r.metrics) // return if not hitting target price if skip { log.Debug("Skipping batch submission", "first batch index", dbBatches[0].Index, "backlog count", backlogCount, "reason", err) return } if err != nil { log.Warn("Failed to check if we should skip batch submission, fallback to immediate submission", "err", err) } } var batchesToSubmit []*dbBatchWithChunks for i, dbBatch := range dbBatches { var dbChunks []*orm.Chunk var dbParentBatch *orm.Batch // Verify batches compatibility dbChunks, err = r.chunkOrm.GetChunksInRange(r.ctx, dbBatch.StartChunkIndex, dbBatch.EndChunkIndex) if err != nil { log.Error("failed to get chunks in range", "err", err) return } // check codec version for _, dbChunk := range dbChunks { if dbBatch.CodecVersion != dbChunk.CodecVersion { log.Error("batch codec version is different from chunk codec version", "batch index", dbBatch.Index, "chunk index", dbChunk.Index, "batch codec version", dbBatch.CodecVersion, "chunk codec version", dbChunk.CodecVersion) return } } if dbBatch.Index == 0 { log.Error("invalid args: batch index is 0, should only happen in committing genesis batch") return } // get parent batch if i == 0 { dbParentBatch, err = r.batchOrm.GetBatchByIndex(r.ctx, dbBatch.Index-1) if err != nil { log.Error("failed to get parent batch header", "err", err) return } } else { dbParentBatch = dbBatches[i-1] } // make sure batch index is continuous if dbParentBatch.Index != dbBatch.Index-1 { log.Error("parent batch index is not equal to current batch index - 1", "index", dbBatch.Index, "parent index", dbParentBatch.Index) return } if dbParentBatch.CodecVersion > dbBatch.CodecVersion { log.Error("parent batch codec version is greater than current batch codec version", "index", dbBatch.Index, "hash", dbBatch.Hash, "parent codec version", dbParentBatch.CodecVersion, "current codec version", dbBatch.CodecVersion) return } // make sure we commit batches of the same codec version together. // If we encounter a batch with a different codec version, we stop here and will commit the batches we have so far. // The next call of ProcessPendingBatches will then start with the batch with the different codec version. batchesToSubmitLen := len(batchesToSubmit) if batchesToSubmitLen > 0 && batchesToSubmit[batchesToSubmitLen-1].Batch.CodecVersion != dbBatch.CodecVersion { forceSubmit = true break } if batchesToSubmitLen < maxBatches { batchesToSubmit = append(batchesToSubmit, &dbBatchWithChunks{ Batch: dbBatch, Chunks: dbChunks, }) } if len(batchesToSubmit) >= maxBatches { break } } // we only submit batches if we have a timeout or if we have enough batches to submit if !forceSubmit && len(batchesToSubmit) < minBatches { log.Debug("Not enough batches to submit", "count", len(batchesToSubmit), "minBatches", minBatches, "maxBatches", maxBatches) return } if forceSubmit { log.Info("Forcing submission of batches due to timeout", "batch index", batchesToSubmit[0].Batch.Index, "first block created at", oldestBlockTimestamp) } // We have at least 1 batch to commit firstBatch := batchesToSubmit[0].Batch lastBatch := batchesToSubmit[len(batchesToSubmit)-1].Batch var calldata []byte var blobs []*kzg4844.Blob var maxBlockHeight uint64 var totalGasUsed uint64 codecVersion := encoding.CodecVersion(firstBatch.CodecVersion) switch codecVersion { case encoding.CodecV7, encoding.CodecV8, encoding.CodecV9, encoding.CodecV10: if r.cfg.ValidiumMode { if len(batchesToSubmit) != 1 { log.Error("validium mode only supports committing one batch at a time", "codecVersion", codecVersion, "start index", firstBatch.Index, "end index", lastBatch.Index, "batches count", len(batchesToSubmit)) return } calldata, maxBlockHeight, totalGasUsed, err = r.constructCommitBatchPayloadValidium(batchesToSubmit[0]) if err != nil { log.Error("failed to construct validium payload", "codecVersion", codecVersion, "index", batchesToSubmit[0].Batch.Index, "err", err) return } } else { calldata, blobs, maxBlockHeight, totalGasUsed, err = r.constructCommitBatchPayloadCodecV7(batchesToSubmit, firstBatch, lastBatch) if err != nil { log.Error("failed to construct normal payload", "codecVersion", codecVersion, "start index", firstBatch.Index, "end index", lastBatch.Index, "err", err) return } if err = r.sanityChecksCommitBatchCodecV7CalldataAndBlobs(calldata, blobs); err != nil { log.Error("Sanity check failed for calldata and blobs", "codecVersion", codecVersion, "start index", firstBatch.Index, "end index", lastBatch.Index, "err", err) return } } default: log.Error("unsupported codec version in ProcessPendingBatches", "codecVersion", codecVersion, "start index", firstBatch, "end index", lastBatch.Index) return } txHash, blobBaseFee, err := r.commitSender.SendTransaction(r.contextIDFromBatches(codecVersion, batchesToSubmit), &r.cfg.RollupContractAddress, calldata, blobs) if err != nil { if errors.Is(err, sender.ErrTooManyPendingBlobTxs) { r.metrics.rollupL2RelayerProcessPendingBatchErrTooManyPendingBlobTxsTotal.Inc() log.Debug( "Skipped sending commitBatch tx to L1: too many pending blob txs", "maxPending", r.cfg.SenderConfig.MaxPendingBlobTxs, "err", err, ) return } log.Error( "Failed to send commitBatch tx to layer1", "start index", firstBatch.Index, "start hash", firstBatch.Hash, "end index", lastBatch.Index, "end hash", lastBatch.Hash, "RollupContractAddress", r.cfg.RollupContractAddress, "err", err, "calldata", common.Bytes2Hex(calldata), ) return } if err = r.db.Transaction(func(dbTX *gorm.DB) error { for _, dbBatch := range batchesToSubmit { if err = r.batchOrm.UpdateCommitTxHashAndRollupStatus(r.ctx, dbBatch.Batch.Hash, txHash.String(), types.RollupCommitting, dbTX); err != nil { return fmt.Errorf("UpdateCommitTxHashAndRollupStatus failed for batch %d: %s, err %v", dbBatch.Batch.Index, dbBatch.Batch.Hash, err) } } return nil }); err != nil { log.Error("failed to update status for batches to RollupCommitting", "err", err) } r.metrics.rollupL2RelayerCommitBlockHeight.Set(float64(maxBlockHeight)) r.metrics.rollupL2RelayerCommitThroughput.Add(float64(totalGasUsed)) r.metrics.rollupL2RelayerProcessPendingBatchSuccessTotal.Add(float64(len(batchesToSubmit))) r.metrics.rollupL2RelayerProcessBatchesPerTxCount.Set(float64(len(batchesToSubmit))) r.metrics.rollupL2RelayerCommitLatency.Set(time.Since(oldestBlockTimestamp).Seconds()) r.metrics.rollupL2RelayerCommitPrice.Set(float64(blobBaseFee)) log.Info("Sent the commitBatches tx to layer1", "batches count", len(batchesToSubmit), "start index", firstBatch.Index, "start hash", firstBatch.Hash, "end index", lastBatch.Index, "end hash", lastBatch.Hash, "tx hash", txHash.String()) } // getEffectiveBatchLimits returns the effective min and max batch limits based on whether validium mode is enabled // and the current backlog size. // When backlogCount >= backlog_max: submit min_batches for fast inclusion at slightly higher price. // When backlogCount < backlog_max: submit max_batches for better cost amortization. func (r *Layer2Relayer) getEffectiveBatchLimits(backlogCount int64) (int, int) { if r.cfg.ValidiumMode { return 1, 1 // minBatches=1, maxBatches=1 } // If backlog is at or above max, prioritize fast inclusion by submitting min_batches if backlogCount >= r.cfg.BatchSubmission.BacklogMax { return r.cfg.BatchSubmission.MinBatches, r.cfg.BatchSubmission.MinBatches } // Otherwise, prioritize cost efficiency by trying to submit max_batches return r.cfg.BatchSubmission.MaxBatches, r.cfg.BatchSubmission.MaxBatches } func (r *Layer2Relayer) contextIDFromBatches(codecVersion encoding.CodecVersion, batches []*dbBatchWithChunks) string { contextIDs := []string{fmt.Sprintf("v%d", codecVersion)} for _, batch := range batches { contextIDs = append(contextIDs, batch.Batch.Hash) } return strings.Join(contextIDs, "-") } func (r *Layer2Relayer) batchHashesFromContextID(contextID string) []string { parts := strings.SplitN(contextID, "-", 2) if len(parts) == 2 && strings.HasPrefix(parts[0], "v") { return strings.Split(parts[1], "-") } return []string{contextID} } type dbBatchWithChunks struct { Batch *orm.Batch Chunks []*orm.Chunk } // ProcessPendingBundles submits proof to layer 1 rollup contract func (r *Layer2Relayer) ProcessPendingBundles() { r.metrics.rollupL2RelayerProcessPendingBundlesTotal.Inc() bundle, err := r.bundleOrm.GetFirstPendingBundle(r.ctx) if bundle == nil && err == nil { return } if err != nil { log.Error("Failed to fetch first pending L2 bundle", "err", err) return } status := types.ProvingStatus(bundle.ProvingStatus) switch status { case types.ProvingTaskUnassigned, types.ProvingTaskAssigned: if r.cfg.EnableTestEnvBypassFeatures && utils.NowUTC().Sub(bundle.CreatedAt) > time.Duration(r.cfg.FinalizeBundleWithoutProofTimeoutSec)*time.Second { // check if last batch is finalized, because in fake finalize bundle mode, the contract does not verify if the previous bundle or batch is finalized. if bundle.StartBatchIndex == 0 { log.Error("invalid args: start batch index of bundle is 0", "bundle index", bundle.Index, "start batch index", bundle.StartBatchIndex, "end batch index", bundle.EndBatchIndex) return } lastBatch, err := r.batchOrm.GetBatchByIndex(r.ctx, bundle.StartBatchIndex-1) if err != nil { log.Error("failed to get last batch", "batch index", bundle.StartBatchIndex-1, "err", err) return } if types.RollupStatus(lastBatch.RollupStatus) != types.RollupFinalized { log.Error("previous bundle or batch is not finalized", "batch index", lastBatch.Index, "batch hash", lastBatch.Hash, "rollup status", types.RollupStatus(lastBatch.RollupStatus)) return } if err := r.finalizeBundle(bundle, false); err != nil { log.Error("failed to finalize timeout bundle without proof", "bundle index", bundle.Index, "start batch index", bundle.StartBatchIndex, "end batch index", bundle.EndBatchIndex, "err", err) return } } case types.ProvingTaskVerified: log.Info("Start to roll up zk proof", "index", bundle.Index, "bundle hash", bundle.Hash) r.metrics.rollupL2RelayerProcessPendingBundlesFinalizedTotal.Inc() if err := r.finalizeBundle(bundle, true); err != nil { log.Error("failed to finalize bundle with proof", "bundle index", bundle.Index, "start batch index", bundle.StartBatchIndex, "end batch index", bundle.EndBatchIndex, "err", err) return } case types.ProvingTaskFailed: // We were unable to prove this bundle. There are two possibilities: // (a) Prover bug. In this case, we should fix and redeploy the prover. // In the meantime, we continue to commit batches to L1 as well as // proposing and proving chunks, batches and bundles. // (b) Unprovable bundle, e.g. proof overflow. In this case we need to // stop the ledger, fix the limit, revert all the violating blocks, // chunks, batches, bundles and all subsequent ones, and resume, // i.e. this case requires manual resolution. log.Error("bundle proving failed", "bundle index", bundle.Index, "bundle hash", bundle.Hash, "proved at", bundle.ProvedAt, "proof time sec", bundle.ProofTimeSec) default: log.Error("encounter unreachable case in ProcessPendingBundles", "proving status", status) } } func (r *Layer2Relayer) finalizeBundle(bundle *orm.Bundle, withProof bool) error { // Check if current bundle codec version is not less than the preceding one if bundle.StartBatchIndex > 0 { prevBatch, err := r.batchOrm.GetBatchByIndex(r.ctx, bundle.StartBatchIndex-1) if err != nil { log.Error("failed to get previous batch", "current bundle index", bundle.Index, "start batch index", bundle.StartBatchIndex, "error", err) return err } if bundle.CodecVersion < prevBatch.CodecVersion { log.Error("current bundle codec version is less than the preceding batch", "current bundle index", bundle.Index, "current codec version", bundle.CodecVersion, "prev batch index", prevBatch.Index, "prev codec version", prevBatch.CodecVersion) return errors.New("current bundle codec version cannot be less than the preceding batch") } } // Check batch status before sending `finalizeBundle` tx. for batchIndex := bundle.StartBatchIndex; batchIndex <= bundle.EndBatchIndex; batchIndex++ { tmpBatch, getErr := r.batchOrm.GetBatchByIndex(r.ctx, batchIndex) if getErr != nil { log.Error("failed to get batch by index", "batch index", batchIndex, "error", getErr) return getErr } // check codec version if tmpBatch.CodecVersion != bundle.CodecVersion { log.Error("bundle codec version is different from batch codec version", "bundle index", bundle.Index, "batch index", tmpBatch.Index, "bundle codec version", bundle.CodecVersion, "batch codec version", tmpBatch.CodecVersion) return errors.New("bundle codec version is different from batch codec version") } if r.cfg.ChainMonitor.Enabled { batchStatus, getErr := r.getBatchStatusByIndex(tmpBatch) if getErr != nil { r.metrics.rollupL2ChainMonitorLatestFailedCall.Inc() log.Error("failed to get batch status, please check chain_monitor api server", "batch_index", tmpBatch.Index, "err", getErr) return getErr } if !batchStatus { r.metrics.rollupL2ChainMonitorLatestFailedBatchStatus.Inc() log.Error("the batch status is false, stop finalize batch and check the reason", "batch_index", tmpBatch.Index) return errors.New("the batch status is false") } } } dbBatch, err := r.batchOrm.GetBatchByIndex(r.ctx, bundle.EndBatchIndex) if err != nil { log.Error("failed to get batch by index", "batch index", bundle.EndBatchIndex, "error", err) return err } firstChunk, err := r.chunkOrm.GetChunkByIndex(r.ctx, dbBatch.StartChunkIndex) if err != nil || firstChunk == nil { log.Error("failed to get first chunk of batch", "chunk index", dbBatch.StartChunkIndex, "error", err) return fmt.Errorf("failed to get first chunk of batch: %w", err) } endChunk, err := r.chunkOrm.GetChunkByIndex(r.ctx, dbBatch.EndChunkIndex) if err != nil || endChunk == nil { log.Error("failed to get end chunk of batch", "chunk index", dbBatch.EndChunkIndex, "error", err) return fmt.Errorf("failed to get end chunk of batch: %w", err) } var aggProof *message.OpenVMBundleProof if withProof { aggProof, err = r.bundleOrm.GetVerifiedProofByHash(r.ctx, bundle.Hash) if err != nil { return fmt.Errorf("failed to get verified proof by bundle index: %d, err: %w", bundle.Index, err) } if err = aggProof.SanityCheck(); err != nil { return fmt.Errorf("failed to check agg_proof sanity, index: %d, err: %w", bundle.Index, err) } } var calldata []byte switch encoding.CodecVersion(bundle.CodecVersion) { case encoding.CodecV7, encoding.CodecV8, encoding.CodecV9, encoding.CodecV10: if r.cfg.ValidiumMode { calldata, err = r.constructFinalizeBundlePayloadValidium(dbBatch, endChunk, aggProof) if err != nil { return fmt.Errorf("failed to construct validium finalizeBundle payload, codec version: %v, bundle index: %v, last batch index: %v, err: %w", dbBatch.CodecVersion, bundle.Index, dbBatch.Index, err) } } else { calldata, err = r.constructFinalizeBundlePayloadCodecV7(dbBatch, endChunk, aggProof) if err != nil { return fmt.Errorf("failed to construct normal finalizeBundle payload, codec version: %v, bundle index: %v, last batch index: %v, err: %w", dbBatch.CodecVersion, bundle.Index, dbBatch.Index, err) } } default: return fmt.Errorf("unsupported codec version in finalizeBundle, bundle index: %v, version: %d", bundle.Index, bundle.CodecVersion) } txHash, _, err := r.finalizeSender.SendTransaction("finalizeBundle-"+bundle.Hash, &r.cfg.RollupContractAddress, calldata, nil) if err != nil { log.Error("finalizeBundle in layer1 failed", "with proof", withProof, "index", bundle.Index, "start batch index", bundle.StartBatchIndex, "end batch index", bundle.EndBatchIndex, "RollupContractAddress", r.cfg.RollupContractAddress, "err", err, "calldata", common.Bytes2Hex(calldata)) return err } log.Info("finalizeBundle in layer1", "with proof", withProof, "index", bundle.Index, "start batch index", bundle.StartBatchIndex, "end batch index", bundle.EndBatchIndex, "tx hash", txHash.String()) // Updating rollup status in database. err = r.db.Transaction(func(dbTX *gorm.DB) error { if err = r.batchOrm.UpdateFinalizeTxHashAndRollupStatusByBundleHash(r.ctx, bundle.Hash, txHash.String(), types.RollupFinalizing, dbTX); err != nil { log.Warn("UpdateFinalizeTxHashAndRollupStatusByBundleHash failed", "index", bundle.Index, "bundle hash", bundle.Hash, "tx hash", txHash.String(), "err", err) return err } if err = r.bundleOrm.UpdateFinalizeTxHashAndRollupStatus(r.ctx, bundle.Hash, txHash.String(), types.RollupFinalizing, dbTX); err != nil { log.Warn("UpdateFinalizeTxHashAndRollupStatus failed", "index", bundle.Index, "bundle hash", bundle.Hash, "tx hash", txHash.String(), "err", err) return err } return nil }) if err != nil { log.Warn("failed to update rollup status of bundle and batches", "err", err) return err } // Updating the proving status when finalizing without proof, thus the coordinator could omit this task. // it isn't a necessary step, so don't put in a transaction with UpdateFinalizeTxHashAndRollupStatus if !withProof { txErr := r.db.Transaction(func(dbTX *gorm.DB) error { if updateErr := r.bundleOrm.UpdateProvingStatus(r.ctx, bundle.Hash, types.ProvingTaskVerified, dbTX); updateErr != nil { return updateErr } if updateErr := r.batchOrm.UpdateProvingStatusByBundleHash(r.ctx, bundle.Hash, types.ProvingTaskVerified, dbTX); updateErr != nil { return updateErr } for batchIndex := bundle.StartBatchIndex; batchIndex <= bundle.EndBatchIndex; batchIndex++ { tmpBatch, getErr := r.batchOrm.GetBatchByIndex(r.ctx, batchIndex) if getErr != nil { return getErr } if updateErr := r.chunkOrm.UpdateProvingStatusByBatchHash(r.ctx, tmpBatch.Hash, types.ProvingTaskVerified, dbTX); updateErr != nil { return updateErr } } return nil }) if txErr != nil { log.Error("Updating chunk and batch proving status when finalizing without proof failure", "bundleHash", bundle.Hash, "err", txErr) } } r.metrics.rollupL2RelayerProcessPendingBundlesFinalizedSuccessTotal.Inc() return nil } // batchStatusResponse the response schema type batchStatusResponse struct { ErrCode int `json:"errcode"` ErrMsg string `json:"errmsg"` Data bool `json:"data"` } func (r *Layer2Relayer) getBatchStatusByIndex(batch *orm.Batch) (bool, error) { chunks, getChunkErr := r.chunkOrm.GetChunksInRange(r.ctx, batch.StartChunkIndex, batch.EndChunkIndex) if getChunkErr != nil { log.Error("Layer2Relayer.getBatchStatusByIndex get chunks range failed", "startChunkIndex", batch.StartChunkIndex, "endChunkIndex", batch.EndChunkIndex, "err", getChunkErr) return false, getChunkErr } if len(chunks) == 0 { log.Error("Layer2Relayer.getBatchStatusByIndex get empty chunks", "startChunkIndex", batch.StartChunkIndex, "endChunkIndex", batch.EndChunkIndex) return false, fmt.Errorf("startChunksIndex:%d endChunkIndex:%d get empty chunks", batch.StartChunkIndex, batch.EndChunkIndex) } sort.Slice(chunks, func(i, j int) bool { return chunks[i].StartBlockNumber < chunks[j].StartBlockNumber }) startBlockNum := chunks[0].StartBlockNumber endBlockNum := chunks[len(chunks)-1].EndBlockNumber var response batchStatusResponse resp, err := r.chainMonitorClient.R(). SetQueryParams(map[string]string{ "batch_index": fmt.Sprintf("%d", batch.Index), "start_block_number": fmt.Sprintf("%d", startBlockNum), "end_block_number": fmt.Sprintf("%d", endBlockNum), }). SetResult(&response). Get(fmt.Sprintf("%s/v1/batch_status", r.cfg.ChainMonitor.BaseURL)) if err != nil { return false, err } if resp.IsError() { return false, resp.Error().(error) } if response.ErrCode != 0 { return false, fmt.Errorf("failed to get batch status, errCode: %d, errMsg: %s", response.ErrCode, response.ErrMsg) } return response.Data, nil } func (r *Layer2Relayer) handleConfirmation(cfm *sender.Confirmation) { switch cfm.SenderType { case types.SenderTypeCommitBatch: var status types.RollupStatus if cfm.IsSuccessful { status = types.RollupCommitted r.metrics.rollupL2BatchesCommittedConfirmedTotal.Inc() } else { status = types.RollupCommitFailed r.metrics.rollupL2BatchesCommittedConfirmedFailedTotal.Inc() log.Warn("CommitBatchTxType transaction confirmed but failed in layer1", "confirmation", cfm) } batchHashes := r.batchHashesFromContextID(cfm.ContextID) if err := r.db.Transaction(func(dbTX *gorm.DB) error { for _, batchHash := range batchHashes { if err := r.batchOrm.UpdateCommitTxHashAndRollupStatus(r.ctx, batchHash, cfm.TxHash.String(), status, dbTX); err != nil { return fmt.Errorf("UpdateCommitTxHashAndRollupStatus failed, batchHash: %s, txHash: %s, status: %s, err: %w", batchHash, cfm.TxHash.String(), status, err) } } return nil }); err != nil { log.Warn("failed to update confirmation status for batches", "confirmation", cfm, "err", err) } case types.SenderTypeFinalizeBatch: if strings.HasPrefix(cfm.ContextID, "finalizeBundle-") { bundleHash := strings.TrimPrefix(cfm.ContextID, "finalizeBundle-") var status types.RollupStatus if cfm.IsSuccessful { status = types.RollupFinalized r.metrics.rollupL2BundlesFinalizedConfirmedTotal.Inc() } else { status = types.RollupFinalizeFailed r.metrics.rollupL2BundlesFinalizedConfirmedFailedTotal.Inc() log.Warn("FinalizeBundleTxType transaction confirmed but failed in layer1", "confirmation", cfm) } err := r.db.Transaction(func(dbTX *gorm.DB) error { if err := r.batchOrm.UpdateFinalizeTxHashAndRollupStatusByBundleHash(r.ctx, bundleHash, cfm.TxHash.String(), status, dbTX); err != nil { log.Warn("UpdateFinalizeTxHashAndRollupStatusByBundleHash failed", "confirmation", cfm, "err", err) return err } if err := r.bundleOrm.UpdateFinalizeTxHashAndRollupStatus(r.ctx, bundleHash, cfm.TxHash.String(), status, dbTX); err != nil { log.Warn("UpdateFinalizeTxHashAndRollupStatus failed", "confirmation", cfm, "err", err) return err } return nil }) if err != nil { log.Warn("failed to update rollup status of bundle and batches", "err", err) } return } var status types.RollupStatus if cfm.IsSuccessful { status = types.RollupFinalized r.metrics.rollupL2BatchesFinalizedConfirmedTotal.Inc() } else { status = types.RollupFinalizeFailed r.metrics.rollupL2BatchesFinalizedConfirmedFailedTotal.Inc() log.Warn("FinalizeBatchTxType transaction confirmed but failed in layer1", "confirmation", cfm) } err := r.batchOrm.UpdateFinalizeTxHashAndRollupStatus(r.ctx, cfm.ContextID, cfm.TxHash.String(), status) if err != nil { log.Warn("UpdateFinalizeTxHashAndRollupStatus failed", "confirmation", cfm, "err", err) } default: log.Warn("Unknown transaction type", "confirmation", cfm) } log.Info("Transaction confirmed in layer1", "confirmation", cfm) } func (r *Layer2Relayer) handleL2RollupRelayerConfirmLoop(ctx context.Context) { for { select { case <-ctx.Done(): return case cfm := <-r.commitSender.ConfirmChan(): r.handleConfirmation(cfm) case cfm := <-r.finalizeSender.ConfirmChan(): r.handleConfirmation(cfm) } } } func (r *Layer2Relayer) constructCommitBatchPayloadCodecV7(batchesToSubmit []*dbBatchWithChunks, firstBatch, lastBatch *orm.Batch) ([]byte, []*kzg4844.Blob, uint64, uint64, error) { var maxBlockHeight uint64 var totalGasUsed uint64 blobs := make([]*kzg4844.Blob, 0, len(batchesToSubmit)) version := encoding.CodecVersion(batchesToSubmit[0].Batch.CodecVersion) // construct blobs for _, b := range batchesToSubmit { // double check that all batches have the same version batchVersion := encoding.CodecVersion(b.Batch.CodecVersion) if batchVersion != version { return nil, nil, 0, 0, fmt.Errorf("codec version mismatch, expected: %d, got: %d for batches %d and %d", version, batchVersion, batchesToSubmit[0].Batch.Index, b.Batch.Index) } var batchBlocks []*encoding.Block for _, c := range b.Chunks { blocks, err := r.l2BlockOrm.GetL2BlocksInRange(r.ctx, c.StartBlockNumber, c.EndBlockNumber) if err != nil { return nil, nil, 0, 0, fmt.Errorf("failed to get blocks in range for batch %d: %w", b.Batch.Index, err) } batchBlocks = append(batchBlocks, blocks...) if c.EndBlockNumber > maxBlockHeight { maxBlockHeight = c.EndBlockNumber } totalGasUsed += c.TotalL2TxGas } encodingBatch := &encoding.Batch{ Index: b.Batch.Index, ParentBatchHash: common.HexToHash(b.Batch.ParentBatchHash), PrevL1MessageQueueHash: common.HexToHash(b.Batch.PrevL1MessageQueueHash), PostL1MessageQueueHash: common.HexToHash(b.Batch.PostL1MessageQueueHash), Blocks: batchBlocks, } codec, err := encoding.CodecFromVersion(version) if err != nil { return nil, nil, 0, 0, fmt.Errorf("failed to get codec from version %d, err: %w", b.Batch.CodecVersion, err) } daBatch, err := codec.NewDABatch(encodingBatch) if err != nil { return nil, nil, 0, 0, fmt.Errorf("failed to create DA batch: %w", err) } blobs = append(blobs, daBatch.Blob()) } calldata, err := r.l1RollupABI.Pack("commitBatches", version, common.HexToHash(firstBatch.ParentBatchHash), common.HexToHash(lastBatch.Hash)) if err != nil { return nil, nil, 0, 0, fmt.Errorf("failed to pack commitBatches: %w", err) } return calldata, blobs, maxBlockHeight, totalGasUsed, nil } func (r *Layer2Relayer) constructCommitBatchPayloadValidium(batch *dbBatchWithChunks) ([]byte, uint64, uint64, error) { // Check state root is not zero stateRoot := common.HexToHash(batch.Batch.StateRoot) if stateRoot == (common.Hash{}) { return nil, 0, 0, fmt.Errorf("batch %d state root is zero", batch.Batch.Index) } // Check parent batch hash is not zero parentBatchHash := common.HexToHash(batch.Batch.ParentBatchHash) if parentBatchHash == (common.Hash{}) { return nil, 0, 0, fmt.Errorf("batch %d parent batch hash is zero", batch.Batch.Index) } // Calculate metrics var maxBlockHeight uint64 var totalGasUsed uint64 for _, c := range batch.Chunks { if c.EndBlockNumber > maxBlockHeight { maxBlockHeight = c.EndBlockNumber } totalGasUsed += c.TotalL2TxGas } // Get the commitment from the batch data: for validium mode, we use the last L2 block hash as the commitment to the off-chain data // Get the last chunk from the batch to find the end block hash // TODO: This is a temporary solution, we might use a larger commitment in the future if len(batch.Chunks) == 0 { return nil, 0, 0, fmt.Errorf("last batch has no chunks") } lastChunk := batch.Chunks[len(batch.Chunks)-1] commitment := common.HexToHash(lastChunk.EndBlockHash) var version uint8 if encoding.CodecVersion(batch.Batch.CodecVersion) == encoding.CodecV8 || encoding.CodecVersion(batch.Batch.CodecVersion) == encoding.CodecV9 || encoding.CodecVersion(batch.Batch.CodecVersion) == encoding.CodecV10 { // Validium version line starts with v1, // but rollup-relayer behavior follows v8. version = 1 } else { return nil, 0, 0, fmt.Errorf("unexpected codec version %d for validium mode", batch.Batch.CodecVersion) } calldata, err := r.validiumABI.Pack("commitBatch", version, common.HexToHash(batch.Batch.ParentBatchHash), common.HexToHash(batch.Batch.StateRoot), common.HexToHash(batch.Batch.WithdrawRoot), commitment[:]) if err != nil { return nil, 0, 0, fmt.Errorf("failed to pack commitBatch: %w", err) } log.Info("Validium commitBatch", "maxBlockHeight", maxBlockHeight, "commitment", commitment.Hex()) return calldata, maxBlockHeight, totalGasUsed, nil } func (r *Layer2Relayer) constructFinalizeBundlePayloadCodecV7(dbBatch *orm.Batch, endChunk *orm.Chunk, aggProof *message.OpenVMBundleProof) ([]byte, error) { // Check state root is not zero stateRoot := common.HexToHash(dbBatch.StateRoot) if stateRoot == (common.Hash{}) { return nil, fmt.Errorf("batch %d state root is zero", dbBatch.Index) } if aggProof != nil { // finalizeBundle with proof. calldata, packErr := r.l1RollupABI.Pack( "finalizeBundlePostEuclidV2", dbBatch.BatchHeader, new(big.Int).SetUint64(endChunk.TotalL1MessagesPoppedBefore+endChunk.TotalL1MessagesPoppedInChunk), common.HexToHash(dbBatch.StateRoot), common.HexToHash(dbBatch.WithdrawRoot), aggProof.Proof(), ) if packErr != nil { return nil, fmt.Errorf("failed to pack finalizeBundlePostEuclidV2 with proof: %w", packErr) } return calldata, nil } log.Info("Packing finalizeBundlePostEuclidV2NoProof", "batchHeaderLength", len(dbBatch.BatchHeader), "codecVersion", dbBatch.CodecVersion, "totalL1Messages", endChunk.TotalL1MessagesPoppedBefore+endChunk.TotalL1MessagesPoppedInChunk, "stateRoot", dbBatch.StateRoot, "withdrawRoot", dbBatch.WithdrawRoot) // finalizeBundle without proof. calldata, packErr := r.l1RollupABI.Pack( "finalizeBundlePostEuclidV2NoProof", dbBatch.BatchHeader, new(big.Int).SetUint64(endChunk.TotalL1MessagesPoppedBefore+endChunk.TotalL1MessagesPoppedInChunk), common.HexToHash(dbBatch.StateRoot), common.HexToHash(dbBatch.WithdrawRoot), ) if packErr != nil { return nil, fmt.Errorf("failed to pack finalizeBundlePostEuclidV2NoProof: %w", packErr) } return calldata, nil } func (r *Layer2Relayer) constructFinalizeBundlePayloadValidium(dbBatch *orm.Batch, endChunk *orm.Chunk, aggProof *message.OpenVMBundleProof) ([]byte, error) { log.Info("Packing validium finalizeBundle", "batchHeaderLength", len(dbBatch.BatchHeader), "codecVersion", dbBatch.CodecVersion, "totalL1Messages", endChunk.TotalL1MessagesPoppedBefore+endChunk.TotalL1MessagesPoppedInChunk, "stateRoot", dbBatch.StateRoot, "withdrawRoot", dbBatch.WithdrawRoot, "withProof", aggProof != nil) var proof []byte if aggProof != nil { proof = aggProof.Proof() } calldata, packErr := r.validiumABI.Pack( "finalizeBundle", dbBatch.BatchHeader, new(big.Int).SetUint64(endChunk.TotalL1MessagesPoppedBefore+endChunk.TotalL1MessagesPoppedInChunk), proof, ) if packErr != nil { return nil, fmt.Errorf("failed to pack validium finalizeBundle: %w", packErr) } return calldata, nil } // StopSenders stops the senders of the rollup-relayer to prevent querying the removed pending_transaction table in unit tests. // for unit test func (r *Layer2Relayer) StopSenders() { if r.commitSender != nil { r.commitSender.Stop() } if r.finalizeSender != nil { r.finalizeSender.Stop() } } // fetchBlobFeeHistory returns the last WindowSec seconds of blob‐fee samples, // by reading L1Block table’s BlobBaseFee column. func (r *Layer2Relayer) fetchBlobFeeHistory(windowSec uint64) ([]*big.Int, error) { latest, err := r.l1BlockOrm.GetLatestL1BlockHeight(r.ctx) if err != nil { return nil, fmt.Errorf("GetLatestL1BlockHeight: %w", err) } // bootstrap on first call if r.lastFetchedBlock == 0 { // start window r.lastFetchedBlock = latest - windowSec/secondsPerBlock } from := r.lastFetchedBlock + 1 //if new blocks if from <= latest { raw, err := r.l1BlockOrm.GetBlobFeesInRange(r.ctx, from, latest) if err != nil { return nil, fmt.Errorf("GetBlobFeesInRange: %w", err) } // append them for _, v := range raw { r.feeHistory = append(r.feeHistory, new(big.Int).SetUint64(v)) r.lastFetchedBlock++ } } maxLen := int(windowSec / secondsPerBlock) if len(r.feeHistory) > maxLen { r.feeHistory = r.feeHistory[len(r.feeHistory)-maxLen:] } return r.feeHistory, nil } // calculateTargetPrice applies pct_min/ewma + relaxation to get a BigInt target func calculateTargetPrice(windowSec uint64, strategy StrategyParams, firstTime time.Time, history []*big.Int) *big.Int { var baseline float64 // baseline in Gwei (converting to float, small loss of precision) n := len(history) if n == 0 { return big.NewInt(0) } switch strategy.BaselineType { case PctMin: // make a copy, sort by big.Int.Cmp, then pick the percentile element sorted := make([]*big.Int, n) copy(sorted, history) sort.Slice(sorted, func(i, j int) bool { return sorted[i].Cmp(sorted[j]) < 0 }) idx := int(strategy.BaselineParam * float64(n-1)) if idx < 0 { idx = 0 } baseline, _ = new(big.Float). Quo(new(big.Float).SetInt(sorted[idx]), big.NewFloat(1e9)). Float64() case EWMA: one := big.NewFloat(1) alpha := big.NewFloat(strategy.BaselineParam) oneMinusAlpha := new(big.Float).Sub(one, alpha) // start from first history point ewma := new(big.Float). Quo(new(big.Float).SetInt(history[0]), big.NewFloat(1e9)) for i := 1; i < n; i++ { curr := new(big.Float). Quo(new(big.Float).SetInt(history[i]), big.NewFloat(1e9)) term1 := new(big.Float).Mul(alpha, curr) term2 := new(big.Float).Mul(oneMinusAlpha, ewma) ewma = new(big.Float).Add(term1, term2) } baseline, _ = ewma.Float64() default: // fallback to last element baseline, _ = new(big.Float). Quo(new(big.Float).SetInt(history[n-1]), big.NewFloat(1e9)). Float64() } // now baseline holds our baseline in float64 Gwei // relaxation age := time.Since(firstTime).Seconds() frac := age / float64(windowSec) var adjusted float64 switch strategy.RelaxType { case Exponential: adjusted = baseline * (1 + strategy.Gamma*math.Exp(strategy.Beta*(frac-1))) case Sigmoid: adjusted = baseline * (1 + strategy.Gamma/(1+math.Exp(-strategy.Beta*(frac-0.5)))) default: adjusted = baseline } // back to wei f := new(big.Float).Mul(big.NewFloat(adjusted), big.NewFloat(1e9)) out, _ := f.Int(nil) return out } // skipSubmitByFee returns (true, nil) when submission should be skipped right now // because the blob‐fee is above target and the timeout window hasn’t yet elapsed. // Otherwise returns (false, err) func (r *Layer2Relayer) skipSubmitByFee(oldest time.Time, metrics *l2RelayerMetrics) (bool, error) { windowSec := uint64(r.cfg.BatchSubmission.TimeoutSec) hist, err := r.fetchBlobFeeHistory(windowSec) if err != nil || len(hist) == 0 { return false, fmt.Errorf( "blob-fee history unavailable or empty: %w (history_length=%d)", err, len(hist), ) } // calculate target & get current (in wei) target := calculateTargetPrice(windowSec, r.batchStrategy, oldest, hist) current := hist[len(hist)-1] // apply absolute tolerance offset to target tolerance := new(big.Int).SetUint64(r.cfg.BatchSubmission.BlobFeeTolerance) threshold := new(big.Int).Add(target, tolerance) currentFloat, _ := current.Float64() targetFloat, _ := target.Float64() metrics.rollupL2RelayerCurrentBlobPrice.Set(currentFloat) metrics.rollupL2RelayerTargetBlobPrice.Set(targetFloat) // if current fee > threshold (target + tolerance) and still inside the timeout window, skip if current.Cmp(threshold) > 0 && time.Since(oldest) < time.Duration(windowSec)*time.Second { return true, fmt.Errorf( "blob-fee above threshold & window not yet passed; current=%s target=%s threshold=%s tolerance=%s age=%s", current.String(), target.String(), threshold.String(), tolerance.String(), time.Since(oldest), ) } // otherwise proceed with submission return false, nil } func addrFromSignerConfig(config *config.SignerConfig) (common.Address, error) { switch config.SignerType { case sender.PrivateKeySignerType: privKey, err := crypto.ToECDSA(common.FromHex(config.PrivateKeySignerConfig.PrivateKey)) if err != nil { return common.Address{}, fmt.Errorf("parse sender private key failed: %w", err) } return crypto.PubkeyToAddress(privKey.PublicKey), nil case sender.RemoteSignerType: if config.RemoteSignerConfig.SignerAddress == "" { return common.Address{}, fmt.Errorf("signer address is empty") } return common.HexToAddress(config.RemoteSignerConfig.SignerAddress), nil default: return common.Address{}, fmt.Errorf("failed to determine signer address, unknown signer type: %v", config.SignerType) } }