Compare commits

...

6 Commits

Author SHA1 Message Date
Péter Garamvölgyi
bcee2a85be fix batch-proposer 2023-08-25 12:13:53 +08:00
Péter Garamvölgyi
85c27ba91e make row consumption optional (for real) 2023-08-24 19:27:59 +08:00
Péter Garamvölgyi
8e021d58ff fix chunk-proposer db limit 2023-08-24 10:38:59 +08:00
Péter Garamvölgyi
f594555053 make ccc result optional 2023-08-24 10:02:04 +08:00
Péter Garamvölgyi
7488a98c83 allow loop with 0 period 2023-08-23 20:56:08 +08:00
Péter Garamvölgyi
d0d24e8075 propose chunk immediately 2023-08-23 20:09:58 +08:00
8 changed files with 41 additions and 24 deletions

View File

@@ -105,9 +105,9 @@ func action(ctx *cli.Context) error {
l2watcher.TryFetchRunningMissingBlocks(number)
})
go utils.Loop(subCtx, 2*time.Second, chunkProposer.TryProposeChunk)
go utils.Loop(subCtx, 0*time.Second, chunkProposer.TryProposeChunk)
go utils.Loop(subCtx, 10*time.Second, batchProposer.TryProposeBatch)
go utils.Loop(subCtx, 0*time.Second, batchProposer.TryProposeBatch)
go utils.Loop(subCtx, 2*time.Second, l2relayer.ProcessPendingBatches)

View File

@@ -3,7 +3,6 @@ package watcher
import (
"context"
"fmt"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
@@ -222,7 +221,7 @@ func (p *BatchProposer) proposeBatchChunks() ([]*orm.Chunk, error) {
}
}
log.Debug("breaking limit condition in batching",
log.Info("breaking limit condition in batching",
"currentTotalChunks", totalChunks,
"maxChunkNumPerBatch", p.maxChunkNumPerBatch,
"currentL1CommitCalldataSize", totalL1CommitCalldataSize,
@@ -237,13 +236,8 @@ func (p *BatchProposer) proposeBatchChunks() ([]*orm.Chunk, error) {
}
}
currentTimeSec := uint64(time.Now().Unix())
if dbChunks[0].StartBlockTime+p.batchTimeoutSec < currentTimeSec {
log.Warn("first block timeout",
"start block number", dbChunks[0].StartBlockNumber,
"first block timestamp", dbChunks[0].StartBlockTime,
"chunk outdated time threshold", currentTimeSec,
)
if len(dbChunks) == 10 {
log.Info("Collected 10 chunks, creating batch")
p.batchFirstBlockTimeoutReached.Inc()
p.totalL1CommitGas.Set(float64(totalL1CommitGas))
p.totalL1CommitCalldataSize.Set(float64(totalL1CommitCalldataSize))

View File

@@ -2,7 +2,6 @@ package watcher
import (
"context"
"errors"
"fmt"
"time"
@@ -24,7 +23,8 @@ type chunkRowConsumption map[string]uint64
// add accumulates row consumption per sub-circuit
func (crc *chunkRowConsumption) add(rowConsumption *gethTypes.RowConsumption) error {
if rowConsumption == nil {
return errors.New("rowConsumption is <nil>")
// return errors.New("rowConsumption is <nil>")
return nil
}
for _, subCircuit := range *rowConsumption {
(*crc)[subCircuit.Name] += subCircuit.RowNumber
@@ -254,7 +254,7 @@ func (p *ChunkProposer) proposeChunk() (*types.Chunk, error) {
}
}
log.Debug("breaking limit condition in chunking",
log.Info("breaking limit condition in chunking",
"totalL2TxNum", totalL2TxNum,
"maxL2TxNumPerChunk", p.maxL2TxNumPerChunk,
"currentL1CommitCalldataSize", totalL1CommitCalldataSize,
@@ -276,6 +276,16 @@ func (p *ChunkProposer) proposeChunk() (*types.Chunk, error) {
chunk.Blocks = append(chunk.Blocks, block)
}
log.Debug("did not break limit condition in chunking",
"numBlocks", len(chunk.Blocks),
"totalL2TxNum", totalL2TxNum,
"maxL2TxNumPerChunk", p.maxL2TxNumPerChunk,
"currentL1CommitCalldataSize", totalL1CommitCalldataSize,
"maxL1CommitGasPerChunk", p.maxL1CommitGasPerChunk,
"maxL1CommitCalldataSizePerChunk", p.maxL1CommitCalldataSizePerChunk,
"chunkRowConsumption", crc,
"p.maxRowConsumptionPerChunk", p.maxRowConsumptionPerChunk)
currentTimeSec := uint64(time.Now().Unix())
if blocks[0].Header.Time+p.chunkTimeoutSec < currentTimeSec {
log.Warn("first block timeout",

View File

@@ -162,7 +162,7 @@ func (w *L2WatcherClient) getAndStoreBlockTraces(ctx context.Context, from, to u
return fmt.Errorf("failed to GetBlockByNumberOrHash: %v. number: %v", err, number)
}
if block.RowConsumption == nil {
return fmt.Errorf("fetched block does not contain RowConsumption. number: %v", number)
// return fmt.Errorf("fetched block does not contain RowConsumption. number: %v", number)
}
log.Info("retrieved block", "height", block.Header().Number, "hash", block.Header().Hash().String())

View File

@@ -93,6 +93,7 @@ func (o *Chunk) GetUnbatchedChunks(ctx context.Context) ([]*Chunk, error) {
db = db.Model(&Chunk{})
db = db.Where("batch_hash IS NULL")
db = db.Order("index asc")
db = db.Limit(10) // we allow at most 10 chunks per batch
var chunks []*Chunk
if err := db.Find(&chunks).Error; err != nil {

View File

@@ -72,6 +72,7 @@ func (o *L2Block) GetUnchunkedBlocks(ctx context.Context) ([]*types.WrappedBlock
db = db.Select("header, transactions, withdraw_root, row_consumption")
db = db.Where("chunk_hash IS NULL")
db = db.Order("number ASC")
db = db.Limit(100)
var l2Blocks []L2Block
if err := db.Find(&l2Blocks).Error; err != nil {

View File

@@ -34,14 +34,25 @@ func LoopWithContext(ctx context.Context, period time.Duration, f func(ctx conte
// Loop Run the f func periodically.
func Loop(ctx context.Context, period time.Duration, f func()) {
tick := time.NewTicker(period)
defer tick.Stop()
for ; ; <-tick.C {
select {
case <-ctx.Done():
return
default:
f()
if period == 0*time.Second {
for {
select {
case <-ctx.Done():
return
default:
f()
}
}
} else {
tick := time.NewTicker(period)
defer tick.Stop()
for ; ; <-tick.C {
select {
case <-ctx.Done():
return
default:
f()
}
}
}
}

View File

@@ -14,7 +14,7 @@ create table l2_block
tx_num INTEGER NOT NULL,
gas_used BIGINT NOT NULL,
block_timestamp NUMERIC NOT NULL,
row_consumption TEXT NOT NULL,
row_consumption TEXT,
-- chunk
chunk_hash VARCHAR DEFAULT NULL,