mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 23:48:06 -05:00
Simplify DA check to avoid blob/block timing inconsistencies (#12882)
* collect unique idxs and avoid races * fix init * remove unused type (lint error) --------- Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com> Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
@@ -16,7 +16,6 @@ import (
|
||||
forkchoicetypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice/types"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
|
||||
"github.com/prysmaticlabs/prysm/v4/config/features"
|
||||
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
|
||||
"github.com/prysmaticlabs/prysm/v4/config/params"
|
||||
consensusblocks "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
|
||||
@@ -552,21 +551,16 @@ func (s *Service) isDataAvailable(ctx context.Context, root [32]byte, signed int
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not get KZG commitments")
|
||||
}
|
||||
existingBlobs := len(kzgCommitments)
|
||||
if existingBlobs == 0 {
|
||||
expected := len(kzgCommitments)
|
||||
if expected == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Read first from db in case we have the blobs
|
||||
s.blobNotifier.Lock()
|
||||
var nc *blobNotifierChan
|
||||
var ok bool
|
||||
nc, ok = s.blobNotifier.chanForRoot[root]
|
||||
sidecars, err := s.cfg.BeaconDB.BlobSidecarsByRoot(ctx, root)
|
||||
if err == nil {
|
||||
if len(sidecars) >= existingBlobs {
|
||||
delete(s.blobNotifier.chanForRoot, root)
|
||||
s.blobNotifier.Unlock()
|
||||
if len(sidecars) >= expected {
|
||||
s.blobNotifiers.delete(root)
|
||||
if err := kzg.IsDataAvailable(kzgCommitments, sidecars); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -574,38 +568,29 @@ func (s *Service) isDataAvailable(ctx context.Context, root [32]byte, signed int
|
||||
return nil
|
||||
}
|
||||
}
|
||||
// Create the channel if it didn't exist already the index map will be
|
||||
// created later anyway
|
||||
if !ok {
|
||||
nc = &blobNotifierChan{channel: make(chan struct{}, fieldparams.MaxBlobsPerBlock)}
|
||||
s.blobNotifier.chanForRoot[root] = nc
|
||||
|
||||
found := map[uint64]struct{}{}
|
||||
for _, sc := range sidecars {
|
||||
found[sc.Index] = struct{}{}
|
||||
}
|
||||
// We have more commitments in the block than blobs in database
|
||||
// We sync the channel indices with the sidecars
|
||||
nc.indices = make(map[uint64]struct{})
|
||||
for _, sidecar := range sidecars {
|
||||
nc.indices[sidecar.Index] = struct{}{}
|
||||
}
|
||||
s.blobNotifier.Unlock()
|
||||
channelWrites := len(sidecars)
|
||||
nc := s.blobNotifiers.forRoot(root)
|
||||
for {
|
||||
select {
|
||||
case <-nc.channel:
|
||||
channelWrites++
|
||||
if channelWrites == existingBlobs {
|
||||
s.blobNotifier.Lock()
|
||||
delete(s.blobNotifier.chanForRoot, root)
|
||||
s.blobNotifier.Unlock()
|
||||
sidecars, err := s.cfg.BeaconDB.BlobSidecarsByRoot(ctx, root)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not get blob sidecars")
|
||||
}
|
||||
if err := kzg.IsDataAvailable(kzgCommitments, sidecars); err != nil {
|
||||
return err
|
||||
}
|
||||
logBlobSidecar(sidecars, t)
|
||||
return nil
|
||||
case idx := <-nc:
|
||||
found[idx] = struct{}{}
|
||||
if len(found) != expected {
|
||||
continue
|
||||
}
|
||||
s.blobNotifiers.delete(root)
|
||||
sidecars, err := s.cfg.BeaconDB.BlobSidecarsByRoot(ctx, root)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not get blob sidecars")
|
||||
}
|
||||
if err := kzg.IsDataAvailable(kzgCommitments, sidecars); err != nil {
|
||||
return err
|
||||
}
|
||||
logBlobSidecar(sidecars, t)
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return errors.Wrap(ctx.Err(), "context deadline waiting for blob sidecars")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user