mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 13:28:01 -05:00
Compare commits
15 Commits
c6c9414d8b
...
blob-proce
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
edbf5d11d1 | ||
|
|
1148d96313 | ||
|
|
daf5e9b286 | ||
|
|
309a863358 | ||
|
|
15b88ed7f2 | ||
|
|
4b4855d65c | ||
|
|
03564ee93b | ||
|
|
96541c2c6c | ||
|
|
0bc55339a7 | ||
|
|
2618852090 | ||
|
|
fb3e42c1b3 | ||
|
|
0e2a9d0d82 | ||
|
|
6573cccf1d | ||
|
|
a89727f38c | ||
|
|
5e1fb15c40 |
@@ -49,6 +49,7 @@ go_library(
|
||||
"//beacon-chain/core/signing:go_default_library",
|
||||
"//beacon-chain/core/time:go_default_library",
|
||||
"//beacon-chain/core/transition:go_default_library",
|
||||
"//beacon-chain/das:go_default_library",
|
||||
"//beacon-chain/db:go_default_library",
|
||||
"//beacon-chain/db/filters:go_default_library",
|
||||
"//beacon-chain/db/kv:go_default_library",
|
||||
|
||||
@@ -2,8 +2,10 @@ package kzg
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
GoKZG "github.com/crate-crypto/go-kzg-4844"
|
||||
"github.com/pkg/errors"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
)
|
||||
|
||||
@@ -31,6 +33,61 @@ func IsDataAvailable(commitments [][]byte, sidecars []*ethpb.BlobSidecar) error
|
||||
return kzgContext.VerifyBlobKZGProofBatch(blobs, cmts, proofs)
|
||||
}
|
||||
|
||||
var ErrKzgProofFailed = errors.New("failed to prove commitment to BlobSidecar Blob data")
|
||||
|
||||
type KzgProofError struct {
|
||||
failed [][48]byte
|
||||
}
|
||||
|
||||
func NewKzgProofError(failed [][48]byte) *KzgProofError {
|
||||
return &KzgProofError{failed: failed}
|
||||
}
|
||||
|
||||
func (e *KzgProofError) Error() string {
|
||||
cmts := make([]string, len(e.failed))
|
||||
for i := range e.failed {
|
||||
cmts[i] = fmt.Sprintf("%#x", e.failed[i])
|
||||
}
|
||||
return fmt.Sprintf("%s: bad commitments=%s", ErrKzgProofFailed.Error(), strings.Join(cmts, ","))
|
||||
}
|
||||
|
||||
func (e *KzgProofError) Failed() [][48]byte {
|
||||
return e.failed
|
||||
}
|
||||
|
||||
func (e *KzgProofError) Unwrap() error {
|
||||
return ErrKzgProofFailed
|
||||
}
|
||||
|
||||
// BisectBlobSidecarKzgProofs tries to batch prove the given sidecars against their own specified commitment.
|
||||
// The caller is responsible for ensuring that the commitments match those specified by the block.
|
||||
// If the batch fails, it will then try to verify the proofs one-by-one.
|
||||
// If an error is returned, it will be a custom error of type KzgProofError that provides access
|
||||
// to the list of commitments that failed.
|
||||
func BisectBlobSidecarKzgProofs(sidecars []*ethpb.BlobSidecar) error {
|
||||
if len(sidecars) == 0 {
|
||||
return nil
|
||||
}
|
||||
blobs := make([]GoKZG.Blob, len(sidecars))
|
||||
cmts := make([]GoKZG.KZGCommitment, len(sidecars))
|
||||
proofs := make([]GoKZG.KZGProof, len(sidecars))
|
||||
for i, sidecar := range sidecars {
|
||||
blobs[i] = bytesToBlob(sidecar.Blob)
|
||||
cmts[i] = bytesToCommitment(sidecar.KzgCommitment)
|
||||
proofs[i] = bytesToKZGProof(sidecar.KzgProof)
|
||||
}
|
||||
if err := kzgContext.VerifyBlobKZGProofBatch(blobs, cmts, proofs); err == nil {
|
||||
return nil
|
||||
}
|
||||
failed := make([][48]byte, 0, len(blobs))
|
||||
for i := range blobs {
|
||||
if err := kzgContext.VerifyBlobKZGProof(blobs[i], cmts[i], proofs[i]); err != nil {
|
||||
failed = append(failed, cmts[i])
|
||||
}
|
||||
}
|
||||
return NewKzgProofError(failed)
|
||||
}
|
||||
|
||||
func bytesToBlob(blob []byte) (ret GoKZG.Blob) {
|
||||
copy(ret[:], blob)
|
||||
return
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
|
||||
coreTime "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/time"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/das"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db"
|
||||
forkchoicetypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice/types"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
|
||||
@@ -162,7 +163,7 @@ func getStateVersionAndPayload(st state.BeaconState) (int, interfaces.ExecutionD
|
||||
return preStateVersion, preStateHeader, nil
|
||||
}
|
||||
|
||||
func (s *Service) onBlockBatch(ctx context.Context, blks []consensusblocks.ROBlock) error {
|
||||
func (s *Service) onBlockBatch(ctx context.Context, blks []consensusblocks.ROBlock, avs das.AvailabilityStore) error {
|
||||
ctx, span := trace.StartSpan(ctx, "blockChain.onBlockBatch")
|
||||
defer span.End()
|
||||
|
||||
@@ -265,7 +266,7 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []consensusblocks.ROBlo
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := s.databaseDACheck(ctx, b); err != nil {
|
||||
if err := avs.IsDataAvailable(ctx, s.CurrentSlot(), b); err != nil {
|
||||
return errors.Wrap(err, "could not validate blob data availability")
|
||||
}
|
||||
args := &forkchoicetypes.BlockAndCheckpoints{Block: b.Block(),
|
||||
@@ -333,33 +334,6 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []consensusblocks.ROBlo
|
||||
return s.saveHeadNoDB(ctx, lastB, lastBR, preState, !isValidPayload)
|
||||
}
|
||||
|
||||
func commitmentsToCheck(b consensusblocks.ROBlock, current primitives.Slot) [][]byte {
|
||||
if b.Version() < version.Deneb {
|
||||
return nil
|
||||
}
|
||||
// We are only required to check within MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS
|
||||
if !params.WithinDAPeriod(slots.ToEpoch(b.Block().Slot()), slots.ToEpoch(current)) {
|
||||
return nil
|
||||
}
|
||||
kzgCommitments, err := b.Block().Body().BlobKzgCommitments()
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return kzgCommitments
|
||||
}
|
||||
|
||||
func (s *Service) databaseDACheck(ctx context.Context, b consensusblocks.ROBlock) error {
|
||||
commitments := commitmentsToCheck(b, s.CurrentSlot())
|
||||
if len(commitments) == 0 {
|
||||
return nil
|
||||
}
|
||||
sidecars, err := s.cfg.BeaconDB.BlobSidecarsByRoot(ctx, b.Root())
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not get blob sidecars")
|
||||
}
|
||||
return kzg.IsDataAvailable(commitments, sidecars)
|
||||
}
|
||||
|
||||
func (s *Service) updateEpochBoundaryCaches(ctx context.Context, st state.BeaconState) error {
|
||||
e := coreTime.CurrentEpoch(st)
|
||||
if err := helpers.UpdateCommitteeCache(ctx, st, e); err != nil {
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package blockchain
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"math/big"
|
||||
@@ -17,6 +16,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/blocks"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/signing"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/das"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db"
|
||||
testDB "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/testing"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/execution"
|
||||
@@ -39,7 +39,6 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v4/testing/require"
|
||||
"github.com/prysmaticlabs/prysm/v4/testing/util"
|
||||
prysmTime "github.com/prysmaticlabs/prysm/v4/time"
|
||||
"github.com/prysmaticlabs/prysm/v4/time/slots"
|
||||
logTest "github.com/sirupsen/logrus/hooks/test"
|
||||
)
|
||||
|
||||
@@ -68,7 +67,7 @@ func TestStore_OnBlockBatch(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
blks = append(blks, rwsb)
|
||||
}
|
||||
err := service.onBlockBatch(ctx, blks)
|
||||
err := service.onBlockBatch(ctx, blks, &das.MockAvailabilityStore{})
|
||||
require.NoError(t, err)
|
||||
jcp := service.CurrentJustifiedCheckpt()
|
||||
jroot := bytesutil.ToBytes32(jcp.Root)
|
||||
@@ -98,7 +97,7 @@ func TestStore_OnBlockBatch_NotifyNewPayload(t *testing.T) {
|
||||
require.NoError(t, service.saveInitSyncBlock(ctx, rwsb.Root(), wsb))
|
||||
blks = append(blks, rwsb)
|
||||
}
|
||||
require.NoError(t, service.onBlockBatch(ctx, blks))
|
||||
require.NoError(t, service.onBlockBatch(ctx, blks, &das.MockAvailabilityStore{}))
|
||||
}
|
||||
|
||||
func TestCachedPreState_CanGetFromStateSummary(t *testing.T) {
|
||||
@@ -1945,7 +1944,7 @@ func TestNoViableHead_Reboot(t *testing.T) {
|
||||
rwsb, err := consensusblocks.NewROBlock(wsb)
|
||||
require.NoError(t, err)
|
||||
// We use onBlockBatch here because the valid chain is missing in forkchoice
|
||||
require.NoError(t, service.onBlockBatch(ctx, []consensusblocks.ROBlock{rwsb}))
|
||||
require.NoError(t, service.onBlockBatch(ctx, []consensusblocks.ROBlock{rwsb}, &das.MockAvailabilityStore{}))
|
||||
// Check that the head is now VALID and the node is not optimistic
|
||||
require.Equal(t, genesisRoot, service.ensureRootNotZeros(service.cfg.ForkChoiceStore.CachedHeadRoot()))
|
||||
headRoot, err = service.HeadRoot(ctx)
|
||||
@@ -2047,71 +2046,3 @@ func driftGenesisTime(s *Service, slot, delay int64) {
|
||||
offset := slot*int64(params.BeaconConfig().SecondsPerSlot) - delay
|
||||
s.SetGenesisTime(time.Unix(time.Now().Unix()-offset, 0))
|
||||
}
|
||||
|
||||
func Test_commitmentsToCheck(t *testing.T) {
|
||||
windowSlots, err := slots.EpochEnd(params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest)
|
||||
require.NoError(t, err)
|
||||
commits := [][]byte{
|
||||
bytesutil.PadTo([]byte("a"), 48),
|
||||
bytesutil.PadTo([]byte("b"), 48),
|
||||
bytesutil.PadTo([]byte("c"), 48),
|
||||
bytesutil.PadTo([]byte("d"), 48),
|
||||
}
|
||||
cases := []struct {
|
||||
name string
|
||||
commits [][]byte
|
||||
block func(*testing.T) consensusblocks.ROBlock
|
||||
slot primitives.Slot
|
||||
}{
|
||||
{
|
||||
name: "pre deneb",
|
||||
block: func(t *testing.T) consensusblocks.ROBlock {
|
||||
bb := util.NewBeaconBlockBellatrix()
|
||||
sb, err := consensusblocks.NewSignedBeaconBlock(bb)
|
||||
require.NoError(t, err)
|
||||
rb, err := consensusblocks.NewROBlock(sb)
|
||||
require.NoError(t, err)
|
||||
return rb
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "commitments within da",
|
||||
block: func(t *testing.T) consensusblocks.ROBlock {
|
||||
d := util.NewBeaconBlockDeneb()
|
||||
d.Block.Body.BlobKzgCommitments = commits
|
||||
d.Block.Slot = 100
|
||||
sb, err := consensusblocks.NewSignedBeaconBlock(d)
|
||||
require.NoError(t, err)
|
||||
rb, err := consensusblocks.NewROBlock(sb)
|
||||
require.NoError(t, err)
|
||||
return rb
|
||||
},
|
||||
commits: commits,
|
||||
slot: 100,
|
||||
},
|
||||
{
|
||||
name: "commitments outside da",
|
||||
block: func(t *testing.T) consensusblocks.ROBlock {
|
||||
d := util.NewBeaconBlockDeneb()
|
||||
// block is from slot 0, "current slot" is window size +1 (so outside the window)
|
||||
d.Block.Body.BlobKzgCommitments = commits
|
||||
sb, err := consensusblocks.NewSignedBeaconBlock(d)
|
||||
require.NoError(t, err)
|
||||
rb, err := consensusblocks.NewROBlock(sb)
|
||||
require.NoError(t, err)
|
||||
return rb
|
||||
},
|
||||
slot: windowSlots + 1,
|
||||
},
|
||||
}
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
b := c.block(t)
|
||||
co := commitmentsToCheck(b, c.slot)
|
||||
require.Equal(t, len(c.commits), len(co))
|
||||
for i := 0; i < len(c.commits); i++ {
|
||||
require.Equal(t, true, bytes.Equal(c.commits[i], co[i]))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
|
||||
coreTime "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/time"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/das"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
|
||||
"github.com/prysmaticlabs/prysm/v4/config/features"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
|
||||
@@ -34,7 +35,7 @@ var epochsSinceFinalitySaveHotStateDB = primitives.Epoch(100)
|
||||
// BlockReceiver interface defines the methods of chain service for receiving and processing new blocks.
|
||||
type BlockReceiver interface {
|
||||
ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte) error
|
||||
ReceiveBlockBatch(ctx context.Context, blocks []blocks.ROBlock) error
|
||||
ReceiveBlockBatch(ctx context.Context, blocks []blocks.ROBlock, avs das.AvailabilityStore) error
|
||||
HasBlock(ctx context.Context, root [32]byte) bool
|
||||
RecentBlockSlot(root [32]byte) (primitives.Slot, error)
|
||||
BlockBeingSynced([32]byte) bool
|
||||
@@ -191,7 +192,7 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig
|
||||
// ReceiveBlockBatch processes the whole block batch at once, assuming the block batch is linear ,transitioning
|
||||
// the state, performing batch verification of all collected signatures and then performing the appropriate
|
||||
// actions for a block post-transition.
|
||||
func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []blocks.ROBlock) error {
|
||||
func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []blocks.ROBlock, avs das.AvailabilityStore) error {
|
||||
ctx, span := trace.StartSpan(ctx, "blockChain.ReceiveBlockBatch")
|
||||
defer span.End()
|
||||
|
||||
@@ -199,7 +200,7 @@ func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []blocks.ROBlock
|
||||
defer s.cfg.ForkChoiceStore.Unlock()
|
||||
|
||||
// Apply state transition on the incoming newly received block batches, one by one.
|
||||
if err := s.onBlockBatch(ctx, blocks); err != nil {
|
||||
if err := s.onBlockBatch(ctx, blocks, avs); err != nil {
|
||||
err := errors.Wrap(err, "could not process block in batch")
|
||||
tracing.AnnotateError(span, err)
|
||||
return err
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"time"
|
||||
|
||||
blockchainTesting "github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain/testing"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/das"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/voluntaryexits"
|
||||
"github.com/prysmaticlabs/prysm/v4/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
|
||||
@@ -240,7 +241,7 @@ func TestService_ReceiveBlockBatch(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
rwsb, err := blocks.NewROBlock(wsb)
|
||||
require.NoError(t, err)
|
||||
err = s.ReceiveBlockBatch(ctx, []blocks.ROBlock{rwsb})
|
||||
err = s.ReceiveBlockBatch(ctx, []blocks.ROBlock{rwsb}, &das.MockAvailabilityStore{})
|
||||
if tt.wantedErr != "" {
|
||||
assert.ErrorContains(t, tt.wantedErr, err)
|
||||
} else {
|
||||
|
||||
@@ -17,6 +17,7 @@ go_library(
|
||||
"//beacon-chain/core/feed/operation:go_default_library",
|
||||
"//beacon-chain/core/feed/state:go_default_library",
|
||||
"//beacon-chain/core/helpers:go_default_library",
|
||||
"//beacon-chain/das:go_default_library",
|
||||
"//beacon-chain/db:go_default_library",
|
||||
"//beacon-chain/forkchoice:go_default_library",
|
||||
"//beacon-chain/state:go_default_library",
|
||||
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
opfeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/operation"
|
||||
statefeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/state"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/das"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
|
||||
@@ -207,7 +208,7 @@ func (s *ChainService) ReceiveBlockInitialSync(ctx context.Context, block interf
|
||||
}
|
||||
|
||||
// ReceiveBlockBatch processes blocks in batches from initial-sync.
|
||||
func (s *ChainService) ReceiveBlockBatch(ctx context.Context, blks []blocks.ROBlock) error {
|
||||
func (s *ChainService) ReceiveBlockBatch(ctx context.Context, blks []blocks.ROBlock, avs das.AvailabilityStore) error {
|
||||
if s.State == nil {
|
||||
return ErrNilState
|
||||
}
|
||||
|
||||
50
beacon-chain/das/BUILD.bazel
Normal file
50
beacon-chain/das/BUILD.bazel
Normal file
@@ -0,0 +1,50 @@
|
||||
load("@prysm//tools/go:def.bzl", "go_library", "go_test")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"availability.go",
|
||||
"blocking.go",
|
||||
"cache.go",
|
||||
"error.go",
|
||||
"iface.go",
|
||||
"mock.go",
|
||||
],
|
||||
importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/das",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//beacon-chain/blockchain/kzg:go_default_library",
|
||||
"//beacon-chain/db:go_default_library",
|
||||
"//config/fieldparams:go_default_library",
|
||||
"//config/params:go_default_library",
|
||||
"//consensus-types/blocks:go_default_library",
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
"//encoding/bytesutil:go_default_library",
|
||||
"//proto/prysm/v1alpha1:go_default_library",
|
||||
"//runtime/version:go_default_library",
|
||||
"//time/slots:go_default_library",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = [
|
||||
"availability_test.go",
|
||||
"cache_test.go",
|
||||
],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//config/fieldparams:go_default_library",
|
||||
"//config/params:go_default_library",
|
||||
"//consensus-types/blocks:go_default_library",
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
"//encoding/bytesutil:go_default_library",
|
||||
"//proto/prysm/v1alpha1:go_default_library",
|
||||
"//testing/require:go_default_library",
|
||||
"//testing/util:go_default_library",
|
||||
"//time/slots:go_default_library",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
],
|
||||
)
|
||||
158
beacon-chain/das/availability.go
Normal file
158
beacon-chain/das/availability.go
Normal file
@@ -0,0 +1,158 @@
|
||||
package das
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
errors "github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain/kzg"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db"
|
||||
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
|
||||
"github.com/prysmaticlabs/prysm/v4/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v4/runtime/version"
|
||||
"github.com/prysmaticlabs/prysm/v4/time/slots"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type kzgBatch func([][]byte, []*ethpb.BlobSidecar) error
|
||||
|
||||
type LazilyPersistentStore struct {
|
||||
db BlobsDB
|
||||
cache *cache
|
||||
verifyKZG kzgBatch
|
||||
}
|
||||
|
||||
var _ AvailabilityStore = &LazilyPersistentStore{}
|
||||
|
||||
func NewLazilyPersistentStore(db BlobsDB) *LazilyPersistentStore {
|
||||
return &LazilyPersistentStore{
|
||||
db: db,
|
||||
cache: newCache(),
|
||||
verifyKZG: kzg.IsDataAvailable,
|
||||
}
|
||||
}
|
||||
|
||||
// PersistOnceCommitted adds blobs to the working blob cache (in-memory or disk backed is an implementation
|
||||
// detail). Blobs stored in this cache will be persisted for at least as long as the node is
|
||||
// running. Once IsDataAvailable succeeds, all blobs referenced by the given block are guaranteed
|
||||
// to be persisted for the remainder of the retention period.
|
||||
func (s *LazilyPersistentStore) Persist(ctx context.Context, current primitives.Slot, sc ...*ethpb.BlobSidecar) ([]*ethpb.BlobSidecar, error) {
|
||||
if len(sc) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
if len(sc) == 1 {
|
||||
|
||||
}
|
||||
var key cacheKey
|
||||
var entry *cacheEntry
|
||||
persisted := make([]*ethpb.BlobSidecar, 0, len(sc))
|
||||
for i := range sc {
|
||||
if !params.WithinDAPeriod(slots.ToEpoch(sc[i].Slot), slots.ToEpoch(current)) {
|
||||
continue
|
||||
}
|
||||
if sc[i].Index >= fieldparams.MaxBlobsPerBlock {
|
||||
log.WithField("block_root", sc[i].BlockRoot).WithField("index", sc[i].Index).Error("discarding BlobSidecar with index >= MaxBlobsPerBlock")
|
||||
continue
|
||||
}
|
||||
skey := keyFromSidecar(sc[i])
|
||||
if key != skey {
|
||||
key = skey
|
||||
entry = s.cache.ensure(key)
|
||||
}
|
||||
if entry.stash(sc[i]) {
|
||||
persisted = append(persisted, sc[i])
|
||||
}
|
||||
}
|
||||
return persisted, nil
|
||||
}
|
||||
|
||||
// IsDataAvailable returns nil if all the commitments in the given block are persisted to the db and have been verified.
|
||||
// - BlobSidecars already in the db are assumed to have been previously verified against the block.
|
||||
// - BlobSidecars waiting for verification in the cache will be persisted to the db after verification.
|
||||
// - When BlobSidecars are written to the db, their cache entries are cleared.
|
||||
// - BlobSidecar cache entries with commitments that do not match the block will be evicted.
|
||||
// - BlobSidecar cachee entries with commitments that fail proof verification will be evicted.
|
||||
func (s *LazilyPersistentStore) IsDataAvailable(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error {
|
||||
blockCommitments := commitmentsToCheck(b, current)
|
||||
if len(blockCommitments) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
key := keyFromBlock(b)
|
||||
entry := s.cache.ensure(key)
|
||||
// holding the lock over the course of the DA check simplifies everything
|
||||
entry.Lock()
|
||||
defer entry.Unlock()
|
||||
if err := s.daCheck(ctx, b.Root(), blockCommitments, entry); err != nil {
|
||||
return err
|
||||
}
|
||||
// If there is no error, DA has been successful, so we can clean up the cache.
|
||||
s.cache.delete(key)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *LazilyPersistentStore) daCheck(ctx context.Context, root [32]byte, blockCommitments [][]byte, entry *cacheEntry) error {
|
||||
sidecars, cacheErr := entry.filter(root, blockCommitments)
|
||||
if cacheErr == nil {
|
||||
if err := s.verifyKZG(blockCommitments, sidecars); err != nil {
|
||||
s.cache.delete(keyFromSidecar(sidecars[0]))
|
||||
return err
|
||||
}
|
||||
// We have all the committed sidecars in cache, and they all have valid proofs.
|
||||
// If flushing them to backing storage succeeds, then we can confirm DA.
|
||||
return s.db.SaveBlobSidecar(ctx, sidecars)
|
||||
}
|
||||
|
||||
// Before returning the cache error, check if we have the data in the db.
|
||||
dbidx, err := s.persisted(ctx, root, entry)
|
||||
// persisted() accounts for db.ErrNotFound, so this is a real database error.
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
notInDb, err := dbidx.missing(blockCommitments)
|
||||
// This is a database integrity sanity check - it should never fail.
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// All commitments were found in the db, due to a previous successful DA check.
|
||||
if len(notInDb) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return cacheErr
|
||||
}
|
||||
|
||||
// persisted populate the db cache, which contains a mapping from Index->KzgCommitment for BlobSidecars previously verified
|
||||
// (proof verification) and saved to the backend.
|
||||
func (s *LazilyPersistentStore) persisted(ctx context.Context, root [32]byte, entry *cacheEntry) (dbidx, error) {
|
||||
if entry.dbidxInitialized() {
|
||||
return entry.dbidx(), nil
|
||||
}
|
||||
sidecars, err := s.db.BlobSidecarsByRoot(ctx, root)
|
||||
if err != nil {
|
||||
if errors.Is(err, db.ErrNotFound) {
|
||||
// No BlobSidecars, initialize with empty idx.
|
||||
return entry.ensureDbidx(), nil
|
||||
}
|
||||
return entry.dbidx(), err
|
||||
}
|
||||
// Ensure all sidecars found in the db are represented in the cache and return the cache value.
|
||||
return entry.ensureDbidx(sidecars...), nil
|
||||
}
|
||||
|
||||
func commitmentsToCheck(b blocks.ROBlock, current primitives.Slot) [][]byte {
|
||||
if b.Version() < version.Deneb {
|
||||
return nil
|
||||
}
|
||||
// We are only required to check within MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS
|
||||
if !params.WithinDAPeriod(slots.ToEpoch(b.Block().Slot()), slots.ToEpoch(current)) {
|
||||
return nil
|
||||
}
|
||||
kzgCommitments, err := b.Block().Body().BlobKzgCommitments()
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return kzgCommitments
|
||||
}
|
||||
318
beacon-chain/das/availability_test.go
Normal file
318
beacon-chain/das/availability_test.go
Normal file
@@ -0,0 +1,318 @@
|
||||
package das
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
errors "github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v4/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v4/testing/require"
|
||||
"github.com/prysmaticlabs/prysm/v4/testing/util"
|
||||
"github.com/prysmaticlabs/prysm/v4/time/slots"
|
||||
)
|
||||
|
||||
func Test_commitmentsToCheck(t *testing.T) {
|
||||
windowSlots, err := slots.EpochEnd(params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest)
|
||||
require.NoError(t, err)
|
||||
commits := [][]byte{
|
||||
bytesutil.PadTo([]byte("a"), 48),
|
||||
bytesutil.PadTo([]byte("b"), 48),
|
||||
bytesutil.PadTo([]byte("c"), 48),
|
||||
bytesutil.PadTo([]byte("d"), 48),
|
||||
}
|
||||
cases := []struct {
|
||||
name string
|
||||
commits [][]byte
|
||||
block func(*testing.T) blocks.ROBlock
|
||||
slot primitives.Slot
|
||||
}{
|
||||
{
|
||||
name: "pre deneb",
|
||||
block: func(t *testing.T) blocks.ROBlock {
|
||||
bb := util.NewBeaconBlockBellatrix()
|
||||
sb, err := blocks.NewSignedBeaconBlock(bb)
|
||||
require.NoError(t, err)
|
||||
rb, err := blocks.NewROBlock(sb)
|
||||
require.NoError(t, err)
|
||||
return rb
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "commitments within da",
|
||||
block: func(t *testing.T) blocks.ROBlock {
|
||||
d := util.NewBeaconBlockDeneb()
|
||||
d.Block.Body.BlobKzgCommitments = commits
|
||||
d.Block.Slot = 100
|
||||
sb, err := blocks.NewSignedBeaconBlock(d)
|
||||
require.NoError(t, err)
|
||||
rb, err := blocks.NewROBlock(sb)
|
||||
require.NoError(t, err)
|
||||
return rb
|
||||
},
|
||||
commits: commits,
|
||||
slot: 100,
|
||||
},
|
||||
{
|
||||
name: "commitments outside da",
|
||||
block: func(t *testing.T) blocks.ROBlock {
|
||||
d := util.NewBeaconBlockDeneb()
|
||||
// block is from slot 0, "current slot" is window size +1 (so outside the window)
|
||||
d.Block.Body.BlobKzgCommitments = commits
|
||||
sb, err := blocks.NewSignedBeaconBlock(d)
|
||||
require.NoError(t, err)
|
||||
rb, err := blocks.NewROBlock(sb)
|
||||
require.NoError(t, err)
|
||||
return rb
|
||||
},
|
||||
slot: windowSlots + 1,
|
||||
},
|
||||
}
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
b := c.block(t)
|
||||
co := commitmentsToCheck(b, c.slot)
|
||||
require.Equal(t, len(c.commits), len(co))
|
||||
for i := 0; i < len(c.commits); i++ {
|
||||
require.Equal(t, true, bytes.Equal(c.commits[i], co[i]))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func daAlwaysSucceeds(_ [][]byte, _ []*ethpb.BlobSidecar) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type mockDA struct {
|
||||
t *testing.T
|
||||
cmts [][]byte
|
||||
scs []*ethpb.BlobSidecar
|
||||
err error
|
||||
}
|
||||
|
||||
func (m *mockDA) expectedArguments(gotC [][]byte, gotSC []*ethpb.BlobSidecar) error {
|
||||
require.Equal(m.t, len(m.cmts), len(gotC))
|
||||
require.Equal(m.t, len(gotSC), len(m.scs))
|
||||
for i := range m.cmts {
|
||||
require.Equal(m.t, true, bytes.Equal(m.cmts[i], gotC[i]))
|
||||
}
|
||||
for i := range m.scs {
|
||||
require.Equal(m.t, m.scs[i], gotSC[i])
|
||||
}
|
||||
return m.err
|
||||
}
|
||||
|
||||
func TestLazilyPersistent_Missing(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
db := &mockBlobsDB{}
|
||||
as := NewLazilyPersistentStore(db)
|
||||
|
||||
blk, scs := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 1, 3)
|
||||
|
||||
expectedCommitments, err := blk.Block().Body().BlobKzgCommitments()
|
||||
require.NoError(t, err)
|
||||
vf := &mockDA{
|
||||
t: t,
|
||||
cmts: expectedCommitments,
|
||||
scs: scs,
|
||||
}
|
||||
as.verifyKZG = vf.expectedArguments
|
||||
|
||||
// Only one commitment persisted, should return error with other indices
|
||||
_, err = as.Persist(ctx, 1, scs[2])
|
||||
require.NoError(t, err)
|
||||
err = as.IsDataAvailable(ctx, 1, blk)
|
||||
require.NotNil(t, err)
|
||||
missingErr := MissingIndicesError{}
|
||||
require.Equal(t, true, errors.As(err, &missingErr))
|
||||
require.DeepEqual(t, []uint64{0, 1}, missingErr.Missing())
|
||||
|
||||
// All but one persisted, return missing idx
|
||||
_, err = as.Persist(ctx, 1, scs[0])
|
||||
require.NoError(t, err)
|
||||
err = as.IsDataAvailable(ctx, 1, blk)
|
||||
require.NotNil(t, err)
|
||||
require.Equal(t, true, errors.As(err, &missingErr))
|
||||
require.DeepEqual(t, []uint64{1}, missingErr.Missing())
|
||||
|
||||
// All persisted, return nil
|
||||
_, err = as.Persist(ctx, 1, scs[1])
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, as.IsDataAvailable(ctx, 1, blk))
|
||||
}
|
||||
|
||||
func TestLazilyPersistent_Mismatch(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
db := &mockBlobsDB{}
|
||||
as := NewLazilyPersistentStore(db)
|
||||
|
||||
blk, scs := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 1, 3)
|
||||
|
||||
vf := &mockDA{
|
||||
t: t,
|
||||
err: errors.New("kzg check should not run"),
|
||||
}
|
||||
as.verifyKZG = vf.expectedArguments
|
||||
scs[0].KzgCommitment = bytesutil.PadTo([]byte("nope"), 48)
|
||||
|
||||
// Only one commitment persisted, should return error with other indices
|
||||
_, err := as.Persist(ctx, 1, scs[0])
|
||||
require.NoError(t, err)
|
||||
err = as.IsDataAvailable(ctx, 1, blk)
|
||||
require.NotNil(t, err)
|
||||
mismatchErr := CommitmentMismatchError{}
|
||||
require.Equal(t, true, errors.As(err, &mismatchErr))
|
||||
require.DeepEqual(t, []uint64{0}, mismatchErr.Mismatch())
|
||||
|
||||
// the next time we call the DA check, the mismatched commitment should be evicted
|
||||
err = as.IsDataAvailable(ctx, 1, blk)
|
||||
require.NotNil(t, err)
|
||||
missingErr := MissingIndicesError{}
|
||||
require.Equal(t, true, errors.As(err, &missingErr))
|
||||
require.DeepEqual(t, []uint64{0, 1, 2}, missingErr.Missing())
|
||||
}
|
||||
|
||||
func TestPersisted(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
blk, scs := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 1, 3)
|
||||
db := &mockBlobsDB{
|
||||
BlobSidecarsByRootCallback: func(ctx context.Context, beaconBlockRoot [32]byte, indices ...uint64) ([]*ethpb.BlobSidecar, error) {
|
||||
return scs, nil
|
||||
},
|
||||
}
|
||||
vf := &mockDA{
|
||||
t: t,
|
||||
err: errors.New("kzg check should not run"),
|
||||
}
|
||||
as := NewLazilyPersistentStore(db)
|
||||
as.verifyKZG = vf.expectedArguments
|
||||
entry := &cacheEntry{}
|
||||
dbidx, err := as.persisted(ctx, blk.Root(), entry)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, false, dbidx[0] == nil)
|
||||
for i := range scs {
|
||||
require.Equal(t, *dbidx[i], bytesutil.ToBytes48(scs[i].KzgCommitment))
|
||||
}
|
||||
|
||||
expectedCommitments, err := blk.Block().Body().BlobKzgCommitments()
|
||||
require.NoError(t, err)
|
||||
missing, err := dbidx.missing(expectedCommitments)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, len(missing))
|
||||
|
||||
// test that the caching is working by returning the wrong set of sidecars
|
||||
// and making sure that dbidx still thinks none are missing
|
||||
db = &mockBlobsDB{
|
||||
BlobSidecarsByRootCallback: func(ctx context.Context, beaconBlockRoot [32]byte, indices ...uint64) ([]*ethpb.BlobSidecar, error) {
|
||||
return scs[1:], nil
|
||||
},
|
||||
}
|
||||
as = NewLazilyPersistentStore(db)
|
||||
// note, using the same entry value
|
||||
dbidx, err = as.persisted(ctx, blk.Root(), entry)
|
||||
require.NoError(t, err)
|
||||
// same assertions should pass as when all sidecars returned by db
|
||||
missing, err = dbidx.missing(expectedCommitments)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, len(missing))
|
||||
|
||||
// do it again, but with a fresh cache entry - we should see a missing sidecar
|
||||
newEntry := &cacheEntry{}
|
||||
dbidx, err = as.persisted(ctx, blk.Root(), newEntry)
|
||||
require.NoError(t, err)
|
||||
missing, err = dbidx.missing(expectedCommitments)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, len(missing))
|
||||
// only element in missing should be the zero index
|
||||
require.Equal(t, uint64(0), missing[0])
|
||||
}
|
||||
|
||||
func TestLazilyPersistent_DBFallback(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
blk, scs := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 1, 3)
|
||||
// Generate the same sidecars index 0 so we can mess with its commitment
|
||||
_, scscp := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 1, 1)
|
||||
db := &mockBlobsDB{
|
||||
BlobSidecarsByRootCallback: func(ctx context.Context, beaconBlockRoot [32]byte, indices ...uint64) ([]*ethpb.BlobSidecar, error) {
|
||||
return scs, nil
|
||||
},
|
||||
}
|
||||
vf := &mockDA{
|
||||
t: t,
|
||||
err: errors.New("kzg check should not run"),
|
||||
}
|
||||
as := NewLazilyPersistentStore(db)
|
||||
as.verifyKZG = vf.expectedArguments
|
||||
|
||||
// Set up the mismatched commit, but we don't expect this to error because
|
||||
// the db contains the sidecars.
|
||||
scscp[0].BlockRoot = scs[0].BlockRoot
|
||||
scscp[0].KzgCommitment = bytesutil.PadTo([]byte("nope"), 48)
|
||||
_, err := as.Persist(ctx, 1, scscp[0])
|
||||
require.NoError(t, err)
|
||||
|
||||
// This should pass since the db is giving us all the right sidecars
|
||||
require.NoError(t, as.IsDataAvailable(ctx, 1, blk))
|
||||
|
||||
// now using an empty db, we should fail
|
||||
as.db = &mockBlobsDB{}
|
||||
|
||||
// but we should have pruned, so we'll get a missing error, not mismatch
|
||||
err = as.IsDataAvailable(ctx, 1, blk)
|
||||
require.NotNil(t, err)
|
||||
missingErr := MissingIndicesError{}
|
||||
require.Equal(t, true, errors.As(err, &missingErr))
|
||||
require.DeepEqual(t, []uint64{0, 1, 2}, missingErr.Missing())
|
||||
|
||||
// put the bad value back in the cache
|
||||
persisted, err := as.Persist(ctx, 1, scscp[0])
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, len(persisted))
|
||||
// now we'll get a mismatch error
|
||||
err = as.IsDataAvailable(ctx, 1, blk)
|
||||
require.NotNil(t, err)
|
||||
mismatchErr := CommitmentMismatchError{}
|
||||
require.Equal(t, true, errors.As(err, &mismatchErr))
|
||||
require.DeepEqual(t, []uint64{0}, mismatchErr.Mismatch())
|
||||
}
|
||||
|
||||
func TestLazyPersistOnceCommitted(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
_, scs := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 1, 6)
|
||||
as := NewLazilyPersistentStore(&mockBlobsDB{})
|
||||
// stashes as expected
|
||||
p, err := as.Persist(ctx, 1, scs...)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 6, len(p))
|
||||
// ignores duplicates
|
||||
p, err = as.Persist(ctx, 1, scs...)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, len(p))
|
||||
|
||||
// ignores index out of bound
|
||||
scs[0].Index = 6
|
||||
p, err = as.Persist(ctx, 1, scs[0])
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, len(p))
|
||||
|
||||
_, more := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 1, 4)
|
||||
// ignores sidecars before the retention period
|
||||
slotOOB, err := slots.EpochStart(params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest)
|
||||
require.NoError(t, err)
|
||||
p, err = as.Persist(ctx, 32+slotOOB, more[0])
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, len(p))
|
||||
|
||||
// doesn't ignore new sidecars with a different block root
|
||||
p, err = as.Persist(ctx, 1, more...)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 4, len(p))
|
||||
}
|
||||
93
beacon-chain/das/blocking.go
Normal file
93
beacon-chain/das/blocking.go
Normal file
@@ -0,0 +1,93 @@
|
||||
package das
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
errors "github.com/pkg/errors"
|
||||
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
)
|
||||
|
||||
// AsyncStore wraps NewCachingDBVerifiedStore and blocks until IsDataAvailable is ready.
|
||||
// If the context given to IsDataAvailable is cancelled, the result of IsDataAvailable will be ctx.Error().
|
||||
type AsyncStore struct {
|
||||
notif *idxNotifiers
|
||||
s AvailabilityStore
|
||||
}
|
||||
|
||||
func (bs *AsyncStore) PersistOnceCommitted(ctx context.Context, current primitives.Slot, sc ...*ethpb.BlobSidecar) ([]*ethpb.BlobSidecar, error) {
|
||||
if len(sc) < 1 {
|
||||
return nil, nil
|
||||
}
|
||||
seen := bs.notif.ensure(keyFromSidecar(sc[0]))
|
||||
persisted, err := bs.s.Persist(ctx, current, sc...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i := range persisted {
|
||||
seen <- persisted[i].Index
|
||||
}
|
||||
return persisted, nil
|
||||
}
|
||||
|
||||
func (bs *AsyncStore) IsDataAvailable(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error {
|
||||
key := keyFromBlock(b)
|
||||
for {
|
||||
err := bs.s.IsDataAvailable(ctx, current, b)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
mie := MissingIndicesError{}
|
||||
if !errors.As(err, &mie) {
|
||||
return err
|
||||
}
|
||||
waitFor := make(map[uint64]struct{})
|
||||
for _, m := range mie.Missing() {
|
||||
waitFor[m] = struct{}{}
|
||||
}
|
||||
if err := waitForIndices(ctx, bs.notif.ensure(key), waitFor); err != nil {
|
||||
return err
|
||||
}
|
||||
bs.notif.reset(key)
|
||||
}
|
||||
}
|
||||
|
||||
func waitForIndices(ctx context.Context, idxSeen chan uint64, waitFor map[uint64]struct{}) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case idx := <-idxSeen:
|
||||
delete(waitFor, idx)
|
||||
if len(waitFor) == 0 {
|
||||
return nil
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type idxNotifiers struct {
|
||||
sync.RWMutex
|
||||
entries map[cacheKey]chan uint64
|
||||
}
|
||||
|
||||
func (c *idxNotifiers) ensure(key cacheKey) chan uint64 {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
e, ok := c.entries[key]
|
||||
if !ok {
|
||||
e = make(chan uint64, fieldparams.MaxBlobsPerBlock)
|
||||
c.entries[key] = e
|
||||
}
|
||||
return e
|
||||
}
|
||||
|
||||
func (c *idxNotifiers) reset(key cacheKey) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
delete(c.entries, key)
|
||||
}
|
||||
184
beacon-chain/das/cache.go
Normal file
184
beacon-chain/das/cache.go
Normal file
@@ -0,0 +1,184 @@
|
||||
package das
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
errors "github.com/pkg/errors"
|
||||
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
var (
|
||||
errDBCommitmentMismatch = errors.New("blob/block commitment mismatch")
|
||||
)
|
||||
|
||||
// cacheKey includes the slot so that we can easily iterate through the cache and compare
|
||||
// slots for eviction purposes. Whether the input is the block or the sidecar, we always have
|
||||
// the root+slot when interacting with the cache, so it isn't an inconvenience to use both.
|
||||
type cacheKey struct {
|
||||
slot primitives.Slot
|
||||
root [32]byte
|
||||
}
|
||||
|
||||
type cache struct {
|
||||
sync.RWMutex
|
||||
entries map[cacheKey]*cacheEntry
|
||||
}
|
||||
|
||||
func newCache() *cache {
|
||||
return &cache{entries: make(map[cacheKey]*cacheEntry)}
|
||||
}
|
||||
|
||||
// keyFromSidecar is a convenience method for constructing a cacheKey from a BlobSidecar value.
|
||||
func keyFromSidecar(sc *ethpb.BlobSidecar) cacheKey {
|
||||
return cacheKey{slot: sc.Slot, root: bytesutil.ToBytes32(sc.BlockRoot)}
|
||||
}
|
||||
|
||||
// keyFromBlock is a convenience method for constructing a cacheKey from a ROBlock value.
|
||||
func keyFromBlock(b blocks.ROBlock) cacheKey {
|
||||
return cacheKey{slot: b.Block().Slot(), root: b.Root()}
|
||||
}
|
||||
|
||||
// ensure returns the entry for the given key, creating it if it isn't already present.
|
||||
func (c *cache) ensure(key cacheKey) *cacheEntry {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
e, ok := c.entries[key]
|
||||
if !ok {
|
||||
e = &cacheEntry{}
|
||||
c.entries[key] = e
|
||||
}
|
||||
return e
|
||||
}
|
||||
|
||||
// delete removes the cache entry from the cache.
|
||||
func (c *cache) delete(key cacheKey) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
delete(c.entries, key)
|
||||
}
|
||||
|
||||
// dbidx is a compact representation of the set of BlobSidecars in the database for a given root,
|
||||
// organized as a map from BlobSidecar.Index->BlobSidecar.KzgCommitment.
|
||||
// This representation is convenient for comparison to a block's commitments.
|
||||
type dbidx [fieldparams.MaxBlobsPerBlock]*[48]byte
|
||||
|
||||
// missing compares the set of BlobSidecars observed in the backing store to the set of commitments
|
||||
// observed in a block - cmts is the BlobKzgCommitments field from a block.
|
||||
func (idx dbidx) missing(cmts [][]byte) ([]uint64, error) {
|
||||
m := make([]uint64, 0, len(cmts))
|
||||
for i := range cmts {
|
||||
if idx[i] == nil {
|
||||
m = append(m, uint64(i))
|
||||
continue
|
||||
}
|
||||
c := *idx[i]
|
||||
if c != bytesutil.ToBytes48(cmts[i]) {
|
||||
return nil, errors.Wrapf(errDBCommitmentMismatch,
|
||||
"index=%d, db=%#x, block=%#x", i, c, cmts[i])
|
||||
}
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// cacheEntry represents 2 different types of caches for a given block.
|
||||
// scs is a fixed-length cache of BlobSidecars.
|
||||
// dbx is a compact representation of BlobSidecars observed in the backing store.
|
||||
// dbx assumes that all writes to the backing store go through the same cache.
|
||||
type cacheEntry struct {
|
||||
sync.RWMutex
|
||||
scs [fieldparams.MaxBlobsPerBlock]*ethpb.BlobSidecar
|
||||
dbx dbidx
|
||||
dbRead bool
|
||||
}
|
||||
|
||||
// stash adds an item to the in-memory cache of BlobSidecars.
|
||||
// Only the first BlobSidecar of a given Index will be kept in the cache.
|
||||
// The return value represents whether the given BlobSidecar was stashed.
|
||||
// A false value means there was already a BlobSidecar with the given Index.
|
||||
func (e *cacheEntry) stash(sc *ethpb.BlobSidecar) bool {
|
||||
if e.scs[sc.Index] == nil {
|
||||
e.scs[sc.Index] = sc
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (e *cacheEntry) dbidx() dbidx {
|
||||
return e.dbx
|
||||
}
|
||||
|
||||
func (e *cacheEntry) dbidxInitialized() bool {
|
||||
return e.dbRead
|
||||
}
|
||||
|
||||
// filter evicts sidecars that are not commited to by the block and returns custom
|
||||
// errors if the cache is missing any of the commitments, or if the commitments in
|
||||
// the cache do not match those found in the block. If err is nil, then all expected
|
||||
// commitments were found in the cache and the sidecar slice return value can be used
|
||||
// to perform a DA check against the cached sidecars.
|
||||
func (e *cacheEntry) filter(root [32]byte, blkCmts [][]byte) ([]*ethpb.BlobSidecar, error) {
|
||||
// Evict any blobs that are out of range.
|
||||
for i := len(blkCmts); i < fieldparams.MaxBlobsPerBlock; i++ {
|
||||
if e.scs[i] == nil {
|
||||
continue
|
||||
}
|
||||
log.WithField("block_root", root).
|
||||
WithField("index", i).
|
||||
WithField("cached_commitment", fmt.Sprintf("%#x", e.scs[i].KzgCommitment)).
|
||||
Warn("Evicting BlobSidecar with index > maximum blob commitment")
|
||||
e.scs[i] = nil
|
||||
}
|
||||
// Generate a MissingIndicesError for any missing indices.
|
||||
// Generate a CommitmentMismatchError for any mismatched commitments.
|
||||
missing := make([]uint64, 0, len(blkCmts))
|
||||
mismatch := make([]uint64, 0, len(blkCmts))
|
||||
for i := range blkCmts {
|
||||
if e.scs[i] == nil {
|
||||
missing = append(missing, uint64(i))
|
||||
continue
|
||||
}
|
||||
if !bytes.Equal(blkCmts[i], e.scs[i].KzgCommitment) {
|
||||
mismatch = append(mismatch, uint64(i))
|
||||
log.WithField("block_root", root).
|
||||
WithField("index", i).
|
||||
WithField("expected_commitment", fmt.Sprintf("%#x", blkCmts[i])).
|
||||
WithField("cached_commitment", fmt.Sprintf("%#x", e.scs[i].KzgCommitment)).
|
||||
Error("Evicting BlobSidecar with incorrect commitment")
|
||||
e.scs[i] = nil
|
||||
continue
|
||||
}
|
||||
}
|
||||
if len(mismatch) > 0 {
|
||||
return nil, NewCommitmentMismatchError(mismatch)
|
||||
}
|
||||
if len(missing) > 0 {
|
||||
return nil, NewMissingIndicesError(missing)
|
||||
}
|
||||
return e.scs[0:len(blkCmts)], nil
|
||||
}
|
||||
|
||||
// ensureDbidx updates the db cache representation to include the given BlobSidecars.
|
||||
func (e *cacheEntry) ensureDbidx(scs ...*ethpb.BlobSidecar) dbidx {
|
||||
if e.dbRead == false {
|
||||
e.dbRead = true
|
||||
}
|
||||
for i := range scs {
|
||||
if scs[i].Index >= fieldparams.MaxBlobsPerBlock {
|
||||
continue
|
||||
}
|
||||
// Don't overwrite.
|
||||
if e.dbx[scs[i].Index] != nil {
|
||||
continue
|
||||
}
|
||||
c := bytesutil.ToBytes48(scs[i].KzgCommitment)
|
||||
e.dbx[scs[i].Index] = &c
|
||||
}
|
||||
return e.dbx
|
||||
}
|
||||
122
beacon-chain/das/cache_test.go
Normal file
122
beacon-chain/das/cache_test.go
Normal file
@@ -0,0 +1,122 @@
|
||||
package das
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v4/testing/require"
|
||||
)
|
||||
|
||||
func TestCacheEnsureDelete(t *testing.T) {
|
||||
c := newCache()
|
||||
require.Equal(t, 0, len(c.entries))
|
||||
root := bytesutil.ToBytes32([]byte("root"))
|
||||
slot := primitives.Slot(1234)
|
||||
k := cacheKey{root: root, slot: slot}
|
||||
entry := c.ensure(k)
|
||||
require.Equal(t, 1, len(c.entries))
|
||||
require.Equal(t, c.entries[k], entry)
|
||||
|
||||
c.delete(k)
|
||||
require.Equal(t, 0, len(c.entries))
|
||||
var nilEntry *cacheEntry
|
||||
require.Equal(t, nilEntry, c.entries[k])
|
||||
}
|
||||
|
||||
func TestNewEntry(t *testing.T) {
|
||||
entry := &cacheEntry{}
|
||||
require.Equal(t, false, entry.dbidxInitialized())
|
||||
entry.ensureDbidx()
|
||||
require.Equal(t, true, entry.dbidxInitialized())
|
||||
}
|
||||
|
||||
func TestDbidxBounds(t *testing.T) {
|
||||
scs := generateMinimalBlobSidecars(2)
|
||||
entry := &cacheEntry{}
|
||||
entry.ensureDbidx(scs...)
|
||||
//require.Equal(t, 2, len(entry.dbidx()))
|
||||
for i := range scs {
|
||||
require.Equal(t, bytesutil.ToBytes48(scs[i].KzgCommitment), *entry.dbidx()[i])
|
||||
}
|
||||
|
||||
var nilPtr *[48]byte
|
||||
// test that duplicate sidecars are ignored
|
||||
orig := entry.dbidx()
|
||||
copy(scs[0].KzgCommitment[0:4], []byte("derp"))
|
||||
edited := bytesutil.ToBytes48(scs[0].KzgCommitment)
|
||||
require.Equal(t, false, *entry.dbidx()[0] == edited)
|
||||
entry.ensureDbidx(scs[0])
|
||||
for i := 2; i < fieldparams.MaxBlobsPerBlock; i++ {
|
||||
require.Equal(t, entry.dbidx()[i], nilPtr)
|
||||
}
|
||||
require.Equal(t, entry.dbidx(), orig)
|
||||
|
||||
// test that excess sidecars are discarded
|
||||
oob := generateMinimalBlobSidecars(fieldparams.MaxBlobsPerBlock + 1)
|
||||
entry = &cacheEntry{}
|
||||
entry.ensureDbidx(oob...)
|
||||
require.Equal(t, fieldparams.MaxBlobsPerBlock, len(entry.dbidx()))
|
||||
}
|
||||
|
||||
func TestDbidxMissing(t *testing.T) {
|
||||
scs := generateMinimalBlobSidecars(6)
|
||||
missing := []uint64{0, 1, 2, 3, 4, 5}
|
||||
blockCommits := commitsForSidecars(scs)
|
||||
cases := []struct {
|
||||
name string
|
||||
nMissing int
|
||||
err error
|
||||
}{
|
||||
{
|
||||
name: "all missing",
|
||||
nMissing: len(scs),
|
||||
},
|
||||
{
|
||||
name: "none missing",
|
||||
nMissing: 0,
|
||||
},
|
||||
{
|
||||
name: "2 missing",
|
||||
nMissing: 2,
|
||||
},
|
||||
{
|
||||
name: "3 missing",
|
||||
nMissing: 3,
|
||||
},
|
||||
}
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
l := len(scs)
|
||||
entry := &cacheEntry{}
|
||||
d := entry.ensureDbidx(scs[0 : l-c.nMissing]...)
|
||||
m, err := d.missing(blockCommits)
|
||||
if c.err == nil {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.DeepEqual(t, m, missing[l-c.nMissing:])
|
||||
require.Equal(t, c.nMissing, len(m))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func commitsForSidecars(scs []*ethpb.BlobSidecar) [][]byte {
|
||||
m := make([][]byte, len(scs))
|
||||
for i := range scs {
|
||||
m[i] = scs[i].KzgCommitment
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
func generateMinimalBlobSidecars(n int) []*ethpb.BlobSidecar {
|
||||
scs := make([]*ethpb.BlobSidecar, n)
|
||||
for i := 0; i < n; i++ {
|
||||
scs[i] = ðpb.BlobSidecar{
|
||||
Index: uint64(i),
|
||||
KzgCommitment: bytesutil.PadTo([]byte{byte(i)}, 48),
|
||||
}
|
||||
}
|
||||
return scs
|
||||
}
|
||||
76
beacon-chain/das/error.go
Normal file
76
beacon-chain/das/error.go
Normal file
@@ -0,0 +1,76 @@
|
||||
package das
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
errors "github.com/pkg/errors"
|
||||
)
|
||||
|
||||
var (
|
||||
errDAIncomplete = errors.New("some BlobSidecars are not available at this time")
|
||||
errDAEquivocated = errors.New("cache contains BlobSidecars that do not match block commitments")
|
||||
errMixedRoots = errors.New("BlobSidecars must all be for the same block")
|
||||
)
|
||||
|
||||
// The following errors are exported so that gossip verification can use errors.Is to determine the correct pubsub.ValidationResult.
|
||||
var (
|
||||
// ErrInvalidInclusionProof is returned when the inclusion proof check on the BlobSidecar fails.
|
||||
ErrInvalidInclusionProof = errors.New("BlobSidecar inclusion proof is invalid")
|
||||
// ErrInvalidBlockSignature is returned when the BlobSidecar.SignedBeaconBlockHeader signature cannot be verified against the block root.
|
||||
ErrInvalidBlockSignature = errors.New("SignedBeaconBlockHeader signature could not verified")
|
||||
// ErrInvalidCommitment is returned when the kzg_commitment cannot be verified against the kzg_proof and blob.
|
||||
ErrInvalidCommitment = errors.New("BlobSidecar.kzg_commitment verification failed")
|
||||
)
|
||||
|
||||
func NewMissingIndicesError(missing []uint64) MissingIndicesError {
|
||||
return MissingIndicesError{indices: missing}
|
||||
}
|
||||
|
||||
type MissingIndicesError struct {
|
||||
indices []uint64
|
||||
}
|
||||
|
||||
var _ error = MissingIndicesError{}
|
||||
|
||||
func (m MissingIndicesError) Error() string {
|
||||
is := make([]string, 0, len(m.indices))
|
||||
for i := range m.indices {
|
||||
is = append(is, fmt.Sprintf("%d", m.indices[i]))
|
||||
}
|
||||
return fmt.Sprintf("%s at indices %s", errDAIncomplete.Error(), strings.Join(is, ","))
|
||||
}
|
||||
|
||||
func (m MissingIndicesError) Missing() []uint64 {
|
||||
return m.indices
|
||||
}
|
||||
|
||||
func (m MissingIndicesError) Unwrap() error {
|
||||
return errDAIncomplete
|
||||
}
|
||||
|
||||
func NewCommitmentMismatchError(mismatch []uint64) CommitmentMismatchError {
|
||||
return CommitmentMismatchError{mismatch: mismatch}
|
||||
}
|
||||
|
||||
type CommitmentMismatchError struct {
|
||||
mismatch []uint64
|
||||
}
|
||||
|
||||
var _ error = CommitmentMismatchError{}
|
||||
|
||||
func (m CommitmentMismatchError) Error() string {
|
||||
is := make([]string, 0, len(m.mismatch))
|
||||
for i := range m.mismatch {
|
||||
is = append(is, fmt.Sprintf("%d", m.mismatch[i]))
|
||||
}
|
||||
return fmt.Sprintf("%s at indices %s", errDAEquivocated.Error(), strings.Join(is, ","))
|
||||
}
|
||||
|
||||
func (m CommitmentMismatchError) Mismatch() []uint64 {
|
||||
return m.mismatch
|
||||
}
|
||||
|
||||
func (m CommitmentMismatchError) Unwrap() error {
|
||||
return errDAEquivocated
|
||||
}
|
||||
22
beacon-chain/das/iface.go
Normal file
22
beacon-chain/das/iface.go
Normal file
@@ -0,0 +1,22 @@
|
||||
package das
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
)
|
||||
|
||||
// BlobsDB specifies the persistence store methods needed by the AvailabilityStore.
|
||||
type BlobsDB interface {
|
||||
BlobSidecarsByRoot(ctx context.Context, beaconBlockRoot [32]byte, indices ...uint64) ([]*ethpb.BlobSidecar, error)
|
||||
SaveBlobSidecar(ctx context.Context, sidecars []*ethpb.BlobSidecar) error
|
||||
}
|
||||
|
||||
// AvailabilityStore describes a component that can verify and save sidecars for a given block, and confirm previously
|
||||
// verified and saved sidecars.
|
||||
type AvailabilityStore interface {
|
||||
IsDataAvailable(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error
|
||||
Persist(ctx context.Context, current primitives.Slot, sc ...*ethpb.BlobSidecar) ([]*ethpb.BlobSidecar, error)
|
||||
}
|
||||
51
beacon-chain/das/mock.go
Normal file
51
beacon-chain/das/mock.go
Normal file
@@ -0,0 +1,51 @@
|
||||
package das
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
)
|
||||
|
||||
type MockAvailabilityStore struct {
|
||||
VerifyAvailabilityCallback func(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error
|
||||
PersistBlobsCallback func(ctx context.Context, current primitives.Slot, sc ...*ethpb.BlobSidecar) ([]*ethpb.BlobSidecar, error)
|
||||
}
|
||||
|
||||
var _ AvailabilityStore = &MockAvailabilityStore{}
|
||||
|
||||
func (m *MockAvailabilityStore) IsDataAvailable(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error {
|
||||
if m.VerifyAvailabilityCallback != nil {
|
||||
return m.VerifyAvailabilityCallback(ctx, current, b)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MockAvailabilityStore) Persist(ctx context.Context, current primitives.Slot, sc ...*ethpb.BlobSidecar) ([]*ethpb.BlobSidecar, error) {
|
||||
if m.PersistBlobsCallback != nil {
|
||||
return m.PersistBlobsCallback(ctx, current, sc...)
|
||||
}
|
||||
return sc, nil
|
||||
}
|
||||
|
||||
type mockBlobsDB struct {
|
||||
BlobSidecarsByRootCallback func(ctx context.Context, root [32]byte, indices ...uint64) ([]*ethpb.BlobSidecar, error)
|
||||
SaveBlobSidecarCallback func(ctx context.Context, sidecars []*ethpb.BlobSidecar) error
|
||||
}
|
||||
|
||||
var _ BlobsDB = &mockBlobsDB{}
|
||||
|
||||
func (b *mockBlobsDB) BlobSidecarsByRoot(ctx context.Context, root [32]byte, indices ...uint64) ([]*ethpb.BlobSidecar, error) {
|
||||
if b.BlobSidecarsByRootCallback != nil {
|
||||
return b.BlobSidecarsByRootCallback(ctx, root, indices...)
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (b *mockBlobsDB) SaveBlobSidecar(ctx context.Context, sidecars []*ethpb.BlobSidecar) error {
|
||||
if b.SaveBlobSidecarCallback != nil {
|
||||
return b.SaveBlobSidecarCallback(ctx, sidecars)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -207,6 +207,37 @@ func (s *Store) BlobSidecarsByRoot(ctx context.Context, root [32]byte, indices .
|
||||
return filterForIndices(sc, indices...)
|
||||
}
|
||||
|
||||
func (s *Store) BlobIndicesAvailable(ctx context.Context, root [32]byte) ([]uint64, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "BeaconDB.BlobIndicesAvailable")
|
||||
defer span.End()
|
||||
|
||||
var enc []byte
|
||||
if err := s.db.View(func(tx *bolt.Tx) error {
|
||||
c := tx.Bucket(blobsBucket).Cursor()
|
||||
for k, v := c.First(); k != nil; k, v = c.Next() {
|
||||
if bytes.HasSuffix(k, root[:]) {
|
||||
enc = v
|
||||
break
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if enc == nil {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
scs := ðpb.BlobSidecars{}
|
||||
if err := decode(ctx, enc, scs); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
idxs := make([]uint64, len(scs.Sidecars))
|
||||
for i := range scs.Sidecars {
|
||||
idxs[i] = scs.Sidecars[i].Index
|
||||
}
|
||||
return idxs, nil
|
||||
}
|
||||
|
||||
func filterForIndices(sc *ethpb.BlobSidecars, indices ...uint64) ([]*ethpb.BlobSidecar, error) {
|
||||
if len(indices) == 0 {
|
||||
return sc.Sidecars, nil
|
||||
|
||||
@@ -245,8 +245,12 @@ func New(cliCtx *cli.Context, opts ...Option) (*BeaconNode, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.Debugln("Registering Blockchain Service")
|
||||
if err := beacon.registerBlockchainService(beacon.forkChoicer, synchronizer, beacon.initialSyncComplete); err != nil {
|
||||
bcOpts := []blockchain.Option{
|
||||
blockchain.WithForkChoiceStore(beacon.forkChoicer),
|
||||
blockchain.WithClockSynchronizer(synchronizer),
|
||||
blockchain.WithSyncComplete(beacon.initialSyncComplete),
|
||||
}
|
||||
if err := beacon.registerBlockchainService(bcOpts); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -607,7 +611,8 @@ func (b *BeaconNode) registerAttestationPool() error {
|
||||
return b.services.RegisterService(s)
|
||||
}
|
||||
|
||||
func (b *BeaconNode) registerBlockchainService(fc forkchoice.ForkChoicer, gs *startup.ClockSynchronizer, syncComplete chan struct{}) error {
|
||||
func (b *BeaconNode) registerBlockchainService(required []blockchain.Option) error {
|
||||
log.Debugln("Registering Blockchain Service")
|
||||
var web3Service *execution.Service
|
||||
if err := b.services.FetchService(&web3Service); err != nil {
|
||||
return err
|
||||
@@ -618,10 +623,9 @@ func (b *BeaconNode) registerBlockchainService(fc forkchoice.ForkChoicer, gs *st
|
||||
return err
|
||||
}
|
||||
|
||||
opts := append(b.serviceFlagOpts.blockchainFlagOpts, required...)
|
||||
// skipcq: CRT-D0001
|
||||
opts := append(
|
||||
b.serviceFlagOpts.blockchainFlagOpts,
|
||||
blockchain.WithForkChoiceStore(fc),
|
||||
opts = append(opts,
|
||||
blockchain.WithDatabase(b.db),
|
||||
blockchain.WithDepositCache(b.depositCache),
|
||||
blockchain.WithChainStartFetcher(web3Service),
|
||||
@@ -637,8 +641,6 @@ func (b *BeaconNode) registerBlockchainService(fc forkchoice.ForkChoicer, gs *st
|
||||
blockchain.WithSlasherAttestationsFeed(b.slasherAttestationsFeed),
|
||||
blockchain.WithFinalizedStateAtStartUp(b.finalizedStateAtStartUp),
|
||||
blockchain.WithProposerIdsCache(b.proposerIdsCache),
|
||||
blockchain.WithClockSynchronizer(gs),
|
||||
blockchain.WithSyncComplete(syncComplete),
|
||||
)
|
||||
|
||||
blockchainService, err := blockchain.NewService(b.ctx, opts...)
|
||||
|
||||
@@ -21,6 +21,7 @@ go_library(
|
||||
"//beacon-chain/core/feed/block:go_default_library",
|
||||
"//beacon-chain/core/feed/state:go_default_library",
|
||||
"//beacon-chain/core/transition:go_default_library",
|
||||
"//beacon-chain/das:go_default_library",
|
||||
"//beacon-chain/db:go_default_library",
|
||||
"//beacon-chain/p2p:go_default_library",
|
||||
"//beacon-chain/p2p/peers/scorers:go_default_library",
|
||||
@@ -112,6 +113,7 @@ go_test(
|
||||
deps = [
|
||||
"//async/abool:go_default_library",
|
||||
"//beacon-chain/blockchain/testing:go_default_library",
|
||||
"//beacon-chain/das:go_default_library",
|
||||
"//beacon-chain/db:go_default_library",
|
||||
"//beacon-chain/db/testing:go_default_library",
|
||||
"//beacon-chain/p2p:go_default_library",
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"github.com/paulbellamy/ratecounter"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/das"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/sync"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
|
||||
@@ -27,7 +28,7 @@ const (
|
||||
type blockReceiverFn func(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte) error
|
||||
|
||||
// batchBlockReceiverFn defines batch receiving function.
|
||||
type batchBlockReceiverFn func(ctx context.Context, blks []blocks.ROBlock) error
|
||||
type batchBlockReceiverFn func(ctx context.Context, blks []blocks.ROBlock, avs das.AvailabilityStore) error
|
||||
|
||||
// Round Robin sync looks at the latest peer statuses and syncs up to the highest known epoch.
|
||||
//
|
||||
@@ -321,25 +322,15 @@ func (s *Service) processBatchedBlocks(ctx context.Context, genesis time.Time,
|
||||
errParentDoesNotExist, first.Block().ParentRoot(), first.Block().Slot())
|
||||
}
|
||||
s.logBatchSyncStatus(genesis, first, len(bwb))
|
||||
blobCount := 0
|
||||
avs := das.NewLazilyPersistentStore(s.cfg.DB)
|
||||
for _, bb := range bwb {
|
||||
if len(bb.Blobs) == 0 {
|
||||
continue
|
||||
_, err = avs.Persist(ctx, bb.Block.Block().Slot(), bb.Blobs...)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error verifying or persisting BlobSidecars")
|
||||
}
|
||||
if err := s.cfg.DB.SaveBlobSidecar(ctx, bb.Blobs); err != nil {
|
||||
return errors.Wrapf(err, "failed to save blobs for block %#x", bb.Block.Root())
|
||||
}
|
||||
blobCount += len(bb.Blobs)
|
||||
}
|
||||
if blobCount > 0 {
|
||||
log.WithFields(logrus.Fields{
|
||||
"startSlot": bwb[0].Block.Block().Slot(),
|
||||
"endSlot": bwb[len(bwb)-1].Block.Block().Slot(),
|
||||
"count": blobCount,
|
||||
}).Info("Processed blob sidecars")
|
||||
}
|
||||
|
||||
return bFunc(ctx, blocks.BlockWithVerifiedBlobsSlice(bwb).ROBlocks())
|
||||
return bFunc(ctx, blocks.BlockWithVerifiedBlobsSlice(bwb).ROBlocks(), avs)
|
||||
}
|
||||
|
||||
// updatePeerScorerStats adjusts monitored metrics for a peer.
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/paulbellamy/ratecounter"
|
||||
"github.com/prysmaticlabs/prysm/v4/async/abool"
|
||||
mock "github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain/testing"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/das"
|
||||
dbtest "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/testing"
|
||||
p2pt "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/startup"
|
||||
@@ -464,15 +465,15 @@ func TestService_processBlockBatch(t *testing.T) {
|
||||
currBlockRoot = blk1Root
|
||||
}
|
||||
|
||||
cbnormal := func(ctx context.Context, blks []blocks.ROBlock) error {
|
||||
assert.NoError(t, s.cfg.Chain.ReceiveBlockBatch(ctx, blks))
|
||||
cbnormal := func(ctx context.Context, blks []blocks.ROBlock, avs das.AvailabilityStore) error {
|
||||
assert.NoError(t, s.cfg.Chain.ReceiveBlockBatch(ctx, blks, avs))
|
||||
return nil
|
||||
}
|
||||
// Process block normally.
|
||||
err = s.processBatchedBlocks(ctx, genesis, batch, cbnormal)
|
||||
assert.NoError(t, err)
|
||||
|
||||
cbnil := func(ctx context.Context, blocks []blocks.ROBlock) error {
|
||||
cbnil := func(ctx context.Context, blocks []blocks.ROBlock, avs das.AvailabilityStore) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain"
|
||||
blockfeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/block"
|
||||
statefeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/state"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/das"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/startup"
|
||||
@@ -41,6 +42,7 @@ type Config struct {
|
||||
BlockNotifier blockfeed.Notifier
|
||||
ClockWaiter startup.ClockWaiter
|
||||
InitialSyncComplete chan struct{}
|
||||
AVS das.AvailabilityStore
|
||||
}
|
||||
|
||||
// Service service.
|
||||
|
||||
Reference in New Issue
Block a user