Limit columns requested instead of making block batches small

This commit is contained in:
Kasey Kirkham
2025-11-20 18:15:52 -06:00
parent 1438a351d7
commit 8d6b3a9fb2
6 changed files with 65 additions and 16 deletions

View File

@@ -28,6 +28,11 @@ var (
errCommitmentValueMismatch = errors.Wrap(errInvalidDataColumnResponse, "sidecar commitments do not match block")
)
// tune the amount of columns we try to download from peers at once.
// The spec limit is 128 * 32, but connection errors are more likely when
// requesting so much at once.
const columnRequestLimit = 128 * 4
type columnBatch struct {
first primitives.Slot
last primitives.Slot
@@ -60,6 +65,28 @@ func (cs *columnBatch) needed() peerdas.ColumnIndices {
return ci
}
// neededSidecarCount returns the total number of sidecars still needed to complete the batch.
func (cs *columnBatch) neededSidecarCount() int {
count := 0
for _, v := range cs.toDownload {
count += v.remaining.Count()
}
return count
}
// neededSidecarsByColumn counts how many sidecars are still needed for each column index.
func (cs *columnBatch) neededSidecarsByColumn(peerHas peerdas.ColumnIndices) map[uint64]int {
need := make(map[uint64]int, len(peerHas))
for _, v := range cs.toDownload {
for idx := range v.remaining {
if peerHas.Has(idx) {
need[idx]++
}
}
}
return need
}
type columnSync struct {
*columnBatch
store *das.LazilyPersistentStoreColumn
@@ -105,6 +132,21 @@ func (cs *columnSync) columnsNeeded() peerdas.ColumnIndices {
}
func (cs *columnSync) request(reqCols []uint64) *ethpb.DataColumnSidecarsByRangeRequest {
// slice b.nextReqCols to limit the size of the request.
reqCount := 0
peerHas := peerdas.NewColumnIndicesFromSlice(reqCols)
needed := cs.neededSidecarsByColumn(peerHas)
for i := range reqCols {
addSidecars := needed[reqCols[i]] // number of sidecars this column would add to the response
if reqCount+addSidecars > columnRequestLimit {
reqCols = reqCols[:i]
break
}
reqCount += addSidecars
}
if len(reqCols) == 0 {
return nil
}
return sync.DataColumnSidecarsByRangeRequest(reqCols, cs.first, cs.last)
}

View File

@@ -35,7 +35,9 @@ func defaultNewWorker(p p2p.P2P) newWorker {
// minRequestInterval is the minimum amount of time between requests.
// ie a value of 1s means we'll make ~1 req/sec per peer.
const minReqInterval = time.Second
const (
minReqInterval = time.Second
)
type p2pBatchWorkerPool struct {
maxBatches int

View File

@@ -20,6 +20,8 @@ var (
errUnknownDomain = errors.Wrap(errInvalidBlocks, "runtime error looking up signing domain for fork")
errBatchSignatureFailed = errors.Wrap(errInvalidBlocks, "failed to verify block signature in batch")
errInvalidSignatureData = errors.Wrap(errInvalidBlocks, "could not verify signatures in block batch due to invalid signature data")
errEmptyVerificationSet = errors.New("no blocks to verify in batch")
)
// verifiedROBlocks represents a slice of blocks that have passed signature verification.
@@ -77,7 +79,11 @@ type verifier struct {
}
func (vr verifier) verify(blks []blocks.ROBlock) (verifiedROBlocks, error) {
var err error
if len(blks) == 0 {
// Returning an error here simplifies handling in the caller.
// errEmptyVerificationSet should not cause the peer to be downscored.
return nil, errEmptyVerificationSet
}
sigSet := bls.NewSet()
for i := range blks {
if i > 0 && blks[i-1].Root() != blks[i].Block().ParentRoot() {

View File

@@ -89,13 +89,15 @@ func (w *p2pWorker) run(ctx context.Context) {
}
log.WithFields(b.logFields()).WithField("backfillWorker", w.id).Trace("Backfill worker received batch")
switch b.state {
case batchSequenced:
b = w.handleBlocks(ctx, b)
case batchSyncBlobs:
b = w.handleBlobs(ctx, b)
case batchSyncColumns:
b = w.handleColumns(ctx, b)
case batchSequenced:
b = w.handleBlocks(ctx, b)
case batchImportable:
// This state indicates the batch got all the way to be imported and failed,
// so we need clear out the blocks to go all the way back to the start of the process.
b.blocks = nil
b = w.handleBlocks(ctx, b)
default:
@@ -141,13 +143,13 @@ func (w *p2pWorker) handleBlocks(ctx context.Context, b batch) batch {
}
dlt := time.Now()
blockDownloadMs.Observe(float64(dlt.Sub(start).Milliseconds()))
toVerify, err := blocks.NewROBlockSlice(results)
if err != nil {
log.WithError(err).WithFields(b.logFields()).Debug("Batch conversion to ROBlock failed")
return b.withRetryableError(err)
}
vb, err := w.cfg.verifier.verify(toVerify)
verified, err := w.cfg.verifier.verify(toVerify)
blockVerifyMs.Observe(float64(time.Since(dlt).Milliseconds()))
if err != nil {
if shouldDownscore(err) {
@@ -156,27 +158,28 @@ func (w *p2pWorker) handleBlocks(ctx context.Context, b batch) batch {
log.WithError(err).WithFields(b.logFields()).Debug("Batch validation failed")
return b.withRetryableError(err)
}
// This is a hack to get the rough size of the batch. This helps us approximate the amount of memory needed
// to hold batches and relative sizes between batches, but will be inaccurate when it comes to measuring actual
// bytes downloaded from peers, mainly because the p2p messages are snappy compressed.
bdl := 0
for i := range vb {
bdl += vb[i].SizeSSZ()
for i := range verified {
bdl += verified[i].SizeSSZ()
}
blockDownloadBytesApprox.Add(float64(bdl))
log.WithFields(b.logFields()).WithField("dlbytes", bdl).Debug("Backfill batch block bytes downloaded")
b.blocks = vb
b.blocks = verified
blobRetentionStart, err := sync.BlobRPCMinValidSlot(current)
if err != nil {
return b.withRetryableError(errors.Wrap(err, "configuration issue, could not compute minimum blob retention slot"))
}
bscfg := &blobSyncConfig{retentionStart: blobRetentionStart, nbv: w.cfg.newVB, store: w.cfg.blobStore}
bs, err := newBlobSync(current, vb, bscfg)
bs, err := newBlobSync(current, verified, bscfg)
if err != nil {
return b.withRetryableError(err)
}
cs, err := newColumnSync(ctx, b, vb, current, w.p2p, vb, w.cfg)
cs, err := newColumnSync(ctx, b, verified, current, w.p2p, verified, w.cfg)
if err != nil {
return b.withRetryableError(err)
}

View File

@@ -5,7 +5,6 @@ import (
"fmt"
"io"
"slices"
"sort"
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/helpers"
@@ -817,9 +816,6 @@ func downscorePeer(p2p p2p.P2P, peerID peer.ID, reason string, fields ...logrus.
}
func DataColumnSidecarsByRangeRequest(columns []uint64, start, end primitives.Slot) *ethpb.DataColumnSidecarsByRangeRequest {
sort.Slice(columns, func(i, j int) bool {
return columns[i] < columns[j]
})
return &ethpb.DataColumnSidecarsByRangeRequest{
StartSlot: start,
Count: uint64(end-start) + 1,

View File

@@ -22,7 +22,7 @@ var (
Usage: "Number of blocks per backfill batch. " +
"A larger number will request more blocks at once from peers, but also consume more system memory to " +
"hold batches in memory during processing. This has a multiplicative effect with " + backfillWorkerCountName + ".",
Value: 4,
Value: 32,
}
// BackfillWorkerCount allows users to tune the number of concurrent backfill batches to download, to maximize
// network utilization at the cost of higher memory.