Compare commits

...

3 Commits

Author SHA1 Message Date
Kasey Kirkham
e5a2873376 rm unused lock 2023-10-17 15:37:33 -05:00
Kasey Kirkham
4d4753ebf3 init AvailabilityStore in blockchain test setup 2023-10-17 15:14:38 -05:00
Kasey Kirkham
d4fd310615 DA check before blob saving for init-sync 2023-10-17 14:23:24 -05:00
14 changed files with 281 additions and 110 deletions

View File

@@ -49,6 +49,7 @@ go_library(
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/core/time:go_default_library",
"//beacon-chain/core/transition:go_default_library",
"//beacon-chain/das:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/filters:go_default_library",
"//beacon-chain/db/kv:go_default_library",

View File

@@ -4,6 +4,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/async/event"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/cache"
statefeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/execution"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice"
@@ -178,3 +179,10 @@ func WithSyncComplete(c chan struct{}) Option {
return nil
}
}
func WithAvailabilityStore(vs das.AvailabilityStore) Option {
return func(s *Service) error {
s.avs = vs
return nil
}
}

View File

@@ -265,7 +265,7 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []consensusblocks.ROBlo
return err
}
}
if err := s.databaseDACheck(ctx, b); err != nil {
if err := s.avs.VerifyAvailability(ctx, s.CurrentSlot(), b); err != nil {
return errors.Wrap(err, "could not validate blob data availability")
}
args := &forkchoicetypes.BlockAndCheckpoints{Block: b.Block(),
@@ -333,33 +333,6 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []consensusblocks.ROBlo
return s.saveHeadNoDB(ctx, lastB, lastBR, preState, !isValidPayload)
}
func commitmentsToCheck(b consensusblocks.ROBlock, current primitives.Slot) [][]byte {
if b.Version() < version.Deneb {
return nil
}
// We are only required to check within MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS
if !params.WithinDAPeriod(slots.ToEpoch(b.Block().Slot()), slots.ToEpoch(current)) {
return nil
}
kzgCommitments, err := b.Block().Body().BlobKzgCommitments()
if err != nil {
return nil
}
return kzgCommitments
}
func (s *Service) databaseDACheck(ctx context.Context, b consensusblocks.ROBlock) error {
commitments := commitmentsToCheck(b, s.CurrentSlot())
if len(commitments) == 0 {
return nil
}
sidecars, err := s.cfg.BeaconDB.BlobSidecarsByRoot(ctx, b.Root())
if err != nil {
return errors.Wrap(err, "could not get blob sidecars")
}
return kzg.IsDataAvailable(commitments, sidecars)
}
func (s *Service) updateEpochBoundaryCaches(ctx context.Context, st state.BeaconState) error {
e := coreTime.CurrentEpoch(st)
if err := helpers.UpdateCommitteeCache(ctx, st, e); err != nil {

View File

@@ -1,7 +1,6 @@
package blockchain
import (
"bytes"
"context"
"fmt"
"math/big"
@@ -39,7 +38,6 @@ import (
"github.com/prysmaticlabs/prysm/v4/testing/require"
"github.com/prysmaticlabs/prysm/v4/testing/util"
prysmTime "github.com/prysmaticlabs/prysm/v4/time"
"github.com/prysmaticlabs/prysm/v4/time/slots"
logTest "github.com/sirupsen/logrus/hooks/test"
)
@@ -2038,71 +2036,3 @@ func driftGenesisTime(s *Service, slot, delay int64) {
offset := slot*int64(params.BeaconConfig().SecondsPerSlot) - delay
s.SetGenesisTime(time.Unix(time.Now().Unix()-offset, 0))
}
func Test_commitmentsToCheck(t *testing.T) {
windowSlots, err := slots.EpochEnd(params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest)
require.NoError(t, err)
commits := [][]byte{
bytesutil.PadTo([]byte("a"), 48),
bytesutil.PadTo([]byte("b"), 48),
bytesutil.PadTo([]byte("c"), 48),
bytesutil.PadTo([]byte("d"), 48),
}
cases := []struct {
name string
commits [][]byte
block func(*testing.T) consensusblocks.ROBlock
slot primitives.Slot
}{
{
name: "pre deneb",
block: func(t *testing.T) consensusblocks.ROBlock {
bb := util.NewBeaconBlockBellatrix()
sb, err := consensusblocks.NewSignedBeaconBlock(bb)
require.NoError(t, err)
rb, err := consensusblocks.NewROBlock(sb)
require.NoError(t, err)
return rb
},
},
{
name: "commitments within da",
block: func(t *testing.T) consensusblocks.ROBlock {
d := util.NewBeaconBlockDeneb()
d.Block.Body.BlobKzgCommitments = commits
d.Block.Slot = 100
sb, err := consensusblocks.NewSignedBeaconBlock(d)
require.NoError(t, err)
rb, err := consensusblocks.NewROBlock(sb)
require.NoError(t, err)
return rb
},
commits: commits,
slot: 100,
},
{
name: "commitments outside da",
block: func(t *testing.T) consensusblocks.ROBlock {
d := util.NewBeaconBlockDeneb()
// block is from slot 0, "current slot" is window size +1 (so outside the window)
d.Block.Body.BlobKzgCommitments = commits
sb, err := consensusblocks.NewSignedBeaconBlock(d)
require.NoError(t, err)
rb, err := consensusblocks.NewROBlock(sb)
require.NoError(t, err)
return rb
},
slot: windowSlots + 1,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
b := c.block(t)
co := commitmentsToCheck(b, c.slot)
require.Equal(t, len(c.commits), len(co))
for i := 0; i < len(c.commits); i++ {
require.Equal(t, true, bytes.Equal(c.commits[i], co[i]))
}
})
}
}

View File

@@ -19,6 +19,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
coreTime "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/execution"
f "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice"
@@ -63,6 +64,7 @@ type Service struct {
syncComplete chan struct{}
blobNotifiers *blobNotifierMap
blockBeingSynced *currentlySyncingBlock
avs das.AvailabilityStore
}
// config options for the service.

View File

@@ -7,6 +7,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/async/event"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/cache/depositcache"
statefeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db"
testDB "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice"
@@ -78,6 +79,7 @@ type testServiceRequirements struct {
func minimalTestService(t *testing.T, opts ...Option) (*Service, *testServiceRequirements) {
ctx := context.Background()
beaconDB := testDB.SetupDB(t)
avs := das.NewCachingDBVerifiedStore(beaconDB)
fcs := doublylinkedtree.New()
sg := stategen.New(beaconDB, fcs)
notif := &mockBeaconNode{}
@@ -110,6 +112,7 @@ func minimalTestService(t *testing.T, opts ...Option) (*Service, *testServiceReq
WithAttestationService(req.attSrv),
WithBLSToExecPool(req.blsPool),
WithDepositCache(dc),
WithAvailabilityStore(avs),
}
// append the variadic opts so they override the defaults by being processed afterwards
opts = append(defOpts, opts...)

View File

@@ -0,0 +1,35 @@
load("@prysm//tools/go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["availability.go"],
importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/das",
visibility = ["//visibility:public"],
deps = [
"//beacon-chain/blockchain/kzg:go_default_library",
"//cache/nonblocking:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/primitives:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//runtime/version:go_default_library",
"//time/slots:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["availability_test.go"],
embed = [":go_default_library"],
deps = [
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/primitives:go_default_library",
"//encoding/bytesutil:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",
"//time/slots:go_default_library",
],
)

View File

@@ -0,0 +1,127 @@
package das
import (
"bytes"
"context"
"fmt"
errors "github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain/kzg"
"github.com/prysmaticlabs/prysm/v4/cache/nonblocking"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/runtime/version"
"github.com/prysmaticlabs/prysm/v4/time/slots"
log "github.com/sirupsen/logrus"
)
var (
errCachedCommitmentMismatch = errors.New("previously verified commitments do not match those in block")
// concurrentBlockFetchers * blocks per request is used to size the LRU so that we can have one cache
// for many worker goroutines without them evicting each others results.
// TODO: figure out how to determine the max number of init sync workers.
concurrentBlockFetchers = 10
)
// AvailabilityStore describes a component that can verify and save sidecars for a given block, and confirm previously
// verified and saved sidecars.
type AvailabilityStore interface {
VerifyAvailability(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error
SaveIfAvailable(ctx context.Context, current primitives.Slot, b blocks.BlockWithVerifiedBlobs) error
}
type CachingDBVerifiedStore struct {
db BlobsDB
cache *nonblocking.LRU[[32]byte, [][]byte]
}
var _ AvailabilityStore = &CachingDBVerifiedStore{}
func NewCachingDBVerifiedStore(db BlobsDB) *CachingDBVerifiedStore {
discardev := func([32]byte, [][]byte) {}
size := int(params.BeaconNetworkConfig().MaxRequestBlocks) * concurrentBlockFetchers
cache, err := nonblocking.NewLRU[[32]byte, [][]byte](size, discardev)
if err != nil {
panic(err)
}
return &CachingDBVerifiedStore{
db: db,
cache: cache,
}
}
func (s *CachingDBVerifiedStore) VerifyAvailability(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error {
c := commitmentsToCheck(b, current)
if len(c) == 0 {
return nil
}
// get the previously verified commitments
vc, ok := s.cache.Get(b.Root())
if !ok {
return s.databaseDACheck(ctx, current, b.Root(), c)
}
// check commitments to make sure that they match
if len(c) != len(vc) {
return errors.Wrapf(errCachedCommitmentMismatch, "cache=%d, block=%d", len(vc), len(c))
}
for i := range vc {
// check each commitment in the block against value previously validated and saved.
if !bytes.Equal(vc[i], c[i]) {
return errors.Wrapf(errCachedCommitmentMismatch, "commitment %#x at index %d does not match cache %#x", c[i], i, vc[i])
}
}
// all commitments match
return nil
}
func (s *CachingDBVerifiedStore) SaveIfAvailable(ctx context.Context, current primitives.Slot, bwb blocks.BlockWithVerifiedBlobs) error {
b := bwb.Block
c := commitmentsToCheck(b, current)
if len(c) == 0 {
return nil
}
if err := kzg.IsDataAvailable(c, bwb.Blobs); err != nil {
return errors.Wrapf(err, "kzg.IsDataAvailable check failed for %#x", b.Root())
}
if err := s.db.SaveBlobSidecar(ctx, bwb.Blobs); err != nil {
return errors.Wrapf(err, "error while trying to save verified blob sidecars for root %#x", b.Root())
}
// cache the commitments that matched so that we can confirm them cheaply while they remain in cache.
s.cache.Add(b.Root(), c)
return nil
}
func (s *CachingDBVerifiedStore) databaseDACheck(ctx context.Context, current primitives.Slot, root [32]byte, cmts [][]byte) error {
log.WithField("root", fmt.Sprintf("%#x", root)).Warn("Falling back to database DA check")
sidecars, err := s.db.BlobSidecarsByRoot(ctx, root)
if err != nil {
return errors.Wrap(err, "could not get blob sidecars")
}
if err := kzg.IsDataAvailable(cmts, sidecars); err != nil {
return err
}
s.cache.Add(root, cmts)
return nil
}
type BlobsDB interface {
BlobSidecarsByRoot(ctx context.Context, beaconBlockRoot [32]byte, indices ...uint64) ([]*ethpb.BlobSidecar, error)
SaveBlobSidecar(ctx context.Context, sidecars []*ethpb.BlobSidecar) error
}
func commitmentsToCheck(b blocks.ROBlock, current primitives.Slot) [][]byte {
if b.Version() < version.Deneb {
return nil
}
// We are only required to check within MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS
if !params.WithinDAPeriod(slots.ToEpoch(b.Block().Slot()), slots.ToEpoch(current)) {
return nil
}
kzgCommitments, err := b.Block().Body().BlobKzgCommitments()
if err != nil {
return nil
}
return kzgCommitments
}

View File

@@ -0,0 +1,82 @@
package das
import (
"bytes"
"testing"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v4/testing/require"
"github.com/prysmaticlabs/prysm/v4/testing/util"
"github.com/prysmaticlabs/prysm/v4/time/slots"
)
func Test_commitmentsToCheck(t *testing.T) {
windowSlots, err := slots.EpochEnd(params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest)
require.NoError(t, err)
commits := [][]byte{
bytesutil.PadTo([]byte("a"), 48),
bytesutil.PadTo([]byte("b"), 48),
bytesutil.PadTo([]byte("c"), 48),
bytesutil.PadTo([]byte("d"), 48),
}
cases := []struct {
name string
commits [][]byte
block func(*testing.T) blocks.ROBlock
slot primitives.Slot
}{
{
name: "pre deneb",
block: func(t *testing.T) blocks.ROBlock {
bb := util.NewBeaconBlockBellatrix()
sb, err := blocks.NewSignedBeaconBlock(bb)
require.NoError(t, err)
rb, err := blocks.NewROBlock(sb)
require.NoError(t, err)
return rb
},
},
{
name: "commitments within da",
block: func(t *testing.T) blocks.ROBlock {
d := util.NewBeaconBlockDeneb()
d.Block.Body.BlobKzgCommitments = commits
d.Block.Slot = 100
sb, err := blocks.NewSignedBeaconBlock(d)
require.NoError(t, err)
rb, err := blocks.NewROBlock(sb)
require.NoError(t, err)
return rb
},
commits: commits,
slot: 100,
},
{
name: "commitments outside da",
block: func(t *testing.T) blocks.ROBlock {
d := util.NewBeaconBlockDeneb()
// block is from slot 0, "current slot" is window size +1 (so outside the window)
d.Block.Body.BlobKzgCommitments = commits
sb, err := blocks.NewSignedBeaconBlock(d)
require.NoError(t, err)
rb, err := blocks.NewROBlock(sb)
require.NoError(t, err)
return rb
},
slot: windowSlots + 1,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
b := c.block(t)
co := commitmentsToCheck(b, c.slot)
require.Equal(t, len(c.commits), len(co))
for i := 0; i < len(c.commits); i++ {
require.Equal(t, true, bytes.Equal(c.commits[i], co[i]))
}
})
}
}

View File

@@ -23,6 +23,7 @@ go_library(
"//beacon-chain/cache:go_default_library",
"//beacon-chain/cache/depositcache:go_default_library",
"//beacon-chain/cache/depositsnapshot:go_default_library",
"//beacon-chain/das:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/kv:go_default_library",
"//beacon-chain/db/slasherkv:go_default_library",

View File

@@ -24,6 +24,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/cache/depositcache"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/cache/depositsnapshot"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db/kv"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db/slasherkv"
@@ -244,13 +245,19 @@ func New(cliCtx *cli.Context, opts ...Option) (*BeaconNode, error) {
return nil, err
}
log.Debugln("Registering Blockchain Service")
if err := beacon.registerBlockchainService(beacon.forkChoicer, synchronizer, beacon.initialSyncComplete); err != nil {
avs := das.NewCachingDBVerifiedStore(beacon.db)
bcOpts := []blockchain.Option{
blockchain.WithForkChoiceStore(beacon.forkChoicer),
blockchain.WithClockSynchronizer(synchronizer),
blockchain.WithSyncComplete(beacon.initialSyncComplete),
blockchain.WithAvailabilityStore(avs),
}
if err := beacon.registerBlockchainService(bcOpts); err != nil {
return nil, err
}
log.Debugln("Registering Initial Sync Service")
if err := beacon.registerInitialSyncService(beacon.initialSyncComplete); err != nil {
if err := beacon.registerInitialSyncService(beacon.initialSyncComplete, avs); err != nil {
return nil, err
}
@@ -606,7 +613,8 @@ func (b *BeaconNode) registerAttestationPool() error {
return b.services.RegisterService(s)
}
func (b *BeaconNode) registerBlockchainService(fc forkchoice.ForkChoicer, gs *startup.ClockSynchronizer, syncComplete chan struct{}) error {
func (b *BeaconNode) registerBlockchainService(required []blockchain.Option) error {
log.Debugln("Registering Blockchain Service")
var web3Service *execution.Service
if err := b.services.FetchService(&web3Service); err != nil {
return err
@@ -617,10 +625,9 @@ func (b *BeaconNode) registerBlockchainService(fc forkchoice.ForkChoicer, gs *st
return err
}
opts := append(b.serviceFlagOpts.blockchainFlagOpts, required...)
// skipcq: CRT-D0001
opts := append(
b.serviceFlagOpts.blockchainFlagOpts,
blockchain.WithForkChoiceStore(fc),
opts = append(opts,
blockchain.WithDatabase(b.db),
blockchain.WithDepositCache(b.depositCache),
blockchain.WithChainStartFetcher(web3Service),
@@ -636,8 +643,6 @@ func (b *BeaconNode) registerBlockchainService(fc forkchoice.ForkChoicer, gs *st
blockchain.WithSlasherAttestationsFeed(b.slasherAttestationsFeed),
blockchain.WithFinalizedStateAtStartUp(b.finalizedStateAtStartUp),
blockchain.WithProposerIdsCache(b.proposerIdsCache),
blockchain.WithClockSynchronizer(gs),
blockchain.WithSyncComplete(syncComplete),
)
blockchainService, err := blockchain.NewService(b.ctx, opts...)
@@ -719,7 +724,7 @@ func (b *BeaconNode) registerSyncService(initialSyncComplete chan struct{}) erro
return b.services.RegisterService(rs)
}
func (b *BeaconNode) registerInitialSyncService(complete chan struct{}) error {
func (b *BeaconNode) registerInitialSyncService(complete chan struct{}, avs das.AvailabilityStore) error {
var chainService *blockchain.Service
if err := b.services.FetchService(&chainService); err != nil {
return err
@@ -733,6 +738,7 @@ func (b *BeaconNode) registerInitialSyncService(complete chan struct{}) error {
BlockNotifier: b,
ClockWaiter: b.clockWaiter,
InitialSyncComplete: complete,
AVS: avs,
})
return b.services.RegisterService(is)
}

View File

@@ -21,6 +21,7 @@ go_library(
"//beacon-chain/core/feed/block:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/transition:go_default_library",
"//beacon-chain/das:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/p2p/peers/scorers:go_default_library",

View File

@@ -326,8 +326,8 @@ func (s *Service) processBatchedBlocks(ctx context.Context, genesis time.Time,
if len(bb.Blobs) == 0 {
continue
}
if err := s.cfg.DB.SaveBlobSidecar(ctx, bb.Blobs); err != nil {
return errors.Wrapf(err, "failed to save blobs for block %#x", bb.Block.Root())
if err := s.cfg.AVS.SaveIfAvailable(ctx, s.clock.CurrentSlot(), bb); err != nil {
return errors.Wrapf(err, "failed to verify blob commitments and save to db for root %#x", bb.Block.Root())
}
blobCount += len(bb.Blobs)
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain"
blockfeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/block"
statefeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/startup"
@@ -41,6 +42,7 @@ type Config struct {
BlockNotifier blockfeed.Notifier
ClockWaiter startup.ClockWaiter
InitialSyncComplete chan struct{}
AVS das.AvailabilityStore
}
// Service service.