PeerDAS: Run reconstruction in parallel. (#14236)

* PeerDAS: Run reconstruction in parallel.

* `isDataAvailableDataColumns` --> `isDataColumnsAvailable`

* `isDataColumnsAvailable`: Return `nil` as soon as half of the columns are received.

* Make deepsource happy.
This commit is contained in:
Manu NALEPA
2024-07-18 13:42:07 +02:00
parent 72bc23b0ec
commit 16754d67ff
3 changed files with 75 additions and 30 deletions

View File

@@ -519,19 +519,23 @@ func missingDataColumns(bs *filesystem.BlobStorage, root [32]byte, expected map[
if len(expected) == 0 {
return nil, nil
}
if len(expected) > int(params.BeaconConfig().NumberOfColumns) {
return nil, errMaxDataColumnsExceeded
}
indices, err := bs.ColumnIndices(root)
if err != nil {
return nil, err
}
missing := make(map[uint64]bool, len(expected))
for col := range expected {
if !indices[col] {
missing[col] = true
}
}
return missing, nil
}
@@ -542,7 +546,7 @@ func missingDataColumns(bs *filesystem.BlobStorage, root [32]byte, expected map[
// closed, the context hits cancellation/timeout, or notifications have been received for all the missing sidecars.
func (s *Service) isDataAvailable(ctx context.Context, root [32]byte, signed interfaces.ReadOnlySignedBeaconBlock) error {
if coreTime.PeerDASIsActive(signed.Block().Slot()) {
return s.isDataAvailableDataColumns(ctx, root, signed)
return s.isDataColumnsAvailable(ctx, root, signed)
}
if signed.Version() < version.Deneb {
return nil
@@ -620,7 +624,7 @@ func (s *Service) isDataAvailable(ctx context.Context, root [32]byte, signed int
}
}
func (s *Service) isDataAvailableDataColumns(ctx context.Context, root [32]byte, signed interfaces.ReadOnlySignedBeaconBlock) error {
func (s *Service) isDataColumnsAvailable(ctx context.Context, root [32]byte, signed interfaces.ReadOnlySignedBeaconBlock) error {
if signed.Version() < version.Deneb {
return nil
}
@@ -633,14 +637,17 @@ func (s *Service) isDataAvailableDataColumns(ctx context.Context, root [32]byte,
if !params.WithinDAPeriod(slots.ToEpoch(block.Slot()), slots.ToEpoch(s.CurrentSlot())) {
return nil
}
body := block.Body()
if body == nil {
return errors.New("invalid nil beacon block body")
}
kzgCommitments, err := body.BlobKzgCommitments()
if err != nil {
return errors.Wrap(err, "could not get KZG commitments")
return errors.Wrap(err, "blob KZG commitments")
}
// If block has not commitments there is nothing to wait for.
if len(kzgCommitments) == 0 {
return nil
@@ -648,9 +655,10 @@ func (s *Service) isDataAvailableDataColumns(ctx context.Context, root [32]byte,
colMap, err := peerdas.CustodyColumns(s.cfg.P2P.NodeID(), peerdas.CustodySubnetCount())
if err != nil {
return err
return errors.Wrap(err, "custody columns")
}
// Expected is the number of custodied data columnns a node is expected to have.
// Expected is the number of custody data columnns a node is expected to have.
expected := len(colMap)
if expected == 0 {
return nil
@@ -661,6 +669,20 @@ func (s *Service) isDataAvailableDataColumns(ctx context.Context, root [32]byte,
subscription := s.blobStorage.DataColumnFeed.Subscribe(rootIndexChan)
defer subscription.Unsubscribe()
// Get the count of data columns we already have in the store.
retrievedDataColumns, err := s.blobStorage.ColumnIndices(root)
if err != nil {
return errors.Wrap(err, "column indices")
}
retrievedDataColumnsCount := uint64(len(retrievedDataColumns))
// As soon as we have more than half of the data columns, we can reconstruct the missing ones.
// We don't need to wait for the rest of the data columns to declare the block as available.
if peerdas.CanSelfReconstruct(retrievedDataColumnsCount) {
return nil
}
// Get a map of data column indices that are not currently available.
missing, err := missingDataColumns(s.blobStorage, root, colMap)
if err != nil {
@@ -691,6 +713,7 @@ func (s *Service) isDataAvailableDataColumns(ctx context.Context, root [32]byte,
})
defer nst.Stop()
}
for {
select {
case rootIndex := <-rootIndexChan:
@@ -699,6 +722,17 @@ func (s *Service) isDataAvailableDataColumns(ctx context.Context, root [32]byte,
continue
}
// This is a data column we are expecting.
if _, ok := missing[rootIndex.Index]; ok {
retrievedDataColumnsCount++
}
// As soon as we have more than half of the data columns, we can reconstruct the missing ones.
// We don't need to wait for the rest of the data columns to declare the block as available.
if peerdas.CanSelfReconstruct(retrievedDataColumnsCount) {
return nil
}
// Remove the index from the missing map.
delete(missing, rootIndex.Index)
@@ -796,7 +830,7 @@ func (s *Service) waitForSync() error {
}
}
func (s *Service) handleInvalidExecutionError(ctx context.Context, err error, blockRoot [32]byte, parentRoot [32]byte) error {
func (s *Service) handleInvalidExecutionError(ctx context.Context, err error, blockRoot, parentRoot [32]byte) error {
if IsInvalidBlock(err) && InvalidBlockLVH(err) != [32]byte{} {
return s.pruneInvalidBlock(ctx, blockRoot, parentRoot, InvalidBlockLVH(err))
}

View File

@@ -149,6 +149,7 @@ go_library(
"@com_github_trailofbits_go_mutexasserts//:go_default_library",
"@io_opencensus_go//trace:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",
"@org_golang_x_sync//errgroup:go_default_library",
],
)

View File

@@ -8,6 +8,7 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
kzg "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/kzg"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
@@ -26,6 +27,8 @@ func recoverCellsAndProofs(
columnsCount int,
blockRoot [fieldparams.RootLength]byte,
) ([]kzg.CellsAndProofs, error) {
var wg errgroup.Group
if len(dataColumnSideCars) == 0 {
return nil, errors.New("no data column sidecars")
}
@@ -40,40 +43,47 @@ func recoverCellsAndProofs(
}
}
// Recover cells and compute proofs.
recoveredCellsAndProofs := make([]kzg.CellsAndProofs, 0, blobCount)
// Recover cells and compute proofs in parallel.
recoveredCellsAndProofs := make([]kzg.CellsAndProofs, blobCount)
for blobIndex := 0; blobIndex < blobCount; blobIndex++ {
start := time.Now()
bIndex := blobIndex
wg.Go(func() error {
start := time.Now()
cellsIndices := make([]uint64, 0, columnsCount)
cells := make([]kzg.Cell, 0, columnsCount)
cellsIndices := make([]uint64, 0, columnsCount)
cells := make([]kzg.Cell, 0, columnsCount)
for _, sidecar := range dataColumnSideCars {
// Build the cell indices.
cellsIndices = append(cellsIndices, sidecar.ColumnIndex)
for _, sidecar := range dataColumnSideCars {
// Build the cell indices.
cellsIndices = append(cellsIndices, sidecar.ColumnIndex)
// Get the cell.
column := sidecar.DataColumn
cell := column[blobIndex]
// Get the cell.
column := sidecar.DataColumn
cell := column[bIndex]
cells = append(cells, kzg.Cell(cell))
}
cells = append(cells, kzg.Cell(cell))
}
// Recover the cells and proofs for the corresponding blob
cellsAndProofs, err := kzg.RecoverCellsAndKZGProofs(cellsIndices, cells)
// Recover the cells and proofs for the corresponding blob
cellsAndProofs, err := kzg.RecoverCellsAndKZGProofs(cellsIndices, cells)
if err != nil {
return nil, errors.Wrapf(err, "recover cells and KZG proofs for blob %d", blobIndex)
}
if err != nil {
return errors.Wrapf(err, "recover cells and KZG proofs for blob %d", bIndex)
}
recoveredCellsAndProofs = append(recoveredCellsAndProofs, cellsAndProofs)
recoveredCellsAndProofs[bIndex] = cellsAndProofs
log.WithFields(logrus.Fields{
"elapsed": time.Since(start),
"index": bIndex,
"root": fmt.Sprintf("%x", blockRoot),
}).Debug("Recovered cells and proofs")
return nil
})
}
log.WithFields(logrus.Fields{
"elapsed": time.Since(start),
"index": blobIndex,
"root": fmt.Sprintf("%x", blockRoot),
}).Debug("Recovered cells and proofs")
if err := wg.Wait(); err != nil {
return nil, err
}
return recoveredCellsAndProofs, nil