mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 15:37:56 -05:00
Blob channel (#12753)
* Add a new blob channel * fix mock * reset the channel * keep a map of channels * gazelle * do not overwrite map * remove pre-declaration
This commit is contained in:
@@ -16,6 +16,7 @@ import (
|
||||
forkchoicetypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice/types"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
|
||||
"github.com/prysmaticlabs/prysm/v4/config/features"
|
||||
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
|
||||
"github.com/prysmaticlabs/prysm/v4/config/params"
|
||||
consensusblocks "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
|
||||
@@ -518,11 +519,56 @@ func (s *Service) isDataAvailable(ctx context.Context, root [32]byte, signed int
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not get KZG commitments")
|
||||
}
|
||||
sidecars, err := s.cfg.BeaconDB.BlobSidecarsByRoot(ctx, root)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not get blob sidecars")
|
||||
existingBlobs := len(kzgCommitments)
|
||||
if existingBlobs == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Read first from db in case we have the blobs
|
||||
s.blobNotifier.Lock()
|
||||
var nc *blobNotifierChan
|
||||
var ok bool
|
||||
nc, ok = s.blobNotifier.chanForRoot[root]
|
||||
sidecars, err := s.cfg.BeaconDB.BlobSidecarsByRoot(ctx, root)
|
||||
if err == nil {
|
||||
if len(sidecars) >= existingBlobs {
|
||||
delete(s.blobNotifier.chanForRoot, root)
|
||||
s.blobNotifier.Unlock()
|
||||
return kzg.IsDataAvailable(kzgCommitments, sidecars)
|
||||
}
|
||||
}
|
||||
// Create the channel if it didn't exist already the index map will be
|
||||
// created later anyway
|
||||
if !ok {
|
||||
nc = &blobNotifierChan{channel: make(chan struct{}, fieldparams.MaxBlobsPerBlock)}
|
||||
s.blobNotifier.chanForRoot[root] = nc
|
||||
}
|
||||
// We have more commitments in the block than blobs in database
|
||||
// We sync the channel indices with the sidecars
|
||||
nc.indices = make(map[uint64]struct{})
|
||||
for _, sidecar := range sidecars {
|
||||
nc.indices[sidecar.Index] = struct{}{}
|
||||
}
|
||||
s.blobNotifier.Unlock()
|
||||
channelWrites := len(sidecars)
|
||||
for {
|
||||
select {
|
||||
case <-nc.channel:
|
||||
channelWrites++
|
||||
if channelWrites == existingBlobs {
|
||||
s.blobNotifier.Lock()
|
||||
delete(s.blobNotifier.chanForRoot, root)
|
||||
s.blobNotifier.Unlock()
|
||||
sidecars, err := s.cfg.BeaconDB.BlobSidecarsByRoot(ctx, root)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not get blob sidecars")
|
||||
}
|
||||
return kzg.IsDataAvailable(kzgCommitments, sidecars)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return errors.Wrap(err, "context deadline waiting for blob sidecars")
|
||||
}
|
||||
}
|
||||
return kzg.IsDataAvailable(kzgCommitments, sidecars)
|
||||
}
|
||||
|
||||
// lateBlockTasks is called 4 seconds into the slot and performs tasks
|
||||
|
||||
Reference in New Issue
Block a user