From 8191bb5711b1ddd6d37a208cea261e6357d7a0f7 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Tue, 16 Dec 2025 17:27:32 +0100 Subject: [PATCH] Construct data column sidecars from the execution layer in parallel and add metrics (#16115) **What type of PR is this?** Optimisation **What does this PR do? Why is it needed?** While constructing data column sidecars from the execution layer is very cheap compared to reconstructing data column sidecars from data column sidecars, it is still efficient to run this construction in parallel. (**Reminder:** Using `getBlobsV2`, all the cell proofs are present, but only 64 (out of 128) cells are present. Recomputing the missing cells is cheap, while reconstruction the missing proofs is expensive.) This PR: - adds some metrics - ensure the construction is done in parallel **Other notes for review** Please read commit by commit The red vertical lines represent the limit between before and after this pull request image The last commit transforms the bottom right histogram to summary, since it makes no sense any more to have an histogram for values. Please check "hide whitespace" so this PR is easier to review: image Updated metrics: Now, for every **non missed slot**, for a block **with at least one commitment**, we have either: ``` [2025-12-10 10:02:12.93] DEBUG sync: Constructed data column sidecars from the execution client count=118 indices=0-5,7-16,18-27,29-35,37-46,48-49,51-82,84-100,102-106,108-125,127 iteration=0 proposerIndex=855082 root=0xf8f44e7d4cbc209b2ff2796c07fcf91e85ab45eebe145c4372017a18b25bf290 slot=1928961 type=BeaconBlock ``` either ``` [2025-12-10 10:02:25.69] DEBUG sync: No data column sidecars constructed from the execution client iteration=2 proposerIndex=1093657 root=0x64c2f6c31e369cd45f2edaf5524b64f4869e8148cd29fb84b5b8866be529eea3 slot=1928962 type=DataColumnSidecar ``` image image **Acknowledgements** - [x] I have read [CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md). - [x] I have included a uniquely named [changelog fragment file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd). - [x] I have added a description to this PR with sufficient context for reviewers to understand this PR. --- beacon-chain/core/peerdas/metrics.go | 22 +++- beacon-chain/core/peerdas/reconstruction.go | 120 +++++++++++------- beacon-chain/db/filesystem/data_column.go | 18 +++ beacon-chain/db/filesystem/metrics.go | 17 ++- beacon-chain/execution/engine_client.go | 7 + beacon-chain/execution/metrics.go | 7 + beacon-chain/sync/metrics.go | 14 ++ beacon-chain/sync/subscriber_beacon_blocks.go | 45 ++++--- changelog/manu_reconstruct-metrics.md | 7 + 9 files changed, 185 insertions(+), 72 deletions(-) create mode 100644 changelog/manu_reconstruct-metrics.md diff --git a/beacon-chain/core/peerdas/metrics.go b/beacon-chain/core/peerdas/metrics.go index caa92ebc56..3c20d51157 100644 --- a/beacon-chain/core/peerdas/metrics.go +++ b/beacon-chain/core/peerdas/metrics.go @@ -5,10 +5,20 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" ) -var dataColumnComputationTime = promauto.NewHistogram( - prometheus.HistogramOpts{ - Name: "beacon_data_column_sidecar_computation_milliseconds", - Help: "Captures the time taken to compute data column sidecars from blobs.", - Buckets: []float64{25, 50, 100, 250, 500, 750, 1000}, - }, +var ( + dataColumnComputationTime = promauto.NewHistogram( + prometheus.HistogramOpts{ + Name: "beacon_data_column_sidecar_computation_milliseconds", + Help: "Captures the time taken to compute data column sidecars from blobs.", + Buckets: []float64{25, 50, 100, 250, 500, 750, 1000}, + }, + ) + + cellsAndProofsFromStructuredComputationTime = promauto.NewHistogram( + prometheus.HistogramOpts{ + Name: "cells_and_proofs_from_structured_computation_milliseconds", + Help: "Captures the time taken to compute cells and proofs from structured computation.", + Buckets: []float64{10, 20, 30, 40, 50, 100, 200}, + }, + ) ) diff --git a/beacon-chain/core/peerdas/reconstruction.go b/beacon-chain/core/peerdas/reconstruction.go index 371418e7c9..bb50a0fa7a 100644 --- a/beacon-chain/core/peerdas/reconstruction.go +++ b/beacon-chain/core/peerdas/reconstruction.go @@ -3,6 +3,7 @@ package peerdas import ( "sort" "sync" + "time" "github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain/kzg" fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams" @@ -296,32 +297,42 @@ func ComputeCellsAndProofsFromFlat(blobs [][]byte, cellProofs [][]byte) ([][]kzg return nil, nil, ErrBlobsCellsProofsMismatch } - cellsPerBlob := make([][]kzg.Cell, 0, blobCount) - proofsPerBlob := make([][]kzg.Proof, 0, blobCount) + var wg errgroup.Group + + cellsPerBlob := make([][]kzg.Cell, blobCount) + proofsPerBlob := make([][]kzg.Proof, blobCount) + for i, blob := range blobs { - var kzgBlob kzg.Blob - if copy(kzgBlob[:], blob) != len(kzgBlob) { - return nil, nil, errors.New("wrong blob size - should never happen") - } - - // Compute the extended cells from the (non-extended) blob. - cells, err := kzg.ComputeCells(&kzgBlob) - if err != nil { - return nil, nil, errors.Wrap(err, "compute cells") - } - - var proofs []kzg.Proof - for idx := uint64(i) * numberOfColumns; idx < (uint64(i)+1)*numberOfColumns; idx++ { - var kzgProof kzg.Proof - if copy(kzgProof[:], cellProofs[idx]) != len(kzgProof) { - return nil, nil, errors.New("wrong KZG proof size - should never happen") + wg.Go(func() error { + var kzgBlob kzg.Blob + if copy(kzgBlob[:], blob) != len(kzgBlob) { + return errors.New("wrong blob size - should never happen") } - proofs = append(proofs, kzgProof) - } + // Compute the extended cells from the (non-extended) blob. + cells, err := kzg.ComputeCells(&kzgBlob) + if err != nil { + return errors.Wrap(err, "compute cells") + } - cellsPerBlob = append(cellsPerBlob, cells) - proofsPerBlob = append(proofsPerBlob, proofs) + proofs := make([]kzg.Proof, 0, numberOfColumns) + for idx := uint64(i) * numberOfColumns; idx < (uint64(i)+1)*numberOfColumns; idx++ { + var kzgProof kzg.Proof + if copy(kzgProof[:], cellProofs[idx]) != len(kzgProof) { + return errors.New("wrong KZG proof size - should never happen") + } + + proofs = append(proofs, kzgProof) + } + + cellsPerBlob[i] = cells + proofsPerBlob[i] = proofs + return nil + }) + } + + if err := wg.Wait(); err != nil { + return nil, nil, err } return cellsPerBlob, proofsPerBlob, nil @@ -329,40 +340,55 @@ func ComputeCellsAndProofsFromFlat(blobs [][]byte, cellProofs [][]byte) ([][]kzg // ComputeCellsAndProofsFromStructured computes the cells and proofs from blobs and cell proofs. func ComputeCellsAndProofsFromStructured(blobsAndProofs []*pb.BlobAndProofV2) ([][]kzg.Cell, [][]kzg.Proof, error) { - cellsPerBlob := make([][]kzg.Cell, 0, len(blobsAndProofs)) - proofsPerBlob := make([][]kzg.Proof, 0, len(blobsAndProofs)) - for _, blobAndProof := range blobsAndProofs { + start := time.Now() + defer func() { + cellsAndProofsFromStructuredComputationTime.Observe(float64(time.Since(start).Milliseconds())) + }() + + var wg errgroup.Group + + cellsPerBlob := make([][]kzg.Cell, len(blobsAndProofs)) + proofsPerBlob := make([][]kzg.Proof, len(blobsAndProofs)) + + for i, blobAndProof := range blobsAndProofs { if blobAndProof == nil { return nil, nil, ErrNilBlobAndProof } - var kzgBlob kzg.Blob - if copy(kzgBlob[:], blobAndProof.Blob) != len(kzgBlob) { - return nil, nil, errors.New("wrong blob size - should never happen") - } - - // Compute the extended cells from the (non-extended) blob. - cells, err := kzg.ComputeCells(&kzgBlob) - if err != nil { - return nil, nil, errors.Wrap(err, "compute cells") - } - - kzgProofs := make([]kzg.Proof, 0, fieldparams.NumberOfColumns) - for _, kzgProofBytes := range blobAndProof.KzgProofs { - if len(kzgProofBytes) != kzg.BytesPerProof { - return nil, nil, errors.New("wrong KZG proof size - should never happen") + wg.Go(func() error { + var kzgBlob kzg.Blob + if copy(kzgBlob[:], blobAndProof.Blob) != len(kzgBlob) { + return errors.New("wrong blob size - should never happen") } - var kzgProof kzg.Proof - if copy(kzgProof[:], kzgProofBytes) != len(kzgProof) { - return nil, nil, errors.New("wrong copied KZG proof size - should never happen") + // Compute the extended cells from the (non-extended) blob. + cells, err := kzg.ComputeCells(&kzgBlob) + if err != nil { + return errors.Wrap(err, "compute cells") } - kzgProofs = append(kzgProofs, kzgProof) - } + kzgProofs := make([]kzg.Proof, 0, fieldparams.NumberOfColumns) + for _, kzgProofBytes := range blobAndProof.KzgProofs { + if len(kzgProofBytes) != kzg.BytesPerProof { + return errors.New("wrong KZG proof size - should never happen") + } - cellsPerBlob = append(cellsPerBlob, cells) - proofsPerBlob = append(proofsPerBlob, kzgProofs) + var kzgProof kzg.Proof + if copy(kzgProof[:], kzgProofBytes) != len(kzgProof) { + return errors.New("wrong copied KZG proof size - should never happen") + } + + kzgProofs = append(kzgProofs, kzgProof) + } + + cellsPerBlob[i] = cells + proofsPerBlob[i] = kzgProofs + return nil + }) + } + + if err := wg.Wait(); err != nil { + return nil, nil, err } return cellsPerBlob, proofsPerBlob, nil diff --git a/beacon-chain/db/filesystem/data_column.go b/beacon-chain/db/filesystem/data_column.go index 221146c556..20e2246f21 100644 --- a/beacon-chain/db/filesystem/data_column.go +++ b/beacon-chain/db/filesystem/data_column.go @@ -515,6 +515,11 @@ func (dcs *DataColumnStorage) Clear() error { // prune clean the cache, the filesystem and mutexes. func (dcs *DataColumnStorage) prune() { + startTime := time.Now() + defer func() { + dataColumnPruneLatency.Observe(float64(time.Since(startTime).Milliseconds())) + }() + highestStoredEpoch := dcs.cache.HighestEpoch() // Check if we need to prune. @@ -622,6 +627,9 @@ func (dcs *DataColumnStorage) saveDataColumnSidecarsExistingFile(filePath string // Create the SSZ encoded data column sidecars. var sszEncodedDataColumnSidecars []byte + // Initialize the count of the saved SSZ encoded data column sidecar. + storedCount := uint8(0) + for { dataColumnSidecars := pullChan(inputDataColumnSidecars) if len(dataColumnSidecars) == 0 { @@ -668,6 +676,9 @@ func (dcs *DataColumnStorage) saveDataColumnSidecarsExistingFile(filePath string return errors.Wrap(err, "set index") } + // Increment the count of the saved SSZ encoded data column sidecar. + storedCount++ + // Append the SSZ encoded data column sidecar to the SSZ encoded data column sidecars. sszEncodedDataColumnSidecars = append(sszEncodedDataColumnSidecars, sszEncodedDataColumnSidecar...) } @@ -692,9 +703,12 @@ func (dcs *DataColumnStorage) saveDataColumnSidecarsExistingFile(filePath string return errWrongBytesWritten } + syncStart := time.Now() if err := file.Sync(); err != nil { return errors.Wrap(err, "sync") } + dataColumnFileSyncLatency.Observe(float64(time.Since(syncStart).Milliseconds())) + dataColumnBatchStoreCount.Observe(float64(storedCount)) return nil } @@ -808,10 +822,14 @@ func (dcs *DataColumnStorage) saveDataColumnSidecarsNewFile(filePath string, inp return errWrongBytesWritten } + syncStart := time.Now() if err := file.Sync(); err != nil { return errors.Wrap(err, "sync") } + dataColumnFileSyncLatency.Observe(float64(time.Since(syncStart).Milliseconds())) + dataColumnBatchStoreCount.Observe(float64(storedCount)) + return nil } diff --git a/beacon-chain/db/filesystem/metrics.go b/beacon-chain/db/filesystem/metrics.go index 495e13836f..03f039349d 100644 --- a/beacon-chain/db/filesystem/metrics.go +++ b/beacon-chain/db/filesystem/metrics.go @@ -36,16 +36,15 @@ var ( }) // Data columns - dataColumnBuckets = []float64{3, 5, 7, 9, 11, 13} dataColumnSaveLatency = promauto.NewHistogram(prometheus.HistogramOpts{ Name: "data_column_storage_save_latency", Help: "Latency of DataColumnSidecar storage save operations in milliseconds", - Buckets: dataColumnBuckets, + Buckets: []float64{10, 20, 30, 50, 100, 200, 500}, }) dataColumnFetchLatency = promauto.NewHistogram(prometheus.HistogramOpts{ Name: "data_column_storage_get_latency", Help: "Latency of DataColumnSidecar storage get operations in milliseconds", - Buckets: dataColumnBuckets, + Buckets: []float64{3, 5, 7, 9, 11, 13}, }) dataColumnPrunedCounter = promauto.NewCounter(prometheus.CounterOpts{ Name: "data_column_pruned", @@ -59,4 +58,16 @@ var ( Name: "data_column_disk_count", Help: "Approximate number of data columns in storage", }) + dataColumnFileSyncLatency = promauto.NewSummary(prometheus.SummaryOpts{ + Name: "data_column_file_sync_latency", + Help: "Latency of sync operations when saving data columns in milliseconds", + }) + dataColumnBatchStoreCount = promauto.NewSummary(prometheus.SummaryOpts{ + Name: "data_column_batch_store_count", + Help: "Number of data columns stored in a batch", + }) + dataColumnPruneLatency = promauto.NewSummary(prometheus.SummaryOpts{ + Name: "data_column_prune_latency", + Help: "Latency of data column prune operations in milliseconds", + }) ) diff --git a/beacon-chain/execution/engine_client.go b/beacon-chain/execution/engine_client.go index 43702a9596..fa63871ad0 100644 --- a/beacon-chain/execution/engine_client.go +++ b/beacon-chain/execution/engine_client.go @@ -532,12 +532,19 @@ func (s *Service) GetBlobsV2(ctx context.Context, versionedHashes []common.Hash) ctx, span := trace.StartSpan(ctx, "powchain.engine-api-client.GetBlobsV2") defer span.End() + start := time.Now() + if !s.capabilityCache.has(GetBlobsV2) { return nil, errors.New(fmt.Sprintf("%s is not supported", GetBlobsV2)) } result := make([]*pb.BlobAndProofV2, len(versionedHashes)) err := s.rpcClient.CallContext(ctx, &result, GetBlobsV2, versionedHashes) + + if len(result) != 0 { + getBlobsV2Latency.Observe(float64(time.Since(start).Milliseconds())) + } + return result, handleRPCError(err) } diff --git a/beacon-chain/execution/metrics.go b/beacon-chain/execution/metrics.go index 1fc2027b4f..b1ec43527f 100644 --- a/beacon-chain/execution/metrics.go +++ b/beacon-chain/execution/metrics.go @@ -27,6 +27,13 @@ var ( Buckets: []float64{25, 50, 100, 200, 500, 1000, 2000, 4000}, }, ) + getBlobsV2Latency = promauto.NewHistogram( + prometheus.HistogramOpts{ + Name: "get_blobs_v2_latency_milliseconds", + Help: "Captures RPC latency for getBlobsV2 in milliseconds", + Buckets: []float64{25, 50, 100, 200, 500, 1000, 2000, 4000}, + }, + ) errParseCount = promauto.NewCounter(prometheus.CounterOpts{ Name: "execution_parse_error_count", Help: "The number of errors that occurred while parsing execution payload", diff --git a/beacon-chain/sync/metrics.go b/beacon-chain/sync/metrics.go index 7acfab0f5d..e4b634910d 100644 --- a/beacon-chain/sync/metrics.go +++ b/beacon-chain/sync/metrics.go @@ -204,6 +204,13 @@ var ( }, ) + dataColumnsRecoveredFromELAttempts = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "data_columns_recovered_from_el_attempts", + Help: "Count the number of data columns recovery attempts from the execution layer.", + }, + ) + dataColumnsRecoveredFromELTotal = promauto.NewCounter( prometheus.CounterOpts{ Name: "data_columns_recovered_from_el_total", @@ -242,6 +249,13 @@ var ( Buckets: []float64{100, 250, 500, 750, 1000, 1500, 2000, 4000, 8000, 12000, 16000}, }, ) + + dataColumnSidecarsObtainedViaELCount = promauto.NewSummary( + prometheus.SummaryOpts{ + Name: "data_column_obtained_via_el_count", + Help: "Count the number of data column sidecars obtained via the execution layer.", + }, + ) ) func (s *Service) updateMetrics() { diff --git a/beacon-chain/sync/subscriber_beacon_blocks.go b/beacon-chain/sync/subscriber_beacon_blocks.go index 0425c5acdc..14798b2fc7 100644 --- a/beacon-chain/sync/subscriber_beacon_blocks.go +++ b/beacon-chain/sync/subscriber_beacon_blocks.go @@ -189,12 +189,30 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so ctx, cancel := context.WithTimeout(ctx, secondsPerHalfSlot) defer cancel() + log := log.WithFields(logrus.Fields{ + "root": fmt.Sprintf("%#x", source.Root()), + "slot": source.Slot(), + "proposerIndex": source.ProposerIndex(), + "type": source.Type(), + }) + + var constructedSidecarCount uint64 for iteration := uint64(0); ; /*no stop condition*/ iteration++ { + log = log.WithField("iteration", iteration) + // Exit early if all sidecars to sample have been seen. if s.haveAllSidecarsBeenSeen(source.Slot(), source.ProposerIndex(), columnIndicesToSample) { + if iteration > 0 && constructedSidecarCount == 0 { + log.Debug("No data column sidecars constructed from the execution client") + } + return nil, nil } + if iteration == 0 { + dataColumnsRecoveredFromELAttempts.Inc() + } + // Try to reconstruct data column constructedSidecars from the execution client. constructedSidecars, err := s.cfg.executionReconstructor.ConstructDataColumnSidecars(ctx, source) if err != nil { @@ -202,8 +220,8 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so } // No sidecars are retrieved from the EL, retry later - sidecarCount := uint64(len(constructedSidecars)) - if sidecarCount == 0 { + constructedSidecarCount = uint64(len(constructedSidecars)) + if constructedSidecarCount == 0 { if ctx.Err() != nil { return nil, ctx.Err() } @@ -212,9 +230,11 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so continue } + dataColumnsRecoveredFromELTotal.Inc() + // Boundary check. - if sidecarCount != fieldparams.NumberOfColumns { - return nil, errors.Errorf("reconstruct data column sidecars returned %d sidecars, expected %d - should never happen", sidecarCount, fieldparams.NumberOfColumns) + if constructedSidecarCount != fieldparams.NumberOfColumns { + return nil, errors.Errorf("reconstruct data column sidecars returned %d sidecars, expected %d - should never happen", constructedSidecarCount, fieldparams.NumberOfColumns) } unseenIndices, err := s.broadcastAndReceiveUnseenDataColumnSidecars(ctx, source.Slot(), source.ProposerIndex(), columnIndicesToSample, constructedSidecars) @@ -222,19 +242,12 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so return nil, errors.Wrap(err, "broadcast and receive unseen data column sidecars") } - if len(unseenIndices) > 0 { - dataColumnsRecoveredFromELTotal.Inc() + log.WithFields(logrus.Fields{ + "count": len(unseenIndices), + "indices": helpers.SortedPrettySliceFromMap(unseenIndices), + }).Debug("Constructed data column sidecars from the execution client") - log.WithFields(logrus.Fields{ - "root": fmt.Sprintf("%#x", source.Root()), - "slot": source.Slot(), - "proposerIndex": source.ProposerIndex(), - "iteration": iteration, - "type": source.Type(), - "count": len(unseenIndices), - "indices": helpers.SortedPrettySliceFromMap(unseenIndices), - }).Debug("Constructed data column sidecars from the execution client") - } + dataColumnSidecarsObtainedViaELCount.Observe(float64(len(unseenIndices))) return nil, nil } diff --git a/changelog/manu_reconstruct-metrics.md b/changelog/manu_reconstruct-metrics.md new file mode 100644 index 0000000000..d061e29284 --- /dev/null +++ b/changelog/manu_reconstruct-metrics.md @@ -0,0 +1,7 @@ +### Added +- prometheus histogram `cells_and_proofs_from_structured_computation_milliseconds` to track computation time for cells and proofs from structured blobs. +- prometheus histogram `get_blobs_v2_latency_milliseconds` to track RPC latency for `getBlobsV2` calls to the execution layer. + +### Changed +- Run `ComputeCellsAndProofsFromFlat` in parallel to improve performance when computing cells and proofs. +- Run `ComputeCellsAndProofsFromStructured` in parallel to improve performance when computing cells and proofs.