feat(chunk&batch proposers): check compressed data validity (#1431)

Co-authored-by: colinlyguo <colinlyguo@users.noreply.github.com>
This commit is contained in:
colin
2024-07-14 05:07:54 +08:00
committed by GitHub
parent f14053ed0c
commit bfb3c7d2b4
8 changed files with 124 additions and 104 deletions

View File

@@ -5,7 +5,7 @@ import (
"runtime/debug"
)
var tag = "v4.4.28"
var tag = "v4.4.29"
var commit = func() string {
if info, ok := debug.ReadBuildInfo(); ok {

View File

@@ -511,6 +511,8 @@ github.com/scroll-tech/da-codec v0.0.0-20240515170552-2e5286688fce/go.mod h1:1wW
github.com/scroll-tech/da-codec v0.0.0-20240605080813-32bfc9fccde7/go.mod h1:1wWYii0OPwd5kw+xrz0PFgS420xNadrNF1x/ELJT+TM=
github.com/scroll-tech/da-codec v0.0.0-20240708144942-a554fe417a39 h1:1gg+HPuirILVVVugZczq7ZQtPdCC/a38pwaFBqFtt1o=
github.com/scroll-tech/da-codec v0.0.0-20240708144942-a554fe417a39/go.mod h1:D6XEESeNVJkQJlv3eK+FyR+ufPkgVQbJzERylQi53Bs=
github.com/scroll-tech/da-codec v0.0.0-20240711065717-10bc0bed2be3 h1:QDiMz4RVjNigz4sHh1ieMe4nl/ca+MrXxyaVf0IKePk=
github.com/scroll-tech/da-codec v0.0.0-20240711065717-10bc0bed2be3/go.mod h1:D6XEESeNVJkQJlv3eK+FyR+ufPkgVQbJzERylQi53Bs=
github.com/scroll-tech/go-ethereum v1.10.14-0.20221202061207-804e7edc23ba/go.mod h1:jurIpDQ0hqtp9//xxeWzr8X9KMP/+TYn+vz3K1wZrv0=
github.com/scroll-tech/go-ethereum v1.10.14-0.20221213034543-78c1f57fcfea/go.mod h1:jurIpDQ0hqtp9//xxeWzr8X9KMP/+TYn+vz3K1wZrv0=
github.com/scroll-tech/go-ethereum v1.10.14-0.20221221073256-5ca70bf3a257/go.mod h1:jurIpDQ0hqtp9//xxeWzr8X9KMP/+TYn+vz3K1wZrv0=

View File

@@ -10,7 +10,7 @@ require (
github.com/go-resty/resty/v2 v2.7.0
github.com/holiman/uint256 v1.2.4
github.com/prometheus/client_golang v1.16.0
github.com/scroll-tech/da-codec v0.0.0-20240710085339-459342e9a542
github.com/scroll-tech/da-codec v0.0.0-20240712125636-d7e76c3f54b5
github.com/scroll-tech/go-ethereum v1.10.14-0.20240607130425-e2becce6a1a4
github.com/smartystreets/goconvey v1.8.0
github.com/stretchr/testify v1.9.0

View File

@@ -236,8 +236,8 @@ github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik=
github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/scroll-tech/da-codec v0.0.0-20240710085339-459342e9a542 h1:if1EeBvoNf7eLzGJYpTt0aJ6Jvw5n274TUDD+9uj9sA=
github.com/scroll-tech/da-codec v0.0.0-20240710085339-459342e9a542/go.mod h1:D6XEESeNVJkQJlv3eK+FyR+ufPkgVQbJzERylQi53Bs=
github.com/scroll-tech/da-codec v0.0.0-20240712125636-d7e76c3f54b5 h1:mdgFgYSKbB7JbUPEvqKdXxXlzc3uRwD+dlNA4GsFSoo=
github.com/scroll-tech/da-codec v0.0.0-20240712125636-d7e76c3f54b5/go.mod h1:D6XEESeNVJkQJlv3eK+FyR+ufPkgVQbJzERylQi53Bs=
github.com/scroll-tech/go-ethereum v1.10.14-0.20240607130425-e2becce6a1a4 h1:gheWXra3HdZsz6q+w4LrXy8ybHOO6/t6Kb/V64bR5wE=
github.com/scroll-tech/go-ethereum v1.10.14-0.20240607130425-e2becce6a1a4/go.mod h1:byf/mZ8jLYUCnUePTicjJWn+RvKdxDn7buS6glTnMwQ=
github.com/scroll-tech/zktrie v0.8.4 h1:UagmnZ4Z3ITCk+aUq9NQZJNAwnWl4gSxsLb2Nl7IgRE=

View File

@@ -154,9 +154,49 @@ func (p *BatchProposer) TryProposeBatch() {
}
}
func (p *BatchProposer) updateDBBatchInfo(batch *encoding.Batch, codecVersion encoding.CodecVersion, metrics utils.BatchMetrics) error {
func (p *BatchProposer) updateDBBatchInfo(batch *encoding.Batch, codecVersion encoding.CodecVersion, metrics *utils.BatchMetrics) error {
compatibilityBreachOccurred := false
for {
compatible, err := utils.CheckBatchCompressedDataCompatibility(batch, codecVersion)
if err != nil {
log.Error("Failed to check batch compressed data compatibility", "batch index", batch.Index, "codecVersion", codecVersion, "err", err)
return err
}
if compatible {
break
}
if len(batch.Chunks) == 1 {
log.Error("Cannot truncate batch with only 1 chunk for compatibility", "start block number", batch.Chunks[0].Blocks[0].Header.Number.Uint64(),
"end block number", batch.Chunks[0].Blocks[len(batch.Chunks[0].Blocks)-1].Header.Number.Uint64())
return errors.New("cannot truncate batch with only 1 chunk for compatibility")
}
compatibilityBreachOccurred = true
batch.Chunks = batch.Chunks[:len(batch.Chunks)-1]
log.Info("Batch not compatible with compressed data, removing last chunk", "batch index", batch.Index, "truncated chunk length", len(batch.Chunks))
}
if compatibilityBreachOccurred {
p.compressedDataCompatibilityBreachTotal.Inc()
// recalculate batch metrics after truncation
var calcErr error
metrics, calcErr = utils.CalculateBatchMetrics(batch, codecVersion)
if calcErr != nil {
return fmt.Errorf("failed to calculate batch metrics, batch index: %v, error: %w", batch.Index, calcErr)
}
p.recordTimerBatchMetrics(metrics)
p.recordAllBatchMetrics(metrics)
}
p.proposeBatchUpdateInfoTotal.Inc()
err := p.db.Transaction(func(dbTX *gorm.DB) error {
dbBatch, dbErr := p.batchOrm.InsertBatch(p.ctx, batch, codecVersion, metrics, dbTX)
dbBatch, dbErr := p.batchOrm.InsertBatch(p.ctx, batch, codecVersion, *metrics, dbTX)
if dbErr != nil {
log.Warn("BatchProposer.updateBatchInfoInDB insert batch failure", "index", batch.Index, "parent hash", batch.ParentBatchHash.Hex(), "error", dbErr)
return dbErr
@@ -239,19 +279,6 @@ func (p *BatchProposer) proposeBatch() error {
for i, chunk := range daChunks {
batch.Chunks = append(batch.Chunks, chunk)
metrics, calcErr := utils.CalculateBatchMetrics(&batch, codecVersion)
var compressErr *encoding.CompressedDataCompatibilityError
if errors.As(calcErr, &compressErr) {
if i == 0 {
// The first chunk fails compressed data compatibility check, manual fix is needed.
return fmt.Errorf("the first chunk fails compressed data compatibility check; start block number: %v, end block number: %v", dbChunks[0].StartBlockNumber, dbChunks[0].EndBlockNumber)
}
log.Warn("breaking limit condition in proposing a new batch due to a compressed data compatibility breach", "start chunk index", dbChunks[0].Index, "end chunk index", dbChunks[len(dbChunks)-1].Index)
batch.Chunks = batch.Chunks[:len(batch.Chunks)-1]
p.compressedDataCompatibilityBreachTotal.Inc()
return p.updateDBBatchInfo(&batch, codecVersion, *metrics)
}
if calcErr != nil {
return fmt.Errorf("failed to calculate batch metrics: %w", calcErr)
}
@@ -286,7 +313,7 @@ func (p *BatchProposer) proposeBatch() error {
}
p.recordAllBatchMetrics(metrics)
return p.updateDBBatchInfo(&batch, codecVersion, *metrics)
return p.updateDBBatchInfo(&batch, codecVersion, metrics)
}
}
@@ -304,7 +331,7 @@ func (p *BatchProposer) proposeBatch() error {
p.batchFirstBlockTimeoutReached.Inc()
p.recordAllBatchMetrics(metrics)
return p.updateDBBatchInfo(&batch, codecVersion, *metrics)
return p.updateDBBatchInfo(&batch, codecVersion, metrics)
}
log.Debug("pending chunks do not reach one of the constraints or contain a timeout block")

View File

@@ -168,14 +168,52 @@ func (p *ChunkProposer) TryProposeChunk() {
}
}
func (p *ChunkProposer) updateDBChunkInfo(chunk *encoding.Chunk, codecVersion encoding.CodecVersion, metrics utils.ChunkMetrics) error {
func (p *ChunkProposer) updateDBChunkInfo(chunk *encoding.Chunk, codecVersion encoding.CodecVersion, metrics *utils.ChunkMetrics) error {
if chunk == nil {
return nil
}
compatibilityBreachOccurred := false
for {
compatible, err := utils.CheckChunkCompressedDataCompatibility(chunk, codecVersion)
if err != nil {
log.Error("Failed to check chunk compressed data compatibility", "start block number", chunk.Blocks[0].Header.Number, "codecVersion", codecVersion, "err", err)
return err
}
if compatible {
break
}
if len(chunk.Blocks) == 1 {
log.Error("Cannot truncate chunk with only 1 block for compatibility", "block number", chunk.Blocks[0].Header.Number)
return errors.New("cannot truncate chunk with only 1 block for compatibility")
}
compatibilityBreachOccurred = true
chunk.Blocks = chunk.Blocks[:len(chunk.Blocks)-1]
log.Info("Chunk not compatible with compressed data, removing last block", "start block number", chunk.Blocks[0].Header.Number, "truncated block length", len(chunk.Blocks))
}
if compatibilityBreachOccurred {
p.compressedDataCompatibilityBreachTotal.Inc()
// recalculate chunk metrics after truncation
var calcErr error
metrics, calcErr = utils.CalculateChunkMetrics(chunk, codecVersion)
if calcErr != nil {
return fmt.Errorf("failed to calculate chunk metrics, start block number: %v, error: %w", chunk.Blocks[0].Header.Number, calcErr)
}
p.recordTimerChunkMetrics(metrics)
p.recordAllChunkMetrics(metrics)
}
p.proposeChunkUpdateInfoTotal.Inc()
err := p.db.Transaction(func(dbTX *gorm.DB) error {
dbChunk, err := p.chunkOrm.InsertChunk(p.ctx, chunk, codecVersion, metrics, dbTX)
dbChunk, err := p.chunkOrm.InsertChunk(p.ctx, chunk, codecVersion, *metrics, dbTX)
if err != nil {
log.Warn("ChunkProposer.InsertChunk failed", "err", err)
return err
@@ -234,7 +272,7 @@ func (p *ChunkProposer) proposeChunk() error {
return fmt.Errorf("failed to calculate chunk metrics: %w", calcErr)
}
p.recordTimerChunkMetrics(metrics)
return p.updateDBChunkInfo(&chunk, codecVersion, *metrics)
return p.updateDBChunkInfo(&chunk, codecVersion, metrics)
}
var chunk encoding.Chunk
@@ -242,18 +280,6 @@ func (p *ChunkProposer) proposeChunk() error {
chunk.Blocks = append(chunk.Blocks, block)
metrics, calcErr := utils.CalculateChunkMetrics(&chunk, codecVersion)
var compressErr *encoding.CompressedDataCompatibilityError
if errors.As(calcErr, &compressErr) {
if i == 0 {
// The first block fails compressed data compatibility check, manual fix is needed.
return fmt.Errorf("the first block fails compressed data compatibility check; block number: %v", block.Header.Number)
}
log.Warn("breaking limit condition in proposing a new chunk due to a compressed data compatibility breach", "start block number", chunk.Blocks[0].Header.Number, "block count", len(chunk.Blocks))
chunk.Blocks = chunk.Blocks[:len(chunk.Blocks)-1]
p.compressedDataCompatibilityBreachTotal.Inc()
return p.updateDBChunkInfo(&chunk, codecVersion, *metrics)
}
if calcErr != nil {
return fmt.Errorf("failed to calculate chunk metrics: %w", calcErr)
}
@@ -296,7 +322,7 @@ func (p *ChunkProposer) proposeChunk() error {
}
p.recordAllChunkMetrics(metrics)
return p.updateDBChunkInfo(&chunk, codecVersion, *metrics)
return p.updateDBChunkInfo(&chunk, codecVersion, metrics)
}
}
@@ -316,7 +342,7 @@ func (p *ChunkProposer) proposeChunk() error {
p.chunkFirstBlockTimeoutReached.Inc()
p.recordAllChunkMetrics(metrics)
return p.updateDBChunkInfo(&chunk, codecVersion, *metrics)
return p.updateDBChunkInfo(&chunk, codecVersion, metrics)
}
log.Debug("pending blocks do not reach one of the constraints or contain a timeout block")

View File

@@ -1,7 +1,6 @@
package utils
import (
"errors"
"fmt"
"time"
@@ -9,6 +8,7 @@ import (
"github.com/scroll-tech/da-codec/encoding/codecv0"
"github.com/scroll-tech/da-codec/encoding/codecv1"
"github.com/scroll-tech/da-codec/encoding/codecv2"
"github.com/scroll-tech/da-codec/encoding/codecv3"
"github.com/scroll-tech/go-ethereum/common"
)
@@ -92,12 +92,7 @@ func CalculateChunkMetrics(chunk *encoding.Chunk, codecVersion encoding.CodecVer
metrics.L1CommitUncompressedBatchBytesSize, metrics.L1CommitBlobSize, err = codecv2.EstimateChunkL1CommitBatchSizeAndBlobSize(chunk)
metrics.EstimateBlobSizeTime = time.Since(start)
if err != nil {
var compressErr *encoding.CompressedDataCompatibilityError
if errors.As(err, &compressErr) {
return nil, err
} else {
return nil, fmt.Errorf("failed to estimate codecv2 chunk L1 commit batch size and blob size: %w", err)
}
return nil, fmt.Errorf("failed to estimate codecv2 chunk L1 commit batch size and blob size: %w", err)
}
return metrics, nil
default:
@@ -105,6 +100,34 @@ func CalculateChunkMetrics(chunk *encoding.Chunk, codecVersion encoding.CodecVer
}
}
// CheckChunkCompressedDataCompatibility checks compressed data compatibility of a batch built by a single chunk.
func CheckChunkCompressedDataCompatibility(chunk *encoding.Chunk, codecVersion encoding.CodecVersion) (bool, error) {
switch codecVersion {
case encoding.CodecV0, encoding.CodecV1:
return true, nil
case encoding.CodecV2:
return codecv2.CheckChunkCompressedDataCompatibility(chunk)
case encoding.CodecV3:
return codecv3.CheckChunkCompressedDataCompatibility(chunk)
default:
return false, fmt.Errorf("unsupported codec version: %v", codecVersion)
}
}
// CheckBatchCompressedDataCompatibility checks compressed data compatibility of a batch built by a single chunk.
func CheckBatchCompressedDataCompatibility(batch *encoding.Batch, codecVersion encoding.CodecVersion) (bool, error) {
switch codecVersion {
case encoding.CodecV0, encoding.CodecV1:
return true, nil
case encoding.CodecV2:
return codecv2.CheckBatchCompressedDataCompatibility(batch)
case encoding.CodecV3:
return codecv3.CheckBatchCompressedDataCompatibility(batch)
default:
return false, fmt.Errorf("unsupported codec version: %v", codecVersion)
}
}
// BatchMetrics indicates the metrics for proposing a batch.
type BatchMetrics struct {
// common metrics
@@ -177,12 +200,7 @@ func CalculateBatchMetrics(batch *encoding.Batch, codecVersion encoding.CodecVer
metrics.L1CommitUncompressedBatchBytesSize, metrics.L1CommitBlobSize, err = codecv2.EstimateBatchL1CommitBatchSizeAndBlobSize(batch)
metrics.EstimateBlobSizeTime = time.Since(start)
if err != nil {
var compressErr *encoding.CompressedDataCompatibilityError
if errors.As(err, &compressErr) {
return nil, err
} else {
return nil, fmt.Errorf("failed to estimate codecv2 batch L1 commit batch size and blob size: %w", err)
}
return nil, fmt.Errorf("failed to estimate codecv2 batch L1 commit batch size and blob size: %w", err)
}
return metrics, nil
default:

View File

@@ -1,53 +0,0 @@
package utils
import (
"errors"
"math/big"
"testing"
"github.com/agiledragon/gomonkey/v2"
"github.com/scroll-tech/da-codec/encoding"
"github.com/scroll-tech/da-codec/encoding/codecv2"
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/stretchr/testify/assert"
)
// regression test
func TestCompressedDataCompatibilityErrorCatching(t *testing.T) {
block := &encoding.Block{
Header: &types.Header{
Number: big.NewInt(0),
},
RowConsumption: &types.RowConsumption{},
}
chunk := &encoding.Chunk{
Blocks: []*encoding.Block{block},
}
batch := &encoding.Batch{
Index: 0,
TotalL1MessagePoppedBefore: 0,
ParentBatchHash: common.Hash{},
Chunks: []*encoding.Chunk{chunk},
}
patchGuard1 := gomonkey.ApplyFunc(codecv2.EstimateChunkL1CommitBatchSizeAndBlobSize, func(b *encoding.Chunk) (uint64, uint64, error) {
return 0, 0, &encoding.CompressedDataCompatibilityError{Err: errors.New("test-error-1")}
})
defer patchGuard1.Reset()
var compressErr *encoding.CompressedDataCompatibilityError
_, err := CalculateChunkMetrics(chunk, encoding.CodecV2)
assert.Error(t, err)
assert.ErrorAs(t, err, &compressErr)
patchGuard2 := gomonkey.ApplyFunc(codecv2.EstimateBatchL1CommitBatchSizeAndBlobSize, func(b *encoding.Batch) (uint64, uint64, error) {
return 0, 0, &encoding.CompressedDataCompatibilityError{Err: errors.New("test-error-2")}
})
defer patchGuard2.Reset()
_, err = CalculateBatchMetrics(batch, encoding.CodecV2)
assert.Error(t, err)
assert.ErrorAs(t, err, &compressErr)
}