Compare commits

...

17 Commits

Author SHA1 Message Date
Preston Van Loon
a1a81d1720 Update bazel-lib to include https://github.com/aspect-build/bazel-lib/pull/768 (#13675)
(cherry picked from commit 87b127365f)
2024-03-08 09:13:01 -06:00
Nishant Das
3d1d5863f5 Check Unrealized Justification Balances In Spectests (#13710)
* add them

* Ensure activation epoch does not overflow

* add them all in

* better check for overflow

* fix tests

* fix tests

---------

Co-authored-by: Potuz <potuz@prysmaticlabs.com>
(cherry picked from commit 2616de1eb1)
2024-03-08 09:11:00 -06:00
Potuz
89e21733e9 Fix UJ (#13688)
* Fix UJ

* gate slashed

* don't filter slashed for active balance

* don't overflow

* fix tests

* fix tests

(cherry picked from commit b0a2115a26)
2024-03-06 15:14:45 -06:00
Potuz
6ec456d836 pass justified=finalized in Prater (#13695)
* pass justified=finalized in Prater

* fix gazelle mess

(cherry picked from commit 102518e106)
2024-03-06 13:34:12 -06:00
Potuz
04a5b2e3c2 Do not check parent weight on early FCU (#13683)
When a late block arrives and the beacon is proposing the next block, we
perform several checks to allow for the next block to reorg the incoming
late block.

Among those checks, we check that the parent block has been heavily
attested (currently 160% of the committee size).

We perform this check in these circumstances:
- When the late block arrives
- At 10 seconds into the slot
- At 0 seconds into the next slot (at proposing time)

The problem is that for blocks that arrive between 4 seconds and 10
seconds, the parent block will not have yet this expected weight since
attestations from the current committee were not imported yet, and thus
Prysm will send an FCU with payload attributes anyway at this time.

What happens is that Prysm keeps the EL building different blocks based
on different parents at the same time, when later in the next slot it
calls to propose, it will reorg the late block anyway and the EL would
have been computing a second payload uselessly.

This PR enables this check only when calling `ShouldOverrideFCU` after
10 seconds into the slot which we do only after having imported the
current attestations. We may want to actually remove this check entirely
from `ShouldOverrideFCU` and only keep it in `ProposerHead`.

Shout out to Anthithesis for reporting an issue that led to this
discoverly.

(cherry picked from commit b6ce6c2eba)
2024-03-05 09:12:13 -06:00
kasey
147f48c5c5 exit blob fetching for cp block if outside retention (#13686)
* exit blob fetching for cp block if outside retention

* regression test

---------

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
(cherry picked from commit b3caaa9acc)
2024-03-04 21:15:54 -06:00
kasey
fdc4ea5bc2 tests for origin blob fetching (#13667)
* tests for origin blob fetching

* Update beacon-chain/sync/initial-sync/service.go

Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com>

---------

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com>
(cherry picked from commit 4c3dbae3c0)
2024-03-04 21:15:45 -06:00
Potuz
536b469d28 Fix failed reorg log (#13679)
(cherry picked from commit 2e2ef4a179)
2024-03-01 10:28:25 -06:00
kasey
66f55556a7 download checkpoint sync origin blobs in init-sync (#13665)
Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
(cherry picked from commit 0132c1b17d)
2024-03-01 10:25:26 -06:00
kasey
4083fe0d1d blob save fsync feature flag (#13652)
Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
(cherry picked from commit 70e1b11aeb)
2024-03-01 10:25:09 -06:00
Nishant Das
6cb6e47339 Use Max Request Limit in Initial Sync (#13641)
* use max limit

* manu's review

(cherry picked from commit e6a6365bdd)
2024-03-01 10:24:54 -06:00
Nishant Das
e006ba155a fix race (#13680)
(cherry picked from commit 68b78dd520)
2024-03-01 10:24:33 -06:00
kasey
b7b017f5b6 avoid part path collisions with mem addr entropy (#13648)
* avoid part path collisions with mem addr entropy

* Regression test

---------

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com>
(cherry picked from commit 4c66e4d060)
2024-02-21 17:15:09 -06:00
Preston Van Loon
0c897045a2 blob save: add better data checking for empty blob issues (#13647)
(cherry picked from commit daad29d0de)
2024-02-21 16:48:06 -06:00
kasey
04dd60d1bf Fix pending block/blob zero peer edge case (#13625)
* pending broadcast err if missing blobs and 0 peers

* compute request first for len check

---------

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
2024-02-17 00:29:23 +00:00
Potuz
92303b610d add mainnet fork epoch (#13601)
* add mainnet fork epoch

* add sha

* gazelle

* gazelle screwed up

* Update spec test to v1.4.0-beta.7

* Update spec test to v1.4.0-beta.7-hotfix

* Gazelle

* compensate for lower request limits post-deneb

---------

Co-authored-by: terence tsao <terence@prysmaticlabs.com>
Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
2024-02-16 22:08:06 +00:00
Radosław Kapka
41d97a2a27 Remove /eth/v1/beacon/blocks/{block_id} (#13628) 2024-02-16 15:01:09 +00:00
43 changed files with 671 additions and 279 deletions

View File

@@ -113,6 +113,13 @@ http_archive(
url = "https://github.com/GoogleContainerTools/distroless/archive/9dc924b9fe812eec2fa0061824dcad39eb09d0d6.tar.gz", # 2024-01-24
)
http_archive(
name = "aspect_bazel_lib",
sha256 = "f5ea76682b209cc0bd90d0f5a3b26d2f7a6a2885f0c5f615e72913f4805dbb0d",
strip_prefix = "bazel-lib-2.5.0",
url = "https://github.com/aspect-build/bazel-lib/releases/download/v2.5.0/bazel-lib-v2.5.0.tar.gz",
)
load("@aspect_bazel_lib//lib:repositories.bzl", "aspect_bazel_lib_dependencies", "aspect_bazel_lib_register_toolchains")
aspect_bazel_lib_dependencies()
@@ -234,7 +241,9 @@ filegroup(
url = "https://github.com/ethereum/EIPs/archive/5480440fe51742ed23342b68cf106cefd427e39d.tar.gz",
)
consensus_spec_version = "v1.4.0-beta.6"
consensus_spec_version = "v1.4.0-beta.7"
consensus_spec_test_version = "v1.4.0-beta.7-hotfix"
bls_test_version = "v0.1.1"
@@ -250,8 +259,8 @@ filegroup(
visibility = ["//visibility:public"],
)
""",
sha256 = "7dc467d7be97525c88a1d3683665c1354cc86297fd62009e7cf5000905b25652",
url = "https://github.com/ethereum/consensus-spec-tests/releases/download/%s/general.tar.gz" % consensus_spec_version,
sha256 = "c282c0f86f23f3d2e0f71f5975769a4077e62a7e3c7382a16bd26a7e589811a0",
url = "https://github.com/ethereum/consensus-spec-tests/releases/download/%s/general.tar.gz" % consensus_spec_test_version,
)
http_archive(
@@ -266,8 +275,8 @@ filegroup(
visibility = ["//visibility:public"],
)
""",
sha256 = "e163011254b6ce100205fb779ba660faedc9bc9f7bb4408c25746a7aa5e8d8bc",
url = "https://github.com/ethereum/consensus-spec-tests/releases/download/%s/minimal.tar.gz" % consensus_spec_version,
sha256 = "4649c35aa3b8eb0cfdc81bee7c05649f90ef36bede5b0513e1f2e8baf37d6033",
url = "https://github.com/ethereum/consensus-spec-tests/releases/download/%s/minimal.tar.gz" % consensus_spec_test_version,
)
http_archive(
@@ -282,8 +291,8 @@ filegroup(
visibility = ["//visibility:public"],
)
""",
sha256 = "b73c81b6386053a2141f6f43b457489668621c7013f740ed93edf9ac0e34f091",
url = "https://github.com/ethereum/consensus-spec-tests/releases/download/%s/mainnet.tar.gz" % consensus_spec_version,
sha256 = "c5a03f724f757456ffaabd2a899992a71d2baf45ee4db65ca3518f2b7ee928c8",
url = "https://github.com/ethereum/consensus-spec-tests/releases/download/%s/mainnet.tar.gz" % consensus_spec_test_version,
)
http_archive(
@@ -297,7 +306,7 @@ filegroup(
visibility = ["//visibility:public"],
)
""",
sha256 = "47726c527512d03ef3e706a8e7f8d5db6a5f2153351db0470dab780f6a87c4dd",
sha256 = "049c29267310e6b88280f4f834a75866c2f5b9036fa97acb9d9c6db8f64d9118",
strip_prefix = "consensus-specs-" + consensus_spec_version[1:],
url = "https://github.com/ethereum/consensus-specs/archive/refs/tags/%s.tar.gz" % consensus_spec_version,
)
@@ -328,9 +337,9 @@ filegroup(
visibility = ["//visibility:public"],
)
""",
sha256 = "2701e1e1a3ec10c673fe7dbdbbe6f02c8ae8c922aebbf6e720d8c72d5458aafe",
strip_prefix = "eth2-networks-7b4897888cebef23801540236f73123e21774954",
url = "https://github.com/eth-clients/eth2-networks/archive/7b4897888cebef23801540236f73123e21774954.tar.gz",
sha256 = "77e7e3ed65e33b7bb19d30131f4c2bb39e4dfeb188ab9ae84651c3cc7600131d",
strip_prefix = "eth2-networks-934c948e69205dcf2deb87e4ae6cc140c335f94d",
url = "https://github.com/eth-clients/eth2-networks/archive/934c948e69205dcf2deb87e4ae6cc140c335f94d.tar.gz",
)
http_archive(

View File

@@ -307,16 +307,16 @@ func (s *Service) updateEpochBoundaryCaches(ctx context.Context, st state.Beacon
if err := helpers.UpdateProposerIndicesInCache(ctx, st, e); err != nil {
return errors.Wrap(err, "could not update proposer index cache")
}
go func() {
go func(ep primitives.Epoch) {
// Use a custom deadline here, since this method runs asynchronously.
// We ignore the parent method's context and instead create a new one
// with a custom deadline, therefore using the background context instead.
slotCtx, cancel := context.WithTimeout(context.Background(), slotDeadline)
defer cancel()
if err := helpers.UpdateCommitteeCache(slotCtx, st, e+1); err != nil {
if err := helpers.UpdateCommitteeCache(slotCtx, st, ep+1); err != nil {
log.WithError(err).Warn("Could not update committee cache")
}
}()
}(e)
// The latest block header is from the previous epoch
r, err := st.LatestBlockHeader().HashTreeRoot()
if err != nil {

View File

@@ -2114,7 +2114,7 @@ func TestMissingIndices(t *testing.T) {
for _, c := range cases {
bm, bs := filesystem.NewEphemeralBlobStorageWithMocker(t)
t.Run(c.name, func(t *testing.T) {
require.NoError(t, bm.CreateFakeIndices(c.root, c.present))
require.NoError(t, bm.CreateFakeIndices(c.root, c.present...))
missing, err := missingIndices(bs, c.root, c.expected)
if c.err != nil {
require.ErrorIs(t, err, c.err)

View File

@@ -289,10 +289,18 @@ func (s *Service) StartFromSavedState(saved state.BeaconState) error {
fRoot := s.ensureRootNotZeros(bytesutil.ToBytes32(finalized.Root))
s.cfg.ForkChoiceStore.Lock()
defer s.cfg.ForkChoiceStore.Unlock()
if err := s.cfg.ForkChoiceStore.UpdateJustifiedCheckpoint(s.ctx, &forkchoicetypes.Checkpoint{Epoch: justified.Epoch,
Root: bytesutil.ToBytes32(justified.Root)}); err != nil {
return errors.Wrap(err, "could not update forkchoice's justified checkpoint")
if params.BeaconConfig().ConfigName != params.PraterName {
if err := s.cfg.ForkChoiceStore.UpdateJustifiedCheckpoint(s.ctx, &forkchoicetypes.Checkpoint{Epoch: justified.Epoch,
Root: bytesutil.ToBytes32(justified.Root)}); err != nil {
return errors.Wrap(err, "could not update forkchoice's justified checkpoint")
}
} else {
if err := s.cfg.ForkChoiceStore.UpdateJustifiedCheckpoint(s.ctx, &forkchoicetypes.Checkpoint{Epoch: finalized.Epoch,
Root: bytesutil.ToBytes32(finalized.Root)}); err != nil {
return errors.Wrap(err, "could not update forkchoice's justified checkpoint")
}
}
if err := s.cfg.ForkChoiceStore.UpdateFinalizedCheckpoint(&forkchoicetypes.Checkpoint{Epoch: finalized.Epoch,
Root: bytesutil.ToBytes32(finalized.Root)}); err != nil {
return errors.Wrap(err, "could not update forkchoice's finalized checkpoint")

View File

@@ -21,7 +21,10 @@ import (
)
var (
errIndexOutOfBounds = errors.New("blob index in file name >= MaxBlobsPerBlock")
errIndexOutOfBounds = errors.New("blob index in file name >= MaxBlobsPerBlock")
errEmptyBlobWritten = errors.New("zero bytes written to disk when saving blob sidecar")
errSidecarEmptySSZData = errors.New("sidecar marshalled to an empty ssz byte slice")
errNoBasePath = errors.New("BlobStorage base path not specified in init")
)
const (
@@ -34,14 +37,26 @@ const (
// BlobStorageOption is a functional option for configuring a BlobStorage.
type BlobStorageOption func(*BlobStorage) error
// WithBasePath is a required option that sets the base path of blob storage.
func WithBasePath(base string) BlobStorageOption {
return func(b *BlobStorage) error {
b.base = base
return nil
}
}
// WithBlobRetentionEpochs is an option that changes the number of epochs blobs will be persisted.
func WithBlobRetentionEpochs(e primitives.Epoch) BlobStorageOption {
return func(b *BlobStorage) error {
pruner, err := newBlobPruner(b.fs, e)
if err != nil {
return err
}
b.pruner = pruner
b.retentionEpochs = e
return nil
}
}
// WithSaveFsync is an option that causes Save to call fsync before renaming part files for improved durability.
func WithSaveFsync(fsync bool) BlobStorageOption {
return func(b *BlobStorage) error {
b.fsync = fsync
return nil
}
}
@@ -49,30 +64,36 @@ func WithBlobRetentionEpochs(e primitives.Epoch) BlobStorageOption {
// NewBlobStorage creates a new instance of the BlobStorage object. Note that the implementation of BlobStorage may
// attempt to hold a file lock to guarantee exclusive control of the blob storage directory, so this should only be
// initialized once per beacon node.
func NewBlobStorage(base string, opts ...BlobStorageOption) (*BlobStorage, error) {
base = path.Clean(base)
if err := file.MkdirAll(base); err != nil {
return nil, fmt.Errorf("failed to create blob storage at %s: %w", base, err)
}
fs := afero.NewBasePathFs(afero.NewOsFs(), base)
b := &BlobStorage{
fs: fs,
}
func NewBlobStorage(opts ...BlobStorageOption) (*BlobStorage, error) {
b := &BlobStorage{}
for _, o := range opts {
if err := o(b); err != nil {
return nil, fmt.Errorf("failed to create blob storage at %s: %w", base, err)
return nil, errors.Wrap(err, "failed to create blob storage")
}
}
if b.pruner == nil {
log.Warn("Initializing blob filesystem storage with pruning disabled")
if b.base == "" {
return nil, errNoBasePath
}
b.base = path.Clean(b.base)
if err := file.MkdirAll(b.base); err != nil {
return nil, errors.Wrapf(err, "failed to create blob storage at %s", b.base)
}
b.fs = afero.NewBasePathFs(afero.NewOsFs(), b.base)
pruner, err := newBlobPruner(b.fs, b.retentionEpochs)
if err != nil {
return nil, err
}
b.pruner = pruner
return b, nil
}
// BlobStorage is the concrete implementation of the filesystem backend for saving and retrieving BlobSidecars.
type BlobStorage struct {
fs afero.Fs
pruner *blobPruner
base string
retentionEpochs primitives.Epoch
fsync bool
fs afero.Fs
pruner *blobPruner
}
// WarmCache runs the prune routine with an expiration of slot of 0, so nothing will be pruned, but the pruner's cache
@@ -111,11 +132,14 @@ func (bs *BlobStorage) Save(sidecar blocks.VerifiedROBlob) error {
sidecarData, err := sidecar.MarshalSSZ()
if err != nil {
return errors.Wrap(err, "failed to serialize sidecar data")
} else if len(sidecarData) == 0 {
return errSidecarEmptySSZData
}
if err := bs.fs.MkdirAll(fname.dir(), directoryPermissions); err != nil {
return err
}
partPath := fname.partPath()
partPath := fname.partPath(fmt.Sprintf("%p", sidecarData))
partialMoved := false
// Ensure the partial file is deleted.
@@ -138,7 +162,7 @@ func (bs *BlobStorage) Save(sidecar blocks.VerifiedROBlob) error {
return errors.Wrap(err, "failed to create partial file")
}
_, err = partialFile.Write(sidecarData)
n, err := partialFile.Write(sidecarData)
if err != nil {
closeErr := partialFile.Close()
if closeErr != nil {
@@ -146,11 +170,24 @@ func (bs *BlobStorage) Save(sidecar blocks.VerifiedROBlob) error {
}
return errors.Wrap(err, "failed to write to partial file")
}
err = partialFile.Close()
if err != nil {
if bs.fsync {
if err := partialFile.Sync(); err != nil {
return err
}
}
if err := partialFile.Close(); err != nil {
return err
}
if n != len(sidecarData) {
return fmt.Errorf("failed to write the full bytes of sidecarData, wrote only %d of %d bytes", n, len(sidecarData))
}
if n == 0 {
return errEmptyBlobWritten
}
// Atomically rename the partial file to its final name.
err = bs.fs.Rename(partPath, sszPath)
if err != nil {
@@ -257,16 +294,12 @@ func (p blobNamer) dir() string {
return rootString(p.root)
}
func (p blobNamer) fname(ext string) string {
return path.Join(p.dir(), fmt.Sprintf("%d.%s", p.index, ext))
}
func (p blobNamer) partPath() string {
return p.fname(partExt)
func (p blobNamer) partPath(entropy string) string {
return path.Join(p.dir(), fmt.Sprintf("%s-%d.%s", entropy, p.index, partExt))
}
func (p blobNamer) path() string {
return p.fname(sszExt)
return path.Join(p.dir(), fmt.Sprintf("%d.%s", p.index, sszExt))
}
func rootString(root [32]byte) string {

View File

@@ -4,6 +4,7 @@ import (
"bytes"
"os"
"path"
"sync"
"testing"
"time"
@@ -101,6 +102,30 @@ func TestBlobStorage_SaveBlobData(t *testing.T) {
_, err = b.Get(blob.BlockRoot(), blob.Index)
require.ErrorIs(t, err, os.ErrNotExist)
})
t.Run("race conditions", func(t *testing.T) {
// There was a bug where saving the same blob in multiple go routines would cause a partial blob
// to be empty. This test ensures that several routines can safely save the same blob at the
// same time. This isn't ideal behavior from the caller, but should be handled safely anyway.
// See https://github.com/prysmaticlabs/prysm/pull/13648
b, err := NewBlobStorage(WithBasePath(t.TempDir()))
require.NoError(t, err)
blob := testSidecars[0]
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
require.NoError(t, b.Save(blob))
}()
}
wg.Wait()
res, err := b.Get(blob.BlockRoot(), blob.Index)
require.NoError(t, err)
require.DeepSSZEqual(t, blob, res)
})
}
// pollUntil polls a condition function until it returns true or a timeout is reached.
@@ -243,6 +268,8 @@ func BenchmarkPruning(b *testing.B) {
}
func TestNewBlobStorage(t *testing.T) {
_, err := NewBlobStorage(path.Join(t.TempDir(), "good"))
_, err := NewBlobStorage()
require.ErrorIs(t, err, errNoBasePath)
_, err = NewBlobStorage(WithBasePath(path.Join(t.TempDir(), "good")))
require.NoError(t, err)
}

View File

@@ -37,7 +37,7 @@ type BlobMocker struct {
// CreateFakeIndices creates empty blob sidecar files at the expected path for the given
// root and indices to influence the result of Indices().
func (bm *BlobMocker) CreateFakeIndices(root [32]byte, indices []uint64) error {
func (bm *BlobMocker) CreateFakeIndices(root [32]byte, indices ...uint64) error {
for i := range indices {
n := blobNamer{root: root, index: indices[i]}
if err := bm.fs.MkdirAll(n.dir(), directoryPermissions); err != nil {

View File

@@ -82,6 +82,15 @@ func (f *ForkChoice) ShouldOverrideFCU() (override bool) {
return
}
// Return early if we are checking before 10 seconds into the slot
secs, err := slots.SecondsSinceSlotStart(head.slot, f.store.genesisTime, uint64(time.Now().Unix()))
if err != nil {
log.WithError(err).Error("could not check current slot")
return true
}
if secs < ProcessAttestationsThreshold {
return true
}
// Only orphan a block if the parent LMD vote is strong
if parent.weight*100 < f.store.committeeWeight*params.BeaconConfig().ReorgParentWeightThreshold {
return

View File

@@ -85,11 +85,19 @@ func TestForkChoice_ShouldOverrideFCU(t *testing.T) {
require.Equal(t, false, f.ShouldOverrideFCU())
f.store.headNode.parent = saved
})
t.Run("parent is weak", func(t *testing.T) {
t.Run("parent is weak early call", func(t *testing.T) {
saved := f.store.headNode.parent.weight
f.store.headNode.parent.weight = 0
require.Equal(t, true, f.ShouldOverrideFCU())
f.store.headNode.parent.weight = saved
})
t.Run("parent is weak late call", func(t *testing.T) {
saved := f.store.headNode.parent.weight
driftGenesisTime(f, 2, 11)
f.store.headNode.parent.weight = 0
require.Equal(t, false, f.ShouldOverrideFCU())
f.store.headNode.parent.weight = saved
driftGenesisTime(f, 2, orphanLateBlockFirstThreshold+1)
})
t.Run("Head is strong", func(t *testing.T) {
f.store.headNode.weight = f.store.committeeWeight

View File

@@ -118,6 +118,7 @@ type BeaconNode struct {
BackfillOpts []backfill.ServiceOption
initialSyncComplete chan struct{}
BlobStorage *filesystem.BlobStorage
BlobStorageOptions []filesystem.BlobStorageOption
blobRetentionEpochs primitives.Epoch
verifyInitWaiter *verification.InitializerWaiter
syncChecker *initialsync.SyncChecker
@@ -209,6 +210,16 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco
return nil, err
}
// Allow tests to set it as an opt.
if beacon.BlobStorage == nil {
beacon.BlobStorageOptions = append(beacon.BlobStorageOptions, filesystem.WithSaveFsync(features.Get().BlobSaveFsync))
blobs, err := filesystem.NewBlobStorage(beacon.BlobStorageOptions...)
if err != nil {
return nil, err
}
beacon.BlobStorage = blobs
}
log.Debugln("Starting DB")
if err := beacon.startDB(cliCtx, depositAddress); err != nil {
return nil, err

View File

@@ -43,6 +43,15 @@ func WithBlobStorage(bs *filesystem.BlobStorage) Option {
}
}
// WithBlobStorageOptions appends 1 or more filesystem.BlobStorageOption on the beacon node,
// to be used when initializing blob storage.
func WithBlobStorageOptions(opt ...filesystem.BlobStorageOption) Option {
return func(bn *BeaconNode) error {
bn.BlobStorageOptions = append(bn.BlobStorageOptions, opt...)
return nil
}
}
// WithBlobRetentionEpochs sets the blobRetentionEpochs value, used in kv store initialization.
func WithBlobRetentionEpochs(e primitives.Epoch) Option {
return func(bn *BeaconNode) error {

View File

@@ -47,51 +47,6 @@ var (
type handled bool
// GetBlock retrieves block details for given block ID.
//
// DEPRECATED: please use GetBlockV2 instead
func (s *Server) GetBlock(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "beacon.GetBlock")
defer span.End()
blockId := mux.Vars(r)["block_id"]
if blockId == "" {
httputil.HandleError(w, "block_id is required in URL params", http.StatusBadRequest)
return
}
blk, err := s.Blocker.Block(ctx, []byte(blockId))
if !shared.WriteBlockFetchError(w, blk, err) {
return
}
if httputil.RespondWithSsz(r) {
s.getBlockSSZ(ctx, w, blk)
} else {
s.getBlock(ctx, w, blk)
}
}
// getBlock returns the JSON-serialized version of the beacon block for given block ID.
func (s *Server) getBlock(ctx context.Context, w http.ResponseWriter, blk interfaces.ReadOnlySignedBeaconBlock) {
v2Resp, err := s.getBlockPhase0(ctx, blk)
if err != nil {
httputil.HandleError(w, "Could not get block: "+err.Error(), http.StatusInternalServerError)
return
}
resp := &structs.GetBlockResponse{Data: v2Resp.Data}
httputil.WriteJson(w, resp)
}
// getBlockSSZ returns the SSZ-serialized version of the becaon block for given block ID.
func (s *Server) getBlockSSZ(ctx context.Context, w http.ResponseWriter, blk interfaces.ReadOnlySignedBeaconBlock) {
resp, err := s.getBlockPhase0SSZ(ctx, blk)
if err != nil {
httputil.HandleError(w, "Could not get block: "+err.Error(), http.StatusInternalServerError)
return
}
httputil.WriteSsz(w, resp, "beacon_block.ssz")
}
// GetBlockV2 retrieves block details for given block ID.
func (s *Server) GetBlockV2(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "beacon.GetBlockV2")

View File

@@ -82,56 +82,6 @@ func fillDBTestBlocks(ctx context.Context, t *testing.T, beaconDB db.Database) (
return genBlk, blkContainers
}
func TestGetBlock(t *testing.T) {
b := util.NewBeaconBlock()
b.Block.Slot = 123
sb, err := blocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
s := &Server{
Blocker: &testutil.MockBlocker{BlockToReturn: sb},
}
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v1/beacon/blocks/{block_id}", nil)
request = mux.SetURLVars(request, map[string]string{"block_id": "head"})
writer := httptest.NewRecorder()
writer.Body = &bytes.Buffer{}
s.GetBlock(writer, request)
require.Equal(t, http.StatusOK, writer.Code)
resp := &structs.GetBlockResponse{}
require.NoError(t, json.Unmarshal(writer.Body.Bytes(), resp))
sbb := &structs.SignedBeaconBlock{Message: &structs.BeaconBlock{}}
require.NoError(t, json.Unmarshal(resp.Data.Message, sbb.Message))
sbb.Signature = resp.Data.Signature
genericBlk, err := sbb.ToGeneric()
require.NoError(t, err)
blk := genericBlk.GetPhase0()
require.NoError(t, err)
assert.DeepEqual(t, blk, b)
}
func TestGetBlockSSZ(t *testing.T) {
b := util.NewBeaconBlock()
b.Block.Slot = 123
sb, err := blocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
s := &Server{
Blocker: &testutil.MockBlocker{BlockToReturn: sb},
}
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v1/beacon/blocks/{block_id}", nil)
request = mux.SetURLVars(request, map[string]string{"block_id": "head"})
request.Header.Set("Accept", api.OctetStreamMediaType)
writer := httptest.NewRecorder()
writer.Body = &bytes.Buffer{}
s.GetBlock(writer, request)
require.Equal(t, http.StatusOK, writer.Code)
sszExpected, err := b.MarshalSSZ()
require.NoError(t, err)
assert.DeepEqual(t, sszExpected, writer.Body.Bytes())
}
func TestGetBlockV2(t *testing.T) {
t.Run("phase0", func(t *testing.T) {
b := util.NewBeaconBlock()
@@ -147,7 +97,7 @@ func TestGetBlockV2(t *testing.T) {
Blocker: mockBlockFetcher,
}
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v1/beacon/blocks/{block_id}", nil)
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v2/beacon/blocks/{block_id}", nil)
request = mux.SetURLVars(request, map[string]string{"block_id": "head"})
writer := httptest.NewRecorder()
writer.Body = &bytes.Buffer{}
@@ -180,7 +130,7 @@ func TestGetBlockV2(t *testing.T) {
Blocker: mockBlockFetcher,
}
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v1/beacon/blocks/{block_id}", nil)
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v2/beacon/blocks/{block_id}", nil)
request = mux.SetURLVars(request, map[string]string{"block_id": "head"})
writer := httptest.NewRecorder()
writer.Body = &bytes.Buffer{}
@@ -214,7 +164,7 @@ func TestGetBlockV2(t *testing.T) {
Blocker: mockBlockFetcher,
}
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v1/beacon/blocks/{block_id}", nil)
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v2/beacon/blocks/{block_id}", nil)
request = mux.SetURLVars(request, map[string]string{"block_id": "head"})
writer := httptest.NewRecorder()
writer.Body = &bytes.Buffer{}
@@ -248,7 +198,7 @@ func TestGetBlockV2(t *testing.T) {
Blocker: mockBlockFetcher,
}
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v1/beacon/blocks/{block_id}", nil)
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v2/beacon/blocks/{block_id}", nil)
request = mux.SetURLVars(request, map[string]string{"block_id": "head"})
writer := httptest.NewRecorder()
writer.Body = &bytes.Buffer{}
@@ -282,7 +232,7 @@ func TestGetBlockV2(t *testing.T) {
Blocker: mockBlockFetcher,
}
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v1/beacon/blocks/{block_id}", nil)
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v2/beacon/blocks/{block_id}", nil)
request = mux.SetURLVars(request, map[string]string{"block_id": "head"})
writer := httptest.NewRecorder()
writer.Body = &bytes.Buffer{}
@@ -316,7 +266,7 @@ func TestGetBlockV2(t *testing.T) {
Blocker: mockBlockFetcher,
}
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v1/beacon/blocks/{block_id}", nil)
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v2/beacon/blocks/{block_id}", nil)
request = mux.SetURLVars(request, map[string]string{"block_id": "head"})
writer := httptest.NewRecorder()
writer.Body = &bytes.Buffer{}
@@ -343,7 +293,7 @@ func TestGetBlockV2(t *testing.T) {
Blocker: mockBlockFetcher,
}
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v1/beacon/blocks/{block_id}", nil)
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v2/beacon/blocks/{block_id}", nil)
request = mux.SetURLVars(request, map[string]string{"block_id": hexutil.Encode(r[:])})
writer := httptest.NewRecorder()
writer.Body = &bytes.Buffer{}
@@ -362,7 +312,7 @@ func TestGetBlockV2(t *testing.T) {
Blocker: mockBlockFetcher,
}
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v1/beacon/blocks/{block_id}", nil)
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v2/beacon/blocks/{block_id}", nil)
request = mux.SetURLVars(request, map[string]string{"block_id": hexutil.Encode(r[:])})
writer := httptest.NewRecorder()
writer.Body = &bytes.Buffer{}
@@ -387,7 +337,7 @@ func TestGetBlockSSZV2(t *testing.T) {
Blocker: &testutil.MockBlocker{BlockToReturn: sb},
}
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v1/beacon/blocks/{block_id}", nil)
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v2/beacon/blocks/{block_id}", nil)
request = mux.SetURLVars(request, map[string]string{"block_id": "head"})
request.Header.Set("Accept", api.OctetStreamMediaType)
writer := httptest.NewRecorder()
@@ -410,7 +360,7 @@ func TestGetBlockSSZV2(t *testing.T) {
Blocker: &testutil.MockBlocker{BlockToReturn: sb},
}
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v1/beacon/blocks/{block_id}", nil)
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v2/beacon/blocks/{block_id}", nil)
request = mux.SetURLVars(request, map[string]string{"block_id": "head"})
request.Header.Set("Accept", api.OctetStreamMediaType)
writer := httptest.NewRecorder()
@@ -433,7 +383,7 @@ func TestGetBlockSSZV2(t *testing.T) {
Blocker: &testutil.MockBlocker{BlockToReturn: sb},
}
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v1/beacon/blocks/{block_id}", nil)
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v2/beacon/blocks/{block_id}", nil)
request = mux.SetURLVars(request, map[string]string{"block_id": "head"})
request.Header.Set("Accept", api.OctetStreamMediaType)
writer := httptest.NewRecorder()
@@ -456,7 +406,7 @@ func TestGetBlockSSZV2(t *testing.T) {
Blocker: &testutil.MockBlocker{BlockToReturn: sb},
}
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v1/beacon/blocks/{block_id}", nil)
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v2/beacon/blocks/{block_id}", nil)
request = mux.SetURLVars(request, map[string]string{"block_id": "head"})
request.Header.Set("Accept", api.OctetStreamMediaType)
writer := httptest.NewRecorder()
@@ -479,7 +429,7 @@ func TestGetBlockSSZV2(t *testing.T) {
Blocker: &testutil.MockBlocker{BlockToReturn: sb},
}
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v1/beacon/blocks/{block_id}", nil)
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v2/beacon/blocks/{block_id}", nil)
request = mux.SetURLVars(request, map[string]string{"block_id": "head"})
request.Header.Set("Accept", api.OctetStreamMediaType)
writer := httptest.NewRecorder()
@@ -546,7 +496,7 @@ func TestGetBlockAttestations(t *testing.T) {
Blocker: mockBlockFetcher,
}
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v1/beacon/blocks/{block_id}/attestations", nil)
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v2/beacon/blocks/{block_id}/attestations", nil)
request = mux.SetURLVars(request, map[string]string{"block_id": "head"})
writer := httptest.NewRecorder()
writer.Body = &bytes.Buffer{}
@@ -580,7 +530,7 @@ func TestGetBlockAttestations(t *testing.T) {
Blocker: mockBlockFetcher,
}
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v1/beacon/blocks/{block_id}/attestations", nil)
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v2/beacon/blocks/{block_id}/attestations", nil)
request = mux.SetURLVars(request, map[string]string{"block_id": "head"})
writer := httptest.NewRecorder()
writer.Body = &bytes.Buffer{}
@@ -607,7 +557,7 @@ func TestGetBlockAttestations(t *testing.T) {
Blocker: mockBlockFetcher,
}
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v1/beacon/blocks/{block_id}/attestations", nil)
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v2/beacon/blocks/{block_id}/attestations", nil)
request = mux.SetURLVars(request, map[string]string{"block_id": "head"})
writer := httptest.NewRecorder()
writer.Body = &bytes.Buffer{}
@@ -626,7 +576,7 @@ func TestGetBlockAttestations(t *testing.T) {
Blocker: mockBlockFetcher,
}
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v1/beacon/blocks/{block_id}/attestations", nil)
request := httptest.NewRequest(http.MethodGet, "http://foo.example/eth/v2/beacon/blocks/{block_id}/attestations", nil)
request = mux.SetURLVars(request, map[string]string{"block_id": "head"})
writer := httptest.NewRecorder()
writer.Body = &bytes.Buffer{}

View File

@@ -115,13 +115,7 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
return vs.constructGenericBeaconBlock(sBlk, bundleCache.get(req.Slot))
}
func (vs *Server) handleFailedReorgAttempt(ctx context.Context, slot primitives.Slot, parentRoot, headRoot [32]byte) (state.BeaconState, error) {
blockchain.LateBlockAttemptedReorgCount.Inc()
log.WithFields(logrus.Fields{
"slot": slot,
"parentRoot": fmt.Sprintf("%#x", parentRoot),
"headRoot": fmt.Sprintf("%#x", headRoot),
}).Warn("late block attempted reorg failed")
func (vs *Server) handleSuccesfulReorgAttempt(ctx context.Context, slot primitives.Slot, parentRoot, headRoot [32]byte) (state.BeaconState, error) {
// Try to get the state from the NSC
head := transition.NextSlotState(parentRoot[:], slot)
if head != nil {
@@ -135,7 +129,16 @@ func (vs *Server) handleFailedReorgAttempt(ctx context.Context, slot primitives.
return head, nil
}
func (vs *Server) getHeadNoFailedReorg(ctx context.Context, slot primitives.Slot, parentRoot [32]byte) (state.BeaconState, error) {
func logFailedReorgAttempt(slot primitives.Slot, oldHeadRoot, headRoot [32]byte) {
blockchain.LateBlockAttemptedReorgCount.Inc()
log.WithFields(logrus.Fields{
"slot": slot,
"oldHeadRoot": fmt.Sprintf("%#x", oldHeadRoot),
"headRoot": fmt.Sprintf("%#x", headRoot),
}).Warn("late block attempted reorg failed")
}
func (vs *Server) getHeadNoReorg(ctx context.Context, slot primitives.Slot, parentRoot [32]byte) (state.BeaconState, error) {
// Try to get the state from the NSC
head := transition.NextSlotState(parentRoot[:], slot)
if head != nil {
@@ -148,11 +151,14 @@ func (vs *Server) getHeadNoFailedReorg(ctx context.Context, slot primitives.Slot
return head, nil
}
func (vs *Server) getParentStateFromReorgData(ctx context.Context, slot primitives.Slot, parentRoot, headRoot [32]byte) (head state.BeaconState, err error) {
func (vs *Server) getParentStateFromReorgData(ctx context.Context, slot primitives.Slot, oldHeadRoot, parentRoot, headRoot [32]byte) (head state.BeaconState, err error) {
if parentRoot != headRoot {
head, err = vs.handleFailedReorgAttempt(ctx, slot, parentRoot, headRoot)
head, err = vs.handleSuccesfulReorgAttempt(ctx, slot, parentRoot, headRoot)
} else {
head, err = vs.getHeadNoFailedReorg(ctx, slot, parentRoot)
if oldHeadRoot != headRoot {
logFailedReorgAttempt(slot, oldHeadRoot, headRoot)
}
head, err = vs.getHeadNoReorg(ctx, slot, parentRoot)
}
if err != nil {
return nil, err
@@ -169,10 +175,11 @@ func (vs *Server) getParentStateFromReorgData(ctx context.Context, slot primitiv
func (vs *Server) getParentState(ctx context.Context, slot primitives.Slot) (state.BeaconState, [32]byte, error) {
// process attestations and update head in forkchoice
oldHeadRoot := vs.ForkchoiceFetcher.CachedHeadRoot()
vs.ForkchoiceFetcher.UpdateHead(ctx, vs.TimeFetcher.CurrentSlot())
headRoot := vs.ForkchoiceFetcher.CachedHeadRoot()
parentRoot := vs.ForkchoiceFetcher.GetProposerHead()
head, err := vs.getParentStateFromReorgData(ctx, slot, parentRoot, headRoot)
head, err := vs.getParentStateFromReorgData(ctx, slot, oldHeadRoot, parentRoot, headRoot)
return head, parentRoot, err
}

View File

@@ -2874,8 +2874,8 @@ func TestProposer_GetParentHeadState(t *testing.T) {
Eth1BlockFetcher: &mockExecution.Chain{},
StateGen: stategen.New(db, doublylinkedtree.New()),
}
t.Run("failed reorg", func(tt *testing.T) {
head, err := proposerServer.getParentStateFromReorgData(ctx, 1, parentRoot, headRoot)
t.Run("successful reorg", func(tt *testing.T) {
head, err := proposerServer.getParentStateFromReorgData(ctx, 1, parentRoot, parentRoot, headRoot)
require.NoError(t, err)
st := parentState.Copy()
st, err = transition.ProcessSlots(ctx, st, st.Slot()+1)
@@ -2892,7 +2892,7 @@ func TestProposer_GetParentHeadState(t *testing.T) {
t.Run("no reorg", func(tt *testing.T) {
require.NoError(t, transition.UpdateNextSlotCache(ctx, headRoot[:], headState))
head, err := proposerServer.getParentStateFromReorgData(ctx, 1, headRoot, headRoot)
head, err := proposerServer.getParentStateFromReorgData(ctx, 1, headRoot, headRoot, headRoot)
require.NoError(t, err)
st := headState.Copy()
st, err = transition.ProcessSlots(ctx, st, st.Slot()+1)
@@ -2906,4 +2906,23 @@ func TestProposer_GetParentHeadState(t *testing.T) {
require.Equal(t, [32]byte(str), [32]byte(headStr))
require.NotEqual(t, [32]byte(str), [32]byte(genesisStr))
})
t.Run("failed reorg", func(tt *testing.T) {
hook := logTest.NewGlobal()
require.NoError(t, transition.UpdateNextSlotCache(ctx, headRoot[:], headState))
head, err := proposerServer.getParentStateFromReorgData(ctx, 1, parentRoot, headRoot, headRoot)
require.NoError(t, err)
st := headState.Copy()
st, err = transition.ProcessSlots(ctx, st, st.Slot()+1)
require.NoError(t, err)
str, err := st.StateRootAtIndex(0)
require.NoError(t, err)
headStr, err := head.StateRootAtIndex(0)
require.NoError(t, err)
genesisStr, err := parentState.StateRootAtIndex(0)
require.NoError(t, err)
require.Equal(t, [32]byte(str), [32]byte(headStr))
require.NotEqual(t, [32]byte(str), [32]byte(genesisStr))
require.LogsContain(t, hook, "late block attempted reorg failed")
})
}

View File

@@ -248,7 +248,6 @@ func (s *Service) initializeBeaconServerRoutes(beaconServer *beacon.Server) {
s.cfg.Router.HandleFunc("/eth/v1/beacon/blinded_blocks", beaconServer.PublishBlindedBlock).Methods(http.MethodPost)
s.cfg.Router.HandleFunc("/eth/v2/beacon/blocks", beaconServer.PublishBlockV2).Methods(http.MethodPost)
s.cfg.Router.HandleFunc("/eth/v2/beacon/blinded_blocks", beaconServer.PublishBlindedBlockV2).Methods(http.MethodPost)
s.cfg.Router.HandleFunc("/eth/v1/beacon/blocks/{block_id}", beaconServer.GetBlock).Methods(http.MethodGet)
s.cfg.Router.HandleFunc("/eth/v2/beacon/blocks/{block_id}", beaconServer.GetBlockV2).Methods(http.MethodGet)
s.cfg.Router.HandleFunc("/eth/v1/beacon/blocks/{block_id}/attestations", beaconServer.GetBlockAttestations).Methods(http.MethodGet)
s.cfg.Router.HandleFunc("/eth/v1/beacon/blinded_blocks/{block_id}", beaconServer.GetBlindedBlock).Methods(http.MethodGet)

View File

@@ -87,7 +87,6 @@ func TestServer_InitializeRoutes(t *testing.T) {
"/eth/v2/beacon/blinded_blocks": {http.MethodPost},
"/eth/v1/beacon/blocks": {http.MethodPost},
"/eth/v2/beacon/blocks": {http.MethodPost},
"/eth/v1/beacon/blocks/{block_id}": {http.MethodGet}, //deprecated
"/eth/v2/beacon/blocks/{block_id}": {http.MethodGet},
"/eth/v1/beacon/blocks/{block_id}/root": {http.MethodGet},
"/eth/v1/beacon/blocks/{block_id}/attestations": {http.MethodGet},

View File

@@ -19,7 +19,7 @@ func TestState_UnrealizedCheckpointBalances(t *testing.T) {
balances[i] = params.BeaconConfig().MaxEffectiveBalance
}
base := &ethpb.BeaconStateAltair{
Slot: 2,
Slot: 66,
RandaoMixes: make([][]byte, params.BeaconConfig().EpochsPerHistoricalVector),
Validators: validators,
@@ -35,8 +35,8 @@ func TestState_UnrealizedCheckpointBalances(t *testing.T) {
active, previous, current, err := state.UnrealizedCheckpointBalances()
require.NoError(t, err)
require.Equal(t, allActive, active)
require.Equal(t, uint64(0), current)
require.Equal(t, uint64(0), previous)
require.Equal(t, params.BeaconConfig().EffectiveBalanceIncrement, current)
require.Equal(t, params.BeaconConfig().EffectiveBalanceIncrement, previous)
// Add some votes in the last two epochs:
base.CurrentEpochParticipation[0] = 0xFF
@@ -57,8 +57,8 @@ func TestState_UnrealizedCheckpointBalances(t *testing.T) {
require.NoError(t, err)
active, previous, current, err = state.UnrealizedCheckpointBalances()
require.NoError(t, err)
require.Equal(t, allActive-params.BeaconConfig().MaxEffectiveBalance, active)
require.Equal(t, uint64(0), current)
require.Equal(t, allActive, active)
require.Equal(t, params.BeaconConfig().EffectiveBalanceIncrement, current)
require.Equal(t, params.BeaconConfig().MaxEffectiveBalance, previous)
}

View File

@@ -24,25 +24,44 @@ func UnrealizedCheckpointBalances(cp, pp []byte, validators []*ethpb.Validator,
var err error
for i, v := range validators {
active := v.ActivationEpoch <= currentEpoch && currentEpoch < v.ExitEpoch
if active && !v.Slashed {
activeCurrent := v.ActivationEpoch <= currentEpoch && currentEpoch < v.ExitEpoch
if activeCurrent {
activeBalance, err = math.Add64(activeBalance, v.EffectiveBalance)
if err != nil {
return 0, 0, 0, err
}
if ((cp[i] >> targetIdx) & 1) == 1 {
currentTarget, err = math.Add64(currentTarget, v.EffectiveBalance)
if err != nil {
return 0, 0, 0, err
}
}
if v.Slashed {
continue
}
if activeCurrent && ((cp[i]>>targetIdx)&1) == 1 {
currentTarget, err = math.Add64(currentTarget, v.EffectiveBalance)
if err != nil {
return 0, 0, 0, err
}
if ((pp[i] >> targetIdx) & 1) == 1 {
prevTarget, err = math.Add64(prevTarget, v.EffectiveBalance)
if err != nil {
return 0, 0, 0, err
}
}
activePrevious := v.ActivationEpoch < currentEpoch && currentEpoch <= v.ExitEpoch
if activePrevious && ((pp[i]>>targetIdx)&1) == 1 {
prevTarget, err = math.Add64(prevTarget, v.EffectiveBalance)
if err != nil {
return 0, 0, 0, err
}
}
}
activeBalance, prevTarget, currentTarget = ensureLowerBound(activeBalance, prevTarget, currentTarget)
return activeBalance, prevTarget, currentTarget, nil
}
func ensureLowerBound(activeCurrEpoch, prevTargetAttested, currTargetAttested uint64) (uint64, uint64, uint64) {
ebi := params.BeaconConfig().EffectiveBalanceIncrement
if ebi > activeCurrEpoch {
activeCurrEpoch = ebi
}
if ebi > prevTargetAttested {
prevTargetAttested = ebi
}
if ebi > currTargetAttested {
currTargetAttested = ebi
}
return activeCurrEpoch, prevTargetAttested, currTargetAttested
}

View File

@@ -28,8 +28,8 @@ func TestState_UnrealizedCheckpointBalances(t *testing.T) {
active, previous, current, err := UnrealizedCheckpointBalances(cp, pp, validators, 0)
require.NoError(tt, err)
require.Equal(tt, expectedActive, active)
require.Equal(tt, uint64(0), current)
require.Equal(tt, uint64(0), previous)
require.Equal(tt, params.BeaconConfig().EffectiveBalanceIncrement, current)
require.Equal(tt, params.BeaconConfig().EffectiveBalanceIncrement, previous)
})
t.Run("bad votes in last two epochs", func(tt *testing.T) {
@@ -38,8 +38,8 @@ func TestState_UnrealizedCheckpointBalances(t *testing.T) {
active, previous, current, err := UnrealizedCheckpointBalances(cp, pp, validators, 1)
require.NoError(tt, err)
require.Equal(tt, expectedActive, active)
require.Equal(tt, uint64(0), current)
require.Equal(tt, uint64(0), previous)
require.Equal(tt, params.BeaconConfig().EffectiveBalanceIncrement, current)
require.Equal(tt, params.BeaconConfig().EffectiveBalanceIncrement, previous)
})
t.Run("two votes in last epoch", func(tt *testing.T) {
@@ -49,7 +49,7 @@ func TestState_UnrealizedCheckpointBalances(t *testing.T) {
require.NoError(tt, err)
require.Equal(tt, expectedActive, active)
require.Equal(tt, 2*params.BeaconConfig().MaxEffectiveBalance, current)
require.Equal(tt, uint64(0), previous)
require.Equal(tt, params.BeaconConfig().EffectiveBalanceIncrement, previous)
})
t.Run("two votes in previous epoch", func(tt *testing.T) {
@@ -58,7 +58,7 @@ func TestState_UnrealizedCheckpointBalances(t *testing.T) {
active, previous, current, err := UnrealizedCheckpointBalances(cp, pp, validators, 1)
require.NoError(tt, err)
require.Equal(tt, expectedActive, active)
require.Equal(tt, uint64(0), current)
require.Equal(tt, params.BeaconConfig().EffectiveBalanceIncrement, current)
require.Equal(tt, 2*params.BeaconConfig().MaxEffectiveBalance, previous)
})
@@ -78,7 +78,6 @@ func TestState_UnrealizedCheckpointBalances(t *testing.T) {
validators[1].Slashed = true
active, previous, current, err := UnrealizedCheckpointBalances(cp, pp, validators, 1)
require.NoError(tt, err)
expectedActive -= params.BeaconConfig().MaxEffectiveBalance
require.Equal(tt, expectedActive, active)
require.Equal(tt, params.BeaconConfig().MaxEffectiveBalance-params.BeaconConfig().MinDepositAmount, current)
require.Equal(tt, 2*params.BeaconConfig().MaxEffectiveBalance, previous)

View File

@@ -45,6 +45,7 @@ go_library(
"//math:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//runtime:go_default_library",
"//runtime/version:go_default_library",
"//time:go_default_library",
"//time/slots:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
@@ -76,6 +77,7 @@ go_test(
"//beacon-chain/das:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/filesystem:go_default_library",
"//beacon-chain/db/kv:go_default_library",
"//beacon-chain/db/testing:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",

View File

@@ -126,8 +126,9 @@ type fetchRequestResponse struct {
// newBlocksFetcher creates ready to use fetcher.
func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetcher {
blocksPerPeriod := flags.Get().BlockBatchLimit
allowedBlocksBurst := flags.Get().BlockBatchLimitBurstFactor * flags.Get().BlockBatchLimit
blockBatchLimit := maxBatchLimit()
blocksPerPeriod := blockBatchLimit
allowedBlocksBurst := flags.Get().BlockBatchLimitBurstFactor * blockBatchLimit
// Allow fetcher to go almost to the full burst capacity (less a single batch).
rateLimiter := leakybucket.NewCollector(
float64(blocksPerPeriod), int64(allowedBlocksBurst-blocksPerPeriod),
@@ -159,6 +160,27 @@ func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetc
}
}
// This specifies the block batch limit the initial sync fetcher will use. In the event the user has provided
// and excessive number, this is automatically lowered.
func maxBatchLimit() int {
currLimit := flags.Get().BlockBatchLimit
maxLimit := params.BeaconConfig().MaxRequestBlocks
if params.DenebEnabled() {
maxLimit = params.BeaconConfig().MaxRequestBlocksDeneb
}
castedMaxLimit, err := math.Int(maxLimit)
if err != nil {
// Should be impossible to hit this case.
log.WithError(err).Error("Unable to calculate the max batch limit")
return currLimit
}
if currLimit > castedMaxLimit {
log.Warnf("Specified batch size exceeds the block limit of the network, lowering from %d to %d", currLimit, maxLimit)
currLimit = castedMaxLimit
}
return currLimit
}
// start boots up the fetcher, which starts listening for incoming fetch requests.
func (f *blocksFetcher) start() error {
select {

View File

@@ -3,6 +3,7 @@ package initialsync
import (
"context"
"fmt"
"math"
"math/rand"
"sort"
"sync"
@@ -1142,3 +1143,26 @@ func TestVerifyAndPopulateBlobs(t *testing.T) {
// We delete each entry we've seen, so if we see all expected commits, the map should be empty at the end.
require.Equal(t, 0, len(expectedCommits))
}
func TestBatchLimit(t *testing.T) {
params.SetupTestConfigCleanup(t)
testCfg := params.BeaconConfig().Copy()
testCfg.DenebForkEpoch = math.MaxUint64
params.OverrideBeaconConfig(testCfg)
resetFlags := flags.Get()
flags.Init(&flags.GlobalFlags{
BlockBatchLimit: 640,
BlockBatchLimitBurstFactor: 10,
})
defer func() {
flags.Init(resetFlags)
}()
assert.Equal(t, 640, maxBatchLimit())
testCfg.DenebForkEpoch = 100000
params.OverrideBeaconConfig(testCfg)
assert.Equal(t, params.BeaconConfig().MaxRequestBlocksDeneb, uint64(maxBatchLimit()))
}

View File

@@ -5,22 +5,31 @@ package initialsync
import (
"context"
"fmt"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/paulbellamy/ratecounter"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/async/abool"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain"
blockfeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/block"
statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
p2ptypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/startup"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/crypto/rand"
eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/runtime"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
prysmTime "github.com/prysmaticlabs/prysm/v5/time"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/sirupsen/logrus"
@@ -58,6 +67,7 @@ type Service struct {
clock *startup.Clock
verifierWaiter *verification.InitializerWaiter
newBlobVerifier verification.NewBlobVerifier
ctxMap sync.ContextByteVersions
}
// Option is a functional option for the initial-sync Service.
@@ -124,6 +134,13 @@ func (s *Service) Start() {
}
s.clock = clock
log.Info("Received state initialized event")
ctxMap, err := sync.ContextByteVersionsForValRoot(clock.GenesisValidatorsRoot())
if err != nil {
log.WithField("genesisValidatorRoot", clock.GenesisValidatorsRoot()).
WithError(err).Error("unable to initialize context version map using genesis validator")
return
}
s.ctxMap = ctxMap
v, err := s.verifierWaiter.WaitForInitializer(s.ctx)
if err != nil {
@@ -162,7 +179,15 @@ func (s *Service) Start() {
s.markSynced()
return
}
s.waitForMinimumPeers()
peers, err := s.waitForMinimumPeers()
if err != nil {
log.WithError(err).Error("Error waiting for minimum number of peers")
return
}
if err := s.fetchOriginBlobs(peers); err != nil {
log.WithError(err).Error("Failed to fetch missing blobs for checkpoint origin")
return
}
if err := s.roundRobinSync(gt); err != nil {
if errors.Is(s.ctx.Err(), context.Canceled) {
return
@@ -215,7 +240,10 @@ func (s *Service) Resync() error {
defer func() { s.synced.Set() }() // Reset it at the end of the method.
genesis := time.Unix(int64(headState.GenesisTime()), 0) // lint:ignore uintcast -- Genesis time will not exceed int64 in your lifetime.
s.waitForMinimumPeers()
_, err = s.waitForMinimumPeers()
if err != nil {
return err
}
if err = s.roundRobinSync(genesis); err != nil {
log = log.WithError(err)
}
@@ -223,16 +251,19 @@ func (s *Service) Resync() error {
return nil
}
func (s *Service) waitForMinimumPeers() {
func (s *Service) waitForMinimumPeers() ([]peer.ID, error) {
required := params.BeaconConfig().MaxPeersToSync
if flags.Get().MinimumSyncPeers < required {
required = flags.Get().MinimumSyncPeers
}
for {
if s.ctx.Err() != nil {
return nil, s.ctx.Err()
}
cp := s.cfg.Chain.FinalizedCheckpt()
_, peers := s.cfg.P2P.Peers().BestNonFinalized(flags.Get().MinimumSyncPeers, cp.Epoch)
if len(peers) >= required {
break
return peers, nil
}
log.WithFields(logrus.Fields{
"suitable": len(peers),
@@ -247,3 +278,87 @@ func (s *Service) markSynced() {
s.synced.Set()
close(s.cfg.InitialSyncComplete)
}
func missingBlobRequest(blk blocks.ROBlock, store *filesystem.BlobStorage) (p2ptypes.BlobSidecarsByRootReq, error) {
r := blk.Root()
if blk.Version() < version.Deneb {
return nil, nil
}
cmts, err := blk.Block().Body().BlobKzgCommitments()
if err != nil {
log.WithField("root", r).Error("Error reading commitments from checkpoint sync origin block")
return nil, err
}
if len(cmts) == 0 {
return nil, nil
}
onDisk, err := store.Indices(r)
if err != nil {
return nil, errors.Wrapf(err, "error checking existing blobs for checkpoint sync block root %#x", r)
}
req := make(p2ptypes.BlobSidecarsByRootReq, 0, len(cmts))
for i := range cmts {
if onDisk[i] {
continue
}
req = append(req, &eth.BlobIdentifier{BlockRoot: r[:], Index: uint64(i)})
}
return req, nil
}
func (s *Service) fetchOriginBlobs(pids []peer.ID) error {
r, err := s.cfg.DB.OriginCheckpointBlockRoot(s.ctx)
if errors.Is(err, db.ErrNotFoundOriginBlockRoot) {
return nil
}
blk, err := s.cfg.DB.Block(s.ctx, r)
if err != nil {
log.WithField("root", fmt.Sprintf("%#x", r)).Error("Block for checkpoint sync origin root not found in db")
return err
}
if !params.WithinDAPeriod(slots.ToEpoch(blk.Block().Slot()), slots.ToEpoch(s.clock.CurrentSlot())) {
return nil
}
rob, err := blocks.NewROBlockWithRoot(blk, r)
if err != nil {
return err
}
req, err := missingBlobRequest(rob, s.cfg.BlobStorage)
if err != nil {
return err
}
if len(req) == 0 {
log.WithField("root", fmt.Sprintf("%#x", r)).Debug("All blobs for checkpoint block are present")
return nil
}
shufflePeers(pids)
for i := range pids {
sidecars, err := sync.SendBlobSidecarByRoot(s.ctx, s.clock, s.cfg.P2P, pids[i], s.ctxMap, &req)
if err != nil {
continue
}
if len(sidecars) != len(req) {
continue
}
bv := newBlobBatchVerifier(s.newBlobVerifier)
avs := das.NewLazilyPersistentStore(s.cfg.BlobStorage, bv)
current := s.clock.CurrentSlot()
if err := avs.Persist(current, sidecars...); err != nil {
return err
}
if err := avs.IsDataAvailable(s.ctx, current, rob); err != nil {
log.WithField("root", fmt.Sprintf("%#x", r)).WithField("peerID", pids[i]).Warn("Blobs from peer for origin block were unusable")
continue
}
log.WithField("nBlobs", len(sidecars)).WithField("root", fmt.Sprintf("%#x", r)).Info("Successfully downloaded blobs for checkpoint sync block")
return nil
}
return fmt.Errorf("no connected peer able to provide blobs for checkpoint sync block %#x", r)
}
func shufflePeers(pids []peer.ID) {
rg := rand.NewGenerator()
rg.Shuffle(len(pids), func(i, j int) {
pids[i], pids[j] = pids[j], pids[i]
})
}

View File

@@ -6,16 +6,19 @@ import (
"testing"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/paulbellamy/ratecounter"
"github.com/prysmaticlabs/prysm/v5/async/abool"
mock "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/kv"
dbtest "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/testing"
p2pt "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/startup"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/testing/assert"
@@ -420,3 +423,91 @@ func TestService_Synced(t *testing.T) {
s.synced.Set()
assert.Equal(t, true, s.Synced())
}
func TestMissingBlobRequest(t *testing.T) {
cases := []struct {
name string
setup func(t *testing.T) (blocks.ROBlock, *filesystem.BlobStorage)
nReq int
err error
}{
{
name: "pre-deneb",
setup: func(t *testing.T) (blocks.ROBlock, *filesystem.BlobStorage) {
cb, err := blocks.NewSignedBeaconBlock(util.NewBeaconBlockCapella())
require.NoError(t, err)
rob, err := blocks.NewROBlockWithRoot(cb, [32]byte{})
require.NoError(t, err)
return rob, nil
},
nReq: 0,
},
{
name: "deneb zero commitments",
setup: func(t *testing.T) (blocks.ROBlock, *filesystem.BlobStorage) {
bk, _ := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, 0)
return bk, nil
},
nReq: 0,
},
{
name: "2 commitments, all missing",
setup: func(t *testing.T) (blocks.ROBlock, *filesystem.BlobStorage) {
bk, _ := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, 2)
fs := filesystem.NewEphemeralBlobStorage(t)
return bk, fs
},
nReq: 2,
},
{
name: "2 commitments, 1 missing",
setup: func(t *testing.T) (blocks.ROBlock, *filesystem.BlobStorage) {
bk, _ := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, 2)
bm, fs := filesystem.NewEphemeralBlobStorageWithMocker(t)
require.NoError(t, bm.CreateFakeIndices(bk.Root(), 1))
return bk, fs
},
nReq: 1,
},
{
name: "2 commitments, 0 missing",
setup: func(t *testing.T) (blocks.ROBlock, *filesystem.BlobStorage) {
bk, _ := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, 2)
bm, fs := filesystem.NewEphemeralBlobStorageWithMocker(t)
require.NoError(t, bm.CreateFakeIndices(bk.Root(), 0, 1))
return bk, fs
},
nReq: 0,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
blk, store := c.setup(t)
req, err := missingBlobRequest(blk, store)
require.NoError(t, err)
require.Equal(t, c.nReq, len(req))
})
}
}
func TestOriginOutsideRetention(t *testing.T) {
ctx := context.Background()
bdb := dbtest.SetupDB(t)
genesis := time.Unix(0, 0)
secsPerEpoch := params.BeaconConfig().SecondsPerSlot * uint64(params.BeaconConfig().SlotsPerEpoch)
retentionSeconds := time.Second * time.Duration(uint64(params.BeaconConfig().MinEpochsForBlobsSidecarsRequest+1)*secsPerEpoch)
outsideRetention := genesis.Add(retentionSeconds)
now := func() time.Time {
return outsideRetention
}
clock := startup.NewClock(genesis, [32]byte{}, startup.WithNower(now))
s := &Service{ctx: ctx, cfg: &Config{DB: bdb}, clock: clock}
blk, _ := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, 1)
require.NoError(t, bdb.SaveBlock(ctx, blk))
concreteDB, ok := bdb.(*kv.Store)
require.Equal(t, true, ok)
require.NoError(t, concreteDB.SaveOriginCheckpointBlockRoot(ctx, blk.Root()))
// This would break due to missing service dependencies, but will return nil fast due to being outside retention.
require.Equal(t, false, params.WithinDAPeriod(slots.ToEpoch(blk.Block().Slot()), slots.ToEpoch(clock.CurrentSlot())))
require.NoError(t, s.fetchOriginBlobs([]peer.ID{}))
}

View File

@@ -192,6 +192,8 @@ func (s *Service) hasPeer() bool {
return len(s.cfg.p2p.Peers().Connected()) > 0
}
var errNoPeersForPending = errors.New("no suitable peers to process pending block queue, delaying")
// processAndBroadcastBlock validates, processes, and broadcasts a block.
// part of the function is to request missing blobs from peers if the block contains kzg commitments.
func (s *Service) processAndBroadcastBlock(ctx context.Context, b interfaces.ReadOnlySignedBeaconBlock, blkRoot [32]byte) error {
@@ -202,10 +204,17 @@ func (s *Service) processAndBroadcastBlock(ctx context.Context, b interfaces.Rea
}
}
peers := s.getBestPeers()
peerCount := len(peers)
if peerCount > 0 {
if err := s.requestPendingBlobs(ctx, b, blkRoot, peers[rand.NewGenerator().Int()%peerCount]); err != nil {
request, err := s.pendingBlobsRequestForBlock(blkRoot, b)
if err != nil {
return err
}
if len(request) > 0 {
peers := s.getBestPeers()
peerCount := len(peers)
if peerCount == 0 {
return errors.Wrapf(errNoPeersForPending, "block root=%#x", blkRoot)
}
if err := s.sendAndSaveBlobSidecars(ctx, request, peers[rand.NewGenerator().Int()%peerCount], b); err != nil {
return err
}
}

View File

@@ -440,9 +440,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) {
if !validateBlocks {
return
}
// Use a step of 1 to be inline with our specs.
req.Step = 1
for i := req.StartSlot; i < req.StartSlot.Add(req.Count*req.Step); i += primitives.Slot(req.Step) {
for i := req.StartSlot; i < req.StartSlot.Add(req.Count); i += primitives.Slot(req.Step) {
if !success {
continue
}
@@ -471,20 +469,21 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) {
p1.Connect(p2)
assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")
capacity := int64(flags.Get().BlockBatchLimit * 3)
clock := startup.NewClock(time.Unix(0, 0), [32]byte{})
reqSize := params.MaxRequestBlock(slots.ToEpoch(clock.CurrentSlot()))
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, availableBlocker: mockBlocker{avail: true}, rateLimiter: newRateLimiter(p1)}
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, capacity, time.Second, false)
defaultBlockBurstFactor := 2 // TODO: can we update the default value set in TestMain to match flags?
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(flags.Get().BlockBatchLimit*defaultBlockBurstFactor), time.Second, false)
req := &ethpb.BeaconBlocksByRangeRequest{
StartSlot: 100,
Step: 5,
Count: uint64(capacity),
Count: reqSize,
}
saveBlocks(req)
// This doesn't error because reqSize by default is 128, which is exactly the burst factor * batch limit
assert.NoError(t, sendRequest(p1, p2, r, req, true, true))
remainingCapacity := r.rateLimiter.limiterMap[topic].Remaining(p2.PeerID().String())
@@ -498,18 +497,17 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) {
p1.Connect(p2)
assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")
capacity := int64(flags.Get().BlockBatchLimit * 3)
clock := startup.NewClock(time.Unix(0, 0), [32]byte{})
reqSize := params.MaxRequestBlock(slots.ToEpoch(clock.CurrentSlot())) - 1
r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, availableBlocker: mockBlocker{avail: true}, rateLimiter: newRateLimiter(p1)}
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, capacity, time.Second, false)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(flags.Get().BlockBatchLimit), time.Second, false)
req := &ethpb.BeaconBlocksByRangeRequest{
StartSlot: 100,
Step: 5,
Count: uint64(capacity + 1),
Count: reqSize,
}
saveBlocks(req)
@@ -538,7 +536,6 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) {
req := &ethpb.BeaconBlocksByRangeRequest{
StartSlot: 100,
Step: 1,
Count: uint64(flags.Get().BlockBatchLimit),
}
saveBlocks(req)

View File

@@ -46,7 +46,6 @@ func (s *Service) sendRecentBeaconBlocksRequest(ctx context.Context, requests *t
}
return nil
})
for _, blk := range blks {
// Skip blocks before deneb because they have no blob.
if blk.Version() < version.Deneb {
@@ -56,11 +55,17 @@ func (s *Service) sendRecentBeaconBlocksRequest(ctx context.Context, requests *t
if err != nil {
return err
}
if err := s.requestPendingBlobs(ctx, blk, blkRoot, id); err != nil {
request, err := s.pendingBlobsRequestForBlock(blkRoot, blk)
if err != nil {
return err
}
if len(request) == 0 {
continue
}
if err := s.sendAndSaveBlobSidecars(ctx, request, id, blk); err != nil {
return err
}
}
return err
}
@@ -128,41 +133,13 @@ func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{
return nil
}
// requestPendingBlobs handles the request for pending blobs based on the given beacon block.
func (s *Service) requestPendingBlobs(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, peerID peer.ID) error {
if block.Version() < version.Deneb {
return nil // Block before deneb has no blob.
}
commitments, err := block.Block().Body().BlobKzgCommitments()
if err != nil {
return err
}
if len(commitments) == 0 {
return nil // No operation if the block has no blob commitments.
}
contextByte, err := ContextByteVersionsForValRoot(s.cfg.chain.GenesisValidatorsRoot())
if err != nil {
return err
}
request, err := s.constructPendingBlobsRequest(ctx, blockRoot, len(commitments))
if err != nil {
return err
}
return s.sendAndSaveBlobSidecars(ctx, request, contextByte, peerID, block)
}
// sendAndSaveBlobSidecars sends the blob request and saves received sidecars.
func (s *Service) sendAndSaveBlobSidecars(ctx context.Context, request types.BlobSidecarsByRootReq, contextByte ContextByteVersions, peerID peer.ID, block interfaces.ReadOnlySignedBeaconBlock) error {
func (s *Service) sendAndSaveBlobSidecars(ctx context.Context, request types.BlobSidecarsByRootReq, peerID peer.ID, block interfaces.ReadOnlySignedBeaconBlock) error {
if len(request) == 0 {
return nil
}
sidecars, err := SendBlobSidecarByRoot(ctx, s.cfg.clock, s.cfg.p2p, peerID, contextByte, &request)
sidecars, err := SendBlobSidecarByRoot(ctx, s.cfg.clock, s.cfg.p2p, peerID, s.ctxMap, &request)
if err != nil {
return err
}
@@ -193,8 +170,25 @@ func (s *Service) sendAndSaveBlobSidecars(ctx context.Context, request types.Blo
return nil
}
func (s *Service) pendingBlobsRequestForBlock(root [32]byte, b interfaces.ReadOnlySignedBeaconBlock) (types.BlobSidecarsByRootReq, error) {
if b.Version() < version.Deneb {
return nil, nil // Block before deneb has no blob.
}
cc, err := b.Block().Body().BlobKzgCommitments()
if err != nil {
return nil, err
}
if len(cc) == 0 {
return nil, nil
}
return s.constructPendingBlobsRequest(root, len(cc))
}
// constructPendingBlobsRequest creates a request for BlobSidecars by root, considering blobs already in DB.
func (s *Service) constructPendingBlobsRequest(ctx context.Context, root [32]byte, commitments int) (types.BlobSidecarsByRootReq, error) {
func (s *Service) constructPendingBlobsRequest(root [32]byte, commitments int) (types.BlobSidecarsByRootReq, error) {
if commitments == 0 {
return nil, nil
}
stored, err := s.cfg.blobStorage.Indices(root)
if err != nil {
return nil, err

View File

@@ -31,7 +31,7 @@ import (
leakybucket "github.com/prysmaticlabs/prysm/v5/container/leaky-bucket"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
enginev1 "github.com/prysmaticlabs/prysm/v5/proto/engine/v1"
"github.com/prysmaticlabs/prysm/v5/proto/eth/v2"
eth "github.com/prysmaticlabs/prysm/v5/proto/eth/v2"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/testing/assert"
"github.com/prysmaticlabs/prysm/v5/testing/require"
@@ -370,12 +370,16 @@ func TestRequestPendingBlobs(t *testing.T) {
t.Run("old block should not fail", func(t *testing.T) {
b, err := blocks.NewSignedBeaconBlock(util.NewBeaconBlock())
require.NoError(t, err)
require.NoError(t, s.requestPendingBlobs(context.Background(), b, [32]byte{}, "test"))
request, err := s.pendingBlobsRequestForBlock([32]byte{}, b)
require.NoError(t, err)
require.NoError(t, s.sendAndSaveBlobSidecars(context.Background(), request, "test", b))
})
t.Run("empty commitment block should not fail", func(t *testing.T) {
b, err := blocks.NewSignedBeaconBlock(util.NewBeaconBlock())
require.NoError(t, err)
require.NoError(t, s.requestPendingBlobs(context.Background(), b, [32]byte{}, "test"))
request, err := s.pendingBlobsRequestForBlock([32]byte{}, b)
require.NoError(t, err)
require.NoError(t, s.sendAndSaveBlobSidecars(context.Background(), request, "test", b))
})
t.Run("unsupported protocol", func(t *testing.T) {
p1 := p2ptest.NewTestP2P(t)
@@ -406,7 +410,9 @@ func TestRequestPendingBlobs(t *testing.T) {
b.Block.Body.BlobKzgCommitments = make([][]byte, 1)
b1, err := blocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
require.ErrorContains(t, "protocols not supported", s.requestPendingBlobs(context.Background(), b1, [32]byte{}, p2.PeerID()))
request, err := s.pendingBlobsRequestForBlock([32]byte{}, b1)
require.NoError(t, err)
require.ErrorContains(t, "protocols not supported", s.sendAndSaveBlobSidecars(context.Background(), request, p2.PeerID(), b1))
})
}
@@ -414,12 +420,11 @@ func TestConstructPendingBlobsRequest(t *testing.T) {
d := db.SetupDB(t)
bs := filesystem.NewEphemeralBlobStorage(t)
s := &Service{cfg: &config{beaconDB: d, blobStorage: bs}}
ctx := context.Background()
// No unknown indices.
root := [32]byte{1}
count := 3
actual, err := s.constructPendingBlobsRequest(ctx, root, count)
actual, err := s.constructPendingBlobsRequest(root, count)
require.NoError(t, err)
require.Equal(t, 3, len(actual))
for i, id := range actual {
@@ -449,7 +454,7 @@ func TestConstructPendingBlobsRequest(t *testing.T) {
expected := []*eth.BlobIdentifier{
{Index: 1, BlockRoot: root[:]},
}
actual, err = s.constructPendingBlobsRequest(ctx, root, count)
actual, err = s.constructPendingBlobsRequest(root, count)
require.NoError(t, err)
require.Equal(t, expected[0].Index, actual[0].Index)
require.DeepEqual(t, expected[0].BlockRoot, actual[0].BlockRoot)

View File

@@ -157,6 +157,7 @@ type Service struct {
verifierWaiter *verification.InitializerWaiter
newBlobVerifier verification.NewBlobVerifier
availableBlocker coverage.AvailableBlocker
ctxMap ContextByteVersions
}
// NewService initializes new regular sync service.
@@ -295,6 +296,15 @@ func (s *Service) waitForChainStart() {
s.cfg.clock = clock
startTime := clock.GenesisTime()
log.WithField("starttime", startTime).Debug("Received state initialized event")
ctxMap, err := ContextByteVersionsForValRoot(clock.GenesisValidatorsRoot())
if err != nil {
log.WithError(err).WithField("genesis_validator_root", clock.GenesisValidatorsRoot()).
Error("sync service failed to initialize context version map")
return
}
s.ctxMap = ctxMap
// Register respective rpc handlers at state initialized event.
s.registerRPCHandlers()
// Wait for chainstart in separate routine.

View File

@@ -35,11 +35,10 @@ func BeaconNodeOptions(c *cli.Context) ([]node.Option, error) {
if err != nil {
return nil, err
}
bs, err := filesystem.NewBlobStorage(blobStoragePath(c), filesystem.WithBlobRetentionEpochs(e))
if err != nil {
return nil, err
}
return []node.Option{node.WithBlobStorage(bs), node.WithBlobRetentionEpochs(e)}, nil
opts := []node.Option{node.WithBlobStorageOptions(
filesystem.WithBlobRetentionEpochs(e), filesystem.WithBasePath(blobStoragePath(c)),
)}
return opts, nil
}
func blobStoragePath(c *cli.Context) string {

View File

@@ -69,7 +69,8 @@ type Flags struct {
EnableEIP4881 bool // EnableEIP4881 specifies whether to use the deposit tree from EIP4881
PrepareAllPayloads bool // PrepareAllPayloads informs the engine to prepare a block on every slot.
// BlobSaveFsync requires blob saving to block on fsync to ensure blobs are durably persisted before passing DA.
BlobSaveFsync bool
// KeystoreImportDebounceInterval specifies the time duration the validator waits to reload new keys if they have
// changed on disk. This feature is for advanced use cases only.
KeystoreImportDebounceInterval time.Duration
@@ -245,6 +246,11 @@ func ConfigureBeaconChain(ctx *cli.Context) error {
logEnabled(EnableLightClient)
cfg.EnableLightClient = true
}
if ctx.IsSet(BlobSaveFsync.Name) {
logEnabled(BlobSaveFsync)
cfg.BlobSaveFsync = true
}
cfg.AggregateIntervals = [3]time.Duration{aggregateFirstInterval.Value, aggregateSecondInterval.Value, aggregateThirdInterval.Value}
Init(cfg)
return nil

View File

@@ -149,12 +149,16 @@ var (
Name: "disable-resource-manager",
Usage: "Disables running the libp2p resource manager.",
}
// DisableRegistrationCache a flag for disabling the validator registration cache and use db instead.
DisableRegistrationCache = &cli.BoolFlag{
Name: "disable-registration-cache",
Usage: "Temporary flag for disabling the validator registration cache instead of using the DB. Note: registrations do not clear on restart while using the DB.",
}
// BlobSaveFsync enforces durable filesystem writes for use cases where blob availability is critical.
BlobSaveFsync = &cli.BoolFlag{
Name: "blob-save-fsync",
Usage: "Forces new blob files to be fysnc'd before continuing, ensuring durable blob writes.",
}
)
// devModeFlags holds list of flags that are set when development mode is on.
@@ -209,6 +213,7 @@ var BeaconChainFlags = append(deprecatedBeaconFlags, append(deprecatedFlags, []c
disableResourceManager,
DisableRegistrationCache,
EnableLightClient,
BlobSaveFsync,
}...)...)
// E2EBeaconChainFlags contains a list of the beacon chain feature flags to be tested in E2E.

View File

@@ -23,7 +23,10 @@ const (
mainnetAltairForkEpoch = 74240 // Oct 27, 2021, 10:56:23am UTC
// Bellatrix Fork Epoch for mainnet config.
mainnetBellatrixForkEpoch = 144896 // Sept 6, 2022, 11:34:47am UTC
mainnetDenebForkEpoch = math.MaxUint64
// Capella Fork Epoch for mainnet config.
mainnetCapellaForkEpoch = 194048 // April 12, 2023, 22:27:35 UTC
// Deneb Fork Epoch for mainnet config.
mainnetDenebForkEpoch = 269568 // March 13, 2024, 13:55:35 UTC
)
var mainnetNetworkConfig = &NetworkConfig{
@@ -202,9 +205,9 @@ var mainnetBeaconConfig = &BeaconChainConfig{
BellatrixForkVersion: []byte{2, 0, 0, 0},
BellatrixForkEpoch: mainnetBellatrixForkEpoch,
CapellaForkVersion: []byte{3, 0, 0, 0},
CapellaForkEpoch: 194048,
CapellaForkEpoch: mainnetCapellaForkEpoch,
DenebForkVersion: []byte{4, 0, 0, 0},
DenebForkEpoch: math.MaxUint64,
DenebForkEpoch: mainnetDenebForkEpoch,
// New values introduced in Altair hard fork 1.
// Participation flag indices.

View File

@@ -90,6 +90,7 @@ func MinimalSpecConfig() *BeaconChainConfig {
minimalConfig.CapellaForkVersion = []byte{3, 0, 0, 1}
minimalConfig.CapellaForkEpoch = math.MaxUint64
minimalConfig.DenebForkVersion = []byte{4, 0, 0, 1}
minimalConfig.DenebForkEpoch = math.MaxUint64
minimalConfig.SyncCommitteeSize = 32
minimalConfig.InactivityScoreBias = 4

View File

@@ -35,6 +35,11 @@ func processJustificationAndFinalizationPrecomputeWrapper(t *testing.T, st state
require.NoError(t, err)
_, bp, err = altair.ProcessEpochParticipation(ctx, st, bp, vp)
require.NoError(t, err)
activeBal, targetPrevious, targetCurrent, err := st.UnrealizedCheckpointBalances()
require.NoError(t, err)
require.Equal(t, bp.ActiveCurrentEpoch, activeBal)
require.Equal(t, bp.CurrentEpochTargetAttested, targetCurrent)
require.Equal(t, bp.PrevEpochTargetAttested, targetPrevious)
st, err = precompute.ProcessJustificationAndFinalizationPreCompute(st, bp)
require.NoError(t, err, "Could not process justification")

View File

@@ -76,6 +76,13 @@ func runPrecomputeRewardsAndPenaltiesTest(t *testing.T, testFolderPath string) {
require.NoError(t, err)
vp, bp, err = altair.ProcessEpochParticipation(ctx, preBeaconState, bp, vp)
require.NoError(t, err)
activeBal, targetPrevious, targetCurrent, err := preBeaconState.UnrealizedCheckpointBalances()
require.NoError(t, err)
require.Equal(t, bp.ActiveCurrentEpoch, activeBal)
require.Equal(t, bp.CurrentEpochTargetAttested, targetCurrent)
require.Equal(t, bp.PrevEpochTargetAttested, targetPrevious)
deltas, err := altair.AttestationsDelta(preBeaconState, bp, vp)
require.NoError(t, err)

View File

@@ -35,6 +35,11 @@ func processJustificationAndFinalizationPrecomputeWrapper(t *testing.T, st state
require.NoError(t, err)
_, bp, err = altair.ProcessEpochParticipation(ctx, st, bp, vp)
require.NoError(t, err)
activeBal, targetPrevious, targetCurrent, err := st.UnrealizedCheckpointBalances()
require.NoError(t, err)
require.Equal(t, bp.ActiveCurrentEpoch, activeBal)
require.Equal(t, bp.CurrentEpochTargetAttested, targetCurrent)
require.Equal(t, bp.PrevEpochTargetAttested, targetPrevious)
st, err = precompute.ProcessJustificationAndFinalizationPreCompute(st, bp)
require.NoError(t, err, "Could not process justification")

View File

@@ -80,6 +80,13 @@ func runPrecomputeRewardsAndPenaltiesTest(t *testing.T, testFolderPath string) {
require.NoError(t, err)
vp, bp, err = altair.ProcessEpochParticipation(ctx, preBeaconState, bp, vp)
require.NoError(t, err)
activeBal, targetPrevious, targetCurrent, err := preBeaconState.UnrealizedCheckpointBalances()
require.NoError(t, err)
require.Equal(t, bp.ActiveCurrentEpoch, activeBal)
require.Equal(t, bp.CurrentEpochTargetAttested, targetCurrent)
require.Equal(t, bp.PrevEpochTargetAttested, targetPrevious)
deltas, err := altair.AttestationsDelta(preBeaconState, bp, vp)
require.NoError(t, err)

View File

@@ -35,6 +35,11 @@ func processJustificationAndFinalizationPrecomputeWrapper(t *testing.T, st state
require.NoError(t, err)
_, bp, err = altair.ProcessEpochParticipation(ctx, st, bp, vp)
require.NoError(t, err)
activeBal, targetPrevious, targetCurrent, err := st.UnrealizedCheckpointBalances()
require.NoError(t, err)
require.Equal(t, bp.ActiveCurrentEpoch, activeBal)
require.Equal(t, bp.CurrentEpochTargetAttested, targetCurrent)
require.Equal(t, bp.PrevEpochTargetAttested, targetPrevious)
st, err = precompute.ProcessJustificationAndFinalizationPreCompute(st, bp)
require.NoError(t, err, "Could not process justification")

View File

@@ -80,6 +80,13 @@ func runPrecomputeRewardsAndPenaltiesTest(t *testing.T, testFolderPath string) {
require.NoError(t, err)
vp, bp, err = altair.ProcessEpochParticipation(ctx, preBeaconState, bp, vp)
require.NoError(t, err)
activeBal, targetPrevious, targetCurrent, err := preBeaconState.UnrealizedCheckpointBalances()
require.NoError(t, err)
require.Equal(t, bp.ActiveCurrentEpoch, activeBal)
require.Equal(t, bp.CurrentEpochTargetAttested, targetCurrent)
require.Equal(t, bp.PrevEpochTargetAttested, targetPrevious)
deltas, err := altair.AttestationsDelta(preBeaconState, bp, vp)
require.NoError(t, err)

View File

@@ -32,7 +32,11 @@ func processJustificationAndFinalizationPrecomputeWrapper(t *testing.T, st state
require.NoError(t, err)
_, bp, err = altair.ProcessEpochParticipation(ctx, st, bp, vp)
require.NoError(t, err)
activeBal, targetPrevious, targetCurrent, err := st.UnrealizedCheckpointBalances()
require.NoError(t, err)
require.Equal(t, bp.ActiveCurrentEpoch, activeBal)
require.Equal(t, bp.CurrentEpochTargetAttested, targetCurrent)
require.Equal(t, bp.PrevEpochTargetAttested, targetPrevious)
st, err = precompute.ProcessJustificationAndFinalizationPreCompute(st, bp)
require.NoError(t, err, "Could not process justification")

View File

@@ -70,8 +70,16 @@ func runPrecomputeRewardsAndPenaltiesTest(t *testing.T, testFolderPath string) {
vp, bp, err := altair.InitializePrecomputeValidators(ctx, preBeaconState)
require.NoError(t, err)
vp, bp, err = altair.ProcessEpochParticipation(ctx, preBeaconState, bp, vp)
require.NoError(t, err)
activeBal, targetPrevious, targetCurrent, err := preBeaconState.UnrealizedCheckpointBalances()
require.NoError(t, err)
require.Equal(t, bp.ActiveCurrentEpoch, activeBal)
require.Equal(t, bp.CurrentEpochTargetAttested, targetCurrent)
require.Equal(t, bp.PrevEpochTargetAttested, targetPrevious)
deltas, err := altair.AttestationsDelta(preBeaconState, bp, vp)
require.NoError(t, err)