Compare commits

...

15 Commits

Author SHA1 Message Date
Kasey Kirkham
edbf5d11d1 refactor to match updated design 2023-11-02 13:32:55 -05:00
Kasey Kirkham
1148d96313 WIP: changes for 3531 2023-11-01 13:13:11 -05:00
Kasey Kirkham
daf5e9b286 get rid of double referencing in error types 2023-10-30 09:15:51 -05:00
Kasey Kirkham
309a863358 errors.As wants a ref 2023-10-27 17:32:09 -05:00
Kasey Kirkham
15b88ed7f2 drop the cache if the kzg commitment fails 2023-10-27 17:22:16 -05:00
Kasey Kirkham
4b4855d65c return custom missing/mismatch errors; unit tests 2023-10-27 17:10:07 -05:00
Kasey Kirkham
03564ee93b more tests 2023-10-26 19:35:01 -05:00
Kasey Kirkham
96541c2c6c gazelle and test fixes 2023-10-26 18:54:11 -05:00
Kasey Kirkham
0bc55339a7 remove partial saving/checking 2023-10-26 16:44:22 -05:00
Kasey Kirkham
2618852090 lint 2023-10-26 11:07:37 -05:00
Kasey Kirkham
fb3e42c1b3 moving things around and comments 2023-10-26 10:24:03 -05:00
Kasey Kirkham
0e2a9d0d82 simplify 2023-10-26 10:24:03 -05:00
Kasey Kirkham
6573cccf1d significant rewrite for db cache blocking variant 2023-10-26 10:24:03 -05:00
Kasey Kirkham
a89727f38c same cache usable for init and reg sync
the goal isn't for the 2 flavors of sync to share the same cache,
because they have different needs, but for them to follow the same
procedure and share as much code as possible.
2023-10-26 10:24:03 -05:00
Kasey Kirkham
5e1fb15c40 DA check before blob saving for init-sync 2023-10-26 10:24:03 -05:00
23 changed files with 1204 additions and 134 deletions

View File

