Compare commits

...

14 Commits

Author SHA1 Message Date
Preston Van Loon
a1a81d1720 Update bazel-lib to include https://github.com/aspect-build/bazel-lib/pull/768 (#13675)
(cherry picked from commit 87b127365f)
2024-03-08 09:13:01 -06:00
Nishant Das
3d1d5863f5 Check Unrealized Justification Balances In Spectests (#13710)
* add them

* Ensure activation epoch does not overflow

* add them all in

* better check for overflow

* fix tests

* fix tests

---------

Co-authored-by: Potuz <potuz@prysmaticlabs.com>
(cherry picked from commit 2616de1eb1)
2024-03-08 09:11:00 -06:00
Potuz
89e21733e9 Fix UJ (#13688)
* Fix UJ

* gate slashed

* don't filter slashed for active balance

* don't overflow

* fix tests

* fix tests

(cherry picked from commit b0a2115a26)
2024-03-06 15:14:45 -06:00
Potuz
6ec456d836 pass justified=finalized in Prater (#13695)
* pass justified=finalized in Prater

* fix gazelle mess

(cherry picked from commit 102518e106)
2024-03-06 13:34:12 -06:00
Potuz
04a5b2e3c2 Do not check parent weight on early FCU (#13683)
When a late block arrives and the beacon is proposing the next block, we
perform several checks to allow for the next block to reorg the incoming
late block.

Among those checks, we check that the parent block has been heavily
attested (currently 160% of the committee size).

We perform this check in these circumstances:
- When the late block arrives
- At 10 seconds into the slot
- At 0 seconds into the next slot (at proposing time)

The problem is that for blocks that arrive between 4 seconds and 10
seconds, the parent block will not have yet this expected weight since
attestations from the current committee were not imported yet, and thus
Prysm will send an FCU with payload attributes anyway at this time.

What happens is that Prysm keeps the EL building different blocks based
on different parents at the same time, when later in the next slot it
calls to propose, it will reorg the late block anyway and the EL would
have been computing a second payload uselessly.

This PR enables this check only when calling `ShouldOverrideFCU` after
10 seconds into the slot which we do only after having imported the
current attestations. We may want to actually remove this check entirely
from `ShouldOverrideFCU` and only keep it in `ProposerHead`.

Shout out to Anthithesis for reporting an issue that led to this
discoverly.

(cherry picked from commit b6ce6c2eba)
2024-03-05 09:12:13 -06:00
kasey
147f48c5c5 exit blob fetching for cp block if outside retention (#13686)
* exit blob fetching for cp block if outside retention

* regression test

---------

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
(cherry picked from commit b3caaa9acc)
2024-03-04 21:15:54 -06:00
kasey
fdc4ea5bc2 tests for origin blob fetching (#13667)
* tests for origin blob fetching

* Update beacon-chain/sync/initial-sync/service.go

Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com>

---------

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com>
(cherry picked from commit 4c3dbae3c0)
2024-03-04 21:15:45 -06:00
Potuz
536b469d28 Fix failed reorg log (#13679)
(cherry picked from commit 2e2ef4a179)
2024-03-01 10:28:25 -06:00
kasey
66f55556a7 download checkpoint sync origin blobs in init-sync (#13665)
Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
(cherry picked from commit 0132c1b17d)
2024-03-01 10:25:26 -06:00
kasey
4083fe0d1d blob save fsync feature flag (#13652)
Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
(cherry picked from commit 70e1b11aeb)
2024-03-01 10:25:09 -06:00
Nishant Das
6cb6e47339 Use Max Request Limit in Initial Sync (#13641)
* use max limit

* manu's review

(cherry picked from commit e6a6365bdd)
2024-03-01 10:24:54 -06:00
Nishant Das
e006ba155a fix race (#13680)
(cherry picked from commit 68b78dd520)
2024-03-01 10:24:33 -06:00
kasey
b7b017f5b6 avoid part path collisions with mem addr entropy (#13648)
* avoid part path collisions with mem addr entropy

* Regression test

---------

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com>
(cherry picked from commit 4c66e4d060)
2024-02-21 17:15:09 -06:00
Preston Van Loon
0c897045a2 blob save: add better data checking for empty blob issues (#13647)
(cherry picked from commit daad29d0de)
2024-02-21 16:48:06 -06:00
32 changed files with 562 additions and 94 deletions

View File

@@ -113,6 +113,13 @@ http_archive(
url = "https://github.com/GoogleContainerTools/distroless/archive/9dc924b9fe812eec2fa0061824dcad39eb09d0d6.tar.gz", # 2024-01-24
)
http_archive(
name = "aspect_bazel_lib",
sha256 = "f5ea76682b209cc0bd90d0f5a3b26d2f7a6a2885f0c5f615e72913f4805dbb0d",
strip_prefix = "bazel-lib-2.5.0",
url = "https://github.com/aspect-build/bazel-lib/releases/download/v2.5.0/bazel-lib-v2.5.0.tar.gz",
)
load("@aspect_bazel_lib//lib:repositories.bzl", "aspect_bazel_lib_dependencies", "aspect_bazel_lib_register_toolchains")
aspect_bazel_lib_dependencies()

View File

@@ -307,16 +307,16 @@ func (s *Service) updateEpochBoundaryCaches(ctx context.Context, st state.Beacon
if err := helpers.UpdateProposerIndicesInCache(ctx, st, e); err != nil {
return errors.Wrap(err, "could not update proposer index cache")
}
go func() {
go func(ep primitives.Epoch) {
// Use a custom deadline here, since this method runs asynchronously.
// We ignore the parent method's context and instead create a new one
// with a custom deadline, therefore using the background context instead.
slotCtx, cancel := context.WithTimeout(context.Background(), slotDeadline)
defer cancel()
if err := helpers.UpdateCommitteeCache(slotCtx, st, e+1); err != nil {
if err := helpers.UpdateCommitteeCache(slotCtx, st, ep+1); err != nil {
log.WithError(err).Warn("Could not update committee cache")
}
}()
}(e)
// The latest block header is from the previous epoch
r, err := st.LatestBlockHeader().HashTreeRoot()
if err != nil {

View File

@@ -2114,7 +2114,7 @@ func TestMissingIndices(t *testing.T) {
for _, c := range cases {
bm, bs := filesystem.NewEphemeralBlobStorageWithMocker(t)
t.Run(c.name, func(t *testing.T) {
require.NoError(t, bm.CreateFakeIndices(c.root, c.present))
require.NoError(t, bm.CreateFakeIndices(c.root, c.present...))
missing, err := missingIndices(bs, c.root, c.expected)
if c.err != nil {
require.ErrorIs(t, err, c.err)

View File

@@ -289,10 +289,18 @@ func (s *Service) StartFromSavedState(saved state.BeaconState) error {
fRoot := s.ensureRootNotZeros(bytesutil.ToBytes32(finalized.Root))
s.cfg.ForkChoiceStore.Lock()
defer s.cfg.ForkChoiceStore.Unlock()
if err := s.cfg.ForkChoiceStore.UpdateJustifiedCheckpoint(s.ctx, &forkchoicetypes.Checkpoint{Epoch: justified.Epoch,
Root: bytesutil.ToBytes32(justified.Root)}); err != nil {
return errors.Wrap(err, "could not update forkchoice's justified checkpoint")
if params.BeaconConfig().ConfigName != params.PraterName {
if err := s.cfg.ForkChoiceStore.UpdateJustifiedCheckpoint(s.ctx, &forkchoicetypes.Checkpoint{Epoch: justified.Epoch,
Root: bytesutil.ToBytes32(justified.Root)}); err != nil {
return errors.Wrap(err, "could not update forkchoice's justified checkpoint")
}
} else {
if err := s.cfg.ForkChoiceStore.UpdateJustifiedCheckpoint(s.ctx, &forkchoicetypes.Checkpoint{Epoch: finalized.Epoch,
Root: bytesutil.ToBytes32(finalized.Root)}); err != nil {
return errors.Wrap(err, "could not update forkchoice's justified checkpoint")
}
}
if err := s.cfg.ForkChoiceStore.UpdateFinalizedCheckpoint(&forkchoicetypes.Checkpoint{Epoch: finalized.Epoch,
Root: bytesutil.ToBytes32(finalized.Root)}); err != nil {
return errors.Wrap(err, "could not update forkchoice's finalized checkpoint")

View File

@@ -21,7 +21,10 @@ import (
)
var (
errIndexOutOfBounds = errors.New("blob index in file name >= MaxBlobsPerBlock")
errIndexOutOfBounds = errors.New("blob index in file name >= MaxBlobsPerBlock")
errEmptyBlobWritten = errors.New("zero bytes written to disk when saving blob sidecar")
errSidecarEmptySSZData = errors.New("sidecar marshalled to an empty ssz byte slice")
errNoBasePath = errors.New("BlobStorage base path not specified in init")
)
const (
@@ -34,14 +37,26 @@ const (
// BlobStorageOption is a functional option for configuring a BlobStorage.
type BlobStorageOption func(*BlobStorage) error
// WithBasePath is a required option that sets the base path of blob storage.
func WithBasePath(base string) BlobStorageOption {
return func(b *BlobStorage) error {
b.base = base
return nil
}
}
// WithBlobRetentionEpochs is an option that changes the number of epochs blobs will be persisted.
func WithBlobRetentionEpochs(e primitives.Epoch) BlobStorageOption {
return func(b *BlobStorage) error {
pruner, err := newBlobPruner(b.fs, e)
if err != nil {
return err
}
b.pruner = pruner
b.retentionEpochs = e
return nil
}
}
// WithSaveFsync is an option that causes Save to call fsync before renaming part files for improved durability.
func WithSaveFsync(fsync bool) BlobStorageOption {
return func(b *BlobStorage) error {
b.fsync = fsync
return nil
}
}
@@ -49,30 +64,36 @@ func WithBlobRetentionEpochs(e primitives.Epoch) BlobStorageOption {
// NewBlobStorage creates a new instance of the BlobStorage object. Note that the implementation of BlobStorage may
// attempt to hold a file lock to guarantee exclusive control of the blob storage directory, so this should only be
// initialized once per beacon node.
func NewBlobStorage(base string, opts ...BlobStorageOption) (*BlobStorage, error) {
base = path.Clean(base)
if err := file.MkdirAll(base); err != nil {
return nil, fmt.Errorf("failed to create blob storage at %s: %w", base, err)
}
fs := afero.NewBasePathFs(afero.NewOsFs(), base)
b := &BlobStorage{
fs: fs,
}
func NewBlobStorage(opts ...BlobStorageOption) (*BlobStorage, error) {
b := &BlobStorage{}
for _, o := range opts {
if err := o(b); err != nil {
return nil, fmt.Errorf("failed to create blob storage at %s: %w", base, err)
return nil, errors.Wrap(err, "failed to create blob storage")
}
}
if b.pruner == nil {
log.Warn("Initializing blob filesystem storage with pruning disabled")
if b.base == "" {
return nil, errNoBasePath
}
b.base = path.Clean(b.base)
if err := file.MkdirAll(b.base); err != nil {
return nil, errors.Wrapf(err, "failed to create blob storage at %s", b.base)
}
b.fs = afero.NewBasePathFs(afero.NewOsFs(), b.base)
pruner, err := newBlobPruner(b.fs, b.retentionEpochs)
if err != nil {
return nil, err
}
b.pruner = pruner
return b, nil
}
// BlobStorage is the concrete implementation of the filesystem backend for saving and retrieving BlobSidecars.
type BlobStorage struct {
fs afero.Fs
pruner *blobPruner
base string
retentionEpochs primitives.Epoch
fsync bool
fs afero.Fs
pruner *blobPruner
}
// WarmCache runs the prune routine with an expiration of slot of 0, so nothing will be pruned, but the pruner's cache
@@ -111,11 +132,14 @@ func (bs *BlobStorage) Save(sidecar blocks.VerifiedROBlob) error {
sidecarData, err := sidecar.MarshalSSZ()
if err != nil {
return errors.Wrap(err, "failed to serialize sidecar data")
} else if len(sidecarData) == 0 {
return errSidecarEmptySSZData
}
if err := bs.fs.MkdirAll(fname.dir(), directoryPermissions); err != nil {
return err
}
partPath := fname.partPath()
partPath := fname.partPath(fmt.Sprintf("%p", sidecarData))
partialMoved := false
// Ensure the partial file is deleted.
@@ -138,7 +162,7 @@ func (bs *BlobStorage) Save(sidecar blocks.VerifiedROBlob) error {
return errors.Wrap(err, "failed to create partial file")
}
_, err = partialFile.Write(sidecarData)
n, err := partialFile.Write(sidecarData)
if err != nil {
closeErr := partialFile.Close()
if closeErr != nil {
@@ -146,11 +170,24 @@ func (bs *BlobStorage) Save(sidecar blocks.VerifiedROBlob) error {
}
return errors.Wrap(err, "failed to write to partial file")
}
err = partialFile.Close()
if err != nil {
if bs.fsync {
if err := partialFile.Sync(); err != nil {
return err
}
}
if err := partialFile.Close(); err != nil {
return err
}
if n != len(sidecarData) {
return fmt.Errorf("failed to write the full bytes of sidecarData, wrote only %d of %d bytes", n, len(sidecarData))
}
if n == 0 {
return errEmptyBlobWritten
}
// Atomically rename the partial file to its final name.
err = bs.fs.Rename(partPath, sszPath)
if err != nil {
@@ -257,16 +294,12 @@ func (p blobNamer) dir() string {
return rootString(p.root)
}
func (p blobNamer) fname(ext string) string {
return path.Join(p.dir(), fmt.Sprintf("%d.%s", p.index, ext))
}
func (p blobNamer) partPath() string {
return p.fname(partExt)
func (p blobNamer) partPath(entropy string) string {
return path.Join(p.dir(), fmt.Sprintf("%s-%d.%s", entropy, p.index, partExt))
}
func (p blobNamer) path() string {
return p.fname(sszExt)
return path.Join(p.dir(), fmt.Sprintf("%d.%s", p.index, sszExt))
}
func rootString(root [32]byte) string {

View File

@@ -4,6 +4,7 @@ import (
"bytes"
"os"
"path"
"sync"
"testing"
"time"
@@ -101,6 +102,30 @@ func TestBlobStorage_SaveBlobData(t *testing.T) {
_, err = b.Get(blob.BlockRoot(), blob.Index)
require.ErrorIs(t, err, os.ErrNotExist)
})
t.Run("race conditions", func(t *testing.T) {
// There was a bug where saving the same blob in multiple go routines would cause a partial blob
// to be empty. This test ensures that several routines can safely save the same blob at the
// same time. This isn't ideal behavior from the caller, but should be handled safely anyway.
// See https://github.com/prysmaticlabs/prysm/pull/13648
b, err := NewBlobStorage(WithBasePath(t.TempDir()))
require.NoError(t, err)
blob := testSidecars[0]
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
require.NoError(t, b.Save(blob))
}()
}
wg.Wait()
res, err := b.Get(blob.BlockRoot(), blob.Index)
require.NoError(t, err)
require.DeepSSZEqual(t, blob, res)
})
}
// pollUntil polls a condition function until it returns true or a timeout is reached.
@@ -243,6 +268,8 @@ func BenchmarkPruning(b *testing.B) {
}
func TestNewBlobStorage(t *testing.T) {
_, err := NewBlobStorage(path.Join(t.TempDir(), "good"))
_, err := NewBlobStorage()
require.ErrorIs(t, err, errNoBasePath)
_, err = NewBlobStorage(WithBasePath(path.Join(t.TempDir(), "good")))
require.NoError(t, err)
}

View File

@@ -37,7 +37,7 @@ type BlobMocker struct {
// CreateFakeIndices creates empty blob sidecar files at the expected path for the given
// root and indices to influence the result of Indices().
func (bm *BlobMocker) CreateFakeIndices(root [32]byte, indices []uint64) error {
func (bm *BlobMocker) CreateFakeIndices(root [32]byte, indices ...uint64) error {
for i := range indices {
n := blobNamer{root: root, index: indices[i]}
if err := bm.fs.MkdirAll(n.dir(), directoryPermissions); err != nil {

View File

@@ -82,6 +82,15 @@ func (f *ForkChoice) ShouldOverrideFCU() (override bool) {
return
}
// Return early if we are checking before 10 seconds into the slot
secs, err := slots.SecondsSinceSlotStart(head.slot, f.store.genesisTime, uint64(time.Now().Unix()))
if err != nil {
log.WithError(err).Error("could not check current slot")
return true
}
if secs < ProcessAttestationsThreshold {
return true
}
// Only orphan a block if the parent LMD vote is strong
if parent.weight*100 < f.store.committeeWeight*params.BeaconConfig().ReorgParentWeightThreshold {
return

View File

@@ -85,11 +85,19 @@ func TestForkChoice_ShouldOverrideFCU(t *testing.T) {
require.Equal(t, false, f.ShouldOverrideFCU())
f.store.headNode.parent = saved
})
t.Run("parent is weak", func(t *testing.T) {
t.Run("parent is weak early call", func(t *testing.T) {
saved := f.store.headNode.parent.weight
f.store.headNode.parent.weight = 0
require.Equal(t, true, f.ShouldOverrideFCU())
f.store.headNode.parent.weight = saved
})
t.Run("parent is weak late call", func(t *testing.T) {
saved := f.store.headNode.parent.weight
driftGenesisTime(f, 2, 11)
f.store.headNode.parent.weight = 0
require.Equal(t, false, f.ShouldOverrideFCU())
f.store.headNode.parent.weight = saved
driftGenesisTime(f, 2, orphanLateBlockFirstThreshold+1)
})
t.Run("Head is strong", func(t *testing.T) {
f.store.headNode.weight = f.store.committeeWeight

View File

@@ -118,6 +118,7 @@ type BeaconNode struct {
BackfillOpts []backfill.ServiceOption
initialSyncComplete chan struct{}
BlobStorage *filesystem.BlobStorage
BlobStorageOptions []filesystem.BlobStorageOption
blobRetentionEpochs primitives.Epoch
verifyInitWaiter *verification.InitializerWaiter
syncChecker *initialsync.SyncChecker
@@ -209,6 +210,16 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco
return nil, err
}
// Allow tests to set it as an opt.
if beacon.BlobStorage == nil {
beacon.BlobStorageOptions = append(beacon.BlobStorageOptions, filesystem.WithSaveFsync(features.Get().BlobSaveFsync))
blobs, err := filesystem.NewBlobStorage(beacon.BlobStorageOptions...)
if err != nil {
return nil, err
}
beacon.BlobStorage = blobs
}
log.Debugln("Starting DB")
if err := beacon.startDB(cliCtx, depositAddress); err != nil {
return nil, err

View File

@@ -43,6 +43,15 @@ func WithBlobStorage(bs *filesystem.BlobStorage) Option {
}
}
// WithBlobStorageOptions appends 1 or more filesystem.BlobStorageOption on the beacon node,
// to be used when initializing blob storage.
func WithBlobStorageOptions(opt ...filesystem.BlobStorageOption) Option {
return func(bn *BeaconNode) error {
bn.BlobStorageOptions = append(bn.BlobStorageOptions, opt...)
return nil
}
}
// WithBlobRetentionEpochs sets the blobRetentionEpochs value, used in kv store initialization.
func WithBlobRetentionEpochs(e primitives.Epoch) Option {
return func(bn *BeaconNode) error {

View File

@@ -115,13 +115,7 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
return vs.constructGenericBeaconBlock(sBlk, bundleCache.get(req.Slot))
}
func (vs *Server) handleFailedReorgAttempt(ctx context.Context, slot primitives.Slot, parentRoot, headRoot [32]byte) (state.BeaconState, error) {
blockchain.LateBlockAttemptedReorgCount.Inc()
log.WithFields(logrus.Fields{
"slot": slot,
"parentRoot": fmt.Sprintf("%#x", parentRoot),
"headRoot": fmt.Sprintf("%#x", headRoot),
}).Warn("late block attempted reorg failed")
func (vs *Server) handleSuccesfulReorgAttempt(ctx context.Context, slot primitives.Slot, parentRoot, headRoot [32]byte) (state.BeaconState, error) {
// Try to get the state from the NSC
head := transition.NextSlotState(parentRoot[:], slot)
if head != nil {
@@ -135,7 +129,16 @@ func (vs *Server) handleFailedReorgAttempt(ctx context.Context, slot primitives.
return head, nil
}
func (vs *Server) getHeadNoFailedReorg(ctx context.Context, slot primitives.Slot, parentRoot [32]byte) (state.BeaconState, error) {
func logFailedReorgAttempt(slot primitives.Slot, oldHeadRoot, headRoot [32]byte) {
blockchain.LateBlockAttemptedReorgCount.Inc()
log.WithFields(logrus.Fields{
"slot": slot,
"oldHeadRoot": fmt.Sprintf("%#x", oldHeadRoot),
"headRoot": fmt.Sprintf("%#x", headRoot),
}).Warn("late block attempted reorg failed")
}
func (vs *Server) getHeadNoReorg(ctx context.Context, slot primitives.Slot, parentRoot [32]byte) (state.BeaconState, error) {
// Try to get the state from the NSC
head := transition.NextSlotState(parentRoot[:], slot)
if head != nil {
@@ -148,11 +151,14 @@ func (vs *Server) getHeadNoFailedReorg(ctx context.Context, slot primitives.Slot
return head, nil
}
func (vs *Server) getParentStateFromReorgData(ctx context.Context, slot primitives.Slot, parentRoot, headRoot [32]byte) (head state.BeaconState, err error) {
func (vs *Server) getParentStateFromReorgData(ctx context.Context, slot primitives.Slot, oldHeadRoot, parentRoot, headRoot [32]byte) (head state.BeaconState, err error) {
if parentRoot != headRoot {
head, err = vs.handleFailedReorgAttempt(ctx, slot, parentRoot, headRoot)
head, err = vs.handleSuccesfulReorgAttempt(ctx, slot, parentRoot, headRoot)
} else {
head, err = vs.getHeadNoFailedReorg(ctx, slot, parentRoot)
if oldHeadRoot != headRoot {
logFailedReorgAttempt(slot, oldHeadRoot, headRoot)
}
head, err = vs.getHeadNoReorg(ctx, slot, parentRoot)
}
if err != nil {
return nil, err
@@ -169,10 +175,11 @@ func (vs *Server) getParentStateFromReorgData(ctx context.Context, slot primitiv
func (vs *Server) getParentState(ctx context.Context, slot primitives.Slot) (state.BeaconState, [32]byte, error) {
// process attestations and update head in forkchoice
oldHeadRoot := vs.ForkchoiceFetcher.CachedHeadRoot()
vs.ForkchoiceFetcher.UpdateHead(ctx, vs.TimeFetcher.CurrentSlot())
headRoot := vs.ForkchoiceFetcher.CachedHeadRoot()
parentRoot := vs.ForkchoiceFetcher.GetProposerHead()
head, err := vs.getParentStateFromReorgData(ctx, slot, parentRoot, headRoot)
head, err := vs.getParentStateFromReorgData(ctx, slot, oldHeadRoot, parentRoot, headRoot)
return head, parentRoot, err
}

View File

@@ -2874,8 +2874,8 @@ func TestProposer_GetParentHeadState(t *testing.T) {
Eth1BlockFetcher: &mockExecution.Chain{},
StateGen: stategen.New(db, doublylinkedtree.New()),
}
t.Run("failed reorg", func(tt *testing.T) {
head, err := proposerServer.getParentStateFromReorgData(ctx, 1, parentRoot, headRoot)
t.Run("successful reorg", func(tt *testing.T) {
head, err := proposerServer.getParentStateFromReorgData(ctx, 1, parentRoot, parentRoot, headRoot)
require.NoError(t, err)
st := parentState.Copy()
st, err = transition.ProcessSlots(ctx, st, st.Slot()+1)
@@ -2892,7 +2892,7 @@ func TestProposer_GetParentHeadState(t *testing.T) {
t.Run("no reorg", func(tt *testing.T) {
require.NoError(t, transition.UpdateNextSlotCache(ctx, headRoot[:], headState))
head, err := proposerServer.getParentStateFromReorgData(ctx, 1, headRoot, headRoot)
head, err := proposerServer.getParentStateFromReorgData(ctx, 1, headRoot, headRoot, headRoot)
require.NoError(t, err)
st := headState.Copy()
st, err = transition.ProcessSlots(ctx, st, st.Slot()+1)
@@ -2906,4 +2906,23 @@ func TestProposer_GetParentHeadState(t *testing.T) {
require.Equal(t, [32]byte(str), [32]byte(headStr))
require.NotEqual(t, [32]byte(str), [32]byte(genesisStr))
})
t.Run("failed reorg", func(tt *testing.T) {
hook := logTest.NewGlobal()
require.NoError(t, transition.UpdateNextSlotCache(ctx, headRoot[:], headState))
head, err := proposerServer.getParentStateFromReorgData(ctx, 1, parentRoot, headRoot, headRoot)
require.NoError(t, err)
st := headState.Copy()
st, err = transition.ProcessSlots(ctx, st, st.Slot()+1)
require.NoError(t, err)
str, err := st.StateRootAtIndex(0)
require.NoError(t, err)
headStr, err := head.StateRootAtIndex(0)
require.NoError(t, err)
genesisStr, err := parentState.StateRootAtIndex(0)
require.NoError(t, err)
require.Equal(t, [32]byte(str), [32]byte(headStr))
require.NotEqual(t, [32]byte(str), [32]byte(genesisStr))
require.LogsContain(t, hook, "late block attempted reorg failed")
})
}

View File

@@ -19,7 +19,7 @@ func TestState_UnrealizedCheckpointBalances(t *testing.T) {
balances[i] = params.BeaconConfig().MaxEffectiveBalance
}
base := &ethpb.BeaconStateAltair{
Slot: 2,
Slot: 66,
RandaoMixes: make([][]byte, params.BeaconConfig().EpochsPerHistoricalVector),
Validators: validators,
@@ -35,8 +35,8 @@ func TestState_UnrealizedCheckpointBalances(t *testing.T) {
active, previous, current, err := state.UnrealizedCheckpointBalances()
require.NoError(t, err)
require.Equal(t, allActive, active)
require.Equal(t, uint64(0), current)
require.Equal(t, uint64(0), previous)
require.Equal(t, params.BeaconConfig().EffectiveBalanceIncrement, current)
require.Equal(t, params.BeaconConfig().EffectiveBalanceIncrement, previous)
// Add some votes in the last two epochs:
base.CurrentEpochParticipation[0] = 0xFF
@@ -57,8 +57,8 @@ func TestState_UnrealizedCheckpointBalances(t *testing.T) {
require.NoError(t, err)
active, previous, current, err = state.UnrealizedCheckpointBalances()
require.NoError(t, err)
require.Equal(t, allActive-params.BeaconConfig().MaxEffectiveBalance, active)
require.Equal(t, uint64(0), current)
require.Equal(t, allActive, active)
require.Equal(t, params.BeaconConfig().EffectiveBalanceIncrement, current)
require.Equal(t, params.BeaconConfig().MaxEffectiveBalance, previous)
}

View File

@@ -24,25 +24,44 @@ func UnrealizedCheckpointBalances(cp, pp []byte, validators []*ethpb.Validator,
var err error
for i, v := range validators {
active := v.ActivationEpoch <= currentEpoch && currentEpoch < v.ExitEpoch
if active && !v.Slashed {
activeCurrent := v.ActivationEpoch <= currentEpoch && currentEpoch < v.ExitEpoch
if activeCurrent {
activeBalance, err = math.Add64(activeBalance, v.EffectiveBalance)
if err != nil {
return 0, 0, 0, err
}
if ((cp[i] >> targetIdx) & 1) == 1 {
currentTarget, err = math.Add64(currentTarget, v.EffectiveBalance)
if err != nil {
return 0, 0, 0, err
}
}
if v.Slashed {
continue
}
if activeCurrent && ((cp[i]>>targetIdx)&1) == 1 {
currentTarget, err = math.Add64(currentTarget, v.EffectiveBalance)
if err != nil {
return 0, 0, 0, err
}
if ((pp[i] >> targetIdx) & 1) == 1 {
prevTarget, err = math.Add64(prevTarget, v.EffectiveBalance)
if err != nil {
return 0, 0, 0, err
}
}
activePrevious := v.ActivationEpoch < currentEpoch && currentEpoch <= v.ExitEpoch
if activePrevious && ((pp[i]>>targetIdx)&1) == 1 {
prevTarget, err = math.Add64(prevTarget, v.EffectiveBalance)
if err != nil {
return 0, 0, 0, err
}
}
}
activeBalance, prevTarget, currentTarget = ensureLowerBound(activeBalance, prevTarget, currentTarget)
return activeBalance, prevTarget, currentTarget, nil
}
func ensureLowerBound(activeCurrEpoch, prevTargetAttested, currTargetAttested uint64) (uint64, uint64, uint64) {
ebi := params.BeaconConfig().EffectiveBalanceIncrement
if ebi > activeCurrEpoch {
activeCurrEpoch = ebi
}
if ebi > prevTargetAttested {
prevTargetAttested = ebi
}
if ebi > currTargetAttested {
currTargetAttested = ebi
}
return activeCurrEpoch, prevTargetAttested, currTargetAttested
}

View File

@@ -28,8 +28,8 @@ func TestState_UnrealizedCheckpointBalances(t *testing.T) {
active, previous, current, err := UnrealizedCheckpointBalances(cp, pp, validators, 0)
require.NoError(tt, err)
require.Equal(tt, expectedActive, active)
require.Equal(tt, uint64(0), current)
require.Equal(tt, uint64(0), previous)
require.Equal(tt, params.BeaconConfig().EffectiveBalanceIncrement, current)
require.Equal(tt, params.BeaconConfig().EffectiveBalanceIncrement, previous)
})
t.Run("bad votes in last two epochs", func(tt *testing.T) {
@@ -38,8 +38,8 @@ func TestState_UnrealizedCheckpointBalances(t *testing.T) {
active, previous, current, err := UnrealizedCheckpointBalances(cp, pp, validators, 1)
require.NoError(tt, err)
require.Equal(tt, expectedActive, active)
require.Equal(tt, uint64(0), current)
require.Equal(tt, uint64(0), previous)
require.Equal(tt, params.BeaconConfig().EffectiveBalanceIncrement, current)
require.Equal(tt, params.BeaconConfig().EffectiveBalanceIncrement, previous)
})
t.Run("two votes in last epoch", func(tt *testing.T) {
@@ -49,7 +49,7 @@ func TestState_UnrealizedCheckpointBalances(t *testing.T) {
require.NoError(tt, err)
require.Equal(tt, expectedActive, active)
require.Equal(tt, 2*params.BeaconConfig().MaxEffectiveBalance, current)
require.Equal(tt, uint64(0), previous)
require.Equal(tt, params.BeaconConfig().EffectiveBalanceIncrement, previous)
})
t.Run("two votes in previous epoch", func(tt *testing.T) {
@@ -58,7 +58,7 @@ func TestState_UnrealizedCheckpointBalances(t *testing.T) {
active, previous, current, err := UnrealizedCheckpointBalances(cp, pp, validators, 1)
require.NoError(tt, err)
require.Equal(tt, expectedActive, active)
require.Equal(tt, uint64(0), current)
require.Equal(tt, params.BeaconConfig().EffectiveBalanceIncrement, current)
require.Equal(tt, 2*params.BeaconConfig().MaxEffectiveBalance, previous)
})
@@ -78,7 +78,6 @@ func TestState_UnrealizedCheckpointBalances(t *testing.T) {
validators[1].Slashed = true
active, previous, current, err := UnrealizedCheckpointBalances(cp, pp, validators, 1)
require.NoError(tt, err)
expectedActive -= params.BeaconConfig().MaxEffectiveBalance
require.Equal(tt, expectedActive, active)
require.Equal(tt, params.BeaconConfig().MaxEffectiveBalance-params.BeaconConfig().MinDepositAmount, current)
require.Equal(tt, 2*params.BeaconConfig().MaxEffectiveBalance, previous)

View File

@@ -45,6 +45,7 @@ go_library(
"//math:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//runtime:go_default_library",
"//runtime/version:go_default_library",
"//time:go_default_library",
"//time/slots:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
@@ -76,6 +77,7 @@ go_test(
"//beacon-chain/das:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/filesystem:go_default_library",
"//beacon-chain/db/kv:go_default_library",
"//beacon-chain/db/testing:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",

View File

@@ -126,8 +126,9 @@ type fetchRequestResponse struct {
// newBlocksFetcher creates ready to use fetcher.
func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetcher {
blocksPerPeriod := flags.Get().BlockBatchLimit
allowedBlocksBurst := flags.Get().BlockBatchLimitBurstFactor * flags.Get().BlockBatchLimit
blockBatchLimit := maxBatchLimit()
blocksPerPeriod := blockBatchLimit
allowedBlocksBurst := flags.Get().BlockBatchLimitBurstFactor * blockBatchLimit
// Allow fetcher to go almost to the full burst capacity (less a single batch).
rateLimiter := leakybucket.NewCollector(
float64(blocksPerPeriod), int64(allowedBlocksBurst-blocksPerPeriod),
@@ -159,6 +160,27 @@ func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetc
}
}
// This specifies the block batch limit the initial sync fetcher will use. In the event the user has provided
// and excessive number, this is automatically lowered.
func maxBatchLimit() int {
currLimit := flags.Get().BlockBatchLimit
maxLimit := params.BeaconConfig().MaxRequestBlocks
if params.DenebEnabled() {
maxLimit = params.BeaconConfig().MaxRequestBlocksDeneb
}
castedMaxLimit, err := math.Int(maxLimit)
if err != nil {
// Should be impossible to hit this case.
log.WithError(err).Error("Unable to calculate the max batch limit")
return currLimit
}
if currLimit > castedMaxLimit {
log.Warnf("Specified batch size exceeds the block limit of the network, lowering from %d to %d", currLimit, maxLimit)
currLimit = castedMaxLimit
}
return currLimit
}
// start boots up the fetcher, which starts listening for incoming fetch requests.
func (f *blocksFetcher) start() error {
select {

View File

@@ -3,6 +3,7 @@ package initialsync
import (
"context"
"fmt"
"math"
"math/rand"
"sort"
"sync"
@@ -1142,3 +1143,26 @@ func TestVerifyAndPopulateBlobs(t *testing.T) {
// We delete each entry we've seen, so if we see all expected commits, the map should be empty at the end.
require.Equal(t, 0, len(expectedCommits))
}
func TestBatchLimit(t *testing.T) {
params.SetupTestConfigCleanup(t)
testCfg := params.BeaconConfig().Copy()
testCfg.DenebForkEpoch = math.MaxUint64
params.OverrideBeaconConfig(testCfg)
resetFlags := flags.Get()
flags.Init(&flags.GlobalFlags{
BlockBatchLimit: 640,
BlockBatchLimitBurstFactor: 10,
})
defer func() {
flags.Init(resetFlags)
}()
assert.Equal(t, 640, maxBatchLimit())
testCfg.DenebForkEpoch = 100000
params.OverrideBeaconConfig(testCfg)
assert.Equal(t, params.BeaconConfig().MaxRequestBlocksDeneb, uint64(maxBatchLimit()))
}

View File

@@ -5,22 +5,31 @@ package initialsync
import (
"context"
"fmt"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/paulbellamy/ratecounter"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/async/abool"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain"
blockfeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/block"
statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
p2ptypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/startup"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/crypto/rand"
eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/runtime"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
prysmTime "github.com/prysmaticlabs/prysm/v5/time"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/sirupsen/logrus"
@@ -58,6 +67,7 @@ type Service struct {
clock *startup.Clock
verifierWaiter *verification.InitializerWaiter
newBlobVerifier verification.NewBlobVerifier
ctxMap sync.ContextByteVersions
}
// Option is a functional option for the initial-sync Service.
@@ -124,6 +134,13 @@ func (s *Service) Start() {
}
s.clock = clock
log.Info("Received state initialized event")
ctxMap, err := sync.ContextByteVersionsForValRoot(clock.GenesisValidatorsRoot())
if err != nil {
log.WithField("genesisValidatorRoot", clock.GenesisValidatorsRoot()).
WithError(err).Error("unable to initialize context version map using genesis validator")
return
}
s.ctxMap = ctxMap
v, err := s.verifierWaiter.WaitForInitializer(s.ctx)
if err != nil {
@@ -162,7 +179,15 @@ func (s *Service) Start() {
s.markSynced()
return
}
s.waitForMinimumPeers()
peers, err := s.waitForMinimumPeers()
if err != nil {
log.WithError(err).Error("Error waiting for minimum number of peers")
return
}
if err := s.fetchOriginBlobs(peers); err != nil {
log.WithError(err).Error("Failed to fetch missing blobs for checkpoint origin")
return
}
if err := s.roundRobinSync(gt); err != nil {
if errors.Is(s.ctx.Err(), context.Canceled) {
return
@@ -215,7 +240,10 @@ func (s *Service) Resync() error {
defer func() { s.synced.Set() }() // Reset it at the end of the method.
genesis := time.Unix(int64(headState.GenesisTime()), 0) // lint:ignore uintcast -- Genesis time will not exceed int64 in your lifetime.
s.waitForMinimumPeers()
_, err = s.waitForMinimumPeers()
if err != nil {
return err
}
if err = s.roundRobinSync(genesis); err != nil {
log = log.WithError(err)
}
@@ -223,16 +251,19 @@ func (s *Service) Resync() error {
return nil
}
func (s *Service) waitForMinimumPeers() {
func (s *Service) waitForMinimumPeers() ([]peer.ID, error) {
required := params.BeaconConfig().MaxPeersToSync
if flags.Get().MinimumSyncPeers < required {
required = flags.Get().MinimumSyncPeers
}
for {
if s.ctx.Err() != nil {
return nil, s.ctx.Err()
}
cp := s.cfg.Chain.FinalizedCheckpt()
_, peers := s.cfg.P2P.Peers().BestNonFinalized(flags.Get().MinimumSyncPeers, cp.Epoch)
if len(peers) >= required {
break
return peers, nil
}
log.WithFields(logrus.Fields{
"suitable": len(peers),
@@ -247,3 +278,87 @@ func (s *Service) markSynced() {
s.synced.Set()
close(s.cfg.InitialSyncComplete)
}
func missingBlobRequest(blk blocks.ROBlock, store *filesystem.BlobStorage) (p2ptypes.BlobSidecarsByRootReq, error) {
r := blk.Root()
if blk.Version() < version.Deneb {
return nil, nil
}
cmts, err := blk.Block().Body().BlobKzgCommitments()
if err != nil {
log.WithField("root", r).Error("Error reading commitments from checkpoint sync origin block")
return nil, err
}
if len(cmts) == 0 {
return nil, nil
}
onDisk, err := store.Indices(r)
if err != nil {
return nil, errors.Wrapf(err, "error checking existing blobs for checkpoint sync block root %#x", r)
}
req := make(p2ptypes.BlobSidecarsByRootReq, 0, len(cmts))
for i := range cmts {
if onDisk[i] {
continue
}
req = append(req, &eth.BlobIdentifier{BlockRoot: r[:], Index: uint64(i)})
}
return req, nil
}
func (s *Service) fetchOriginBlobs(pids []peer.ID) error {
r, err := s.cfg.DB.OriginCheckpointBlockRoot(s.ctx)
if errors.Is(err, db.ErrNotFoundOriginBlockRoot) {
return nil
}
blk, err := s.cfg.DB.Block(s.ctx, r)
if err != nil {
log.WithField("root", fmt.Sprintf("%#x", r)).Error("Block for checkpoint sync origin root not found in db")
return err
}
if !params.WithinDAPeriod(slots.ToEpoch(blk.Block().Slot()), slots.ToEpoch(s.clock.CurrentSlot())) {
return nil
}
rob, err := blocks.NewROBlockWithRoot(blk, r)
if err != nil {
return err
}
req, err := missingBlobRequest(rob, s.cfg.BlobStorage)
if err != nil {
return err
}
if len(req) == 0 {
log.WithField("root", fmt.Sprintf("%#x", r)).Debug("All blobs for checkpoint block are present")
return nil
}
shufflePeers(pids)
for i := range pids {
sidecars, err := sync.SendBlobSidecarByRoot(s.ctx, s.clock, s.cfg.P2P, pids[i], s.ctxMap, &req)
if err != nil {
continue
}
if len(sidecars) != len(req) {
continue
}
bv := newBlobBatchVerifier(s.newBlobVerifier)
avs := das.NewLazilyPersistentStore(s.cfg.BlobStorage, bv)
current := s.clock.CurrentSlot()
if err := avs.Persist(current, sidecars...); err != nil {
return err
}
if err := avs.IsDataAvailable(s.ctx, current, rob); err != nil {
log.WithField("root", fmt.Sprintf("%#x", r)).WithField("peerID", pids[i]).Warn("Blobs from peer for origin block were unusable")
continue
}
log.WithField("nBlobs", len(sidecars)).WithField("root", fmt.Sprintf("%#x", r)).Info("Successfully downloaded blobs for checkpoint sync block")
return nil
}
return fmt.Errorf("no connected peer able to provide blobs for checkpoint sync block %#x", r)
}
func shufflePeers(pids []peer.ID) {
rg := rand.NewGenerator()
rg.Shuffle(len(pids), func(i, j int) {
pids[i], pids[j] = pids[j], pids[i]
})
}

View File

@@ -6,16 +6,19 @@ import (
"testing"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/paulbellamy/ratecounter"
"github.com/prysmaticlabs/prysm/v5/async/abool"
mock "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/kv"
dbtest "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/testing"
p2pt "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/startup"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/testing/assert"
@@ -420,3 +423,91 @@ func TestService_Synced(t *testing.T) {
s.synced.Set()
assert.Equal(t, true, s.Synced())
}
func TestMissingBlobRequest(t *testing.T) {
cases := []struct {
name string
setup func(t *testing.T) (blocks.ROBlock, *filesystem.BlobStorage)
nReq int
err error
}{
{
name: "pre-deneb",
setup: func(t *testing.T) (blocks.ROBlock, *filesystem.BlobStorage) {
cb, err := blocks.NewSignedBeaconBlock(util.NewBeaconBlockCapella())
require.NoError(t, err)
rob, err := blocks.NewROBlockWithRoot(cb, [32]byte{})
require.NoError(t, err)
return rob, nil
},
nReq: 0,
},
{
name: "deneb zero commitments",
setup: func(t *testing.T) (blocks.ROBlock, *filesystem.BlobStorage) {
bk, _ := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, 0)
return bk, nil
},
nReq: 0,
},
{
name: "2 commitments, all missing",
setup: func(t *testing.T) (blocks.ROBlock, *filesystem.BlobStorage) {
bk, _ := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, 2)
fs := filesystem.NewEphemeralBlobStorage(t)
return bk, fs
},
nReq: 2,
},
{
name: "2 commitments, 1 missing",
setup: func(t *testing.T) (blocks.ROBlock, *filesystem.BlobStorage) {
bk, _ := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, 2)
bm, fs := filesystem.NewEphemeralBlobStorageWithMocker(t)
require.NoError(t, bm.CreateFakeIndices(bk.Root(), 1))
return bk, fs
},
nReq: 1,
},
{
name: "2 commitments, 0 missing",
setup: func(t *testing.T) (blocks.ROBlock, *filesystem.BlobStorage) {
bk, _ := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, 2)
bm, fs := filesystem.NewEphemeralBlobStorageWithMocker(t)
require.NoError(t, bm.CreateFakeIndices(bk.Root(), 0, 1))
return bk, fs
},
nReq: 0,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
blk, store := c.setup(t)
req, err := missingBlobRequest(blk, store)
require.NoError(t, err)
require.Equal(t, c.nReq, len(req))
})
}
}
func TestOriginOutsideRetention(t *testing.T) {
ctx := context.Background()
bdb := dbtest.SetupDB(t)
genesis := time.Unix(0, 0)
secsPerEpoch := params.BeaconConfig().SecondsPerSlot * uint64(params.BeaconConfig().SlotsPerEpoch)
retentionSeconds := time.Second * time.Duration(uint64(params.BeaconConfig().MinEpochsForBlobsSidecarsRequest+1)*secsPerEpoch)
outsideRetention := genesis.Add(retentionSeconds)
now := func() time.Time {
return outsideRetention
}
clock := startup.NewClock(genesis, [32]byte{}, startup.WithNower(now))
s := &Service{ctx: ctx, cfg: &Config{DB: bdb}, clock: clock}
blk, _ := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, 1)
require.NoError(t, bdb.SaveBlock(ctx, blk))
concreteDB, ok := bdb.(*kv.Store)
require.Equal(t, true, ok)
require.NoError(t, concreteDB.SaveOriginCheckpointBlockRoot(ctx, blk.Root()))
// This would break due to missing service dependencies, but will return nil fast due to being outside retention.
require.Equal(t, false, params.WithinDAPeriod(slots.ToEpoch(blk.Block().Slot()), slots.ToEpoch(clock.CurrentSlot())))
require.NoError(t, s.fetchOriginBlobs([]peer.ID{}))
}

View File

@@ -35,11 +35,10 @@ func BeaconNodeOptions(c *cli.Context) ([]node.Option, error) {
if err != nil {
return nil, err
}
bs, err := filesystem.NewBlobStorage(blobStoragePath(c), filesystem.WithBlobRetentionEpochs(e))
if err != nil {
return nil, err
}
return []node.Option{node.WithBlobStorage(bs), node.WithBlobRetentionEpochs(e)}, nil
opts := []node.Option{node.WithBlobStorageOptions(
filesystem.WithBlobRetentionEpochs(e), filesystem.WithBasePath(blobStoragePath(c)),
)}
return opts, nil
}
func blobStoragePath(c *cli.Context) string {

View File

@@ -69,7 +69,8 @@ type Flags struct {
EnableEIP4881 bool // EnableEIP4881 specifies whether to use the deposit tree from EIP4881
PrepareAllPayloads bool // PrepareAllPayloads informs the engine to prepare a block on every slot.
// BlobSaveFsync requires blob saving to block on fsync to ensure blobs are durably persisted before passing DA.
BlobSaveFsync bool
// KeystoreImportDebounceInterval specifies the time duration the validator waits to reload new keys if they have
// changed on disk. This feature is for advanced use cases only.
KeystoreImportDebounceInterval time.Duration
@@ -245,6 +246,11 @@ func ConfigureBeaconChain(ctx *cli.Context) error {
logEnabled(EnableLightClient)
cfg.EnableLightClient = true
}
if ctx.IsSet(BlobSaveFsync.Name) {
logEnabled(BlobSaveFsync)
cfg.BlobSaveFsync = true
}
cfg.AggregateIntervals = [3]time.Duration{aggregateFirstInterval.Value, aggregateSecondInterval.Value, aggregateThirdInterval.Value}
Init(cfg)
return nil

View File

@@ -149,12 +149,16 @@ var (
Name: "disable-resource-manager",
Usage: "Disables running the libp2p resource manager.",
}
// DisableRegistrationCache a flag for disabling the validator registration cache and use db instead.
DisableRegistrationCache = &cli.BoolFlag{
Name: "disable-registration-cache",
Usage: "Temporary flag for disabling the validator registration cache instead of using the DB. Note: registrations do not clear on restart while using the DB.",
}
// BlobSaveFsync enforces durable filesystem writes for use cases where blob availability is critical.
BlobSaveFsync = &cli.BoolFlag{
Name: "blob-save-fsync",
Usage: "Forces new blob files to be fysnc'd before continuing, ensuring durable blob writes.",
}
)
// devModeFlags holds list of flags that are set when development mode is on.
@@ -209,6 +213,7 @@ var BeaconChainFlags = append(deprecatedBeaconFlags, append(deprecatedFlags, []c
disableResourceManager,
DisableRegistrationCache,
EnableLightClient,
BlobSaveFsync,
}...)...)
// E2EBeaconChainFlags contains a list of the beacon chain feature flags to be tested in E2E.

View File

@@ -35,6 +35,11 @@ func processJustificationAndFinalizationPrecomputeWrapper(t *testing.T, st state
require.NoError(t, err)
_, bp, err = altair.ProcessEpochParticipation(ctx, st, bp, vp)
require.NoError(t, err)
activeBal, targetPrevious, targetCurrent, err := st.UnrealizedCheckpointBalances()
require.NoError(t, err)
require.Equal(t, bp.ActiveCurrentEpoch, activeBal)
require.Equal(t, bp.CurrentEpochTargetAttested, targetCurrent)
require.Equal(t, bp.PrevEpochTargetAttested, targetPrevious)
st, err = precompute.ProcessJustificationAndFinalizationPreCompute(st, bp)
require.NoError(t, err, "Could not process justification")

View File

@@ -76,6 +76,13 @@ func runPrecomputeRewardsAndPenaltiesTest(t *testing.T, testFolderPath string) {
require.NoError(t, err)
vp, bp, err = altair.ProcessEpochParticipation(ctx, preBeaconState, bp, vp)
require.NoError(t, err)
activeBal, targetPrevious, targetCurrent, err := preBeaconState.UnrealizedCheckpointBalances()
require.NoError(t, err)
require.Equal(t, bp.ActiveCurrentEpoch, activeBal)
require.Equal(t, bp.CurrentEpochTargetAttested, targetCurrent)
require.Equal(t, bp.PrevEpochTargetAttested, targetPrevious)
deltas, err := altair.AttestationsDelta(preBeaconState, bp, vp)
require.NoError(t, err)

View File

@@ -35,6 +35,11 @@ func processJustificationAndFinalizationPrecomputeWrapper(t *testing.T, st state
require.NoError(t, err)
_, bp, err = altair.ProcessEpochParticipation(ctx, st, bp, vp)
require.NoError(t, err)
activeBal, targetPrevious, targetCurrent, err := st.UnrealizedCheckpointBalances()
require.NoError(t, err)
require.Equal(t, bp.ActiveCurrentEpoch, activeBal)
require.Equal(t, bp.CurrentEpochTargetAttested, targetCurrent)
require.Equal(t, bp.PrevEpochTargetAttested, targetPrevious)
st, err = precompute.ProcessJustificationAndFinalizationPreCompute(st, bp)
require.NoError(t, err, "Could not process justification")

View File

@@ -80,6 +80,13 @@ func runPrecomputeRewardsAndPenaltiesTest(t *testing.T, testFolderPath string) {
require.NoError(t, err)
vp, bp, err = altair.ProcessEpochParticipation(ctx, preBeaconState, bp, vp)
require.NoError(t, err)
activeBal, targetPrevious, targetCurrent, err := preBeaconState.UnrealizedCheckpointBalances()
require.NoError(t, err)
require.Equal(t, bp.ActiveCurrentEpoch, activeBal)
require.Equal(t, bp.CurrentEpochTargetAttested, targetCurrent)
require.Equal(t, bp.PrevEpochTargetAttested, targetPrevious)
deltas, err := altair.AttestationsDelta(preBeaconState, bp, vp)
require.NoError(t, err)

View File

@@ -35,6 +35,11 @@ func processJustificationAndFinalizationPrecomputeWrapper(t *testing.T, st state
require.NoError(t, err)
_, bp, err = altair.ProcessEpochParticipation(ctx, st, bp, vp)
require.NoError(t, err)
activeBal, targetPrevious, targetCurrent, err := st.UnrealizedCheckpointBalances()
require.NoError(t, err)
require.Equal(t, bp.ActiveCurrentEpoch, activeBal)
require.Equal(t, bp.CurrentEpochTargetAttested, targetCurrent)
require.Equal(t, bp.PrevEpochTargetAttested, targetPrevious)
st, err = precompute.ProcessJustificationAndFinalizationPreCompute(st, bp)
require.NoError(t, err, "Could not process justification")

View File

@@ -80,6 +80,13 @@ func runPrecomputeRewardsAndPenaltiesTest(t *testing.T, testFolderPath string) {
require.NoError(t, err)
vp, bp, err = altair.ProcessEpochParticipation(ctx, preBeaconState, bp, vp)
require.NoError(t, err)
activeBal, targetPrevious, targetCurrent, err := preBeaconState.UnrealizedCheckpointBalances()
require.NoError(t, err)
require.Equal(t, bp.ActiveCurrentEpoch, activeBal)
require.Equal(t, bp.CurrentEpochTargetAttested, targetCurrent)
require.Equal(t, bp.PrevEpochTargetAttested, targetPrevious)
deltas, err := altair.AttestationsDelta(preBeaconState, bp, vp)
require.NoError(t, err)

View File

@@ -32,7 +32,11 @@ func processJustificationAndFinalizationPrecomputeWrapper(t *testing.T, st state
require.NoError(t, err)
_, bp, err = altair.ProcessEpochParticipation(ctx, st, bp, vp)
require.NoError(t, err)
activeBal, targetPrevious, targetCurrent, err := st.UnrealizedCheckpointBalances()
require.NoError(t, err)
require.Equal(t, bp.ActiveCurrentEpoch, activeBal)
require.Equal(t, bp.CurrentEpochTargetAttested, targetCurrent)
require.Equal(t, bp.PrevEpochTargetAttested, targetPrevious)
st, err = precompute.ProcessJustificationAndFinalizationPreCompute(st, bp)
require.NoError(t, err, "Could not process justification")

View File

@@ -70,8 +70,16 @@ func runPrecomputeRewardsAndPenaltiesTest(t *testing.T, testFolderPath string) {
vp, bp, err := altair.InitializePrecomputeValidators(ctx, preBeaconState)
require.NoError(t, err)
vp, bp, err = altair.ProcessEpochParticipation(ctx, preBeaconState, bp, vp)
require.NoError(t, err)
activeBal, targetPrevious, targetCurrent, err := preBeaconState.UnrealizedCheckpointBalances()
require.NoError(t, err)
require.Equal(t, bp.ActiveCurrentEpoch, activeBal)
require.Equal(t, bp.CurrentEpochTargetAttested, targetCurrent)
require.Equal(t, bp.PrevEpochTargetAttested, targetPrevious)
deltas, err := altair.AttestationsDelta(preBeaconState, bp, vp)
require.NoError(t, err)