mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 13:28:01 -05:00
Compare commits
14 Commits
c6c9414d8b
...
v5.0.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a1a81d1720 | ||
|
|
3d1d5863f5 | ||
|
|
89e21733e9 | ||
|
|
6ec456d836 | ||
|
|
04a5b2e3c2 | ||
|
|
147f48c5c5 | ||
|
|
fdc4ea5bc2 | ||
|
|
536b469d28 | ||
|
|
66f55556a7 | ||
|
|
4083fe0d1d | ||
|
|
6cb6e47339 | ||
|
|
e006ba155a | ||
|
|
b7b017f5b6 | ||
|
|
0c897045a2 |
@@ -113,6 +113,13 @@ http_archive(
|
||||
url = "https://github.com/GoogleContainerTools/distroless/archive/9dc924b9fe812eec2fa0061824dcad39eb09d0d6.tar.gz", # 2024-01-24
|
||||
)
|
||||
|
||||
http_archive(
|
||||
name = "aspect_bazel_lib",
|
||||
sha256 = "f5ea76682b209cc0bd90d0f5a3b26d2f7a6a2885f0c5f615e72913f4805dbb0d",
|
||||
strip_prefix = "bazel-lib-2.5.0",
|
||||
url = "https://github.com/aspect-build/bazel-lib/releases/download/v2.5.0/bazel-lib-v2.5.0.tar.gz",
|
||||
)
|
||||
|
||||
load("@aspect_bazel_lib//lib:repositories.bzl", "aspect_bazel_lib_dependencies", "aspect_bazel_lib_register_toolchains")
|
||||
|
||||
aspect_bazel_lib_dependencies()
|
||||
|
||||
@@ -307,16 +307,16 @@ func (s *Service) updateEpochBoundaryCaches(ctx context.Context, st state.Beacon
|
||||
if err := helpers.UpdateProposerIndicesInCache(ctx, st, e); err != nil {
|
||||
return errors.Wrap(err, "could not update proposer index cache")
|
||||
}
|
||||
go func() {
|
||||
go func(ep primitives.Epoch) {
|
||||
// Use a custom deadline here, since this method runs asynchronously.
|
||||
// We ignore the parent method's context and instead create a new one
|
||||
// with a custom deadline, therefore using the background context instead.
|
||||
slotCtx, cancel := context.WithTimeout(context.Background(), slotDeadline)
|
||||
defer cancel()
|
||||
if err := helpers.UpdateCommitteeCache(slotCtx, st, e+1); err != nil {
|
||||
if err := helpers.UpdateCommitteeCache(slotCtx, st, ep+1); err != nil {
|
||||
log.WithError(err).Warn("Could not update committee cache")
|
||||
}
|
||||
}()
|
||||
}(e)
|
||||
// The latest block header is from the previous epoch
|
||||
r, err := st.LatestBlockHeader().HashTreeRoot()
|
||||
if err != nil {
|
||||
|
||||
@@ -2114,7 +2114,7 @@ func TestMissingIndices(t *testing.T) {
|
||||
for _, c := range cases {
|
||||
bm, bs := filesystem.NewEphemeralBlobStorageWithMocker(t)
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
require.NoError(t, bm.CreateFakeIndices(c.root, c.present))
|
||||
require.NoError(t, bm.CreateFakeIndices(c.root, c.present...))
|
||||
missing, err := missingIndices(bs, c.root, c.expected)
|
||||
if c.err != nil {
|
||||
require.ErrorIs(t, err, c.err)
|
||||
|
||||
@@ -289,10 +289,18 @@ func (s *Service) StartFromSavedState(saved state.BeaconState) error {
|
||||
fRoot := s.ensureRootNotZeros(bytesutil.ToBytes32(finalized.Root))
|
||||
s.cfg.ForkChoiceStore.Lock()
|
||||
defer s.cfg.ForkChoiceStore.Unlock()
|
||||
if err := s.cfg.ForkChoiceStore.UpdateJustifiedCheckpoint(s.ctx, &forkchoicetypes.Checkpoint{Epoch: justified.Epoch,
|
||||
Root: bytesutil.ToBytes32(justified.Root)}); err != nil {
|
||||
return errors.Wrap(err, "could not update forkchoice's justified checkpoint")
|
||||
if params.BeaconConfig().ConfigName != params.PraterName {
|
||||
if err := s.cfg.ForkChoiceStore.UpdateJustifiedCheckpoint(s.ctx, &forkchoicetypes.Checkpoint{Epoch: justified.Epoch,
|
||||
Root: bytesutil.ToBytes32(justified.Root)}); err != nil {
|
||||
return errors.Wrap(err, "could not update forkchoice's justified checkpoint")
|
||||
}
|
||||
} else {
|
||||
if err := s.cfg.ForkChoiceStore.UpdateJustifiedCheckpoint(s.ctx, &forkchoicetypes.Checkpoint{Epoch: finalized.Epoch,
|
||||
Root: bytesutil.ToBytes32(finalized.Root)}); err != nil {
|
||||
return errors.Wrap(err, "could not update forkchoice's justified checkpoint")
|
||||
}
|
||||
}
|
||||
|
||||
if err := s.cfg.ForkChoiceStore.UpdateFinalizedCheckpoint(&forkchoicetypes.Checkpoint{Epoch: finalized.Epoch,
|
||||
Root: bytesutil.ToBytes32(finalized.Root)}); err != nil {
|
||||
return errors.Wrap(err, "could not update forkchoice's finalized checkpoint")
|
||||
|
||||
@@ -21,7 +21,10 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
errIndexOutOfBounds = errors.New("blob index in file name >= MaxBlobsPerBlock")
|
||||
errIndexOutOfBounds = errors.New("blob index in file name >= MaxBlobsPerBlock")
|
||||
errEmptyBlobWritten = errors.New("zero bytes written to disk when saving blob sidecar")
|
||||
errSidecarEmptySSZData = errors.New("sidecar marshalled to an empty ssz byte slice")
|
||||
errNoBasePath = errors.New("BlobStorage base path not specified in init")
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -34,14 +37,26 @@ const (
|
||||
// BlobStorageOption is a functional option for configuring a BlobStorage.
|
||||
type BlobStorageOption func(*BlobStorage) error
|
||||
|
||||
// WithBasePath is a required option that sets the base path of blob storage.
|
||||
func WithBasePath(base string) BlobStorageOption {
|
||||
return func(b *BlobStorage) error {
|
||||
b.base = base
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithBlobRetentionEpochs is an option that changes the number of epochs blobs will be persisted.
|
||||
func WithBlobRetentionEpochs(e primitives.Epoch) BlobStorageOption {
|
||||
return func(b *BlobStorage) error {
|
||||
pruner, err := newBlobPruner(b.fs, e)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.pruner = pruner
|
||||
b.retentionEpochs = e
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithSaveFsync is an option that causes Save to call fsync before renaming part files for improved durability.
|
||||
func WithSaveFsync(fsync bool) BlobStorageOption {
|
||||
return func(b *BlobStorage) error {
|
||||
b.fsync = fsync
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@@ -49,30 +64,36 @@ func WithBlobRetentionEpochs(e primitives.Epoch) BlobStorageOption {
|
||||
// NewBlobStorage creates a new instance of the BlobStorage object. Note that the implementation of BlobStorage may
|
||||
// attempt to hold a file lock to guarantee exclusive control of the blob storage directory, so this should only be
|
||||
// initialized once per beacon node.
|
||||
func NewBlobStorage(base string, opts ...BlobStorageOption) (*BlobStorage, error) {
|
||||
base = path.Clean(base)
|
||||
if err := file.MkdirAll(base); err != nil {
|
||||
return nil, fmt.Errorf("failed to create blob storage at %s: %w", base, err)
|
||||
}
|
||||
fs := afero.NewBasePathFs(afero.NewOsFs(), base)
|
||||
b := &BlobStorage{
|
||||
fs: fs,
|
||||
}
|
||||
func NewBlobStorage(opts ...BlobStorageOption) (*BlobStorage, error) {
|
||||
b := &BlobStorage{}
|
||||
for _, o := range opts {
|
||||
if err := o(b); err != nil {
|
||||
return nil, fmt.Errorf("failed to create blob storage at %s: %w", base, err)
|
||||
return nil, errors.Wrap(err, "failed to create blob storage")
|
||||
}
|
||||
}
|
||||
if b.pruner == nil {
|
||||
log.Warn("Initializing blob filesystem storage with pruning disabled")
|
||||
if b.base == "" {
|
||||
return nil, errNoBasePath
|
||||
}
|
||||
b.base = path.Clean(b.base)
|
||||
if err := file.MkdirAll(b.base); err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to create blob storage at %s", b.base)
|
||||
}
|
||||
b.fs = afero.NewBasePathFs(afero.NewOsFs(), b.base)
|
||||
pruner, err := newBlobPruner(b.fs, b.retentionEpochs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
b.pruner = pruner
|
||||
return b, nil
|
||||
}
|
||||
|
||||
// BlobStorage is the concrete implementation of the filesystem backend for saving and retrieving BlobSidecars.
|
||||
type BlobStorage struct {
|
||||
fs afero.Fs
|
||||
pruner *blobPruner
|
||||
base string
|
||||
retentionEpochs primitives.Epoch
|
||||
fsync bool
|
||||
fs afero.Fs
|
||||
pruner *blobPruner
|
||||
}
|
||||
|
||||
// WarmCache runs the prune routine with an expiration of slot of 0, so nothing will be pruned, but the pruner's cache
|
||||
@@ -111,11 +132,14 @@ func (bs *BlobStorage) Save(sidecar blocks.VerifiedROBlob) error {
|
||||
sidecarData, err := sidecar.MarshalSSZ()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to serialize sidecar data")
|
||||
} else if len(sidecarData) == 0 {
|
||||
return errSidecarEmptySSZData
|
||||
}
|
||||
|
||||
if err := bs.fs.MkdirAll(fname.dir(), directoryPermissions); err != nil {
|
||||
return err
|
||||
}
|
||||
partPath := fname.partPath()
|
||||
partPath := fname.partPath(fmt.Sprintf("%p", sidecarData))
|
||||
|
||||
partialMoved := false
|
||||
// Ensure the partial file is deleted.
|
||||
@@ -138,7 +162,7 @@ func (bs *BlobStorage) Save(sidecar blocks.VerifiedROBlob) error {
|
||||
return errors.Wrap(err, "failed to create partial file")
|
||||
}
|
||||
|
||||
_, err = partialFile.Write(sidecarData)
|
||||
n, err := partialFile.Write(sidecarData)
|
||||
if err != nil {
|
||||
closeErr := partialFile.Close()
|
||||
if closeErr != nil {
|
||||
@@ -146,11 +170,24 @@ func (bs *BlobStorage) Save(sidecar blocks.VerifiedROBlob) error {
|
||||
}
|
||||
return errors.Wrap(err, "failed to write to partial file")
|
||||
}
|
||||
err = partialFile.Close()
|
||||
if err != nil {
|
||||
if bs.fsync {
|
||||
if err := partialFile.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := partialFile.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if n != len(sidecarData) {
|
||||
return fmt.Errorf("failed to write the full bytes of sidecarData, wrote only %d of %d bytes", n, len(sidecarData))
|
||||
}
|
||||
|
||||
if n == 0 {
|
||||
return errEmptyBlobWritten
|
||||
}
|
||||
|
||||
// Atomically rename the partial file to its final name.
|
||||
err = bs.fs.Rename(partPath, sszPath)
|
||||
if err != nil {
|
||||
@@ -257,16 +294,12 @@ func (p blobNamer) dir() string {
|
||||
return rootString(p.root)
|
||||
}
|
||||
|
||||
func (p blobNamer) fname(ext string) string {
|
||||
return path.Join(p.dir(), fmt.Sprintf("%d.%s", p.index, ext))
|
||||
}
|
||||
|
||||
func (p blobNamer) partPath() string {
|
||||
return p.fname(partExt)
|
||||
func (p blobNamer) partPath(entropy string) string {
|
||||
return path.Join(p.dir(), fmt.Sprintf("%s-%d.%s", entropy, p.index, partExt))
|
||||
}
|
||||
|
||||
func (p blobNamer) path() string {
|
||||
return p.fname(sszExt)
|
||||
return path.Join(p.dir(), fmt.Sprintf("%d.%s", p.index, sszExt))
|
||||
}
|
||||
|
||||
func rootString(root [32]byte) string {
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"bytes"
|
||||
"os"
|
||||
"path"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -101,6 +102,30 @@ func TestBlobStorage_SaveBlobData(t *testing.T) {
|
||||
_, err = b.Get(blob.BlockRoot(), blob.Index)
|
||||
require.ErrorIs(t, err, os.ErrNotExist)
|
||||
})
|
||||
|
||||
t.Run("race conditions", func(t *testing.T) {
|
||||
// There was a bug where saving the same blob in multiple go routines would cause a partial blob
|
||||
// to be empty. This test ensures that several routines can safely save the same blob at the
|
||||
// same time. This isn't ideal behavior from the caller, but should be handled safely anyway.
|
||||
// See https://github.com/prysmaticlabs/prysm/pull/13648
|
||||
b, err := NewBlobStorage(WithBasePath(t.TempDir()))
|
||||
require.NoError(t, err)
|
||||
blob := testSidecars[0]
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 100; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
require.NoError(t, b.Save(blob))
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
res, err := b.Get(blob.BlockRoot(), blob.Index)
|
||||
require.NoError(t, err)
|
||||
require.DeepSSZEqual(t, blob, res)
|
||||
})
|
||||
}
|
||||
|
||||
// pollUntil polls a condition function until it returns true or a timeout is reached.
|
||||
@@ -243,6 +268,8 @@ func BenchmarkPruning(b *testing.B) {
|
||||
}
|
||||
|
||||
func TestNewBlobStorage(t *testing.T) {
|
||||
_, err := NewBlobStorage(path.Join(t.TempDir(), "good"))
|
||||
_, err := NewBlobStorage()
|
||||
require.ErrorIs(t, err, errNoBasePath)
|
||||
_, err = NewBlobStorage(WithBasePath(path.Join(t.TempDir(), "good")))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
@@ -37,7 +37,7 @@ type BlobMocker struct {
|
||||
|
||||
// CreateFakeIndices creates empty blob sidecar files at the expected path for the given
|
||||
// root and indices to influence the result of Indices().
|
||||
func (bm *BlobMocker) CreateFakeIndices(root [32]byte, indices []uint64) error {
|
||||
func (bm *BlobMocker) CreateFakeIndices(root [32]byte, indices ...uint64) error {
|
||||
for i := range indices {
|
||||
n := blobNamer{root: root, index: indices[i]}
|
||||
if err := bm.fs.MkdirAll(n.dir(), directoryPermissions); err != nil {
|
||||
|
||||
@@ -82,6 +82,15 @@ func (f *ForkChoice) ShouldOverrideFCU() (override bool) {
|
||||
return
|
||||
}
|
||||
|
||||
// Return early if we are checking before 10 seconds into the slot
|
||||
secs, err := slots.SecondsSinceSlotStart(head.slot, f.store.genesisTime, uint64(time.Now().Unix()))
|
||||
if err != nil {
|
||||
log.WithError(err).Error("could not check current slot")
|
||||
return true
|
||||
}
|
||||
if secs < ProcessAttestationsThreshold {
|
||||
return true
|
||||
}
|
||||
// Only orphan a block if the parent LMD vote is strong
|
||||
if parent.weight*100 < f.store.committeeWeight*params.BeaconConfig().ReorgParentWeightThreshold {
|
||||
return
|
||||
|
||||
@@ -85,11 +85,19 @@ func TestForkChoice_ShouldOverrideFCU(t *testing.T) {
|
||||
require.Equal(t, false, f.ShouldOverrideFCU())
|
||||
f.store.headNode.parent = saved
|
||||
})
|
||||
t.Run("parent is weak", func(t *testing.T) {
|
||||
t.Run("parent is weak early call", func(t *testing.T) {
|
||||
saved := f.store.headNode.parent.weight
|
||||
f.store.headNode.parent.weight = 0
|
||||
require.Equal(t, true, f.ShouldOverrideFCU())
|
||||
f.store.headNode.parent.weight = saved
|
||||
})
|
||||
t.Run("parent is weak late call", func(t *testing.T) {
|
||||
saved := f.store.headNode.parent.weight
|
||||
driftGenesisTime(f, 2, 11)
|
||||
f.store.headNode.parent.weight = 0
|
||||
require.Equal(t, false, f.ShouldOverrideFCU())
|
||||
f.store.headNode.parent.weight = saved
|
||||
driftGenesisTime(f, 2, orphanLateBlockFirstThreshold+1)
|
||||
})
|
||||
t.Run("Head is strong", func(t *testing.T) {
|
||||
f.store.headNode.weight = f.store.committeeWeight
|
||||
|
||||
@@ -118,6 +118,7 @@ type BeaconNode struct {
|
||||
BackfillOpts []backfill.ServiceOption
|
||||
initialSyncComplete chan struct{}
|
||||
BlobStorage *filesystem.BlobStorage
|
||||
BlobStorageOptions []filesystem.BlobStorageOption
|
||||
blobRetentionEpochs primitives.Epoch
|
||||
verifyInitWaiter *verification.InitializerWaiter
|
||||
syncChecker *initialsync.SyncChecker
|
||||
@@ -209,6 +210,16 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Allow tests to set it as an opt.
|
||||
if beacon.BlobStorage == nil {
|
||||
beacon.BlobStorageOptions = append(beacon.BlobStorageOptions, filesystem.WithSaveFsync(features.Get().BlobSaveFsync))
|
||||
blobs, err := filesystem.NewBlobStorage(beacon.BlobStorageOptions...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
beacon.BlobStorage = blobs
|
||||
}
|
||||
|
||||
log.Debugln("Starting DB")
|
||||
if err := beacon.startDB(cliCtx, depositAddress); err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -43,6 +43,15 @@ func WithBlobStorage(bs *filesystem.BlobStorage) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// WithBlobStorageOptions appends 1 or more filesystem.BlobStorageOption on the beacon node,
|
||||
// to be used when initializing blob storage.
|
||||
func WithBlobStorageOptions(opt ...filesystem.BlobStorageOption) Option {
|
||||
return func(bn *BeaconNode) error {
|
||||
bn.BlobStorageOptions = append(bn.BlobStorageOptions, opt...)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithBlobRetentionEpochs sets the blobRetentionEpochs value, used in kv store initialization.
|
||||
func WithBlobRetentionEpochs(e primitives.Epoch) Option {
|
||||
return func(bn *BeaconNode) error {
|
||||
|
||||
@@ -115,13 +115,7 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
|
||||
return vs.constructGenericBeaconBlock(sBlk, bundleCache.get(req.Slot))
|
||||
}
|
||||
|
||||
func (vs *Server) handleFailedReorgAttempt(ctx context.Context, slot primitives.Slot, parentRoot, headRoot [32]byte) (state.BeaconState, error) {
|
||||
blockchain.LateBlockAttemptedReorgCount.Inc()
|
||||
log.WithFields(logrus.Fields{
|
||||
"slot": slot,
|
||||
"parentRoot": fmt.Sprintf("%#x", parentRoot),
|
||||
"headRoot": fmt.Sprintf("%#x", headRoot),
|
||||
}).Warn("late block attempted reorg failed")
|
||||
func (vs *Server) handleSuccesfulReorgAttempt(ctx context.Context, slot primitives.Slot, parentRoot, headRoot [32]byte) (state.BeaconState, error) {
|
||||
// Try to get the state from the NSC
|
||||
head := transition.NextSlotState(parentRoot[:], slot)
|
||||
if head != nil {
|
||||
@@ -135,7 +129,16 @@ func (vs *Server) handleFailedReorgAttempt(ctx context.Context, slot primitives.
|
||||
return head, nil
|
||||
}
|
||||
|
||||
func (vs *Server) getHeadNoFailedReorg(ctx context.Context, slot primitives.Slot, parentRoot [32]byte) (state.BeaconState, error) {
|
||||
func logFailedReorgAttempt(slot primitives.Slot, oldHeadRoot, headRoot [32]byte) {
|
||||
blockchain.LateBlockAttemptedReorgCount.Inc()
|
||||
log.WithFields(logrus.Fields{
|
||||
"slot": slot,
|
||||
"oldHeadRoot": fmt.Sprintf("%#x", oldHeadRoot),
|
||||
"headRoot": fmt.Sprintf("%#x", headRoot),
|
||||
}).Warn("late block attempted reorg failed")
|
||||
}
|
||||
|
||||
func (vs *Server) getHeadNoReorg(ctx context.Context, slot primitives.Slot, parentRoot [32]byte) (state.BeaconState, error) {
|
||||
// Try to get the state from the NSC
|
||||
head := transition.NextSlotState(parentRoot[:], slot)
|
||||
if head != nil {
|
||||
@@ -148,11 +151,14 @@ func (vs *Server) getHeadNoFailedReorg(ctx context.Context, slot primitives.Slot
|
||||
return head, nil
|
||||
}
|
||||
|
||||
func (vs *Server) getParentStateFromReorgData(ctx context.Context, slot primitives.Slot, parentRoot, headRoot [32]byte) (head state.BeaconState, err error) {
|
||||
func (vs *Server) getParentStateFromReorgData(ctx context.Context, slot primitives.Slot, oldHeadRoot, parentRoot, headRoot [32]byte) (head state.BeaconState, err error) {
|
||||
if parentRoot != headRoot {
|
||||
head, err = vs.handleFailedReorgAttempt(ctx, slot, parentRoot, headRoot)
|
||||
head, err = vs.handleSuccesfulReorgAttempt(ctx, slot, parentRoot, headRoot)
|
||||
} else {
|
||||
head, err = vs.getHeadNoFailedReorg(ctx, slot, parentRoot)
|
||||
if oldHeadRoot != headRoot {
|
||||
logFailedReorgAttempt(slot, oldHeadRoot, headRoot)
|
||||
}
|
||||
head, err = vs.getHeadNoReorg(ctx, slot, parentRoot)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -169,10 +175,11 @@ func (vs *Server) getParentStateFromReorgData(ctx context.Context, slot primitiv
|
||||
|
||||
func (vs *Server) getParentState(ctx context.Context, slot primitives.Slot) (state.BeaconState, [32]byte, error) {
|
||||
// process attestations and update head in forkchoice
|
||||
oldHeadRoot := vs.ForkchoiceFetcher.CachedHeadRoot()
|
||||
vs.ForkchoiceFetcher.UpdateHead(ctx, vs.TimeFetcher.CurrentSlot())
|
||||
headRoot := vs.ForkchoiceFetcher.CachedHeadRoot()
|
||||
parentRoot := vs.ForkchoiceFetcher.GetProposerHead()
|
||||
head, err := vs.getParentStateFromReorgData(ctx, slot, parentRoot, headRoot)
|
||||
head, err := vs.getParentStateFromReorgData(ctx, slot, oldHeadRoot, parentRoot, headRoot)
|
||||
return head, parentRoot, err
|
||||
}
|
||||
|
||||
|
||||
@@ -2874,8 +2874,8 @@ func TestProposer_GetParentHeadState(t *testing.T) {
|
||||
Eth1BlockFetcher: &mockExecution.Chain{},
|
||||
StateGen: stategen.New(db, doublylinkedtree.New()),
|
||||
}
|
||||
t.Run("failed reorg", func(tt *testing.T) {
|
||||
head, err := proposerServer.getParentStateFromReorgData(ctx, 1, parentRoot, headRoot)
|
||||
t.Run("successful reorg", func(tt *testing.T) {
|
||||
head, err := proposerServer.getParentStateFromReorgData(ctx, 1, parentRoot, parentRoot, headRoot)
|
||||
require.NoError(t, err)
|
||||
st := parentState.Copy()
|
||||
st, err = transition.ProcessSlots(ctx, st, st.Slot()+1)
|
||||
@@ -2892,7 +2892,7 @@ func TestProposer_GetParentHeadState(t *testing.T) {
|
||||
|
||||
t.Run("no reorg", func(tt *testing.T) {
|
||||
require.NoError(t, transition.UpdateNextSlotCache(ctx, headRoot[:], headState))
|
||||
head, err := proposerServer.getParentStateFromReorgData(ctx, 1, headRoot, headRoot)
|
||||
head, err := proposerServer.getParentStateFromReorgData(ctx, 1, headRoot, headRoot, headRoot)
|
||||
require.NoError(t, err)
|
||||
st := headState.Copy()
|
||||
st, err = transition.ProcessSlots(ctx, st, st.Slot()+1)
|
||||
@@ -2906,4 +2906,23 @@ func TestProposer_GetParentHeadState(t *testing.T) {
|
||||
require.Equal(t, [32]byte(str), [32]byte(headStr))
|
||||
require.NotEqual(t, [32]byte(str), [32]byte(genesisStr))
|
||||
})
|
||||
|
||||
t.Run("failed reorg", func(tt *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
require.NoError(t, transition.UpdateNextSlotCache(ctx, headRoot[:], headState))
|
||||
head, err := proposerServer.getParentStateFromReorgData(ctx, 1, parentRoot, headRoot, headRoot)
|
||||
require.NoError(t, err)
|
||||
st := headState.Copy()
|
||||
st, err = transition.ProcessSlots(ctx, st, st.Slot()+1)
|
||||
require.NoError(t, err)
|
||||
str, err := st.StateRootAtIndex(0)
|
||||
require.NoError(t, err)
|
||||
headStr, err := head.StateRootAtIndex(0)
|
||||
require.NoError(t, err)
|
||||
genesisStr, err := parentState.StateRootAtIndex(0)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, [32]byte(str), [32]byte(headStr))
|
||||
require.NotEqual(t, [32]byte(str), [32]byte(genesisStr))
|
||||
require.LogsContain(t, hook, "late block attempted reorg failed")
|
||||
})
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ func TestState_UnrealizedCheckpointBalances(t *testing.T) {
|
||||
balances[i] = params.BeaconConfig().MaxEffectiveBalance
|
||||
}
|
||||
base := ðpb.BeaconStateAltair{
|
||||
Slot: 2,
|
||||
Slot: 66,
|
||||
RandaoMixes: make([][]byte, params.BeaconConfig().EpochsPerHistoricalVector),
|
||||
|
||||
Validators: validators,
|
||||
@@ -35,8 +35,8 @@ func TestState_UnrealizedCheckpointBalances(t *testing.T) {
|
||||
active, previous, current, err := state.UnrealizedCheckpointBalances()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, allActive, active)
|
||||
require.Equal(t, uint64(0), current)
|
||||
require.Equal(t, uint64(0), previous)
|
||||
require.Equal(t, params.BeaconConfig().EffectiveBalanceIncrement, current)
|
||||
require.Equal(t, params.BeaconConfig().EffectiveBalanceIncrement, previous)
|
||||
|
||||
// Add some votes in the last two epochs:
|
||||
base.CurrentEpochParticipation[0] = 0xFF
|
||||
@@ -57,8 +57,8 @@ func TestState_UnrealizedCheckpointBalances(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
active, previous, current, err = state.UnrealizedCheckpointBalances()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, allActive-params.BeaconConfig().MaxEffectiveBalance, active)
|
||||
require.Equal(t, uint64(0), current)
|
||||
require.Equal(t, allActive, active)
|
||||
require.Equal(t, params.BeaconConfig().EffectiveBalanceIncrement, current)
|
||||
require.Equal(t, params.BeaconConfig().MaxEffectiveBalance, previous)
|
||||
|
||||
}
|
||||
|
||||
@@ -24,25 +24,44 @@ func UnrealizedCheckpointBalances(cp, pp []byte, validators []*ethpb.Validator,
|
||||
|
||||
var err error
|
||||
for i, v := range validators {
|
||||
active := v.ActivationEpoch <= currentEpoch && currentEpoch < v.ExitEpoch
|
||||
if active && !v.Slashed {
|
||||
activeCurrent := v.ActivationEpoch <= currentEpoch && currentEpoch < v.ExitEpoch
|
||||
if activeCurrent {
|
||||
activeBalance, err = math.Add64(activeBalance, v.EffectiveBalance)
|
||||
if err != nil {
|
||||
return 0, 0, 0, err
|
||||
}
|
||||
if ((cp[i] >> targetIdx) & 1) == 1 {
|
||||
currentTarget, err = math.Add64(currentTarget, v.EffectiveBalance)
|
||||
if err != nil {
|
||||
return 0, 0, 0, err
|
||||
}
|
||||
}
|
||||
if v.Slashed {
|
||||
continue
|
||||
}
|
||||
if activeCurrent && ((cp[i]>>targetIdx)&1) == 1 {
|
||||
currentTarget, err = math.Add64(currentTarget, v.EffectiveBalance)
|
||||
if err != nil {
|
||||
return 0, 0, 0, err
|
||||
}
|
||||
if ((pp[i] >> targetIdx) & 1) == 1 {
|
||||
prevTarget, err = math.Add64(prevTarget, v.EffectiveBalance)
|
||||
if err != nil {
|
||||
return 0, 0, 0, err
|
||||
}
|
||||
}
|
||||
activePrevious := v.ActivationEpoch < currentEpoch && currentEpoch <= v.ExitEpoch
|
||||
if activePrevious && ((pp[i]>>targetIdx)&1) == 1 {
|
||||
prevTarget, err = math.Add64(prevTarget, v.EffectiveBalance)
|
||||
if err != nil {
|
||||
return 0, 0, 0, err
|
||||
}
|
||||
}
|
||||
}
|
||||
activeBalance, prevTarget, currentTarget = ensureLowerBound(activeBalance, prevTarget, currentTarget)
|
||||
return activeBalance, prevTarget, currentTarget, nil
|
||||
}
|
||||
|
||||
func ensureLowerBound(activeCurrEpoch, prevTargetAttested, currTargetAttested uint64) (uint64, uint64, uint64) {
|
||||
ebi := params.BeaconConfig().EffectiveBalanceIncrement
|
||||
if ebi > activeCurrEpoch {
|
||||
activeCurrEpoch = ebi
|
||||
}
|
||||
if ebi > prevTargetAttested {
|
||||
prevTargetAttested = ebi
|
||||
}
|
||||
if ebi > currTargetAttested {
|
||||
currTargetAttested = ebi
|
||||
}
|
||||
return activeCurrEpoch, prevTargetAttested, currTargetAttested
|
||||
}
|
||||
|
||||
@@ -28,8 +28,8 @@ func TestState_UnrealizedCheckpointBalances(t *testing.T) {
|
||||
active, previous, current, err := UnrealizedCheckpointBalances(cp, pp, validators, 0)
|
||||
require.NoError(tt, err)
|
||||
require.Equal(tt, expectedActive, active)
|
||||
require.Equal(tt, uint64(0), current)
|
||||
require.Equal(tt, uint64(0), previous)
|
||||
require.Equal(tt, params.BeaconConfig().EffectiveBalanceIncrement, current)
|
||||
require.Equal(tt, params.BeaconConfig().EffectiveBalanceIncrement, previous)
|
||||
})
|
||||
|
||||
t.Run("bad votes in last two epochs", func(tt *testing.T) {
|
||||
@@ -38,8 +38,8 @@ func TestState_UnrealizedCheckpointBalances(t *testing.T) {
|
||||
active, previous, current, err := UnrealizedCheckpointBalances(cp, pp, validators, 1)
|
||||
require.NoError(tt, err)
|
||||
require.Equal(tt, expectedActive, active)
|
||||
require.Equal(tt, uint64(0), current)
|
||||
require.Equal(tt, uint64(0), previous)
|
||||
require.Equal(tt, params.BeaconConfig().EffectiveBalanceIncrement, current)
|
||||
require.Equal(tt, params.BeaconConfig().EffectiveBalanceIncrement, previous)
|
||||
})
|
||||
|
||||
t.Run("two votes in last epoch", func(tt *testing.T) {
|
||||
@@ -49,7 +49,7 @@ func TestState_UnrealizedCheckpointBalances(t *testing.T) {
|
||||
require.NoError(tt, err)
|
||||
require.Equal(tt, expectedActive, active)
|
||||
require.Equal(tt, 2*params.BeaconConfig().MaxEffectiveBalance, current)
|
||||
require.Equal(tt, uint64(0), previous)
|
||||
require.Equal(tt, params.BeaconConfig().EffectiveBalanceIncrement, previous)
|
||||
})
|
||||
|
||||
t.Run("two votes in previous epoch", func(tt *testing.T) {
|
||||
@@ -58,7 +58,7 @@ func TestState_UnrealizedCheckpointBalances(t *testing.T) {
|
||||
active, previous, current, err := UnrealizedCheckpointBalances(cp, pp, validators, 1)
|
||||
require.NoError(tt, err)
|
||||
require.Equal(tt, expectedActive, active)
|
||||
require.Equal(tt, uint64(0), current)
|
||||
require.Equal(tt, params.BeaconConfig().EffectiveBalanceIncrement, current)
|
||||
require.Equal(tt, 2*params.BeaconConfig().MaxEffectiveBalance, previous)
|
||||
})
|
||||
|
||||
@@ -78,7 +78,6 @@ func TestState_UnrealizedCheckpointBalances(t *testing.T) {
|
||||
validators[1].Slashed = true
|
||||
active, previous, current, err := UnrealizedCheckpointBalances(cp, pp, validators, 1)
|
||||
require.NoError(tt, err)
|
||||
expectedActive -= params.BeaconConfig().MaxEffectiveBalance
|
||||
require.Equal(tt, expectedActive, active)
|
||||
require.Equal(tt, params.BeaconConfig().MaxEffectiveBalance-params.BeaconConfig().MinDepositAmount, current)
|
||||
require.Equal(tt, 2*params.BeaconConfig().MaxEffectiveBalance, previous)
|
||||
|
||||
@@ -45,6 +45,7 @@ go_library(
|
||||
"//math:go_default_library",
|
||||
"//proto/prysm/v1alpha1:go_default_library",
|
||||
"//runtime:go_default_library",
|
||||
"//runtime/version:go_default_library",
|
||||
"//time:go_default_library",
|
||||
"//time/slots:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
|
||||
@@ -76,6 +77,7 @@ go_test(
|
||||
"//beacon-chain/das:go_default_library",
|
||||
"//beacon-chain/db:go_default_library",
|
||||
"//beacon-chain/db/filesystem:go_default_library",
|
||||
"//beacon-chain/db/kv:go_default_library",
|
||||
"//beacon-chain/db/testing:go_default_library",
|
||||
"//beacon-chain/p2p:go_default_library",
|
||||
"//beacon-chain/p2p/peers:go_default_library",
|
||||
|
||||
@@ -126,8 +126,9 @@ type fetchRequestResponse struct {
|
||||
|
||||
// newBlocksFetcher creates ready to use fetcher.
|
||||
func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetcher {
|
||||
blocksPerPeriod := flags.Get().BlockBatchLimit
|
||||
allowedBlocksBurst := flags.Get().BlockBatchLimitBurstFactor * flags.Get().BlockBatchLimit
|
||||
blockBatchLimit := maxBatchLimit()
|
||||
blocksPerPeriod := blockBatchLimit
|
||||
allowedBlocksBurst := flags.Get().BlockBatchLimitBurstFactor * blockBatchLimit
|
||||
// Allow fetcher to go almost to the full burst capacity (less a single batch).
|
||||
rateLimiter := leakybucket.NewCollector(
|
||||
float64(blocksPerPeriod), int64(allowedBlocksBurst-blocksPerPeriod),
|
||||
@@ -159,6 +160,27 @@ func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetc
|
||||
}
|
||||
}
|
||||
|
||||
// This specifies the block batch limit the initial sync fetcher will use. In the event the user has provided
|
||||
// and excessive number, this is automatically lowered.
|
||||
func maxBatchLimit() int {
|
||||
currLimit := flags.Get().BlockBatchLimit
|
||||
maxLimit := params.BeaconConfig().MaxRequestBlocks
|
||||
if params.DenebEnabled() {
|
||||
maxLimit = params.BeaconConfig().MaxRequestBlocksDeneb
|
||||
}
|
||||
castedMaxLimit, err := math.Int(maxLimit)
|
||||
if err != nil {
|
||||
// Should be impossible to hit this case.
|
||||
log.WithError(err).Error("Unable to calculate the max batch limit")
|
||||
return currLimit
|
||||
}
|
||||
if currLimit > castedMaxLimit {
|
||||
log.Warnf("Specified batch size exceeds the block limit of the network, lowering from %d to %d", currLimit, maxLimit)
|
||||
currLimit = castedMaxLimit
|
||||
}
|
||||
return currLimit
|
||||
}
|
||||
|
||||
// start boots up the fetcher, which starts listening for incoming fetch requests.
|
||||
func (f *blocksFetcher) start() error {
|
||||
select {
|
||||
|
||||
@@ -3,6 +3,7 @@ package initialsync
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"sync"
|
||||
@@ -1142,3 +1143,26 @@ func TestVerifyAndPopulateBlobs(t *testing.T) {
|
||||
// We delete each entry we've seen, so if we see all expected commits, the map should be empty at the end.
|
||||
require.Equal(t, 0, len(expectedCommits))
|
||||
}
|
||||
|
||||
func TestBatchLimit(t *testing.T) {
|
||||
params.SetupTestConfigCleanup(t)
|
||||
testCfg := params.BeaconConfig().Copy()
|
||||
testCfg.DenebForkEpoch = math.MaxUint64
|
||||
params.OverrideBeaconConfig(testCfg)
|
||||
|
||||
resetFlags := flags.Get()
|
||||
flags.Init(&flags.GlobalFlags{
|
||||
BlockBatchLimit: 640,
|
||||
BlockBatchLimitBurstFactor: 10,
|
||||
})
|
||||
defer func() {
|
||||
flags.Init(resetFlags)
|
||||
}()
|
||||
|
||||
assert.Equal(t, 640, maxBatchLimit())
|
||||
|
||||
testCfg.DenebForkEpoch = 100000
|
||||
params.OverrideBeaconConfig(testCfg)
|
||||
|
||||
assert.Equal(t, params.BeaconConfig().MaxRequestBlocksDeneb, uint64(maxBatchLimit()))
|
||||
}
|
||||
|
||||
@@ -5,22 +5,31 @@ package initialsync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/paulbellamy/ratecounter"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v5/async/abool"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain"
|
||||
blockfeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/block"
|
||||
statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/das"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
|
||||
p2ptypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/startup"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/sync"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
|
||||
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
|
||||
"github.com/prysmaticlabs/prysm/v5/crypto/rand"
|
||||
eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v5/runtime"
|
||||
"github.com/prysmaticlabs/prysm/v5/runtime/version"
|
||||
prysmTime "github.com/prysmaticlabs/prysm/v5/time"
|
||||
"github.com/prysmaticlabs/prysm/v5/time/slots"
|
||||
"github.com/sirupsen/logrus"
|
||||
@@ -58,6 +67,7 @@ type Service struct {
|
||||
clock *startup.Clock
|
||||
verifierWaiter *verification.InitializerWaiter
|
||||
newBlobVerifier verification.NewBlobVerifier
|
||||
ctxMap sync.ContextByteVersions
|
||||
}
|
||||
|
||||
// Option is a functional option for the initial-sync Service.
|
||||
@@ -124,6 +134,13 @@ func (s *Service) Start() {
|
||||
}
|
||||
s.clock = clock
|
||||
log.Info("Received state initialized event")
|
||||
ctxMap, err := sync.ContextByteVersionsForValRoot(clock.GenesisValidatorsRoot())
|
||||
if err != nil {
|
||||
log.WithField("genesisValidatorRoot", clock.GenesisValidatorsRoot()).
|
||||
WithError(err).Error("unable to initialize context version map using genesis validator")
|
||||
return
|
||||
}
|
||||
s.ctxMap = ctxMap
|
||||
|
||||
v, err := s.verifierWaiter.WaitForInitializer(s.ctx)
|
||||
if err != nil {
|
||||
@@ -162,7 +179,15 @@ func (s *Service) Start() {
|
||||
s.markSynced()
|
||||
return
|
||||
}
|
||||
s.waitForMinimumPeers()
|
||||
peers, err := s.waitForMinimumPeers()
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Error waiting for minimum number of peers")
|
||||
return
|
||||
}
|
||||
if err := s.fetchOriginBlobs(peers); err != nil {
|
||||
log.WithError(err).Error("Failed to fetch missing blobs for checkpoint origin")
|
||||
return
|
||||
}
|
||||
if err := s.roundRobinSync(gt); err != nil {
|
||||
if errors.Is(s.ctx.Err(), context.Canceled) {
|
||||
return
|
||||
@@ -215,7 +240,10 @@ func (s *Service) Resync() error {
|
||||
defer func() { s.synced.Set() }() // Reset it at the end of the method.
|
||||
genesis := time.Unix(int64(headState.GenesisTime()), 0) // lint:ignore uintcast -- Genesis time will not exceed int64 in your lifetime.
|
||||
|
||||
s.waitForMinimumPeers()
|
||||
_, err = s.waitForMinimumPeers()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = s.roundRobinSync(genesis); err != nil {
|
||||
log = log.WithError(err)
|
||||
}
|
||||
@@ -223,16 +251,19 @@ func (s *Service) Resync() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) waitForMinimumPeers() {
|
||||
func (s *Service) waitForMinimumPeers() ([]peer.ID, error) {
|
||||
required := params.BeaconConfig().MaxPeersToSync
|
||||
if flags.Get().MinimumSyncPeers < required {
|
||||
required = flags.Get().MinimumSyncPeers
|
||||
}
|
||||
for {
|
||||
if s.ctx.Err() != nil {
|
||||
return nil, s.ctx.Err()
|
||||
}
|
||||
cp := s.cfg.Chain.FinalizedCheckpt()
|
||||
_, peers := s.cfg.P2P.Peers().BestNonFinalized(flags.Get().MinimumSyncPeers, cp.Epoch)
|
||||
if len(peers) >= required {
|
||||
break
|
||||
return peers, nil
|
||||
}
|
||||
log.WithFields(logrus.Fields{
|
||||
"suitable": len(peers),
|
||||
@@ -247,3 +278,87 @@ func (s *Service) markSynced() {
|
||||
s.synced.Set()
|
||||
close(s.cfg.InitialSyncComplete)
|
||||
}
|
||||
|
||||
func missingBlobRequest(blk blocks.ROBlock, store *filesystem.BlobStorage) (p2ptypes.BlobSidecarsByRootReq, error) {
|
||||
r := blk.Root()
|
||||
if blk.Version() < version.Deneb {
|
||||
return nil, nil
|
||||
}
|
||||
cmts, err := blk.Block().Body().BlobKzgCommitments()
|
||||
if err != nil {
|
||||
log.WithField("root", r).Error("Error reading commitments from checkpoint sync origin block")
|
||||
return nil, err
|
||||
}
|
||||
if len(cmts) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
onDisk, err := store.Indices(r)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "error checking existing blobs for checkpoint sync block root %#x", r)
|
||||
}
|
||||
req := make(p2ptypes.BlobSidecarsByRootReq, 0, len(cmts))
|
||||
for i := range cmts {
|
||||
if onDisk[i] {
|
||||
continue
|
||||
}
|
||||
req = append(req, ð.BlobIdentifier{BlockRoot: r[:], Index: uint64(i)})
|
||||
}
|
||||
return req, nil
|
||||
}
|
||||
|
||||
func (s *Service) fetchOriginBlobs(pids []peer.ID) error {
|
||||
r, err := s.cfg.DB.OriginCheckpointBlockRoot(s.ctx)
|
||||
if errors.Is(err, db.ErrNotFoundOriginBlockRoot) {
|
||||
return nil
|
||||
}
|
||||
blk, err := s.cfg.DB.Block(s.ctx, r)
|
||||
if err != nil {
|
||||
log.WithField("root", fmt.Sprintf("%#x", r)).Error("Block for checkpoint sync origin root not found in db")
|
||||
return err
|
||||
}
|
||||
if !params.WithinDAPeriod(slots.ToEpoch(blk.Block().Slot()), slots.ToEpoch(s.clock.CurrentSlot())) {
|
||||
return nil
|
||||
}
|
||||
rob, err := blocks.NewROBlockWithRoot(blk, r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req, err := missingBlobRequest(rob, s.cfg.BlobStorage)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(req) == 0 {
|
||||
log.WithField("root", fmt.Sprintf("%#x", r)).Debug("All blobs for checkpoint block are present")
|
||||
return nil
|
||||
}
|
||||
shufflePeers(pids)
|
||||
for i := range pids {
|
||||
sidecars, err := sync.SendBlobSidecarByRoot(s.ctx, s.clock, s.cfg.P2P, pids[i], s.ctxMap, &req)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if len(sidecars) != len(req) {
|
||||
continue
|
||||
}
|
||||
bv := newBlobBatchVerifier(s.newBlobVerifier)
|
||||
avs := das.NewLazilyPersistentStore(s.cfg.BlobStorage, bv)
|
||||
current := s.clock.CurrentSlot()
|
||||
if err := avs.Persist(current, sidecars...); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := avs.IsDataAvailable(s.ctx, current, rob); err != nil {
|
||||
log.WithField("root", fmt.Sprintf("%#x", r)).WithField("peerID", pids[i]).Warn("Blobs from peer for origin block were unusable")
|
||||
continue
|
||||
}
|
||||
log.WithField("nBlobs", len(sidecars)).WithField("root", fmt.Sprintf("%#x", r)).Info("Successfully downloaded blobs for checkpoint sync block")
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("no connected peer able to provide blobs for checkpoint sync block %#x", r)
|
||||
}
|
||||
|
||||
func shufflePeers(pids []peer.ID) {
|
||||
rg := rand.NewGenerator()
|
||||
rg.Shuffle(len(pids), func(i, j int) {
|
||||
pids[i], pids[j] = pids[j], pids[i]
|
||||
})
|
||||
}
|
||||
|
||||
@@ -6,16 +6,19 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/paulbellamy/ratecounter"
|
||||
"github.com/prysmaticlabs/prysm/v5/async/abool"
|
||||
mock "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/kv"
|
||||
dbtest "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/testing"
|
||||
p2pt "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/testing"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/startup"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
|
||||
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v5/testing/assert"
|
||||
@@ -420,3 +423,91 @@ func TestService_Synced(t *testing.T) {
|
||||
s.synced.Set()
|
||||
assert.Equal(t, true, s.Synced())
|
||||
}
|
||||
|
||||
func TestMissingBlobRequest(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
setup func(t *testing.T) (blocks.ROBlock, *filesystem.BlobStorage)
|
||||
nReq int
|
||||
err error
|
||||
}{
|
||||
{
|
||||
name: "pre-deneb",
|
||||
setup: func(t *testing.T) (blocks.ROBlock, *filesystem.BlobStorage) {
|
||||
cb, err := blocks.NewSignedBeaconBlock(util.NewBeaconBlockCapella())
|
||||
require.NoError(t, err)
|
||||
rob, err := blocks.NewROBlockWithRoot(cb, [32]byte{})
|
||||
require.NoError(t, err)
|
||||
return rob, nil
|
||||
},
|
||||
nReq: 0,
|
||||
},
|
||||
{
|
||||
name: "deneb zero commitments",
|
||||
setup: func(t *testing.T) (blocks.ROBlock, *filesystem.BlobStorage) {
|
||||
bk, _ := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, 0)
|
||||
return bk, nil
|
||||
},
|
||||
nReq: 0,
|
||||
},
|
||||
{
|
||||
name: "2 commitments, all missing",
|
||||
setup: func(t *testing.T) (blocks.ROBlock, *filesystem.BlobStorage) {
|
||||
bk, _ := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, 2)
|
||||
fs := filesystem.NewEphemeralBlobStorage(t)
|
||||
return bk, fs
|
||||
},
|
||||
nReq: 2,
|
||||
},
|
||||
{
|
||||
name: "2 commitments, 1 missing",
|
||||
setup: func(t *testing.T) (blocks.ROBlock, *filesystem.BlobStorage) {
|
||||
bk, _ := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, 2)
|
||||
bm, fs := filesystem.NewEphemeralBlobStorageWithMocker(t)
|
||||
require.NoError(t, bm.CreateFakeIndices(bk.Root(), 1))
|
||||
return bk, fs
|
||||
},
|
||||
nReq: 1,
|
||||
},
|
||||
{
|
||||
name: "2 commitments, 0 missing",
|
||||
setup: func(t *testing.T) (blocks.ROBlock, *filesystem.BlobStorage) {
|
||||
bk, _ := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, 2)
|
||||
bm, fs := filesystem.NewEphemeralBlobStorageWithMocker(t)
|
||||
require.NoError(t, bm.CreateFakeIndices(bk.Root(), 0, 1))
|
||||
return bk, fs
|
||||
},
|
||||
nReq: 0,
|
||||
},
|
||||
}
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
blk, store := c.setup(t)
|
||||
req, err := missingBlobRequest(blk, store)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, c.nReq, len(req))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestOriginOutsideRetention(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
bdb := dbtest.SetupDB(t)
|
||||
genesis := time.Unix(0, 0)
|
||||
secsPerEpoch := params.BeaconConfig().SecondsPerSlot * uint64(params.BeaconConfig().SlotsPerEpoch)
|
||||
retentionSeconds := time.Second * time.Duration(uint64(params.BeaconConfig().MinEpochsForBlobsSidecarsRequest+1)*secsPerEpoch)
|
||||
outsideRetention := genesis.Add(retentionSeconds)
|
||||
now := func() time.Time {
|
||||
return outsideRetention
|
||||
}
|
||||
clock := startup.NewClock(genesis, [32]byte{}, startup.WithNower(now))
|
||||
s := &Service{ctx: ctx, cfg: &Config{DB: bdb}, clock: clock}
|
||||
blk, _ := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, 1)
|
||||
require.NoError(t, bdb.SaveBlock(ctx, blk))
|
||||
concreteDB, ok := bdb.(*kv.Store)
|
||||
require.Equal(t, true, ok)
|
||||
require.NoError(t, concreteDB.SaveOriginCheckpointBlockRoot(ctx, blk.Root()))
|
||||
// This would break due to missing service dependencies, but will return nil fast due to being outside retention.
|
||||
require.Equal(t, false, params.WithinDAPeriod(slots.ToEpoch(blk.Block().Slot()), slots.ToEpoch(clock.CurrentSlot())))
|
||||
require.NoError(t, s.fetchOriginBlobs([]peer.ID{}))
|
||||
}
|
||||
|
||||
@@ -35,11 +35,10 @@ func BeaconNodeOptions(c *cli.Context) ([]node.Option, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
bs, err := filesystem.NewBlobStorage(blobStoragePath(c), filesystem.WithBlobRetentionEpochs(e))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return []node.Option{node.WithBlobStorage(bs), node.WithBlobRetentionEpochs(e)}, nil
|
||||
opts := []node.Option{node.WithBlobStorageOptions(
|
||||
filesystem.WithBlobRetentionEpochs(e), filesystem.WithBasePath(blobStoragePath(c)),
|
||||
)}
|
||||
return opts, nil
|
||||
}
|
||||
|
||||
func blobStoragePath(c *cli.Context) string {
|
||||
|
||||
@@ -69,7 +69,8 @@ type Flags struct {
|
||||
EnableEIP4881 bool // EnableEIP4881 specifies whether to use the deposit tree from EIP4881
|
||||
|
||||
PrepareAllPayloads bool // PrepareAllPayloads informs the engine to prepare a block on every slot.
|
||||
|
||||
// BlobSaveFsync requires blob saving to block on fsync to ensure blobs are durably persisted before passing DA.
|
||||
BlobSaveFsync bool
|
||||
// KeystoreImportDebounceInterval specifies the time duration the validator waits to reload new keys if they have
|
||||
// changed on disk. This feature is for advanced use cases only.
|
||||
KeystoreImportDebounceInterval time.Duration
|
||||
@@ -245,6 +246,11 @@ func ConfigureBeaconChain(ctx *cli.Context) error {
|
||||
logEnabled(EnableLightClient)
|
||||
cfg.EnableLightClient = true
|
||||
}
|
||||
if ctx.IsSet(BlobSaveFsync.Name) {
|
||||
logEnabled(BlobSaveFsync)
|
||||
cfg.BlobSaveFsync = true
|
||||
}
|
||||
|
||||
cfg.AggregateIntervals = [3]time.Duration{aggregateFirstInterval.Value, aggregateSecondInterval.Value, aggregateThirdInterval.Value}
|
||||
Init(cfg)
|
||||
return nil
|
||||
|
||||
@@ -149,12 +149,16 @@ var (
|
||||
Name: "disable-resource-manager",
|
||||
Usage: "Disables running the libp2p resource manager.",
|
||||
}
|
||||
|
||||
// DisableRegistrationCache a flag for disabling the validator registration cache and use db instead.
|
||||
DisableRegistrationCache = &cli.BoolFlag{
|
||||
Name: "disable-registration-cache",
|
||||
Usage: "Temporary flag for disabling the validator registration cache instead of using the DB. Note: registrations do not clear on restart while using the DB.",
|
||||
}
|
||||
// BlobSaveFsync enforces durable filesystem writes for use cases where blob availability is critical.
|
||||
BlobSaveFsync = &cli.BoolFlag{
|
||||
Name: "blob-save-fsync",
|
||||
Usage: "Forces new blob files to be fysnc'd before continuing, ensuring durable blob writes.",
|
||||
}
|
||||
)
|
||||
|
||||
// devModeFlags holds list of flags that are set when development mode is on.
|
||||
@@ -209,6 +213,7 @@ var BeaconChainFlags = append(deprecatedBeaconFlags, append(deprecatedFlags, []c
|
||||
disableResourceManager,
|
||||
DisableRegistrationCache,
|
||||
EnableLightClient,
|
||||
BlobSaveFsync,
|
||||
}...)...)
|
||||
|
||||
// E2EBeaconChainFlags contains a list of the beacon chain feature flags to be tested in E2E.
|
||||
|
||||
@@ -35,6 +35,11 @@ func processJustificationAndFinalizationPrecomputeWrapper(t *testing.T, st state
|
||||
require.NoError(t, err)
|
||||
_, bp, err = altair.ProcessEpochParticipation(ctx, st, bp, vp)
|
||||
require.NoError(t, err)
|
||||
activeBal, targetPrevious, targetCurrent, err := st.UnrealizedCheckpointBalances()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, bp.ActiveCurrentEpoch, activeBal)
|
||||
require.Equal(t, bp.CurrentEpochTargetAttested, targetCurrent)
|
||||
require.Equal(t, bp.PrevEpochTargetAttested, targetPrevious)
|
||||
|
||||
st, err = precompute.ProcessJustificationAndFinalizationPreCompute(st, bp)
|
||||
require.NoError(t, err, "Could not process justification")
|
||||
|
||||
@@ -76,6 +76,13 @@ func runPrecomputeRewardsAndPenaltiesTest(t *testing.T, testFolderPath string) {
|
||||
require.NoError(t, err)
|
||||
vp, bp, err = altair.ProcessEpochParticipation(ctx, preBeaconState, bp, vp)
|
||||
require.NoError(t, err)
|
||||
|
||||
activeBal, targetPrevious, targetCurrent, err := preBeaconState.UnrealizedCheckpointBalances()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, bp.ActiveCurrentEpoch, activeBal)
|
||||
require.Equal(t, bp.CurrentEpochTargetAttested, targetCurrent)
|
||||
require.Equal(t, bp.PrevEpochTargetAttested, targetPrevious)
|
||||
|
||||
deltas, err := altair.AttestationsDelta(preBeaconState, bp, vp)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
||||
@@ -35,6 +35,11 @@ func processJustificationAndFinalizationPrecomputeWrapper(t *testing.T, st state
|
||||
require.NoError(t, err)
|
||||
_, bp, err = altair.ProcessEpochParticipation(ctx, st, bp, vp)
|
||||
require.NoError(t, err)
|
||||
activeBal, targetPrevious, targetCurrent, err := st.UnrealizedCheckpointBalances()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, bp.ActiveCurrentEpoch, activeBal)
|
||||
require.Equal(t, bp.CurrentEpochTargetAttested, targetCurrent)
|
||||
require.Equal(t, bp.PrevEpochTargetAttested, targetPrevious)
|
||||
|
||||
st, err = precompute.ProcessJustificationAndFinalizationPreCompute(st, bp)
|
||||
require.NoError(t, err, "Could not process justification")
|
||||
|
||||
@@ -80,6 +80,13 @@ func runPrecomputeRewardsAndPenaltiesTest(t *testing.T, testFolderPath string) {
|
||||
require.NoError(t, err)
|
||||
vp, bp, err = altair.ProcessEpochParticipation(ctx, preBeaconState, bp, vp)
|
||||
require.NoError(t, err)
|
||||
|
||||
activeBal, targetPrevious, targetCurrent, err := preBeaconState.UnrealizedCheckpointBalances()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, bp.ActiveCurrentEpoch, activeBal)
|
||||
require.Equal(t, bp.CurrentEpochTargetAttested, targetCurrent)
|
||||
require.Equal(t, bp.PrevEpochTargetAttested, targetPrevious)
|
||||
|
||||
deltas, err := altair.AttestationsDelta(preBeaconState, bp, vp)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
||||
@@ -35,6 +35,11 @@ func processJustificationAndFinalizationPrecomputeWrapper(t *testing.T, st state
|
||||
require.NoError(t, err)
|
||||
_, bp, err = altair.ProcessEpochParticipation(ctx, st, bp, vp)
|
||||
require.NoError(t, err)
|
||||
activeBal, targetPrevious, targetCurrent, err := st.UnrealizedCheckpointBalances()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, bp.ActiveCurrentEpoch, activeBal)
|
||||
require.Equal(t, bp.CurrentEpochTargetAttested, targetCurrent)
|
||||
require.Equal(t, bp.PrevEpochTargetAttested, targetPrevious)
|
||||
|
||||
st, err = precompute.ProcessJustificationAndFinalizationPreCompute(st, bp)
|
||||
require.NoError(t, err, "Could not process justification")
|
||||
|
||||
@@ -80,6 +80,13 @@ func runPrecomputeRewardsAndPenaltiesTest(t *testing.T, testFolderPath string) {
|
||||
require.NoError(t, err)
|
||||
vp, bp, err = altair.ProcessEpochParticipation(ctx, preBeaconState, bp, vp)
|
||||
require.NoError(t, err)
|
||||
|
||||
activeBal, targetPrevious, targetCurrent, err := preBeaconState.UnrealizedCheckpointBalances()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, bp.ActiveCurrentEpoch, activeBal)
|
||||
require.Equal(t, bp.CurrentEpochTargetAttested, targetCurrent)
|
||||
require.Equal(t, bp.PrevEpochTargetAttested, targetPrevious)
|
||||
|
||||
deltas, err := altair.AttestationsDelta(preBeaconState, bp, vp)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
||||
@@ -32,7 +32,11 @@ func processJustificationAndFinalizationPrecomputeWrapper(t *testing.T, st state
|
||||
require.NoError(t, err)
|
||||
_, bp, err = altair.ProcessEpochParticipation(ctx, st, bp, vp)
|
||||
require.NoError(t, err)
|
||||
|
||||
activeBal, targetPrevious, targetCurrent, err := st.UnrealizedCheckpointBalances()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, bp.ActiveCurrentEpoch, activeBal)
|
||||
require.Equal(t, bp.CurrentEpochTargetAttested, targetCurrent)
|
||||
require.Equal(t, bp.PrevEpochTargetAttested, targetPrevious)
|
||||
st, err = precompute.ProcessJustificationAndFinalizationPreCompute(st, bp)
|
||||
require.NoError(t, err, "Could not process justification")
|
||||
|
||||
|
||||
@@ -70,8 +70,16 @@ func runPrecomputeRewardsAndPenaltiesTest(t *testing.T, testFolderPath string) {
|
||||
|
||||
vp, bp, err := altair.InitializePrecomputeValidators(ctx, preBeaconState)
|
||||
require.NoError(t, err)
|
||||
|
||||
vp, bp, err = altair.ProcessEpochParticipation(ctx, preBeaconState, bp, vp)
|
||||
require.NoError(t, err)
|
||||
|
||||
activeBal, targetPrevious, targetCurrent, err := preBeaconState.UnrealizedCheckpointBalances()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, bp.ActiveCurrentEpoch, activeBal)
|
||||
require.Equal(t, bp.CurrentEpochTargetAttested, targetCurrent)
|
||||
require.Equal(t, bp.PrevEpochTargetAttested, targetPrevious)
|
||||
|
||||
deltas, err := altair.AttestationsDelta(preBeaconState, bp, vp)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user