make sure that all batches committed in the same tx are part of the same bundle

This commit is contained in:
jonastheis
2025-02-27 10:49:10 +08:00
parent 121ce09c80
commit ed394a6369
4 changed files with 183 additions and 56 deletions

View File

@@ -133,7 +133,7 @@ func (p *BundleProposer) proposeBundle() error {
// select at most maxBlocksThisChunk blocks
maxBatchesThisBundle := p.maxBatchNumPerBundle
batches, err := p.batchOrm.GetBatchesGEIndexGECodecVersion(p.ctx, firstUnbundledBatchIndex, p.minCodecVersion, int(maxBatchesThisBundle))
batches, err := p.batchOrm.GetCommittedBatchesGEIndexGECodecVersion(p.ctx, firstUnbundledBatchIndex, p.minCodecVersion, int(maxBatchesThisBundle))
if err != nil {
return err
}
@@ -167,6 +167,11 @@ func (p *BundleProposer) proposeBundle() error {
}
for i := 1; i < len(batches); i++ {
// Make sure that all batches have been committed.
if len(batches[i].CommitTxHash) == 0 {
return fmt.Errorf("commit tx hash is empty for batch %v %s", batches[0].Index, batches[0].Hash)
}
chunk, err := p.chunkOrm.GetChunkByIndex(p.ctx, batches[i].StartChunkIndex)
if err != nil {
return err
@@ -181,6 +186,12 @@ func (p *BundleProposer) proposeBundle() error {
if uint64(len(batches)) == maxBatchesThisBundle {
log.Info("reached maximum number of batches per bundle", "batch count", len(batches), "start batch index", batches[0].Index, "end batch index", batches[len(batches)-1].Index)
batches, err = p.allBatchesCommittedInSameTXIncluded(batches)
if err != nil {
return fmt.Errorf("failed to include all batches committed in the same tx: %w", err)
}
p.bundleFirstBlockTimeoutReached.Inc()
p.bundleBatchesNum.Set(float64(len(batches)))
return p.updateDBBundleInfo(batches, codecVersion)
@@ -189,6 +200,12 @@ func (p *BundleProposer) proposeBundle() error {
currentTimeSec := uint64(time.Now().Unix())
if firstChunk.StartBlockTime+p.bundleTimeoutSec < currentTimeSec {
log.Info("first block timeout", "batch count", len(batches), "start block number", firstChunk.StartBlockNumber, "start block timestamp", firstChunk.StartBlockTime, "current time", currentTimeSec)
batches, err = p.allBatchesCommittedInSameTXIncluded(batches)
if err != nil {
return fmt.Errorf("failed to include all batches committed in the same tx: %w", err)
}
p.bundleFirstBlockTimeoutReached.Inc()
p.bundleBatchesNum.Set(float64(len(batches)))
return p.updateDBBundleInfo(batches, codecVersion)
@@ -198,3 +215,43 @@ func (p *BundleProposer) proposeBundle() error {
p.bundleBatchesProposeNotEnoughTotal.Inc()
return nil
}
// allBatchesCommittedInSameTXIncluded makes sure that all batches that were committed in the same tx are included in the bundle.
// If the last batch of the input batches was committed in the same tx as other batches but has not the highest index amongst those,
// we need to remove all batches with the same commit tx hash.
// As a result, all batches with the same commit tx hash will always be included in a single bundle.
func (p *BundleProposer) allBatchesCommittedInSameTXIncluded(batches []*orm.Batch) ([]*orm.Batch, error) {
lastBatch := batches[len(batches)-1]
fields := map[string]interface{}{
"commit_tx_hash = ?": lastBatch.CommitTxHash,
}
// get all batches with the same commit tx hash as lastBatch
batchesWithSameCommitTX, err := p.batchOrm.GetBatches(p.ctx, fields, nil, 0)
if err != nil {
return nil, fmt.Errorf("failed to get batches with the same commit tx hash: %w", err)
}
// get the batch with the highest index amongst the batches with the same commit tx hash as lastBatch
lastBatchWithSameCommitTX := batchesWithSameCommitTX[len(batchesWithSameCommitTX)-1]
// check if lastBatchWithSameCommitTX is included in the input batches -> if not, we need to remove all batches with the same commit tx hash
batchIncluded := lastBatch.Index == lastBatchWithSameCommitTX.Index
if !batchIncluded {
// we need to remove all batches with the same commit tx hash
for i := 0; i < len(batches); i++ {
if batches[i].CommitTxHash != lastBatchWithSameCommitTX.CommitTxHash {
continue
}
batches = batches[:i]
break
}
}
if len(batches) == 0 {
return nil, fmt.Errorf("no batches anymore after cleaning up batches with the same commit tx hash %s", lastBatch.CommitTxHash)
}
return batches, nil
}

View File

@@ -165,13 +165,14 @@ func (o *Batch) GetFirstUnbatchedChunkIndex(ctx context.Context) (uint64, error)
return latestBatch.EndChunkIndex + 1, nil
}
// GetBatchesGEIndexGECodecVersion retrieves batches that have a batch index greater than or equal to the given index and codec version.
// GetCommittedBatchesGEIndexGECodecVersion retrieves batches that have been committed (commit_tx_hash is set) and have a batch index greater than or equal to the given index and codec version.
// The returned batches are sorted in ascending order by their index.
func (o *Batch) GetBatchesGEIndexGECodecVersion(ctx context.Context, index uint64, codecv encoding.CodecVersion, limit int) ([]*Batch, error) {
func (o *Batch) GetCommittedBatchesGEIndexGECodecVersion(ctx context.Context, index uint64, codecv encoding.CodecVersion, limit int) ([]*Batch, error) {
db := o.db.WithContext(ctx)
db = db.Model(&Batch{})
db = db.Where("index >= ?", index)
db = db.Where("codec_version >= ?", codecv)
db = db.Where("commit_tx_hash IS NOT NULL") // only include committed batches
db = db.Order("index ASC")
if limit > 0 {
@@ -180,7 +181,7 @@ func (o *Batch) GetBatchesGEIndexGECodecVersion(ctx context.Context, index uint6
var batches []*Batch
if err := db.Find(&batches).Error; err != nil {
return nil, fmt.Errorf("Batch.GetBatchesGEIndexGECodecVersion error: %w", err)
return nil, fmt.Errorf("Batch.GetCommittedBatchesGEIndexGECodecVersion error: %w", err)
}
return batches, nil
}

View File

@@ -327,23 +327,23 @@ func TestBatchOrm(t *testing.T) {
assert.Equal(t, "finalizeTxHash", updatedBatch.FinalizeTxHash)
assert.Equal(t, types.RollupFinalizeFailed, types.RollupStatus(updatedBatch.RollupStatus))
batches, err := batchOrm.GetBatchesGEIndexGECodecVersion(context.Background(), 0, codecVersion, 0)
batches, err := batchOrm.GetCommittedBatchesGEIndexGECodecVersion(context.Background(), 0, codecVersion, 0)
assert.NoError(t, err)
assert.Equal(t, 2, len(batches))
assert.Equal(t, batchHash1, batches[0].Hash)
assert.Equal(t, batchHash2, batches[1].Hash)
batches, err = batchOrm.GetBatchesGEIndexGECodecVersion(context.Background(), 0, codecVersion, 1)
batches, err = batchOrm.GetCommittedBatchesGEIndexGECodecVersion(context.Background(), 0, codecVersion, 1)
assert.NoError(t, err)
assert.Equal(t, 1, len(batches))
assert.Equal(t, batchHash1, batches[0].Hash)
batches, err = batchOrm.GetBatchesGEIndexGECodecVersion(context.Background(), 1, codecVersion, 0)
batches, err = batchOrm.GetCommittedBatchesGEIndexGECodecVersion(context.Background(), 1, codecVersion, 0)
assert.NoError(t, err)
assert.Equal(t, 1, len(batches))
assert.Equal(t, batchHash2, batches[0].Hash)
batches, err = batchOrm.GetBatchesGEIndexGECodecVersion(context.Background(), 0, codecVersion+1, 0)
batches, err = batchOrm.GetCommittedBatchesGEIndexGECodecVersion(context.Background(), 0, codecVersion+1, 0)
assert.NoError(t, err)
assert.Equal(t, 0, len(batches))
@@ -356,7 +356,7 @@ func TestBatchOrm(t *testing.T) {
err = batchOrm.UpdateFinalizeTxHashAndRollupStatusByBundleHash(context.Background(), "test hash", "tx hash", types.RollupCommitFailed)
assert.NoError(t, err)
batches, err = batchOrm.GetBatchesGEIndexGECodecVersion(context.Background(), 0, codecVersion, 0)
batches, err = batchOrm.GetCommittedBatchesGEIndexGECodecVersion(context.Background(), 0, codecVersion, 0)
assert.NoError(t, err)
assert.Equal(t, 2, len(batches))
assert.Equal(t, batchHash1, batches[0].Hash)

View File

@@ -14,6 +14,7 @@ import (
"github.com/scroll-tech/go-ethereum/params"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gorm.io/gorm"
"scroll-tech/common/database"
"scroll-tech/common/types"
@@ -295,7 +296,7 @@ func testCommitBatchAndFinalizeBundleCodecV7(t *testing.T) {
}, encoding.CodecV7, chainConfig, db, nil)
bup := watcher.NewBundleProposer(context.Background(), &config.BundleProposerConfig{
MaxBatchNumPerBundle: 1000000,
MaxBatchNumPerBundle: 2,
BundleTimeoutSec: 300,
}, encoding.CodecV7, chainConfig, db, nil)
@@ -303,33 +304,56 @@ func testCommitBatchAndFinalizeBundleCodecV7(t *testing.T) {
batchOrm := orm.NewBatch(db)
bundleOrm := orm.NewBundle(db)
fmt.Println("insert first 5 blocks ------------------------")
err = l2BlockOrm.InsertL2Blocks(context.Background(), blocks[:5])
require.NoError(t, err)
batch1ExpectedLastL1MessageQueueHash, err := encoding.MessageQueueV2ApplyL1MessagesFromBlocks(common.Hash{}, blocks[:5])
require.NoError(t, err)
var batch1ExpectedLastL1MessageQueueHash common.Hash
{
fmt.Println("insert first 5 blocks ------------------------")
err = l2BlockOrm.InsertL2Blocks(context.Background(), blocks[:5])
require.NoError(t, err)
batch1ExpectedLastL1MessageQueueHash, err = encoding.MessageQueueV2ApplyL1MessagesFromBlocks(common.Hash{}, blocks[:5])
require.NoError(t, err)
cp.TryProposeChunk()
bap.TryProposeBatch()
cp.TryProposeChunk()
bap.TryProposeBatch()
}
fmt.Println("insert last 5 blocks ------------------------")
err = l2BlockOrm.InsertL2Blocks(context.Background(), blocks[5:])
require.NoError(t, err)
batch2ExpectedLastL1MessageQueueHash, err := encoding.MessageQueueV2ApplyL1MessagesFromBlocks(batch1ExpectedLastL1MessageQueueHash, blocks[5:])
require.NoError(t, err)
var batch2ExpectedLastL1MessageQueueHash common.Hash
{
fmt.Println("insert next 3 blocks ------------------------")
err = l2BlockOrm.InsertL2Blocks(context.Background(), blocks[5:8])
for _, block := range blocks[5:8] {
fmt.Println("insert[5:8] block number: ", block.Header.Number, block.Header.Hash())
}
require.NoError(t, err)
batch2ExpectedLastL1MessageQueueHash, err = encoding.MessageQueueV2ApplyL1MessagesFromBlocks(batch1ExpectedLastL1MessageQueueHash, blocks[5:8])
require.NoError(t, err)
cp.TryProposeChunk()
bap.TryProposeBatch()
cp.TryProposeChunk()
bap.TryProposeBatch()
}
bup.TryProposeBundle() // The proposed bundle contains two batches when codec version is codecv3.
var batch3ExpectedLastL1MessageQueueHash common.Hash
{
fmt.Println("insert last 2 blocks ------------------------")
err = l2BlockOrm.InsertL2Blocks(context.Background(), blocks[8:])
for _, block := range blocks[8:] {
fmt.Println("insert[:8] block number: ", block.Header.Number, block.Header.Hash())
}
require.NoError(t, err)
batch3ExpectedLastL1MessageQueueHash, err = encoding.MessageQueueV2ApplyL1MessagesFromBlocks(batch2ExpectedLastL1MessageQueueHash, blocks[8:])
require.NoError(t, err)
cp.TryProposeChunk()
bap.TryProposeBatch()
}
var batches []*orm.Batch
// make sure that batches are created as expected
require.Eventually(t, func() bool {
batches, getErr := batchOrm.GetBatches(context.Background(), map[string]interface{}{}, nil, 0)
if getErr != nil {
batches, err = batchOrm.GetBatches(context.Background(), map[string]interface{}{}, nil, 0)
if err != nil {
return false
}
if len(batches) != 3 {
if len(batches) != 4 {
return false
}
@@ -340,46 +364,91 @@ func testCommitBatchAndFinalizeBundleCodecV7(t *testing.T) {
require.Equal(t, batch1ExpectedLastL1MessageQueueHash, common.HexToHash(batches[1].PostL1MessageQueueHash))
require.Equal(t, batch1ExpectedLastL1MessageQueueHash, common.HexToHash(batches[2].PrevL1MessageQueueHash))
require.Equal(t, batch2ExpectedLastL1MessageQueueHash, common.HexToHash(batches[2].PostL1MessageQueueHash))
require.Equal(t, batch2ExpectedLastL1MessageQueueHash, common.HexToHash(batches[3].PrevL1MessageQueueHash))
require.Equal(t, batch3ExpectedLastL1MessageQueueHash, common.HexToHash(batches[3].PostL1MessageQueueHash))
return true
}, 30*time.Second, time.Second)
// simulate proof generation -> all batches and bundle are verified
// Nothing should happen since no batch is committed yet.
{
batchProof := &message.OpenVMBatchProof{}
batches, err := batchOrm.GetBatches(context.Background(), map[string]interface{}{}, nil, 0)
require.NoError(t, err)
batches = batches[1:]
for _, batch := range batches {
err = batchOrm.UpdateProofByHash(context.Background(), batch.Hash, batchProof, 100)
require.NoError(t, err)
err = batchOrm.UpdateProvingStatus(context.Background(), batch.Hash, types.ProvingTaskVerified)
require.NoError(t, err)
}
bundleProof := &message.OpenVMBundleProof{}
bup.TryProposeBundle()
bundles, err := bundleOrm.GetBundles(context.Background(), map[string]interface{}{}, nil, 0)
require.NoError(t, err)
for _, bundle := range bundles {
err = bundleOrm.UpdateProofAndProvingStatusByHash(context.Background(), bundle.Hash, bundleProof, types.ProvingTaskVerified, 100)
require.NoError(t, err)
}
require.Len(t, bundles, 0)
}
//return
// TODO: assert that batches have been submitted together in a single transaction after contract ABI is updated
//for _, batch := range batches {
// fmt.Println("batch hash: ", batch.Hash, batch.Index, batch.RollupStatus)
// //if types.RollupCommitted != types.RollupStatus(batch.RollupStatus) {
// // return false
// //}
//}
//l2Relayer.ProcessPendingBatches()
// simulate batches 2 and 3 being submitted together in a single transaction
err = db.Transaction(func(dbTX *gorm.DB) error {
if err = batchOrm.UpdateCommitTxHashAndRollupStatus(context.Background(), batches[1].Hash, "0xdefdef", types.RollupCommitted, dbTX); err != nil {
return fmt.Errorf("UpdateCommitTxHashAndRollupStatus failed for batch %d: %s, err %v", batches[1].Index, batches[1].Hash, err)
}
for _, batch := range batches[2:] {
if err = batchOrm.UpdateCommitTxHashAndRollupStatus(context.Background(), batch.Hash, "0xabcabc", types.RollupCommitted, dbTX); err != nil {
return fmt.Errorf("UpdateCommitTxHashAndRollupStatus failed for batch %d: %s, err %v", batch.Index, batch.Hash, err)
}
}
return nil
})
require.NoError(t, err)
// We only allow bundles up to 2 batches. We should have 2 bundles:
// 1. batch 1 -> because it was committed by itself and the next set of batches could not fit the bundle
// 2. batch 2 and 3 -> because they were committed together in a single transaction
{
// need to propose 2 times to get 2 bundles with all batches
bup.TryProposeBundle()
bup.TryProposeBundle()
bundles, err := bundleOrm.GetBundles(context.Background(), map[string]interface{}{}, nil, 0)
require.NoError(t, err)
require.Len(t, bundles, 2)
require.Equal(t, bundles[0].StartBatchIndex, batches[1].Index)
require.Equal(t, bundles[0].EndBatchIndex, batches[1].Index)
require.Equal(t, bundles[0].StartBatchHash, batches[1].Hash)
require.Equal(t, bundles[0].EndBatchHash, batches[1].Hash)
require.Equal(t, bundles[1].StartBatchIndex, batches[2].Index)
require.Equal(t, bundles[1].EndBatchIndex, batches[3].Index)
require.Equal(t, bundles[1].StartBatchHash, batches[2].Hash)
require.Equal(t, bundles[1].EndBatchHash, batches[3].Hash)
}
return
// simulate proof generation -> all batches and bundle are verified
//{
// batchProof := &message.OpenVMBatchProof{}
// batches, err := batchOrm.GetBatches(context.Background(), map[string]interface{}{}, nil, 0)
// require.NoError(t, err)
// batches = batches[1:]
// for _, batch := range batches {
// err = batchOrm.UpdateProofByHash(context.Background(), batch.Hash, batchProof, 100)
// require.NoError(t, err)
// err = batchOrm.UpdateProvingStatus(context.Background(), batch.Hash, types.ProvingTaskVerified)
// require.NoError(t, err)
// }
//
// bundleProof := &message.OpenVMBundleProof{}
// bundles, err := bundleOrm.GetBundles(context.Background(), map[string]interface{}{}, nil, 0)
// require.NoError(t, err)
// for _, bundle := range bundles {
// err = bundleOrm.UpdateProofAndProvingStatusByHash(context.Background(), bundle.Hash, bundleProof, types.ProvingTaskVerified, 100)
// require.NoError(t, err)
// }
//}
// TODO: assert that batches have been submitted together in a single transaction after contract ABI is updated
//l2Relayer.ProcessPendingBatches()
//l2Relayer.ProcessPendingBundles()
//assert.Eventually(t, func() bool {
// l2Relayer.ProcessPendingBundles()
//
// batches, err := batchOrm.GetBatches(context.Background(), map[string]interface{}{}, nil, 0)
// batches, err = batchOrm.GetBatches(context.Background(), map[string]interface{}{}, nil, 0)
// assert.NoError(t, err)
// assert.Len(t, batches, 3)
// batches = batches[1:]
@@ -415,5 +484,5 @@ func testCommitBatchAndFinalizeBundleCodecV7(t *testing.T) {
// }
//
// return true
//}, 30*time.Second, time.Second)
//}, 10*time.Second, time.Second)
}