support first version of CodecV6 in relayer and add functionality to submit multiple batches in a single transaction

This commit is contained in:
jonastheis
2024-12-30 18:48:18 +08:00
parent 45b23edde9
commit 41606fe7d7
4 changed files with 222 additions and 4 deletions

View File

@@ -11,8 +11,8 @@ require (
github.com/holiman/uint256 v1.2.4
github.com/mitchellh/mapstructure v1.5.0
github.com/prometheus/client_golang v1.16.0
github.com/scroll-tech/da-codec v0.1.2
github.com/scroll-tech/go-ethereum v1.10.14-0.20241023093931-91c2f9c27f4d
github.com/scroll-tech/da-codec v0.1.3-0.20241227041406-286f2092d4cd
github.com/scroll-tech/go-ethereum v1.10.14-0.20241210104312-bdf64cfb39dc
github.com/smartystreets/goconvey v1.8.0
github.com/spf13/viper v1.19.0
github.com/stretchr/testify v1.9.0

View File

@@ -267,8 +267,11 @@ github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6g
github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ=
github.com/scroll-tech/da-codec v0.1.2 h1:QyJ+dQ4zWVVJwuqxNt4MiKyrymVc6rHe4YPtURkjiRc=
github.com/scroll-tech/da-codec v0.1.2/go.mod h1:odz1ck3umvYccCG03osaQBISAYGinZktZYbpk94fYRE=
github.com/scroll-tech/da-codec v0.1.3-0.20241227041406-286f2092d4cd h1:Yq3vc5e9VqXKBEGAqpptPhviXlydoL3R8xX8R8C6YvY=
github.com/scroll-tech/da-codec v0.1.3-0.20241227041406-286f2092d4cd/go.mod h1:XfQhUl3msmE6dpZEbR/LIwiMxywPQcUQsch9URgXDzs=
github.com/scroll-tech/go-ethereum v1.10.14-0.20241023093931-91c2f9c27f4d h1:vuv7fGKEDtoeetI6RkKt8RAByJsYZBWk9Vo6gShv65c=
github.com/scroll-tech/go-ethereum v1.10.14-0.20241023093931-91c2f9c27f4d/go.mod h1:PWEOTg6LeWlJAlFJauO0msSLXWnpHmE+mVh5txtfeRM=
github.com/scroll-tech/go-ethereum v1.10.14-0.20241210104312-bdf64cfb39dc/go.mod h1:xRDJvaNUe7lCU2fB+AqyS7gahar+dfJPrUJplfXF4dw=
github.com/scroll-tech/zktrie v0.8.4 h1:UagmnZ4Z3ITCk+aUq9NQZJNAwnWl4gSxsLb2Nl7IgRE=
github.com/scroll-tech/zktrie v0.8.4/go.mod h1:XvNo7vAk8yxNyTjBDj5WIiFzYW4bx/gJ78+NK6Zn6Uk=
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=

View File

@@ -29,6 +29,18 @@ type SenderConfig struct {
TxType string `json:"tx_type"`
// The maximum number of pending blob-carrying transactions
MaxPendingBlobTxs int64 `json:"max_pending_blob_txs"`
// Config for batch submission
BatchSubmission *BatchSubmission `json:"batch_submission"`
}
type BatchSubmission struct {
// The minimum number of batches to submit in a single transaction.
MinBatches int `json:"min_batches"`
// The maximum number of batches to submit in a single transaction.
MaxBatches int `json:"max_batches"`
// The time in seconds after which a batch is considered stale and should be submitted ignoring the min batch count.
TimeoutSec int64 `json:"timeout"`
}
// ChainMonitor this config is used to get batch status from chain_monitor API.

View File

@@ -373,11 +373,156 @@ func (r *Layer2Relayer) ProcessGasPriceOracle() {
// ProcessPendingBatches processes the pending batches by sending commitBatch transactions to layer 1.
func (r *Layer2Relayer) ProcessPendingBatches() {
// get pending batches from database in ascending order by their index.
dbBatches, err := r.batchOrm.GetFailedAndPendingBatches(r.ctx, 5)
dbBatches, err := r.batchOrm.GetFailedAndPendingBatches(r.ctx, max(5, r.cfg.SenderConfig.BatchSubmission.MaxBatches))
if err != nil {
log.Error("Failed to fetch pending L2 batches", "err", err)
return
}
var batchesToSubmit []*dbBatchWithChunksAndParent
var forceSubmit bool
for i, dbBatch := range dbBatches {
if i == 0 && encoding.CodecVersion(dbBatch.CodecVersion) < encoding.CodecV6 {
// if the first batch is not >= V6 then we need to submit batches one by one
r.processPendingBatchesV4(dbBatches)
return
}
batchesToSubmitLen := len(batchesToSubmit)
var dbChunks []*orm.Chunk
var dbParentBatch *orm.Batch
// Verify batches compatibility
{
dbChunks, err = r.chunkOrm.GetChunksInRange(r.ctx, dbBatch.StartChunkIndex, dbBatch.EndChunkIndex)
if err != nil {
log.Error("failed to get chunks in range", "err", err)
return
}
// check codec version
for _, dbChunk := range dbChunks {
if dbBatch.CodecVersion != dbChunk.CodecVersion {
log.Error("batch codec version is different from chunk codec version", "batch index", dbBatch.Index, "chunk index", dbChunk.Index, "batch codec version", dbBatch.CodecVersion, "chunk codec version", dbChunk.CodecVersion)
return
}
}
if dbBatch.Index == 0 {
log.Error("invalid args: batch index is 0, should only happen in committing genesis batch")
return
}
dbParentBatch, err = r.batchOrm.GetBatchByIndex(r.ctx, dbBatch.Index-1)
if err != nil {
log.Error("failed to get parent batch header", "err", err)
return
}
if dbParentBatch.CodecVersion > dbBatch.CodecVersion {
log.Error("parent batch codec version is greater than current batch codec version", "index", dbBatch.Index, "hash", dbBatch.Hash, "parent codec version", dbParentBatch.CodecVersion, "current codec version", dbBatch.CodecVersion)
return
}
// make sure we commit batches of the same codec version together.
// If we encounter a batch with a different codec version, we stop here and will commit the batches we have so far.
// The next call of ProcessPendingBatches will then start with the batch with the different codec version.
if batchesToSubmitLen > 0 && batchesToSubmit[batchesToSubmitLen-1].Batch.CodecVersion != dbBatch.CodecVersion {
break
}
}
// if one of the batches is too old, we force submit all batches that we have so far in the next step
if !forceSubmit && time.Since(dbBatch.CreatedAt) > time.Duration(r.cfg.SenderConfig.BatchSubmission.TimeoutSec)*time.Second {
forceSubmit = true
}
if batchesToSubmitLen <= r.cfg.SenderConfig.BatchSubmission.MaxBatches {
batchesToSubmit = append(batchesToSubmit, &dbBatchWithChunksAndParent{
Batch: dbBatch,
Chunks: dbChunks,
ParentBatch: dbParentBatch,
})
}
}
if !forceSubmit && len(batchesToSubmit) < r.cfg.SenderConfig.BatchSubmission.MinBatches {
log.Debug("Not enough batches to submit", "count", len(batchesToSubmit), "minBatches", r.cfg.SenderConfig.BatchSubmission.MinBatches, "maxBatches", r.cfg.SenderConfig.BatchSubmission.MaxBatches)
return
}
if forceSubmit {
log.Info("Forcing submission of batches due to timeout", "batch index", batchesToSubmit[0].Batch.Index, "created at", batchesToSubmit[0].Batch.CreatedAt)
}
// We have at least 1 batch to commit
firstBatch := batchesToSubmit[0].Batch
lastBatch := batchesToSubmit[len(batchesToSubmit)-1].Batch
codecVersion := encoding.CodecVersion(firstBatch.CodecVersion)
switch codecVersion {
case encoding.CodecV6:
calldata, blob, err := r.constructCommitBatchPayloadCodecV6(batchesToSubmit)
if err != nil {
log.Error("failed to construct commitBatchWithBlobProof payload for V6", "codecVersion", codecVersion, "start index", firstBatch.Index, "end index", lastBatch.Index, "err", err)
return
}
default:
log.Error("unsupported codec version in ProcessPendingBatches", "codecVersion", codecVersion, "start index", firstBatch, "end index", lastBatch.Index)
return
}
//txHash, err := r.commitSender.SendTransaction(dbBatch.Hash, &r.cfg.RollupContractAddress, calldata, blob, 0)
//if err != nil {
// if errors.Is(err, sender.ErrTooManyPendingBlobTxs) {
// r.metrics.rollupL2RelayerProcessPendingBatchErrTooManyPendingBlobTxsTotal.Inc()
// log.Debug(
// "Skipped sending commitBatch tx to L1: too many pending blob txs",
// "maxPending", r.cfg.SenderConfig.MaxPendingBlobTxs,
// "err", err,
// )
// return
// }
// log.Error(
// "Failed to send commitBatch tx to layer1",
// "index", dbBatch.Index,
// "hash", dbBatch.Hash,
// "RollupContractAddress", r.cfg.RollupContractAddress,
// "err", err,
// "calldata", common.Bytes2Hex(calldata),
// )
// return
//}
//
//err = r.batchOrm.UpdateCommitTxHashAndRollupStatus(r.ctx, dbBatch.Hash, txHash.String(), types.RollupCommitting)
//if err != nil {
// log.Error("UpdateCommitTxHashAndRollupStatus failed", "hash", dbBatch.Hash, "index", dbBatch.Index, "err", err)
// return
//}
//
//var maxBlockHeight uint64
//var totalGasUsed uint64
//for _, dbChunk := range dbChunks {
// if dbChunk.EndBlockNumber > maxBlockHeight {
// maxBlockHeight = dbChunk.EndBlockNumber
// }
// totalGasUsed += dbChunk.TotalL2TxGas
//}
//r.metrics.rollupL2RelayerCommitBlockHeight.Set(float64(maxBlockHeight))
//r.metrics.rollupL2RelayerCommitThroughput.Add(float64(totalGasUsed))
//
//r.metrics.rollupL2RelayerProcessPendingBatchSuccessTotal.Inc()
//log.Info("Sent the commitBatch tx to layer1", "batch index", dbBatch.Index, "batch hash", dbBatch.Hash, "tx hash", txHash.String())
}
type dbBatchWithChunksAndParent struct {
Batch *orm.Batch
Chunks []*orm.Chunk
ParentBatch *orm.Batch
}
func (r *Layer2Relayer) processPendingBatchesV4(dbBatches []*orm.Batch) {
for _, dbBatch := range dbBatches {
r.metrics.rollupL2RelayerProcessPendingBatchTotal.Inc()
@@ -432,7 +577,7 @@ func (r *Layer2Relayer) ProcessPendingBatches() {
return
}
default:
log.Error("unsupported codec version", "codecVersion", codecVersion)
log.Error("unsupported codec version in processPendingBatchesV4", "codecVersion", codecVersion)
return
}
@@ -868,6 +1013,64 @@ func (r *Layer2Relayer) constructCommitBatchPayloadCodecV4(dbBatch *orm.Batch, d
return calldata, daBatch.Blob(), nil
}
func (r *Layer2Relayer) constructCommitBatchPayloadCodecV6(batchesToSubmit []*dbBatchWithChunksAndParent) ([]byte, []*kzg4844.Blob, error) {
blobs := make([]*kzg4844.Blob, len(batchesToSubmit))
version := encoding.CodecVersion(batchesToSubmit[0].Batch.CodecVersion)
var firstParentBatch *orm.Batch
// construct blobs
for _, b := range batchesToSubmit {
// double check that all batches have the same version
batchVersion := encoding.CodecVersion(b.Batch.CodecVersion)
if batchVersion != version {
return nil, nil, fmt.Errorf("codec version mismatch, expected: %d, got: %d for batches %d and %d", version, batchVersion, batchesToSubmit[0].Batch.Index, b.Batch.Index)
}
if firstParentBatch == nil {
firstParentBatch = b.ParentBatch
}
chunks := make([]*encoding.Chunk, len(b.Chunks))
for i, c := range b.Chunks {
blocks, err := r.l2BlockOrm.GetL2BlocksInRange(r.ctx, c.StartBlockNumber, c.EndBlockNumber)
if err != nil {
return nil, nil, fmt.Errorf("failed to get blocks in range for batch %d: %w", b.Batch.Index, err)
}
chunks[i] = &encoding.Chunk{Blocks: blocks}
}
encodingBatch := &encoding.Batch{
Index: b.Batch.Index,
TotalL1MessagePoppedBefore: b.Chunks[0].TotalL1MessagesPoppedBefore,
ParentBatchHash: common.HexToHash(b.ParentBatch.Hash),
Chunks: chunks,
}
codec, err := encoding.CodecFromVersion(version)
if err != nil {
return nil, nil, fmt.Errorf("failed to get codec from version %d, err: %w", b.Batch.CodecVersion, err)
}
daBatch, err := codec.NewDABatch(encodingBatch)
if err != nil {
return nil, nil, fmt.Errorf("failed to create DA batch: %w", err)
}
blobs = append(blobs, daBatch.Blob())
}
if firstParentBatch == nil {
return nil, nil, fmt.Errorf("firstParentBatch is nil")
}
// TODO: this needs to be updated once the contract interface is finalized
calldata, err := r.l1RollupABI.Pack("commitBatches", version, firstParentBatch.BatchHeader)
if err != nil {
return nil, nil, fmt.Errorf("failed to pack commitBatchWithBlobProof: %w", err)
}
return calldata, blobs, nil
}
func (r *Layer2Relayer) constructFinalizeBundlePayloadCodecV4(dbBatch *orm.Batch, aggProof *message.BundleProof) ([]byte, error) {
if aggProof != nil { // finalizeBundle with proof.
calldata, packErr := r.l1RollupABI.Pack(