@@ -49,6 +49,7 @@ go_library(
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/core/time:go_default_library",
"//beacon-chain/core/transition:go_default_library",
"//beacon-chain/das:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/filters:go_default_library",
"//beacon-chain/db/kv:go_default_library",

View File

@@ -2,8 +2,10 @@ package kzg
import (
"fmt"
"strings"
GoKZG "github.com/crate-crypto/go-kzg-4844"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
)
@@ -31,6 +33,61 @@ func IsDataAvailable(commitments [][]byte, sidecars []*ethpb.BlobSidecar) error
return kzgContext.VerifyBlobKZGProofBatch(blobs, cmts, proofs)
}
var ErrKzgProofFailed = errors.New("failed to prove commitment to BlobSidecar Blob data")
type KzgProofError struct {
failed [][48]byte
}
func NewKzgProofError(failed [][48]byte) *KzgProofError {
return &KzgProofError{failed: failed}
}
func (e *KzgProofError) Error() string {
cmts := make([]string, len(e.failed))
for i := range e.failed {
cmts[i] = fmt.Sprintf("%#x", e.failed[i])
}
return fmt.Sprintf("%s: bad commitments=%s", ErrKzgProofFailed.Error(), strings.Join(cmts, ","))
}
func (e *KzgProofError) Failed() [][48]byte {
return e.failed
}
func (e *KzgProofError) Unwrap() error {
return ErrKzgProofFailed
}
// BisectBlobSidecarKzgProofs tries to batch prove the given sidecars against their own specified commitment.
// The caller is responsible for ensuring that the commitments match those specified by the block.
// If the batch fails, it will then try to verify the proofs one-by-one.
// If an error is returned, it will be a custom error of type KzgProofError that provides access
// to the list of commitments that failed.
func BisectBlobSidecarKzgProofs(sidecars []*ethpb.BlobSidecar) error {
if len(sidecars) == 0 {
return nil
}
blobs := make([]GoKZG.Blob, len(sidecars))
cmts := make([]GoKZG.KZGCommitment, len(sidecars))
proofs := make([]GoKZG.KZGProof, len(sidecars))
for i, sidecar := range sidecars {
blobs[i] = bytesToBlob(sidecar.Blob)
cmts[i] = bytesToCommitment(sidecar.KzgCommitment)
proofs[i] = bytesToKZGProof(sidecar.KzgProof)
}
if err := kzgContext.VerifyBlobKZGProofBatch(blobs, cmts, proofs); err == nil {
return nil
}
failed := make([][48]byte, 0, len(blobs))
for i := range blobs {
if err := kzgContext.VerifyBlobKZGProof(blobs[i], cmts[i], proofs[i]); err != nil {
failed = append(failed, cmts[i])
}
}
return NewKzgProofError(failed)
}
func bytesToBlob(blob []byte) (ret GoKZG.Blob) {
copy(ret[:], blob)
return

View File

@@ -13,6 +13,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
coreTime "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db"
forkchoicetypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice/types"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
@@ -162,7 +163,7 @@ func getStateVersionAndPayload(st state.BeaconState) (int, interfaces.ExecutionD
return preStateVersion, preStateHeader, nil
}
func (s *Service) onBlockBatch(ctx context.Context, blks []consensusblocks.ROBlock) error {
func (s *Service) onBlockBatch(ctx context.Context, blks []consensusblocks.ROBlock, avs das.AvailabilityStore) error {
ctx, span := trace.StartSpan(ctx, "blockChain.onBlockBatch")
defer span.End()
@@ -265,7 +266,7 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []consensusblocks.ROBlo
return err
}
}
if err := s.databaseDACheck(ctx, b); err != nil {
if err := avs.IsDataAvailable(ctx, s.CurrentSlot(), b); err != nil {
return errors.Wrap(err, "could not validate blob data availability")
}
args := &forkchoicetypes.BlockAndCheckpoints{Block: b.Block(),
@@ -333,33 +334,6 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []consensusblocks.ROBlo
return s.saveHeadNoDB(ctx, lastB, lastBR, preState, !isValidPayload)
}
func commitmentsToCheck(b consensusblocks.ROBlock, current primitives.Slot) [][]byte {
if b.Version() < version.Deneb {
return nil
}
// We are only required to check within MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS
if !params.WithinDAPeriod(slots.ToEpoch(b.Block().Slot()), slots.ToEpoch(current)) {
return nil
}
kzgCommitments, err := b.Block().Body().BlobKzgCommitments()
if err != nil {
return nil
}
return kzgCommitments
}
func (s *Service) databaseDACheck(ctx context.Context, b consensusblocks.ROBlock) error {
commitments := commitmentsToCheck(b, s.CurrentSlot())
if len(commitments) == 0 {
return nil
}
sidecars, err := s.cfg.BeaconDB.BlobSidecarsByRoot(ctx, b.Root())
if err != nil {
return errors.Wrap(err, "could not get blob sidecars")
}
return kzg.IsDataAvailable(commitments, sidecars)
}
func (s *Service) updateEpochBoundaryCaches(ctx context.Context, st state.BeaconState) error {
e := coreTime.CurrentEpoch(st)
if err := helpers.UpdateCommitteeCache(ctx, st, e); err != nil {

View File

@@ -1,7 +1,6 @@
package blockchain
import (
"bytes"
"context"
"fmt"
"math/big"
@@ -17,6 +16,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/signing"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db"
testDB "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/execution"
@@ -39,7 +39,6 @@ import (
"github.com/prysmaticlabs/prysm/v4/testing/require"
"github.com/prysmaticlabs/prysm/v4/testing/util"
prysmTime "github.com/prysmaticlabs/prysm/v4/time"
"github.com/prysmaticlabs/prysm/v4/time/slots"
logTest "github.com/sirupsen/logrus/hooks/test"
)
@@ -68,7 +67,7 @@ func TestStore_OnBlockBatch(t *testing.T) {
require.NoError(t, err)
blks = append(blks, rwsb)
}
err := service.onBlockBatch(ctx, blks)
err := service.onBlockBatch(ctx, blks, &das.MockAvailabilityStore{})
require.NoError(t, err)
jcp := service.CurrentJustifiedCheckpt()
jroot := bytesutil.ToBytes32(jcp.Root)
@@ -98,7 +97,7 @@ func TestStore_OnBlockBatch_NotifyNewPayload(t *testing.T) {
require.NoError(t, service.saveInitSyncBlock(ctx, rwsb.Root(), wsb))
blks = append(blks, rwsb)
}
require.NoError(t, service.onBlockBatch(ctx, blks))
require.NoError(t, service.onBlockBatch(ctx, blks, &das.MockAvailabilityStore{}))
}
func TestCachedPreState_CanGetFromStateSummary(t *testing.T) {
@@ -1945,7 +1944,7 @@ func TestNoViableHead_Reboot(t *testing.T) {
rwsb, err := consensusblocks.NewROBlock(wsb)
require.NoError(t, err)
// We use onBlockBatch here because the valid chain is missing in forkchoice
require.NoError(t, service.onBlockBatch(ctx, []consensusblocks.ROBlock{rwsb}))
require.NoError(t, service.onBlockBatch(ctx, []consensusblocks.ROBlock{rwsb}, &das.MockAvailabilityStore{}))
// Check that the head is now VALID and the node is not optimistic
require.Equal(t, genesisRoot, service.ensureRootNotZeros(service.cfg.ForkChoiceStore.CachedHeadRoot()))
headRoot, err = service.HeadRoot(ctx)
@@ -2047,71 +2046,3 @@ func driftGenesisTime(s *Service, slot, delay int64) {
offset := slot*int64(params.BeaconConfig().SecondsPerSlot) - delay
s.SetGenesisTime(time.Unix(time.Now().Unix()-offset, 0))
}
func Test_commitmentsToCheck(t *testing.T) {
windowSlots, err := slots.EpochEnd(params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest)
require.NoError(t, err)
commits := [][]byte{
bytesutil.PadTo([]byte("a"), 48),
bytesutil.PadTo([]byte("b"), 48),
bytesutil.PadTo([]byte("c"), 48),
bytesutil.PadTo([]byte("d"), 48),
}
cases := []struct {
name string
commits [][]byte
block func(*testing.T) consensusblocks.ROBlock
slot primitives.Slot
}{
{
name: "pre deneb",
block: func(t *testing.T) consensusblocks.ROBlock {
bb := util.NewBeaconBlockBellatrix()
sb, err := consensusblocks.NewSignedBeaconBlock(bb)
require.NoError(t, err)
rb, err := consensusblocks.NewROBlock(sb)
require.NoError(t, err)
return rb
},
},
{
name: "commitments within da",
block: func(t *testing.T) consensusblocks.ROBlock {
d := util.NewBeaconBlockDeneb()
d.Block.Body.BlobKzgCommitments = commits
d.Block.Slot = 100
sb, err := consensusblocks.NewSignedBeaconBlock(d)
require.NoError(t, err)
rb, err := consensusblocks.NewROBlock(sb)
require.NoError(t, err)
return rb
},
commits: commits,
slot: 100,
},
{
name: "commitments outside da",
block: func(t *testing.T) consensusblocks.ROBlock {
d := util.NewBeaconBlockDeneb()
// block is from slot 0, "current slot" is window size +1 (so outside the window)
d.Block.Body.BlobKzgCommitments = commits
sb, err := consensusblocks.NewSignedBeaconBlock(d)
require.NoError(t, err)
rb, err := consensusblocks.NewROBlock(sb)
require.NoError(t, err)
return rb
},
slot: windowSlots + 1,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
b := c.block(t)
co := commitmentsToCheck(b, c.slot)
require.Equal(t, len(c.commits), len(co))
for i := 0; i < len(c.commits); i++ {
require.Equal(t, true, bytes.Equal(c.commits[i], co[i]))
}
})
}
}

View File

@@ -12,6 +12,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
coreTime "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v4/config/features"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
@@ -34,7 +35,7 @@ var epochsSinceFinalitySaveHotStateDB = primitives.Epoch(100)
// BlockReceiver interface defines the methods of chain service for receiving and processing new blocks.
type BlockReceiver interface {
ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte) error
ReceiveBlockBatch(ctx context.Context, blocks []blocks.ROBlock) error
ReceiveBlockBatch(ctx context.Context, blocks []blocks.ROBlock, avs das.AvailabilityStore) error
HasBlock(ctx context.Context, root [32]byte) bool
RecentBlockSlot(root [32]byte) (primitives.Slot, error)
BlockBeingSynced([32]byte) bool
@@ -191,7 +192,7 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig
// ReceiveBlockBatch processes the whole block batch at once, assuming the block batch is linear ,transitioning
// the state, performing batch verification of all collected signatures and then performing the appropriate
// actions for a block post-transition.
func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []blocks.ROBlock) error {
func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []blocks.ROBlock, avs das.AvailabilityStore) error {
ctx, span := trace.StartSpan(ctx, "blockChain.ReceiveBlockBatch")
defer span.End()
@@ -199,7 +200,7 @@ func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []blocks.ROBlock
defer s.cfg.ForkChoiceStore.Unlock()
// Apply state transition on the incoming newly received block batches, one by one.
if err := s.onBlockBatch(ctx, blocks); err != nil {
if err := s.onBlockBatch(ctx, blocks, avs); err != nil {
err := errors.Wrap(err, "could not process block in batch")
tracing.AnnotateError(span, err)
return err

View File

@@ -7,6 +7,7 @@ import (
"time"
blockchainTesting "github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/voluntaryexits"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
@@ -240,7 +241,7 @@ func TestService_ReceiveBlockBatch(t *testing.T) {
require.NoError(t, err)
rwsb, err := blocks.NewROBlock(wsb)
require.NoError(t, err)
err = s.ReceiveBlockBatch(ctx, []blocks.ROBlock{rwsb})
err = s.ReceiveBlockBatch(ctx, []blocks.ROBlock{rwsb}, &das.MockAvailabilityStore{})
if tt.wantedErr != "" {
assert.ErrorContains(t, tt.wantedErr, err)
} else {

View File

@@ -17,6 +17,7 @@ go_library(
"//beacon-chain/core/feed/operation:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/das:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/forkchoice:go_default_library",
"//beacon-chain/state:go_default_library",

View File

@@ -16,6 +16,7 @@ import (
opfeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/operation"
statefeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
@@ -207,7 +208,7 @@ func (s *ChainService) ReceiveBlockInitialSync(ctx context.Context, block interf
}
// ReceiveBlockBatch processes blocks in batches from initial-sync.
func (s *ChainService) ReceiveBlockBatch(ctx context.Context, blks []blocks.ROBlock) error {
func (s *ChainService) ReceiveBlockBatch(ctx context.Context, blks []blocks.ROBlock, avs das.AvailabilityStore) error {
if s.State == nil {
return ErrNilState
}

View File

@@ -0,0 +1,50 @@
load("@prysm//tools/go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"availability.go",
"blocking.go",
"cache.go",
"error.go",
"iface.go",
"mock.go",
],
importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/das",
visibility = ["//visibility:public"],
deps = [
"//beacon-chain/blockchain/kzg:go_default_library",
"//beacon-chain/db:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/primitives:go_default_library",
"//encoding/bytesutil:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//runtime/version:go_default_library",
"//time/slots:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = [
"availability_test.go",
"cache_test.go",
],
embed = [":go_default_library"],
deps = [
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/primitives:go_default_library",
"//encoding/bytesutil:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",
"//time/slots:go_default_library",
"@com_github_pkg_errors//:go_default_library",
],
)

View File

@@ -0,0 +1,158 @@
package das
import (
"context"
errors "github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain/kzg"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/runtime/version"
"github.com/prysmaticlabs/prysm/v4/time/slots"
log "github.com/sirupsen/logrus"
)
type kzgBatch func([][]byte, []*ethpb.BlobSidecar) error
type LazilyPersistentStore struct {
db BlobsDB
cache *cache
verifyKZG kzgBatch
}
var _ AvailabilityStore = &LazilyPersistentStore{}
func NewLazilyPersistentStore(db BlobsDB) *LazilyPersistentStore {
return &LazilyPersistentStore{
db: db,
cache: newCache(),
verifyKZG: kzg.IsDataAvailable,
}
}
// PersistOnceCommitted adds blobs to the working blob cache (in-memory or disk backed is an implementation
// detail). Blobs stored in this cache will be persisted for at least as long as the node is
// running. Once IsDataAvailable succeeds, all blobs referenced by the given block are guaranteed
// to be persisted for the remainder of the retention period.
func (s *LazilyPersistentStore) Persist(ctx context.Context, current primitives.Slot, sc ...*ethpb.BlobSidecar) ([]*ethpb.BlobSidecar, error) {
if len(sc) == 0 {
return nil, nil
}
if len(sc) == 1 {
}
var key cacheKey
var entry *cacheEntry
persisted := make([]*ethpb.BlobSidecar, 0, len(sc))
for i := range sc {
if !params.WithinDAPeriod(slots.ToEpoch(sc[i].Slot), slots.ToEpoch(current)) {
continue
}
if sc[i].Index >= fieldparams.MaxBlobsPerBlock {
log.WithField("block_root", sc[i].BlockRoot).WithField("index", sc[i].Index).Error("discarding BlobSidecar with index >= MaxBlobsPerBlock")
continue
}
skey := keyFromSidecar(sc[i])
if key != skey {
key = skey
entry = s.cache.ensure(key)
}
if entry.stash(sc[i]) {
persisted = append(persisted, sc[i])
}
}
return persisted, nil
}
// IsDataAvailable returns nil if all the commitments in the given block are persisted to the db and have been verified.
// - BlobSidecars already in the db are assumed to have been previously verified against the block.
// - BlobSidecars waiting for verification in the cache will be persisted to the db after verification.
// - When BlobSidecars are written to the db, their cache entries are cleared.
// - BlobSidecar cache entries with commitments that do not match the block will be evicted.
// - BlobSidecar cachee entries with commitments that fail proof verification will be evicted.
func (s *LazilyPersistentStore) IsDataAvailable(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error {
blockCommitments := commitmentsToCheck(b, current)
if len(blockCommitments) == 0 {
return nil
}
key := keyFromBlock(b)
entry := s.cache.ensure(key)
// holding the lock over the course of the DA check simplifies everything
entry.Lock()
defer entry.Unlock()
if err := s.daCheck(ctx, b.Root(), blockCommitments, entry); err != nil {
return err
}
// If there is no error, DA has been successful, so we can clean up the cache.
s.cache.delete(key)
return nil
}
func (s *LazilyPersistentStore) daCheck(ctx context.Context, root [32]byte, blockCommitments [][]byte, entry *cacheEntry) error {
sidecars, cacheErr := entry.filter(root, blockCommitments)
if cacheErr == nil {
if err := s.verifyKZG(blockCommitments, sidecars); err != nil {
s.cache.delete(keyFromSidecar(sidecars[0]))
return err
}
// We have all the committed sidecars in cache, and they all have valid proofs.
// If flushing them to backing storage succeeds, then we can confirm DA.
return s.db.SaveBlobSidecar(ctx, sidecars)
}
// Before returning the cache error, check if we have the data in the db.
dbidx, err := s.persisted(ctx, root, entry)
// persisted() accounts for db.ErrNotFound, so this is a real database error.
if err != nil {
return err
}
notInDb, err := dbidx.missing(blockCommitments)
// This is a database integrity sanity check - it should never fail.
if err != nil {
return err
}
// All commitments were found in the db, due to a previous successful DA check.
if len(notInDb) == 0 {
return nil
}
return cacheErr
}
// persisted populate the db cache, which contains a mapping from Index->KzgCommitment for BlobSidecars previously verified
// (proof verification) and saved to the backend.
func (s *LazilyPersistentStore) persisted(ctx context.Context, root [32]byte, entry *cacheEntry) (dbidx, error) {
if entry.dbidxInitialized() {
return entry.dbidx(), nil
}
sidecars, err := s.db.BlobSidecarsByRoot(ctx, root)
if err != nil {
if errors.Is(err, db.ErrNotFound) {
// No BlobSidecars, initialize with empty idx.
return entry.ensureDbidx(), nil
}
return entry.dbidx(), err
}
// Ensure all sidecars found in the db are represented in the cache and return the cache value.
return entry.ensureDbidx(sidecars...), nil
}
func commitmentsToCheck(b blocks.ROBlock, current primitives.Slot) [][]byte {
if b.Version() < version.Deneb {
return nil
}
// We are only required to check within MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS
if !params.WithinDAPeriod(slots.ToEpoch(b.Block().Slot()), slots.ToEpoch(current)) {
return nil
}
kzgCommitments, err := b.Block().Body().BlobKzgCommitments()
if err != nil {
return nil
}
return kzgCommitments
}

View File

@@ -0,0 +1,318 @@
package das
import (
"bytes"
"context"
"testing"
errors "github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/testing/require"
"github.com/prysmaticlabs/prysm/v4/testing/util"
"github.com/prysmaticlabs/prysm/v4/time/slots"
)
func Test_commitmentsToCheck(t *testing.T) {
windowSlots, err := slots.EpochEnd(params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest)
require.NoError(t, err)
commits := [][]byte{
bytesutil.PadTo([]byte("a"), 48),
bytesutil.PadTo([]byte("b"), 48),
bytesutil.PadTo([]byte("c"), 48),
bytesutil.PadTo([]byte("d"), 48),
}
cases := []struct {
name string
commits [][]byte
block func(*testing.T) blocks.ROBlock
slot primitives.Slot
}{
{
name: "pre deneb",
block: func(t *testing.T) blocks.ROBlock {
bb := util.NewBeaconBlockBellatrix()
sb, err := blocks.NewSignedBeaconBlock(bb)
require.NoError(t, err)
rb, err := blocks.NewROBlock(sb)
require.NoError(t, err)
return rb
},
},
{
name: "commitments within da",
block: func(t *testing.T) blocks.ROBlock {
d := util.NewBeaconBlockDeneb()
d.Block.Body.BlobKzgCommitments = commits
d.Block.Slot = 100
sb, err := blocks.NewSignedBeaconBlock(d)
require.NoError(t, err)
rb, err := blocks.NewROBlock(sb)
require.NoError(t, err)
return rb
},
commits: commits,
slot: 100,
},
{
name: "commitments outside da",
block: func(t *testing.T) blocks.ROBlock {
d := util.NewBeaconBlockDeneb()
// block is from slot 0, "current slot" is window size +1 (so outside the window)
d.Block.Body.BlobKzgCommitments = commits
sb, err := blocks.NewSignedBeaconBlock(d)
require.NoError(t, err)
rb, err := blocks.NewROBlock(sb)
require.NoError(t, err)
return rb
},
slot: windowSlots + 1,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
b := c.block(t)
co := commitmentsToCheck(b, c.slot)
require.Equal(t, len(c.commits), len(co))
for i := 0; i < len(c.commits); i++ {
require.Equal(t, true, bytes.Equal(c.commits[i], co[i]))
}
})
}
}
func daAlwaysSucceeds(_ [][]byte, _ []*ethpb.BlobSidecar) error {
return nil
}
type mockDA struct {
t *testing.T
cmts [][]byte
scs []*ethpb.BlobSidecar
err error
}
func (m *mockDA) expectedArguments(gotC [][]byte, gotSC []*ethpb.BlobSidecar) error {
require.Equal(m.t, len(m.cmts), len(gotC))
require.Equal(m.t, len(gotSC), len(m.scs))
for i := range m.cmts {
require.Equal(m.t, true, bytes.Equal(m.cmts[i], gotC[i]))
}
for i := range m.scs {
require.Equal(m.t, m.scs[i], gotSC[i])
}
return m.err
}
func TestLazilyPersistent_Missing(t *testing.T) {
ctx := context.Background()
db := &mockBlobsDB{}
as := NewLazilyPersistentStore(db)
blk, scs := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 1, 3)
expectedCommitments, err := blk.Block().Body().BlobKzgCommitments()
require.NoError(t, err)
vf := &mockDA{
t: t,
cmts: expectedCommitments,
scs: scs,
}
as.verifyKZG = vf.expectedArguments
// Only one commitment persisted, should return error with other indices
_, err = as.Persist(ctx, 1, scs[2])
require.NoError(t, err)
err = as.IsDataAvailable(ctx, 1, blk)
require.NotNil(t, err)
missingErr := MissingIndicesError{}
require.Equal(t, true, errors.As(err, &missingErr))
require.DeepEqual(t, []uint64{0, 1}, missingErr.Missing())
// All but one persisted, return missing idx
_, err = as.Persist(ctx, 1, scs[0])
require.NoError(t, err)
err = as.IsDataAvailable(ctx, 1, blk)
require.NotNil(t, err)
require.Equal(t, true, errors.As(err, &missingErr))
require.DeepEqual(t, []uint64{1}, missingErr.Missing())
// All persisted, return nil
_, err = as.Persist(ctx, 1, scs[1])
require.NoError(t, err)
require.NoError(t, as.IsDataAvailable(ctx, 1, blk))
}
func TestLazilyPersistent_Mismatch(t *testing.T) {
ctx := context.Background()
db := &mockBlobsDB{}
as := NewLazilyPersistentStore(db)
blk, scs := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 1, 3)
vf := &mockDA{
t: t,
err: errors.New("kzg check should not run"),
}
as.verifyKZG = vf.expectedArguments
scs[0].KzgCommitment = bytesutil.PadTo([]byte("nope"), 48)
// Only one commitment persisted, should return error with other indices
_, err := as.Persist(ctx, 1, scs[0])
require.NoError(t, err)
err = as.IsDataAvailable(ctx, 1, blk)
require.NotNil(t, err)
mismatchErr := CommitmentMismatchError{}
require.Equal(t, true, errors.As(err, &mismatchErr))
require.DeepEqual(t, []uint64{0}, mismatchErr.Mismatch())
// the next time we call the DA check, the mismatched commitment should be evicted
err = as.IsDataAvailable(ctx, 1, blk)
require.NotNil(t, err)
missingErr := MissingIndicesError{}
require.Equal(t, true, errors.As(err, &missingErr))
require.DeepEqual(t, []uint64{0, 1, 2}, missingErr.Missing())
}
func TestPersisted(t *testing.T) {
ctx := context.Background()
blk, scs := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 1, 3)
db := &mockBlobsDB{
BlobSidecarsByRootCallback: func(ctx context.Context, beaconBlockRoot [32]byte, indices ...uint64) ([]*ethpb.BlobSidecar, error) {
return scs, nil
},
}
vf := &mockDA{
t: t,
err: errors.New("kzg check should not run"),
}
as := NewLazilyPersistentStore(db)
as.verifyKZG = vf.expectedArguments
entry := &cacheEntry{}
dbidx, err := as.persisted(ctx, blk.Root(), entry)
require.NoError(t, err)
require.Equal(t, false, dbidx[0] == nil)
for i := range scs {
require.Equal(t, *dbidx[i], bytesutil.ToBytes48(scs[i].KzgCommitment))
}
expectedCommitments, err := blk.Block().Body().BlobKzgCommitments()
require.NoError(t, err)
missing, err := dbidx.missing(expectedCommitments)
require.NoError(t, err)
require.Equal(t, 0, len(missing))
// test that the caching is working by returning the wrong set of sidecars
// and making sure that dbidx still thinks none are missing
db = &mockBlobsDB{
BlobSidecarsByRootCallback: func(ctx context.Context, beaconBlockRoot [32]byte, indices ...uint64) ([]*ethpb.BlobSidecar, error) {
return scs[1:], nil
},
}
as = NewLazilyPersistentStore(db)
// note, using the same entry value
dbidx, err = as.persisted(ctx, blk.Root(), entry)
require.NoError(t, err)
// same assertions should pass as when all sidecars returned by db
missing, err = dbidx.missing(expectedCommitments)
require.NoError(t, err)
require.Equal(t, 0, len(missing))
// do it again, but with a fresh cache entry - we should see a missing sidecar
newEntry := &cacheEntry{}
dbidx, err = as.persisted(ctx, blk.Root(), newEntry)
require.NoError(t, err)
missing, err = dbidx.missing(expectedCommitments)
require.NoError(t, err)
require.Equal(t, 1, len(missing))
// only element in missing should be the zero index
require.Equal(t, uint64(0), missing[0])
}
func TestLazilyPersistent_DBFallback(t *testing.T) {
ctx := context.Background()
blk, scs := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 1, 3)
// Generate the same sidecars index 0 so we can mess with its commitment
_, scscp := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 1, 1)
db := &mockBlobsDB{
BlobSidecarsByRootCallback: func(ctx context.Context, beaconBlockRoot [32]byte, indices ...uint64) ([]*ethpb.BlobSidecar, error) {
return scs, nil
},
}
vf := &mockDA{
t: t,
err: errors.New("kzg check should not run"),
}
as := NewLazilyPersistentStore(db)
as.verifyKZG = vf.expectedArguments
// Set up the mismatched commit, but we don't expect this to error because
// the db contains the sidecars.
scscp[0].BlockRoot = scs[0].BlockRoot
scscp[0].KzgCommitment = bytesutil.PadTo([]byte("nope"), 48)
_, err := as.Persist(ctx, 1, scscp[0])
require.NoError(t, err)
// This should pass since the db is giving us all the right sidecars
require.NoError(t, as.IsDataAvailable(ctx, 1, blk))
// now using an empty db, we should fail
as.db = &mockBlobsDB{}
// but we should have pruned, so we'll get a missing error, not mismatch
err = as.IsDataAvailable(ctx, 1, blk)
require.NotNil(t, err)
missingErr := MissingIndicesError{}
require.Equal(t, true, errors.As(err, &missingErr))
require.DeepEqual(t, []uint64{0, 1, 2}, missingErr.Missing())
// put the bad value back in the cache
persisted, err := as.Persist(ctx, 1, scscp[0])
require.NoError(t, err)
require.Equal(t, 1, len(persisted))
// now we'll get a mismatch error
err = as.IsDataAvailable(ctx, 1, blk)
require.NotNil(t, err)
mismatchErr := CommitmentMismatchError{}
require.Equal(t, true, errors.As(err, &mismatchErr))
require.DeepEqual(t, []uint64{0}, mismatchErr.Mismatch())
}
func TestLazyPersistOnceCommitted(t *testing.T) {
ctx := context.Background()
_, scs := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 1, 6)
as := NewLazilyPersistentStore(&mockBlobsDB{})
// stashes as expected
p, err := as.Persist(ctx, 1, scs...)
require.NoError(t, err)
require.Equal(t, 6, len(p))
// ignores duplicates
p, err = as.Persist(ctx, 1, scs...)
require.NoError(t, err)
require.Equal(t, 0, len(p))
// ignores index out of bound
scs[0].Index = 6
p, err = as.Persist(ctx, 1, scs[0])
require.NoError(t, err)
require.Equal(t, 0, len(p))
_, more := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 1, 4)
// ignores sidecars before the retention period
slotOOB, err := slots.EpochStart(params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest)
require.NoError(t, err)
p, err = as.Persist(ctx, 32+slotOOB, more[0])
require.NoError(t, err)
require.Equal(t, 0, len(p))
// doesn't ignore new sidecars with a different block root
p, err = as.Persist(ctx, 1, more...)
require.NoError(t, err)
require.Equal(t, 4, len(p))
}

View File

@@ -0,0 +1,93 @@
package das
import (
"context"
"sync"
errors "github.com/pkg/errors"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
)
// AsyncStore wraps NewCachingDBVerifiedStore and blocks until IsDataAvailable is ready.
// If the context given to IsDataAvailable is cancelled, the result of IsDataAvailable will be ctx.Error().
type AsyncStore struct {
notif *idxNotifiers
s AvailabilityStore
}
func (bs *AsyncStore) PersistOnceCommitted(ctx context.Context, current primitives.Slot, sc ...*ethpb.BlobSidecar) ([]*ethpb.BlobSidecar, error) {
if len(sc) < 1 {
return nil, nil
}
seen := bs.notif.ensure(keyFromSidecar(sc[0]))
persisted, err := bs.s.Persist(ctx, current, sc...)
if err != nil {
return nil, err
}
for i := range persisted {
seen <- persisted[i].Index
}
return persisted, nil
}
func (bs *AsyncStore) IsDataAvailable(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error {
key := keyFromBlock(b)
for {
err := bs.s.IsDataAvailable(ctx, current, b)
if err == nil {
return nil
}
mie := MissingIndicesError{}
if !errors.As(err, &mie) {
return err
}
waitFor := make(map[uint64]struct{})
for _, m := range mie.Missing() {
waitFor[m] = struct{}{}
}
if err := waitForIndices(ctx, bs.notif.ensure(key), waitFor); err != nil {
return err
}
bs.notif.reset(key)
}
}
func waitForIndices(ctx context.Context, idxSeen chan uint64, waitFor map[uint64]struct{}) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case idx := <-idxSeen:
delete(waitFor, idx)
if len(waitFor) == 0 {
return nil
}
continue
}
}
}
type idxNotifiers struct {
sync.RWMutex
entries map[cacheKey]chan uint64
}
func (c *idxNotifiers) ensure(key cacheKey) chan uint64 {
c.Lock()
defer c.Unlock()
e, ok := c.entries[key]
if !ok {
e = make(chan uint64, fieldparams.MaxBlobsPerBlock)
c.entries[key] = e
}
return e
}
func (c *idxNotifiers) reset(key cacheKey) {
c.Lock()
defer c.Unlock()
delete(c.entries, key)
}

184
beacon-chain/das/cache.go Normal file
View File

@@ -0,0 +1,184 @@
package das
import (
"bytes"
"fmt"
"sync"
errors "github.com/pkg/errors"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
log "github.com/sirupsen/logrus"
)
var (
errDBCommitmentMismatch = errors.New("blob/block commitment mismatch")
)
// cacheKey includes the slot so that we can easily iterate through the cache and compare
// slots for eviction purposes. Whether the input is the block or the sidecar, we always have
// the root+slot when interacting with the cache, so it isn't an inconvenience to use both.
type cacheKey struct {
slot primitives.Slot
root [32]byte
}
type cache struct {
sync.RWMutex
entries map[cacheKey]*cacheEntry
}
func newCache() *cache {
return &cache{entries: make(map[cacheKey]*cacheEntry)}
}
// keyFromSidecar is a convenience method for constructing a cacheKey from a BlobSidecar value.
func keyFromSidecar(sc *ethpb.BlobSidecar) cacheKey {
return cacheKey{slot: sc.Slot, root: bytesutil.ToBytes32(sc.BlockRoot)}
}
// keyFromBlock is a convenience method for constructing a cacheKey from a ROBlock value.
func keyFromBlock(b blocks.ROBlock) cacheKey {
return cacheKey{slot: b.Block().Slot(), root: b.Root()}
}
// ensure returns the entry for the given key, creating it if it isn't already present.
func (c *cache) ensure(key cacheKey) *cacheEntry {
c.Lock()
defer c.Unlock()
e, ok := c.entries[key]
if !ok {
e = &cacheEntry{}
c.entries[key] = e
}
return e
}
// delete removes the cache entry from the cache.
func (c *cache) delete(key cacheKey) {
c.Lock()
defer c.Unlock()
delete(c.entries, key)
}
// dbidx is a compact representation of the set of BlobSidecars in the database for a given root,
// organized as a map from BlobSidecar.Index->BlobSidecar.KzgCommitment.
// This representation is convenient for comparison to a block's commitments.
type dbidx [fieldparams.MaxBlobsPerBlock]*[48]byte
// missing compares the set of BlobSidecars observed in the backing store to the set of commitments
// observed in a block - cmts is the BlobKzgCommitments field from a block.
func (idx dbidx) missing(cmts [][]byte) ([]uint64, error) {
m := make([]uint64, 0, len(cmts))
for i := range cmts {
if idx[i] == nil {
m = append(m, uint64(i))
continue
}
c := *idx[i]
if c != bytesutil.ToBytes48(cmts[i]) {
return nil, errors.Wrapf(errDBCommitmentMismatch,
"index=%d, db=%#x, block=%#x", i, c, cmts[i])
}
}
return m, nil
}
// cacheEntry represents 2 different types of caches for a given block.
// scs is a fixed-length cache of BlobSidecars.
// dbx is a compact representation of BlobSidecars observed in the backing store.
// dbx assumes that all writes to the backing store go through the same cache.
type cacheEntry struct {
sync.RWMutex
scs [fieldparams.MaxBlobsPerBlock]*ethpb.BlobSidecar
dbx dbidx
dbRead bool
}
// stash adds an item to the in-memory cache of BlobSidecars.
// Only the first BlobSidecar of a given Index will be kept in the cache.
// The return value represents whether the given BlobSidecar was stashed.
// A false value means there was already a BlobSidecar with the given Index.
func (e *cacheEntry) stash(sc *ethpb.BlobSidecar) bool {
if e.scs[sc.Index] == nil {
e.scs[sc.Index] = sc
return true
}
return false
}
func (e *cacheEntry) dbidx() dbidx {
return e.dbx
}
func (e *cacheEntry) dbidxInitialized() bool {
return e.dbRead
}
// filter evicts sidecars that are not commited to by the block and returns custom
// errors if the cache is missing any of the commitments, or if the commitments in
// the cache do not match those found in the block. If err is nil, then all expected
// commitments were found in the cache and the sidecar slice return value can be used
// to perform a DA check against the cached sidecars.
func (e *cacheEntry) filter(root [32]byte, blkCmts [][]byte) ([]*ethpb.BlobSidecar, error) {
// Evict any blobs that are out of range.
for i := len(blkCmts); i < fieldparams.MaxBlobsPerBlock; i++ {
if e.scs[i] == nil {
continue
}
log.WithField("block_root", root).
WithField("index", i).
WithField("cached_commitment", fmt.Sprintf("%#x", e.scs[i].KzgCommitment)).
Warn("Evicting BlobSidecar with index > maximum blob commitment")
e.scs[i] = nil
}
// Generate a MissingIndicesError for any missing indices.
// Generate a CommitmentMismatchError for any mismatched commitments.
missing := make([]uint64, 0, len(blkCmts))
mismatch := make([]uint64, 0, len(blkCmts))
for i := range blkCmts {
if e.scs[i] == nil {
missing = append(missing, uint64(i))
continue
}
if !bytes.Equal(blkCmts[i], e.scs[i].KzgCommitment) {
mismatch = append(mismatch, uint64(i))
log.WithField("block_root", root).
WithField("index", i).
WithField("expected_commitment", fmt.Sprintf("%#x", blkCmts[i])).
WithField("cached_commitment", fmt.Sprintf("%#x", e.scs[i].KzgCommitment)).
Error("Evicting BlobSidecar with incorrect commitment")
e.scs[i] = nil
continue
}
}
if len(mismatch) > 0 {
return nil, NewCommitmentMismatchError(mismatch)
}
if len(missing) > 0 {
return nil, NewMissingIndicesError(missing)
}
return e.scs[0:len(blkCmts)], nil
}
// ensureDbidx updates the db cache representation to include the given BlobSidecars.
func (e *cacheEntry) ensureDbidx(scs ...*ethpb.BlobSidecar) dbidx {
if e.dbRead == false {
e.dbRead = true
}
for i := range scs {
if scs[i].Index >= fieldparams.MaxBlobsPerBlock {
continue
}
// Don't overwrite.
if e.dbx[scs[i].Index] != nil {
continue
}
c := bytesutil.ToBytes48(scs[i].KzgCommitment)
e.dbx[scs[i].Index] = &c
}
return e.dbx
}

View File

@@ -0,0 +1,122 @@
package das
import (
"testing"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/testing/require"
)
func TestCacheEnsureDelete(t *testing.T) {
c := newCache()
require.Equal(t, 0, len(c.entries))
root := bytesutil.ToBytes32([]byte("root"))
slot := primitives.Slot(1234)
k := cacheKey{root: root, slot: slot}
entry := c.ensure(k)
require.Equal(t, 1, len(c.entries))
require.Equal(t, c.entries[k], entry)
c.delete(k)
require.Equal(t, 0, len(c.entries))
var nilEntry *cacheEntry
require.Equal(t, nilEntry, c.entries[k])
}
func TestNewEntry(t *testing.T) {
entry := &cacheEntry{}
require.Equal(t, false, entry.dbidxInitialized())
entry.ensureDbidx()
require.Equal(t, true, entry.dbidxInitialized())
}
func TestDbidxBounds(t *testing.T) {
scs := generateMinimalBlobSidecars(2)
entry := &cacheEntry{}
entry.ensureDbidx(scs...)
//require.Equal(t, 2, len(entry.dbidx()))
for i := range scs {
require.Equal(t, bytesutil.ToBytes48(scs[i].KzgCommitment), *entry.dbidx()[i])
}
var nilPtr *[48]byte
// test that duplicate sidecars are ignored
orig := entry.dbidx()
copy(scs[0].KzgCommitment[0:4], []byte("derp"))
edited := bytesutil.ToBytes48(scs[0].KzgCommitment)
require.Equal(t, false, *entry.dbidx()[0] == edited)
entry.ensureDbidx(scs[0])
for i := 2; i < fieldparams.MaxBlobsPerBlock; i++ {
require.Equal(t, entry.dbidx()[i], nilPtr)
}
require.Equal(t, entry.dbidx(), orig)
// test that excess sidecars are discarded
oob := generateMinimalBlobSidecars(fieldparams.MaxBlobsPerBlock + 1)
entry = &cacheEntry{}
entry.ensureDbidx(oob...)
require.Equal(t, fieldparams.MaxBlobsPerBlock, len(entry.dbidx()))
}
func TestDbidxMissing(t *testing.T) {
scs := generateMinimalBlobSidecars(6)
missing := []uint64{0, 1, 2, 3, 4, 5}
blockCommits := commitsForSidecars(scs)
cases := []struct {
name string
nMissing int
err error
}{
{
name: "all missing",
nMissing: len(scs),
},
{
name: "none missing",
nMissing: 0,
},
{
name: "2 missing",
nMissing: 2,
},
{
name: "3 missing",
nMissing: 3,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
l := len(scs)
entry := &cacheEntry{}
d := entry.ensureDbidx(scs[0 : l-c.nMissing]...)
m, err := d.missing(blockCommits)
if c.err == nil {
require.NoError(t, err)
}
require.DeepEqual(t, m, missing[l-c.nMissing:])
require.Equal(t, c.nMissing, len(m))
})
}
}
func commitsForSidecars(scs []*ethpb.BlobSidecar) [][]byte {
m := make([][]byte, len(scs))
for i := range scs {
m[i] = scs[i].KzgCommitment
}
return m
}
func generateMinimalBlobSidecars(n int) []*ethpb.BlobSidecar {
scs := make([]*ethpb.BlobSidecar, n)
for i := 0; i < n; i++ {
scs[i] = &ethpb.BlobSidecar{
Index: uint64(i),
KzgCommitment: bytesutil.PadTo([]byte{byte(i)}, 48),
}
}
return scs
}

76
beacon-chain/das/error.go Normal file
View File

@@ -0,0 +1,76 @@
package das
import (
"fmt"
"strings"
errors "github.com/pkg/errors"
)
var (
errDAIncomplete = errors.New("some BlobSidecars are not available at this time")
errDAEquivocated = errors.New("cache contains BlobSidecars that do not match block commitments")
errMixedRoots = errors.New("BlobSidecars must all be for the same block")
)
// The following errors are exported so that gossip verification can use errors.Is to determine the correct pubsub.ValidationResult.
var (
// ErrInvalidInclusionProof is returned when the inclusion proof check on the BlobSidecar fails.
ErrInvalidInclusionProof = errors.New("BlobSidecar inclusion proof is invalid")
// ErrInvalidBlockSignature is returned when the BlobSidecar.SignedBeaconBlockHeader signature cannot be verified against the block root.
ErrInvalidBlockSignature = errors.New("SignedBeaconBlockHeader signature could not verified")
// ErrInvalidCommitment is returned when the kzg_commitment cannot be verified against the kzg_proof and blob.
ErrInvalidCommitment = errors.New("BlobSidecar.kzg_commitment verification failed")
)
func NewMissingIndicesError(missing []uint64) MissingIndicesError {
return MissingIndicesError{indices: missing}
}
type MissingIndicesError struct {
indices []uint64
}
var _ error = MissingIndicesError{}
func (m MissingIndicesError) Error() string {
is := make([]string, 0, len(m.indices))
for i := range m.indices {
is = append(is, fmt.Sprintf("%d", m.indices[i]))
}
return fmt.Sprintf("%s at indices %s", errDAIncomplete.Error(), strings.Join(is, ","))
}
func (m MissingIndicesError) Missing() []uint64 {
return m.indices
}
func (m MissingIndicesError) Unwrap() error {
return errDAIncomplete
}
func NewCommitmentMismatchError(mismatch []uint64) CommitmentMismatchError {
return CommitmentMismatchError{mismatch: mismatch}
}
type CommitmentMismatchError struct {
mismatch []uint64
}
var _ error = CommitmentMismatchError{}
func (m CommitmentMismatchError) Error() string {
is := make([]string, 0, len(m.mismatch))
for i := range m.mismatch {
is = append(is, fmt.Sprintf("%d", m.mismatch[i]))
}
return fmt.Sprintf("%s at indices %s", errDAEquivocated.Error(), strings.Join(is, ","))
}
func (m CommitmentMismatchError) Mismatch() []uint64 {
return m.mismatch
}
func (m CommitmentMismatchError) Unwrap() error {
return errDAEquivocated
}

22
beacon-chain/das/iface.go Normal file
View File

@@ -0,0 +1,22 @@
package das
import (
"context"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
)
// BlobsDB specifies the persistence store methods needed by the AvailabilityStore.
type BlobsDB interface {
BlobSidecarsByRoot(ctx context.Context, beaconBlockRoot [32]byte, indices ...uint64) ([]*ethpb.BlobSidecar, error)
SaveBlobSidecar(ctx context.Context, sidecars []*ethpb.BlobSidecar) error
}
// AvailabilityStore describes a component that can verify and save sidecars for a given block, and confirm previously
// verified and saved sidecars.
type AvailabilityStore interface {
IsDataAvailable(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error
Persist(ctx context.Context, current primitives.Slot, sc ...*ethpb.BlobSidecar) ([]*ethpb.BlobSidecar, error)
}

51
beacon-chain/das/mock.go Normal file
View File

@@ -0,0 +1,51 @@
package das
import (
"context"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
)
type MockAvailabilityStore struct {
VerifyAvailabilityCallback func(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error
PersistBlobsCallback func(ctx context.Context, current primitives.Slot, sc ...*ethpb.BlobSidecar) ([]*ethpb.BlobSidecar, error)
}
var _ AvailabilityStore = &MockAvailabilityStore{}
func (m *MockAvailabilityStore) IsDataAvailable(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error {
if m.VerifyAvailabilityCallback != nil {
return m.VerifyAvailabilityCallback(ctx, current, b)
}
return nil
}
func (m *MockAvailabilityStore) Persist(ctx context.Context, current primitives.Slot, sc ...*ethpb.BlobSidecar) ([]*ethpb.BlobSidecar, error) {
if m.PersistBlobsCallback != nil {
return m.PersistBlobsCallback(ctx, current, sc...)
}
return sc, nil
}
type mockBlobsDB struct {
BlobSidecarsByRootCallback func(ctx context.Context, root [32]byte, indices ...uint64) ([]*ethpb.BlobSidecar, error)
SaveBlobSidecarCallback func(ctx context.Context, sidecars []*ethpb.BlobSidecar) error
}
var _ BlobsDB = &mockBlobsDB{}
func (b *mockBlobsDB) BlobSidecarsByRoot(ctx context.Context, root [32]byte, indices ...uint64) ([]*ethpb.BlobSidecar, error) {
if b.BlobSidecarsByRootCallback != nil {
return b.BlobSidecarsByRootCallback(ctx, root, indices...)
}
return nil, nil
}
func (b *mockBlobsDB) SaveBlobSidecar(ctx context.Context, sidecars []*ethpb.BlobSidecar) error {
if b.SaveBlobSidecarCallback != nil {
return b.SaveBlobSidecarCallback(ctx, sidecars)
}
return nil
}

View File

@@ -207,6 +207,37 @@ func (s *Store) BlobSidecarsByRoot(ctx context.Context, root [32]byte, indices .
return filterForIndices(sc, indices...)
}
func (s *Store) BlobIndicesAvailable(ctx context.Context, root [32]byte) ([]uint64, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.BlobIndicesAvailable")
defer span.End()
var enc []byte
if err := s.db.View(func(tx *bolt.Tx) error {
c := tx.Bucket(blobsBucket).Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
if bytes.HasSuffix(k, root[:]) {
enc = v
break
}
}
return nil
}); err != nil {
return nil, err
}
if enc == nil {
return nil, ErrNotFound
}
scs := &ethpb.BlobSidecars{}
if err := decode(ctx, enc, scs); err != nil {
return nil, err
}
idxs := make([]uint64, len(scs.Sidecars))
for i := range scs.Sidecars {
idxs[i] = scs.Sidecars[i].Index
}
return idxs, nil
}
func filterForIndices(sc *ethpb.BlobSidecars, indices ...uint64) ([]*ethpb.BlobSidecar, error) {
if len(indices) == 0 {
return sc.Sidecars, nil

View File

@@ -245,8 +245,12 @@ func New(cliCtx *cli.Context, opts ...Option) (*BeaconNode, error) {
return nil, err
}
log.Debugln("Registering Blockchain Service")
if err := beacon.registerBlockchainService(beacon.forkChoicer, synchronizer, beacon.initialSyncComplete); err != nil {
bcOpts := []blockchain.Option{
blockchain.WithForkChoiceStore(beacon.forkChoicer),
blockchain.WithClockSynchronizer(synchronizer),
blockchain.WithSyncComplete(beacon.initialSyncComplete),
}
if err := beacon.registerBlockchainService(bcOpts); err != nil {
return nil, err
}
@@ -607,7 +611,8 @@ func (b *BeaconNode) registerAttestationPool() error {
return b.services.RegisterService(s)
}
func (b *BeaconNode) registerBlockchainService(fc forkchoice.ForkChoicer, gs *startup.ClockSynchronizer, syncComplete chan struct{}) error {
func (b *BeaconNode) registerBlockchainService(required []blockchain.Option) error {
log.Debugln("Registering Blockchain Service")
var web3Service *execution.Service
if err := b.services.FetchService(&web3Service); err != nil {
return err
@@ -618,10 +623,9 @@ func (b *BeaconNode) registerBlockchainService(fc forkchoice.ForkChoicer, gs *st
return err
}
opts := append(b.serviceFlagOpts.blockchainFlagOpts, required...)
// skipcq: CRT-D0001
opts := append(
b.serviceFlagOpts.blockchainFlagOpts,
blockchain.WithForkChoiceStore(fc),
opts = append(opts,
blockchain.WithDatabase(b.db),
blockchain.WithDepositCache(b.depositCache),
blockchain.WithChainStartFetcher(web3Service),
@@ -637,8 +641,6 @@ func (b *BeaconNode) registerBlockchainService(fc forkchoice.ForkChoicer, gs *st
blockchain.WithSlasherAttestationsFeed(b.slasherAttestationsFeed),
blockchain.WithFinalizedStateAtStartUp(b.finalizedStateAtStartUp),
blockchain.WithProposerIdsCache(b.proposerIdsCache),
blockchain.WithClockSynchronizer(gs),
blockchain.WithSyncComplete(syncComplete),
)
blockchainService, err := blockchain.NewService(b.ctx, opts...)

View File

@@ -21,6 +21,7 @@ go_library(
"//beacon-chain/core/feed/block:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/transition:go_default_library",
"//beacon-chain/das:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/p2p/peers/scorers:go_default_library",
@@ -112,6 +113,7 @@ go_test(
deps = [
"//async/abool:go_default_library",
"//beacon-chain/blockchain/testing:go_default_library",
"//beacon-chain/das:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/testing:go_default_library",
"//beacon-chain/p2p:go_default_library",

View File

@@ -10,6 +10,7 @@ import (
"github.com/paulbellamy/ratecounter"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
@@ -27,7 +28,7 @@ const (
type blockReceiverFn func(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte) error
// batchBlockReceiverFn defines batch receiving function.
type batchBlockReceiverFn func(ctx context.Context, blks []blocks.ROBlock) error
type batchBlockReceiverFn func(ctx context.Context, blks []blocks.ROBlock, avs das.AvailabilityStore) error
// Round Robin sync looks at the latest peer statuses and syncs up to the highest known epoch.
//
@@ -321,25 +322,15 @@ func (s *Service) processBatchedBlocks(ctx context.Context, genesis time.Time,
errParentDoesNotExist, first.Block().ParentRoot(), first.Block().Slot())
}
s.logBatchSyncStatus(genesis, first, len(bwb))
blobCount := 0
avs := das.NewLazilyPersistentStore(s.cfg.DB)
for _, bb := range bwb {
if len(bb.Blobs) == 0 {
continue
_, err = avs.Persist(ctx, bb.Block.Block().Slot(), bb.Blobs...)
if err != nil {
return errors.Wrap(err, "error verifying or persisting BlobSidecars")
}
if err := s.cfg.DB.SaveBlobSidecar(ctx, bb.Blobs); err != nil {
return errors.Wrapf(err, "failed to save blobs for block %#x", bb.Block.Root())
}
blobCount += len(bb.Blobs)
}
if blobCount > 0 {
log.WithFields(logrus.Fields{
"startSlot": bwb[0].Block.Block().Slot(),
"endSlot": bwb[len(bwb)-1].Block.Block().Slot(),
"count": blobCount,
}).Info("Processed blob sidecars")
}
return bFunc(ctx, blocks.BlockWithVerifiedBlobsSlice(bwb).ROBlocks())
return bFunc(ctx, blocks.BlockWithVerifiedBlobsSlice(bwb).ROBlocks(), avs)
}
// updatePeerScorerStats adjusts monitored metrics for a peer.

View File

@@ -8,6 +8,7 @@ import (
"github.com/paulbellamy/ratecounter"
"github.com/prysmaticlabs/prysm/v4/async/abool"
mock "github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/das"
dbtest "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/testing"
p2pt "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/startup"
@@ -464,15 +465,15 @@ func TestService_processBlockBatch(t *testing.T) {
currBlockRoot = blk1Root
}
cbnormal := func(ctx context.Context, blks []blocks.ROBlock) error {
assert.NoError(t, s.cfg.Chain.ReceiveBlockBatch(ctx, blks))
cbnormal := func(ctx context.Context, blks []blocks.ROBlock, avs das.AvailabilityStore) error {
assert.NoError(t, s.cfg.Chain.ReceiveBlockBatch(ctx, blks, avs))
return nil
}
// Process block normally.
err = s.processBatchedBlocks(ctx, genesis, batch, cbnormal)
assert.NoError(t, err)
cbnil := func(ctx context.Context, blocks []blocks.ROBlock) error {
cbnil := func(ctx context.Context, blocks []blocks.ROBlock, avs das.AvailabilityStore) error {
return nil
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain"
blockfeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/block"
statefeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/startup"
@@ -41,6 +42,7 @@ type Config struct {
BlockNotifier blockfeed.Notifier
ClockWaiter startup.ClockWaiter
InitialSyncComplete chan struct{}
AVS das.AvailabilityStore
}
// Service service.