Compare commits

..

10 Commits

Author SHA1 Message Date
nisdas
aa1327701e fix it 2024-04-09 13:33:30 +08:00
Nishant Das
1a0c07deec Extend Broadcast Window For Attestations (#13858)
* fix it

* make check better
2024-04-08 04:49:20 +00:00
kasey
04f231a400 Initsync skip local blobs (#13827)
* wip - init-sync skip available blob req

* satisfy deep source

* gaz

* don't need to sort blobs; simplify blobRequest stack

* wip debug log to watch blob skip behavior

* unit tests for new blob req generator

* refactor to reduce blob req func count

* log when WaitForSummarizer fails

---------

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
2024-04-05 19:09:43 +00:00
Manu NALEPA
be1bfcce63 P2P: Add QUIC support (#13786)
* (Unrelated) DoppelGanger: Improve message.

* `beacon-blocks-by-range`: Add `--network` option.

* `ensurePeerConnections`: Remove capital letter in error message.

* `MultiAddressBuilder{WithID}`: Refactor.

* `buildOptions`: Improve log.

* `NewService`: Bubbles up errors.

* `tcp` ==> `libp2ptcp`

* `multiAddressBuilderWithID`: Add the ability to build QUIC multiaddr

* `p2p Start`: Fix error message.

* `p2p`: Add QUIC support.

* Status: Implement `{Inbound,Outbound}Connected{TCP,QUIC}`.

* Logging: Display the number of TCP/QUIC connected peers.

* P2P: Implement `{Inbound,Outbound}ConnectedWithProtocol`.

* Hide QUIC protocol behind the `--enable-quic` feature flag.

* `e2e`: Add `--enable-quic` flag.

* Add `--enable-quic` in `devModeFlag`.

* `convertToMultiAddrs` ==> `retrieveMultiAddrsFromNode`.

* `convertToAddrInfo`: Ensure `len(infos) == 1`.
2024-04-04 12:21:35 +00:00
Manu NALEPA
8cf5d79852 Remove the Goerli/Prater support. (#13846) 2024-04-03 19:19:17 +00:00
redistay
f7912e7c20 chore: fix some comments (#13843)
Signed-off-by: redistay <wujunjing@outlook.com>
2024-04-02 22:19:15 +00:00
terence
caa8be5dd1 Beacon-api: broadcast blobs in the event of seen block (#13830)
* Beacon-api: broadcast blobs in the event of seen block

* Fix parameters

* Fix test

* Check forkchoice

* Ran go format

* Revert "Ran go format"

This reverts commit 091e77e81d6e2b9861fecc27c0bad1898033f9a3.

* James feedback

* Radek's feedback

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Fix bad tests

---------

Co-authored-by: Radosław Kapka <rkapka@wp.pl>
2024-04-02 18:12:58 +00:00
cui
0c15a30a34 using slices.Index (#13836) 2024-04-02 16:30:05 +00:00
cui
7bce1c0714 using slices.IndexFunc (#13839) 2024-04-02 16:06:27 +00:00
Radosław Kapka
d1084cbe48 Send correct state root with finalization event (#13842) 2024-04-02 15:32:32 +00:00
56 changed files with 810 additions and 598 deletions

View File

@@ -342,22 +342,6 @@ filegroup(
url = "https://github.com/eth-clients/eth2-networks/archive/934c948e69205dcf2deb87e4ae6cc140c335f94d.tar.gz",
)
http_archive(
name = "goerli_testnet",
build_file_content = """
filegroup(
name = "configs",
srcs = [
"prater/config.yaml",
],
visibility = ["//visibility:public"],
)
""",
sha256 = "43fc0f55ddff7b511713e2de07aa22846a67432df997296fb4fc09cd8ed1dcdb",
strip_prefix = "goerli-6522ac6684693740cd4ddcc2a0662e03702aa4a1",
url = "https://github.com/eth-clients/goerli/archive/6522ac6684693740cd4ddcc2a0662e03702aa4a1.tar.gz",
)
http_archive(
name = "holesky_testnet",
build_file_content = """

View File

@@ -61,7 +61,7 @@ func TestService_headNextSyncCommitteeIndices(t *testing.T) {
indices, err := c.headNextSyncCommitteeIndices(context.Background(), 0, primitives.Slot(slot))
require.NoError(t, err)
// NextSyncCommittee should be be empty after `ProcessSyncCommitteeUpdates`. Validator should get indices.
// NextSyncCommittee should be empty after `ProcessSyncCommitteeUpdates`. Validator should get indices.
require.NotEqual(t, 0, len(indices))
}

View File

@@ -170,7 +170,7 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig
// Send finalized events and finalized deposits in the background
if newFinalized {
finalized := s.cfg.ForkChoiceStore.FinalizedCheckpoint()
go s.sendNewFinalizedEvent(blockCopy, postState)
go s.sendNewFinalizedEvent(ctx, postState)
depCtx, cancel := context.WithTimeout(context.Background(), depositDeadline)
go func() {
s.insertFinalizedDeposits(depCtx, finalized.Root)
@@ -443,7 +443,7 @@ func (s *Service) updateFinalizationOnBlock(ctx context.Context, preState, postS
// sendNewFinalizedEvent sends a new finalization checkpoint event over the
// event feed. It needs to be called on the background
func (s *Service) sendNewFinalizedEvent(signed interfaces.ReadOnlySignedBeaconBlock, postState state.BeaconState) {
func (s *Service) sendNewFinalizedEvent(ctx context.Context, postState state.BeaconState) {
isValidPayload := false
s.headLock.RLock()
if s.head != nil {
@@ -451,8 +451,17 @@ func (s *Service) sendNewFinalizedEvent(signed interfaces.ReadOnlySignedBeaconBl
}
s.headLock.RUnlock()
blk, err := s.cfg.BeaconDB.Block(ctx, bytesutil.ToBytes32(postState.FinalizedCheckpoint().Root))
if err != nil {
log.WithError(err).Error("Could not retrieve block for finalized checkpoint root. Finalized event will not be emitted")
return
}
if blk == nil || blk.IsNil() || blk.Block() == nil || blk.Block().IsNil() {
log.WithError(err).Error("Block retrieved for finalized checkpoint root is nil. Finalized event will not be emitted")
return
}
stateRoot := blk.Block().StateRoot()
// Send an event regarding the new finalized checkpoint over a common event feed.
stateRoot := signed.Block().StateRoot()
s.cfg.StateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.FinalizedCheckpoint,
Data: &ethpbv1.EventFinalizedCheckpoint{

View File

@@ -8,12 +8,14 @@ import (
blockchainTesting "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/cache"
statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/voluntaryexits"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
ethpbv1 "github.com/prysmaticlabs/prysm/v5/proto/eth/v1"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/testing/assert"
"github.com/prysmaticlabs/prysm/v5/testing/require"
@@ -378,3 +380,38 @@ func TestHandleBlockBLSToExecutionChanges(t *testing.T) {
require.Equal(t, false, pool.ValidatorExists(idx))
})
}
func Test_sendNewFinalizedEvent(t *testing.T) {
s, _ := minimalTestService(t)
notifier := &blockchainTesting.MockStateNotifier{RecordEvents: true}
s.cfg.StateNotifier = notifier
finalizedSt, err := util.NewBeaconState()
require.NoError(t, err)
finalizedStRoot, err := finalizedSt.HashTreeRoot(s.ctx)
require.NoError(t, err)
b := util.NewBeaconBlock()
b.Block.StateRoot = finalizedStRoot[:]
sbb, err := blocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
sbbRoot, err := sbb.Block().HashTreeRoot()
require.NoError(t, err)
require.NoError(t, s.cfg.BeaconDB.SaveBlock(s.ctx, sbb))
st, err := util.NewBeaconState()
require.NoError(t, err)
require.NoError(t, st.SetFinalizedCheckpoint(&ethpb.Checkpoint{
Epoch: 123,
Root: sbbRoot[:],
}))
s.sendNewFinalizedEvent(s.ctx, st)
require.Equal(t, 1, len(notifier.ReceivedEvents()))
e := notifier.ReceivedEvents()[0]
assert.Equal(t, statefeed.FinalizedCheckpoint, int(e.Type))
fc, ok := e.Data.(*ethpbv1.EventFinalizedCheckpoint)
require.Equal(t, true, ok, "event has wrong data type")
assert.Equal(t, primitives.Epoch(123), fc.Epoch)
assert.DeepEqual(t, sbbRoot[:], fc.Block)
assert.DeepEqual(t, finalizedStRoot[:], fc.State)
assert.Equal(t, false, fc.ExecutionOptimistic)
}

View File

@@ -94,6 +94,15 @@ func (s *LazilyPersistentStore) IsDataAvailable(ctx context.Context, current pri
entry := s.cache.ensure(key)
defer s.cache.delete(key)
root := b.Root()
sumz, err := s.store.WaitForSummarizer(ctx)
if err != nil {
log.WithField("root", fmt.Sprintf("%#x", b.Root())).
WithError(err).
Debug("Failed to receive BlobStorageSummarizer within IsDataAvailable")
} else {
entry.setDiskSummary(sumz.Summary(root))
}
// Verify we have all the expected sidecars, and fail fast if any are missing or inconsistent.
// We don't try to salvage problematic batches because this indicates a misbehaving peer and we'd rather
// ignore their response and decrease their peer score.

View File

@@ -4,6 +4,7 @@ import (
"bytes"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
@@ -59,7 +60,12 @@ func (c *cache) delete(key cacheKey) {
// cacheEntry holds a fixed-length cache of BlobSidecars.
type cacheEntry struct {
scs [fieldparams.MaxBlobsPerBlock]*blocks.ROBlob
scs [fieldparams.MaxBlobsPerBlock]*blocks.ROBlob
diskSummary filesystem.BlobStorageSummary
}
func (e *cacheEntry) setDiskSummary(sum filesystem.BlobStorageSummary) {
e.diskSummary = sum
}
// stash adds an item to the in-memory cache of BlobSidecars.
@@ -81,9 +87,17 @@ func (e *cacheEntry) stash(sc *blocks.ROBlob) error {
// the cache do not match those found in the block. If err is nil, then all expected
// commitments were found in the cache and the sidecar slice return value can be used
// to perform a DA check against the cached sidecars.
// filter only returns blobs that need to be checked. Blobs already available on disk will be excluded.
func (e *cacheEntry) filter(root [32]byte, kc safeCommitmentArray) ([]blocks.ROBlob, error) {
if e.diskSummary.AllAvailable(kc.count()) {
return nil, nil
}
scs := make([]blocks.ROBlob, kc.count())
for i := uint64(0); i < fieldparams.MaxBlobsPerBlock; i++ {
// We already have this blob, we don't need to write it or validate it.
if e.diskSummary.HasIndex(i) {
continue
}
if kc[i] == nil {
if e.scs[i] != nil {
return nil, errors.Wrapf(errCommitmentMismatch, "root=%#x, index=%#x, commitment=%#x, no block commitment", root, i, e.scs[i].KzgCommitment)

View File

@@ -5,9 +5,9 @@ go_library(
srcs = [
"blob.go",
"cache.go",
"ephemeral.go",
"log.go",
"metrics.go",
"mock.go",
"pruner.go",
],
importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem",

View File

@@ -121,7 +121,7 @@ var ErrBlobStorageSummarizerUnavailable = errors.New("BlobStorage not initialize
// BlobStorageSummarizer is not ready immediately on node startup because it needs to sample the blob filesystem to
// determine which blobs are available.
func (bs *BlobStorage) WaitForSummarizer(ctx context.Context) (BlobStorageSummarizer, error) {
if bs.pruner == nil {
if bs == nil || bs.pruner == nil {
return nil, ErrBlobStorageSummarizerUnavailable
}
return bs.pruner.waitForCache(ctx)

View File

@@ -12,7 +12,7 @@ import (
// improving test performance and simplifying cleanup.
func NewEphemeralBlobStorage(t testing.TB) *BlobStorage {
fs := afero.NewMemMapFs()
pruner, err := newBlobPruner(fs, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest)
pruner, err := newBlobPruner(fs, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest, withWarmedCache())
if err != nil {
t.Fatal("test setup issue", err)
}
@@ -23,7 +23,7 @@ func NewEphemeralBlobStorage(t testing.TB) *BlobStorage {
// in order to interact with it outside the parameters of the BlobStorage api.
func NewEphemeralBlobStorageWithFs(t testing.TB) (afero.Fs, *BlobStorage, error) {
fs := afero.NewMemMapFs()
pruner, err := newBlobPruner(fs, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest)
pruner, err := newBlobPruner(fs, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest, withWarmedCache())
if err != nil {
t.Fatal("test setup issue", err)
}
@@ -61,3 +61,15 @@ func NewEphemeralBlobStorageWithMocker(_ testing.TB) (*BlobMocker, *BlobStorage)
bs := &BlobStorage{fs: fs}
return &BlobMocker{fs: fs, bs: bs}, bs
}
func NewMockBlobStorageSummarizer(t *testing.T, set map[[32]byte][]int) BlobStorageSummarizer {
c := newBlobStorageCache()
for k, v := range set {
for i := range v {
if err := c.ensure(rootString(k), 0, uint64(v[i])); err != nil {
t.Fatal(err)
}
}
}
return c
}

View File

@@ -33,17 +33,32 @@ type blobPruner struct {
prunedBefore atomic.Uint64
windowSize primitives.Slot
cache *blobStorageCache
cacheWarmed chan struct{}
cacheReady chan struct{}
warmed bool
fs afero.Fs
}
func newBlobPruner(fs afero.Fs, retain primitives.Epoch) (*blobPruner, error) {
type prunerOpt func(*blobPruner) error
func withWarmedCache() prunerOpt {
return func(p *blobPruner) error {
return p.warmCache()
}
}
func newBlobPruner(fs afero.Fs, retain primitives.Epoch, opts ...prunerOpt) (*blobPruner, error) {
r, err := slots.EpochStart(retain + retentionBuffer)
if err != nil {
return nil, errors.Wrap(err, "could not set retentionSlots")
}
cw := make(chan struct{})
return &blobPruner{fs: fs, windowSize: r, cache: newBlobStorageCache(), cacheWarmed: cw}, nil
p := &blobPruner{fs: fs, windowSize: r, cache: newBlobStorageCache(), cacheReady: cw}
for _, o := range opts {
if err := o(p); err != nil {
return nil, err
}
}
return p, nil
}
// notify updates the pruner's view of root->blob mappings. This allows the pruner to build a cache
@@ -57,6 +72,8 @@ func (p *blobPruner) notify(root [32]byte, latest primitives.Slot, idx uint64) e
return nil
}
go func() {
p.Lock()
defer p.Unlock()
if err := p.prune(primitives.Slot(pruned)); err != nil {
log.WithError(err).Errorf("Failed to prune blobs from slot %d", latest)
}
@@ -74,16 +91,21 @@ func windowMin(latest, offset primitives.Slot) primitives.Slot {
}
func (p *blobPruner) warmCache() error {
p.Lock()
defer p.Unlock()
if err := p.prune(0); err != nil {
return err
}
close(p.cacheWarmed)
if !p.warmed {
p.warmed = true
close(p.cacheReady)
}
return nil
}
func (p *blobPruner) waitForCache(ctx context.Context) (*blobStorageCache, error) {
select {
case <-p.cacheWarmed:
case <-p.cacheReady:
return p.cache, nil
case <-ctx.Done():
return nil, ctx.Err()
@@ -94,8 +116,6 @@ func (p *blobPruner) waitForCache(ctx context.Context) (*blobStorageCache, error
// It deletes blobs older than currentEpoch - (retentionEpochs+bufferEpochs).
// This is so that we keep a slight buffer and blobs are deleted after n+2 epochs.
func (p *blobPruner) prune(pruneBefore primitives.Slot) error {
p.Lock()
defer p.Unlock()
start := time.Now()
totalPruned, totalErr := 0, 0
// Customize logging/metrics behavior for the initial cache warmup when slot=0.

View File

@@ -217,9 +217,9 @@ func Test_hasNetworkFlag(t *testing.T) {
want bool
}{
{
name: "Prater testnet",
networkName: features.PraterTestnet.Name,
networkValue: "prater",
name: "Holesky testnet",
networkName: features.HoleskyTestnet.Name,
networkValue: "holesky",
want: true,
},
{

View File

@@ -10,6 +10,7 @@ import (
"github.com/pkg/errors"
ssz "github.com/prysmaticlabs/fastssz"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/altair"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/crypto/hash"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing"
@@ -137,11 +138,11 @@ func (s *Service) internalBroadcastAttestation(ctx context.Context, subnet uint6
// In the event our attestation is outdated and beyond the
// acceptable threshold, we exit early and do not broadcast it.
currSlot := slots.CurrentSlot(uint64(s.genesisTime.Unix()))
if att.Data.Slot+params.BeaconConfig().SlotsPerEpoch < currSlot {
if err := helpers.ValidateAttestationTime(att.Data.Slot, s.genesisTime, params.BeaconConfig().MaximumGossipClockDisparityDuration()); err != nil {
log.WithFields(logrus.Fields{
"attestationSlot": att.Data.Slot,
"currentSlot": currSlot,
}).Warning("Attestation is too old to broadcast, discarding it")
}).WithError(err).Warning("Attestation is too old to broadcast, discarding it")
return
}

View File

@@ -15,6 +15,7 @@ import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/v5/config/features"
"github.com/prysmaticlabs/prysm/v5/config/params"
ecdsaprysm "github.com/prysmaticlabs/prysm/v5/crypto/ecdsa"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
@@ -230,14 +231,18 @@ func (s *Service) createLocalNode(
localNode := enode.NewLocalNode(db, privKey)
ipEntry := enr.IP(ipAddr)
udpEntry := enr.UDP(udpPort)
tcpEntry := enr.TCP(tcpPort)
quicEntry := quicProtocol(quicPort)
localNode.Set(ipEntry)
udpEntry := enr.UDP(udpPort)
localNode.Set(udpEntry)
tcpEntry := enr.TCP(tcpPort)
localNode.Set(tcpEntry)
localNode.Set(quicEntry)
if features.Get().EnableQUIC {
quicEntry := quicProtocol(quicPort)
localNode.Set(quicEntry)
}
localNode.SetFallbackIP(ipAddr)
localNode.SetFallbackUDP(udpPort)
@@ -316,7 +321,7 @@ func (s *Service) filterPeer(node *enode.Node) bool {
return false
}
if len(multiAddrs) == 0 {
if peerData == nil || len(multiAddrs) == 0 {
return false
}
@@ -395,7 +400,7 @@ func PeersFromStringAddrs(addrs []string) ([]ma.Multiaddr, error) {
if err != nil {
return nil, errors.Wrapf(err, "Could not get enode from string")
}
nodeAddrs, err := convertToMultiAddrs(enodeAddr)
nodeAddrs, err := retrieveMultiAddrsFromNode(enodeAddr)
if err != nil {
return nil, errors.Wrapf(err, "Could not get multiaddr")
}
@@ -444,7 +449,7 @@ func convertToMultiAddr(nodes []*enode.Node) []ma.Multiaddr {
}
// Get up to two multiaddrs (TCP and QUIC) for each node.
nodeMultiAddrs, err := convertToMultiAddrs(node)
nodeMultiAddrs, err := retrieveMultiAddrsFromNode(node)
if err != nil {
log.WithError(err).Errorf("Could not convert to multiAddr node %s", node)
continue
@@ -457,7 +462,7 @@ func convertToMultiAddr(nodes []*enode.Node) []ma.Multiaddr {
}
func convertToAddrInfo(node *enode.Node) (*peer.AddrInfo, []ma.Multiaddr, error) {
multiAddrs, err := convertToMultiAddrs(node)
multiAddrs, err := retrieveMultiAddrsFromNode(node)
if err != nil {
return nil, nil, err
}
@@ -471,22 +476,18 @@ func convertToAddrInfo(node *enode.Node) (*peer.AddrInfo, []ma.Multiaddr, error)
return nil, nil, errors.Wrapf(err, "could not convert to peer info: %v", multiAddrs)
}
if len(infos) > 1 {
return nil, nil, errors.Errorf("infos contains %v elements, expected not more than 1", len(infos))
}
if len(infos) == 0 {
return nil, multiAddrs, nil
if len(infos) != 1 {
return nil, nil, errors.Errorf("infos contains %v elements, expected exactly 1", len(infos))
}
return &infos[0], multiAddrs, nil
}
// convertToMultiAddrs converts an enode.Node to a list of multiaddrs.
// retrieveMultiAddrsFromNode converts an enode.Node to a list of multiaddrs.
// If the node has a both a QUIC and a TCP port set in their ENR, then
// the multiaddr corresponding to the QUIC port is added first, followed
// by the multiaddr corresponding to the TCP port.
func convertToMultiAddrs(node *enode.Node) ([]ma.Multiaddr, error) {
func retrieveMultiAddrsFromNode(node *enode.Node) ([]ma.Multiaddr, error) {
multiaddrs := make([]ma.Multiaddr, 0, 2)
// Retrieve the node public key.
@@ -502,23 +503,25 @@ func convertToMultiAddrs(node *enode.Node) ([]ma.Multiaddr, error) {
return nil, errors.Wrap(err, "could not get peer id")
}
// If the QUIC entry is present in the ENR, build the corresponding multiaddress.
port, ok, err := getPort(node, quic)
if err != nil {
return nil, errors.Wrap(err, "could not get QUIC port")
}
if ok {
addr, err := multiAddressBuilderWithID(node.IP(), quic, port, id)
if features.Get().EnableQUIC {
// If the QUIC entry is present in the ENR, build the corresponding multiaddress.
port, ok, err := getPort(node, quic)
if err != nil {
return nil, errors.Wrap(err, "could not build QUIC address")
return nil, errors.Wrap(err, "could not get QUIC port")
}
multiaddrs = append(multiaddrs, addr)
if ok {
addr, err := multiAddressBuilderWithID(node.IP(), quic, port, id)
if err != nil {
return nil, errors.Wrap(err, "could not build QUIC address")
}
multiaddrs = append(multiaddrs, addr)
}
}
// If the TCP entry is present in the ENR, build the corresponding multiaddress.
port, ok, err = getPort(node, tcp)
port, ok, err := getPort(node, tcp)
if err != nil {
return nil, errors.Wrap(err, "could not get TCP port")
}

View File

@@ -104,7 +104,7 @@ func TestStartDiscv5_DifferentForkDigests(t *testing.T) {
for _, node := range nodes {
if s.filterPeer(node) {
nodeAddrs, err := convertToMultiAddrs(node)
nodeAddrs, err := retrieveMultiAddrsFromNode(node)
require.NoError(t, err)
addrs = append(addrs, nodeAddrs...)
}
@@ -195,7 +195,7 @@ func TestStartDiscv5_SameForkDigests_DifferentNextForkData(t *testing.T) {
for _, node := range nodes {
if s.filterPeer(node) {
nodeAddrs, err := convertToMultiAddrs(node)
nodeAddrs, err := retrieveMultiAddrsFromNode(node)
require.NoError(t, err)
addrs = append(addrs, nodeAddrs...)
}

View File

@@ -37,12 +37,6 @@ func MultiAddressBuilder(ip net.IP, tcpPort, quicPort uint) ([]ma.Multiaddr, err
return nil, errors.Wrap(err, "unable to determine IP type")
}
// Example: /ip4/1.2.3.4/udp/5678/quic-v1
multiAddrQUIC, err := ma.NewMultiaddr(fmt.Sprintf("/%s/%s/udp/%d/quic-v1", ipType, ip, quicPort))
if err != nil {
return nil, errors.Wrapf(err, "cannot produce QUIC multiaddr format from %s:%d", ip, tcpPort)
}
// Example: /ip4/1.2.3.4./tcp/5678
multiaddrStr := fmt.Sprintf("/%s/%s/tcp/%d", ipType, ip, tcpPort)
multiAddrTCP, err := ma.NewMultiaddr(multiaddrStr)
@@ -50,7 +44,19 @@ func MultiAddressBuilder(ip net.IP, tcpPort, quicPort uint) ([]ma.Multiaddr, err
return nil, errors.Wrapf(err, "cannot produce TCP multiaddr format from %s:%d", ip, tcpPort)
}
return []ma.Multiaddr{multiAddrTCP, multiAddrQUIC}, nil
multiaddrs := []ma.Multiaddr{multiAddrTCP}
if features.Get().EnableQUIC {
// Example: /ip4/1.2.3.4/udp/5678/quic-v1
multiAddrQUIC, err := ma.NewMultiaddr(fmt.Sprintf("/%s/%s/udp/%d/quic-v1", ipType, ip, quicPort))
if err != nil {
return nil, errors.Wrapf(err, "cannot produce QUIC multiaddr format from %s:%d", ip, tcpPort)
}
multiaddrs = append(multiaddrs, multiAddrQUIC)
}
return multiaddrs, nil
}
// buildOptions for the libp2p host.
@@ -80,14 +86,13 @@ func (s *Service) buildOptions(ip net.IP, priKey *ecdsa.PrivateKey) ([]libp2p.Op
return nil, errors.Wrapf(err, "cannot get ID from public key: %s", ifaceKey.GetPublic().Type().String())
}
log.WithField("peerId", id).Info("Running node with")
log.WithField("peerId", id).Info("Running node with id")
options := []libp2p.Option{
privKeyOption(priKey),
libp2p.ListenAddrs(multiaddrs...),
libp2p.UserAgent(version.BuildData()),
libp2p.ConnectionGater(s),
libp2p.Transport(libp2pquic.NewTransport),
libp2p.Transport(libp2ptcp.NewTCPTransport),
libp2p.DefaultMuxers,
libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport),
@@ -95,6 +100,10 @@ func (s *Service) buildOptions(ip net.IP, priKey *ecdsa.PrivateKey) ([]libp2p.Op
libp2p.Ping(false), // Disable Ping Service.
}
if features.Get().EnableQUIC {
options = append(options, libp2p.Transport(libp2pquic.NewTransport))
}
if cfg.EnableUPnP {
options = append(options, libp2p.NATPortMap()) // Allow to use UPnP
}

View File

@@ -89,7 +89,7 @@ func TestIPV6Support(t *testing.T) {
lNode := enode.NewLocalNode(db, key)
mockIPV6 := net.IP{0xff, 0x02, 0xAA, 0, 0x1F, 0, 0x2E, 0, 0, 0x36, 0x45, 0, 0, 0, 0, 0x02}
lNode.Set(enr.IP(mockIPV6))
mas, err := convertToMultiAddrs(lNode.Node())
mas, err := retrieveMultiAddrsFromNode(lNode.Node())
if err != nil {
t.Fatal(err)
}

View File

@@ -77,6 +77,13 @@ const (
MaxBackOffDuration = 5000
)
type InternetProtocol string
const (
TCP = "tcp"
QUIC = "quic"
)
// Status is the structure holding the peer status information.
type Status struct {
ctx context.Context
@@ -450,26 +457,13 @@ func (p *Status) InboundConnected() []peer.ID {
return peers
}
// InboundConnectedTCP returns the current batch of inbound peers that are connected using TCP.
func (p *Status) InboundConnectedTCP() []peer.ID {
// InboundConnectedWithProtocol returns the current batch of inbound peers that are connected with a given protocol.
func (p *Status) InboundConnectedWithProtocol(protocol InternetProtocol) []peer.ID {
p.store.RLock()
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerConnected && peerData.Direction == network.DirInbound && strings.Contains(peerData.Address.String(), "tcp") {
peers = append(peers, pid)
}
}
return peers
}
// InboundConnectedTCP returns the current batch of inbound peers that are connected using QUIC.
func (p *Status) InboundConnectedQUIC() []peer.ID {
p.store.RLock()
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerConnected && peerData.Direction == network.DirInbound && strings.Contains(peerData.Address.String(), "quic") {
if peerData.ConnState == PeerConnected && peerData.Direction == network.DirInbound && strings.Contains(peerData.Address.String(), string(protocol)) {
peers = append(peers, pid)
}
}
@@ -502,26 +496,13 @@ func (p *Status) OutboundConnected() []peer.ID {
return peers
}
// OutboundConnected returns the current batch of outbound peers that are connected using TCP.
func (p *Status) OutboundConnectedTCP() []peer.ID {
// OutboundConnectedWithProtocol returns the current batch of outbound peers that are connected with a given protocol.
func (p *Status) OutboundConnectedWithProtocol(protocol InternetProtocol) []peer.ID {
p.store.RLock()
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerConnected && peerData.Direction == network.DirOutbound && strings.Contains(peerData.Address.String(), "tcp") {
peers = append(peers, pid)
}
}
return peers
}
// OutboundConnected returns the current batch of outbound peers that are connected using QUIC.
func (p *Status) OutboundConnectedQUIC() []peer.ID {
p.store.RLock()
defer p.store.RUnlock()
peers := make([]peer.ID, 0)
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerConnected && peerData.Direction == network.DirOutbound && strings.Contains(peerData.Address.String(), "quic") {
if peerData.ConnState == PeerConnected && peerData.Direction == network.DirOutbound && strings.Contains(peerData.Address.String(), string(protocol)) {
peers = append(peers, pid)
}
}

View File

@@ -1131,7 +1131,7 @@ func TestInboundConnected(t *testing.T) {
assert.Equal(t, inbound.String(), result[0].String())
}
func TestInboundConnectedTCP(t *testing.T) {
func TestInboundConnectedWithProtocol(t *testing.T) {
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &scorers.Config{
@@ -1141,42 +1141,55 @@ func TestInboundConnectedTCP(t *testing.T) {
},
})
addrTCP, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/33333")
require.NoError(t, err)
addrsTCP := []string{
"/ip4/127.0.0.1/tcp/33333",
"/ip4/127.0.0.2/tcp/44444",
}
addrQUIC, err := ma.NewMultiaddr("/ip4/192.168.1.3/udp/13000/quic-v1")
require.NoError(t, err)
addrsQUIC := []string{
"/ip4/192.168.1.3/udp/13000/quic-v1",
"/ip4/192.168.1.4/udp/14000/quic-v1",
"/ip4/192.168.1.5/udp/14000/quic-v1",
}
inboundTCP := createPeer(t, p, addrTCP, network.DirInbound, peers.PeerConnected)
createPeer(t, p, addrQUIC, network.DirInbound, peers.PeerConnected)
expectedTCP := make(map[string]bool, len(addrsTCP))
for _, addr := range addrsTCP {
multiaddr, err := ma.NewMultiaddr(addr)
require.NoError(t, err)
result := p.InboundConnectedTCP()
require.Equal(t, 1, len(result))
assert.Equal(t, inboundTCP.String(), result[0].String())
}
peer := createPeer(t, p, multiaddr, network.DirInbound, peers.PeerConnected)
expectedTCP[peer.String()] = true
}
func TestInboundConnectedQUIC(t *testing.T) {
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: 0,
},
},
})
expectedQUIC := make(map[string]bool, len(addrsQUIC))
for _, addr := range addrsQUIC {
multiaddr, err := ma.NewMultiaddr(addr)
require.NoError(t, err)
addrQUIC, err := ma.NewMultiaddr("/ip4/192.168.1.3/udp/13000/quic-v1")
require.NoError(t, err)
peer := createPeer(t, p, multiaddr, network.DirInbound, peers.PeerConnected)
expectedQUIC[peer.String()] = true
}
addrTCP, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/33333")
require.NoError(t, err)
// TCP
// ---
inboundQUIC := createPeer(t, p, addrQUIC, network.DirInbound, peers.PeerConnected)
createPeer(t, p, addrTCP, network.DirInbound, peers.PeerConnected)
actualTCP := p.InboundConnectedWithProtocol(peers.TCP)
require.Equal(t, len(expectedTCP), len(actualTCP))
result := p.InboundConnectedQUIC()
require.Equal(t, 1, len(result))
assert.Equal(t, inboundQUIC.String(), result[0].String())
for _, actualPeer := range actualTCP {
_, ok := expectedTCP[actualPeer.String()]
require.Equal(t, true, ok)
}
// QUIC
// ----
actualQUIC := p.InboundConnectedWithProtocol(peers.QUIC)
require.Equal(t, len(expectedQUIC), len(actualQUIC))
for _, actualPeer := range actualQUIC {
_, ok := expectedQUIC[actualPeer.String()]
require.Equal(t, true, ok)
}
}
func TestOutbound(t *testing.T) {
@@ -1218,7 +1231,7 @@ func TestOutboundConnected(t *testing.T) {
assert.Equal(t, inbound.String(), result[0].String())
}
func TestOutbondConnectedTCP(t *testing.T) {
func TestOutboundConnectedWithProtocol(t *testing.T) {
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &scorers.Config{
@@ -1228,42 +1241,55 @@ func TestOutbondConnectedTCP(t *testing.T) {
},
})
addrTCP, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/33333")
require.NoError(t, err)
addrsTCP := []string{
"/ip4/127.0.0.1/tcp/33333",
"/ip4/127.0.0.2/tcp/44444",
}
addrQUIC, err := ma.NewMultiaddr("/ip4/192.168.1.3/udp/13000/quic-v1")
require.NoError(t, err)
addrsQUIC := []string{
"/ip4/192.168.1.3/udp/13000/quic-v1",
"/ip4/192.168.1.4/udp/14000/quic-v1",
"/ip4/192.168.1.5/udp/14000/quic-v1",
}
outboundTCP := createPeer(t, p, addrTCP, network.DirOutbound, peers.PeerConnected)
createPeer(t, p, addrQUIC, network.DirOutbound, peers.PeerConnected)
expectedTCP := make(map[string]bool, len(addrsTCP))
for _, addr := range addrsTCP {
multiaddr, err := ma.NewMultiaddr(addr)
require.NoError(t, err)
result := p.OutboundConnectedTCP()
require.Equal(t, 1, len(result))
assert.Equal(t, outboundTCP.String(), result[0].String())
}
peer := createPeer(t, p, multiaddr, network.DirOutbound, peers.PeerConnected)
expectedTCP[peer.String()] = true
}
func TestOutboundConnectedQUIC(t *testing.T) {
p := peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: 0,
},
},
})
expectedQUIC := make(map[string]bool, len(addrsQUIC))
for _, addr := range addrsQUIC {
multiaddr, err := ma.NewMultiaddr(addr)
require.NoError(t, err)
addrQUIC, err := ma.NewMultiaddr("/ip4/192.168.1.3/udp/13000/quic-v1")
require.NoError(t, err)
peer := createPeer(t, p, multiaddr, network.DirOutbound, peers.PeerConnected)
expectedQUIC[peer.String()] = true
}
addrTCP, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/33333")
require.NoError(t, err)
// TCP
// ---
outboundQUIC := createPeer(t, p, addrQUIC, network.DirOutbound, peers.PeerConnected)
createPeer(t, p, addrTCP, network.DirOutbound, peers.PeerConnected)
actualTCP := p.OutboundConnectedWithProtocol(peers.TCP)
require.Equal(t, len(expectedTCP), len(actualTCP))
result := p.OutboundConnectedQUIC()
require.Equal(t, 1, len(result))
assert.Equal(t, outboundQUIC.String(), result[0].String())
for _, actualPeer := range actualTCP {
_, ok := expectedTCP[actualPeer.String()]
require.Equal(t, true, ok)
}
// QUIC
// ----
actualQUIC := p.OutboundConnectedWithProtocol(peers.QUIC)
require.Equal(t, len(expectedQUIC), len(actualQUIC))
for _, actualPeer := range actualQUIC {
_, ok := expectedQUIC[actualPeer.String()]
require.Equal(t, true, ok)
}
}
// addPeer is a helper to add a peer with a given connection state)

View File

@@ -24,6 +24,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers/scorers"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v5/config/features"
"github.com/prysmaticlabs/prysm/v5/config/params"
leakybucket "github.com/prysmaticlabs/prysm/v5/container/leaky-bucket"
prysmnetwork "github.com/prysmaticlabs/prysm/v5/network"
@@ -235,19 +236,24 @@ func (s *Service) Start() {
async.RunEvery(s.ctx, time.Duration(params.BeaconConfig().RespTimeout)*time.Second, s.updateMetrics)
async.RunEvery(s.ctx, refreshRate, s.RefreshENR)
async.RunEvery(s.ctx, 1*time.Minute, func() {
inboundQUICCount := len(s.peers.InboundConnectedQUIC())
inboundTCPCount := len(s.peers.InboundConnectedTCP())
outboundQUICCount := len(s.peers.OutboundConnectedQUIC())
outboundTCPCount := len(s.peers.OutboundConnectedTCP())
inboundQUICCount := len(s.peers.InboundConnectedWithProtocol(peers.QUIC))
inboundTCPCount := len(s.peers.InboundConnectedWithProtocol(peers.TCP))
outboundQUICCount := len(s.peers.OutboundConnectedWithProtocol(peers.QUIC))
outboundTCPCount := len(s.peers.OutboundConnectedWithProtocol(peers.TCP))
total := inboundQUICCount + inboundTCPCount + outboundQUICCount + outboundTCPCount
log.WithFields(logrus.Fields{
"inboundQUIC": inboundQUICCount,
"inboundTCP": inboundTCPCount,
"outboundQUIC": outboundQUICCount,
"outboundTCP": outboundTCPCount,
"total": total,
}).Info("Connected peers")
fields := logrus.Fields{
"inboundTCP": inboundTCPCount,
"outboundTCP": outboundTCPCount,
"total": total,
}
if features.Get().EnableQUIC {
fields["inboundQUIC"] = inboundQUICCount
fields["outboundQUIC"] = outboundQUICCount
}
log.WithFields(fields).Info("Connected peers")
})
multiAddrs := s.host.Network().ListenAddresses()

View File

@@ -37,6 +37,7 @@ go_library(
"//beacon-chain/rpc/eth/helpers:go_default_library",
"//beacon-chain/rpc/eth/shared:go_default_library",
"//beacon-chain/rpc/lookup:go_default_library",
"//beacon-chain/rpc/prysm/v1alpha1/validator:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/state-native:go_default_library",
"//beacon-chain/state/stategen:go_default_library",
@@ -121,6 +122,7 @@ go_test(
"@com_github_gorilla_mux//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
"@com_github_stretchr_testify//mock:go_default_library",
"@org_uber_go_mock//gomock:go_default_library",
],

View File

@@ -21,6 +21,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filters"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/rpc/eth/helpers"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/rpc/eth/shared"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/rpc/prysm/v1alpha1/validator"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/config/params"
consensus_types "github.com/prysmaticlabs/prysm/v5/consensus-types"
@@ -32,6 +33,7 @@ import (
eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
@@ -42,7 +44,8 @@ const (
)
var (
errNilBlock = errors.New("nil block")
errNilBlock = errors.New("nil block")
errEquivocatedBlock = errors.New("block is equivocated")
)
type handled bool
@@ -1254,6 +1257,16 @@ func (s *Server) publishBlockSSZ(ctx context.Context, w http.ResponseWriter, r *
},
}
if err = s.validateBroadcast(ctx, r, genericBlock); err != nil {
if errors.Is(err, errEquivocatedBlock) {
b, err := blocks.NewSignedBeaconBlock(genericBlock)
if err != nil {
httputil.HandleError(w, err.Error(), http.StatusBadRequest)
return
}
if err := s.broadcastSeenBlockSidecars(ctx, b, genericBlock.GetDeneb().Blobs, genericBlock.GetDeneb().KzgProofs); err != nil {
log.WithError(err).Error("Failed to broadcast blob sidecars")
}
}
httputil.HandleError(w, err.Error(), http.StatusBadRequest)
return
}
@@ -1383,6 +1396,16 @@ func (s *Server) publishBlock(ctx context.Context, w http.ResponseWriter, r *htt
consensusBlock, err = denebBlockContents.ToGeneric()
if err == nil {
if err = s.validateBroadcast(ctx, r, consensusBlock); err != nil {
if errors.Is(err, errEquivocatedBlock) {
b, err := blocks.NewSignedBeaconBlock(consensusBlock)
if err != nil {
httputil.HandleError(w, err.Error(), http.StatusBadRequest)
return
}
if err := s.broadcastSeenBlockSidecars(ctx, b, consensusBlock.GetDeneb().Blobs, consensusBlock.GetDeneb().KzgProofs); err != nil {
log.WithError(err).Error("Failed to broadcast blob sidecars")
}
}
httputil.HandleError(w, err.Error(), http.StatusBadRequest)
return
}
@@ -1547,7 +1570,7 @@ func (s *Server) validateConsensus(ctx context.Context, blk interfaces.ReadOnlyS
func (s *Server) validateEquivocation(blk interfaces.ReadOnlyBeaconBlock) error {
if s.ForkchoiceFetcher.HighestReceivedBlockSlot() == blk.Slot() {
return fmt.Errorf("block for slot %d already exists in fork choice", blk.Slot())
return errors.Wrapf(errEquivocatedBlock, "block for slot %d already exists in fork choice", blk.Slot())
}
return nil
}
@@ -2072,3 +2095,37 @@ func (s *Server) GetDepositSnapshot(w http.ResponseWriter, r *http.Request) {
},
)
}
// Broadcast blob sidecars even if the block of the same slot has been imported.
// To ensure safety, we will only broadcast blob sidecars if the header references the same block that was previously seen.
// Otherwise, a proposer could get slashed through a different blob sidecar header reference.
func (s *Server) broadcastSeenBlockSidecars(
ctx context.Context,
b interfaces.SignedBeaconBlock,
blobs [][]byte,
kzgProofs [][]byte) error {
scs, err := validator.BuildBlobSidecars(b, blobs, kzgProofs)
if err != nil {
return err
}
for _, sc := range scs {
r, err := sc.SignedBlockHeader.Header.HashTreeRoot()
if err != nil {
log.WithError(err).Error("Failed to hash block header for blob sidecar")
continue
}
if !s.FinalizationFetcher.InForkchoice(r) {
log.WithField("root", fmt.Sprintf("%#x", r)).Debug("Block header not in forkchoice, skipping blob sidecar broadcast")
continue
}
if err := s.Broadcaster.BroadcastBlob(ctx, sc.Index, sc); err != nil {
log.WithError(err).Error("Failed to broadcast blob sidecar for index ", sc.Index)
}
log.WithFields(logrus.Fields{
"index": sc.Index,
"slot": sc.SignedBlockHeader.Header.Slot,
"kzgCommitment": fmt.Sprintf("%#x", sc.KzgCommitment),
}).Info("Broadcasted blob sidecar for already seen block")
}
return nil
}

View File

@@ -13,6 +13,8 @@ import (
"time"
"github.com/ethereum/go-ethereum/common/hexutil"
mockp2p "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/testing"
logTest "github.com/sirupsen/logrus/hooks/test"
"go.uber.org/mock/gomock"
"github.com/gorilla/mux"
@@ -2472,7 +2474,9 @@ func TestValidateEquivocation(t *testing.T) {
require.NoError(t, err)
blk.SetSlot(st.Slot())
assert.ErrorContains(t, "already exists", server.validateEquivocation(blk.Block()))
err = server.validateEquivocation(blk.Block())
assert.ErrorContains(t, "already exists", err)
require.ErrorIs(t, err, errEquivocatedBlock)
})
}
@@ -3630,3 +3634,27 @@ func TestGetDepositSnapshot(t *testing.T) {
assert.Equal(t, finalized, len(resp.Finalized))
})
}
func TestServer_broadcastBlobSidecars(t *testing.T) {
hook := logTest.NewGlobal()
blockToPropose := util.NewBeaconBlockContentsDeneb()
blockToPropose.Blobs = [][]byte{{0x01}, {0x02}, {0x03}}
blockToPropose.KzgProofs = [][]byte{{0x01}, {0x02}, {0x03}}
blockToPropose.Block.Block.Body.BlobKzgCommitments = [][]byte{bytesutil.PadTo([]byte("kc"), 48), bytesutil.PadTo([]byte("kc1"), 48), bytesutil.PadTo([]byte("kc2"), 48)}
d := &eth.GenericSignedBeaconBlock_Deneb{Deneb: blockToPropose}
b := &eth.GenericSignedBeaconBlock{Block: d}
server := &Server{
Broadcaster: &mockp2p.MockBroadcaster{},
FinalizationFetcher: &chainMock.ChainService{NotFinalized: true},
}
blk, err := blocks.NewSignedBeaconBlock(b.Block)
require.NoError(t, err)
require.NoError(t, server.broadcastSeenBlockSidecars(context.Background(), blk, b.GetDeneb().Blobs, b.GetDeneb().KzgProofs))
require.LogsDoNotContain(t, hook, "Broadcasted blob sidecar for already seen block")
server.FinalizationFetcher = &chainMock.ChainService{NotFinalized: false}
require.NoError(t, server.broadcastSeenBlockSidecars(context.Background(), blk, b.GetDeneb().Blobs, b.GetDeneb().KzgProofs))
require.LogsContain(t, hook, "Broadcasted blob sidecar for already seen block")
}

View File

@@ -341,7 +341,7 @@ func (vs *Server) handleUnblindedBlock(block interfaces.SignedBeaconBlock, req *
if dbBlockContents == nil {
return nil, nil
}
return buildBlobSidecars(block, dbBlockContents.Blobs, dbBlockContents.KzgProofs)
return BuildBlobSidecars(block, dbBlockContents.Blobs, dbBlockContents.KzgProofs)
}
// broadcastReceiveBlock broadcasts a block and handles its reception.

View File

@@ -56,8 +56,8 @@ func (c *blobsBundleCache) prune(minSlot primitives.Slot) {
}
}
// buildBlobSidecars given a block, builds the blob sidecars for the block.
func buildBlobSidecars(blk interfaces.SignedBeaconBlock, blobs [][]byte, kzgProofs [][]byte) ([]*ethpb.BlobSidecar, error) {
// BuildBlobSidecars given a block, builds the blob sidecars for the block.
func BuildBlobSidecars(blk interfaces.SignedBeaconBlock, blobs [][]byte, kzgProofs [][]byte) ([]*ethpb.BlobSidecar, error) {
if blk.Version() < version.Deneb {
return nil, nil // No blobs before deneb.
}

View File

@@ -51,7 +51,7 @@ func TestServer_buildBlobSidecars(t *testing.T) {
require.NoError(t, blk.SetBlobKzgCommitments(kzgCommitments))
proof, err := hexutil.Decode("0xb4021b0de10f743893d4f71e1bf830c019e832958efd6795baf2f83b8699a9eccc5dc99015d8d4d8ec370d0cc333c06a")
require.NoError(t, err)
scs, err := buildBlobSidecars(blk, [][]byte{
scs, err := BuildBlobSidecars(blk, [][]byte{
make([]byte, fieldparams.BlobLength), make([]byte, fieldparams.BlobLength),
}, [][]byte{
proof, proof,

View File

@@ -135,7 +135,7 @@ With 1_048_576 validators, we need 4096 * 2MB = 8GB
Storing both MIN and MAX spans for 1_048_576 validators takes 16GB.
Each chunk is stored snappy-compressed in the database.
If all validators attest ideally, a MIN SPAN chunk will contain only `2`s, and and MAX SPAN chunk will contain only `0`s.
If all validators attest ideally, a MIN SPAN chunk will contain only `2`s, and MAX SPAN chunk will contain only `0`s.
This will compress very well, and will let us store a lot of data in a small amount of space.
*/

View File

@@ -33,7 +33,6 @@ go_library(
"//beacon-chain/verification:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
"//config/params:go_default_library",
"//consensus-types:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/primitives:go_default_library",
@@ -83,10 +82,10 @@ go_test(
"//beacon-chain/p2p/types:go_default_library",
"//beacon-chain/startup:go_default_library",
"//beacon-chain/sync:go_default_library",
"//beacon-chain/sync/verify:go_default_library",
"//beacon-chain/verification:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
"//config/features:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/interfaces:go_default_library",

View File

@@ -11,6 +11,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
p2pTypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/startup"
@@ -18,7 +19,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/sync/verify"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v5/config/params"
consensus_types "github.com/prysmaticlabs/prysm/v5/consensus-types"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
blocks2 "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
@@ -27,6 +27,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/crypto/rand"
"github.com/prysmaticlabs/prysm/v5/math"
p2ppb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
@@ -76,6 +77,7 @@ type blocksFetcherConfig struct {
db db.ReadOnlyDatabase
peerFilterCapacityWeight float64
mode syncMode
bs filesystem.BlobStorageSummarizer
}
// blocksFetcher is a service to fetch chain data from peers.
@@ -91,6 +93,7 @@ type blocksFetcher struct {
ctxMap prysmsync.ContextByteVersions
p2p p2p.P2P
db db.ReadOnlyDatabase
bs filesystem.BlobStorageSummarizer
blocksPerPeriod uint64
rateLimiter *leakybucket.Collector
peerLocks map[peer.ID]*peerLock
@@ -149,6 +152,7 @@ func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetc
ctxMap: cfg.ctxMap,
p2p: cfg.p2p,
db: cfg.db,
bs: cfg.bs,
blocksPerPeriod: uint64(blocksPerPeriod),
rateLimiter: rateLimiter,
peerLocks: make(map[peer.ID]*peerLock),
@@ -236,7 +240,7 @@ func (f *blocksFetcher) loop() {
// Main loop.
for {
// Make sure there is are available peers before processing requests.
// Make sure there are available peers before processing requests.
if _, err := f.waitForMinimumPeers(f.ctx); err != nil {
log.Error(err)
}
@@ -372,22 +376,17 @@ func sortedBlockWithVerifiedBlobSlice(blocks []interfaces.ReadOnlySignedBeaconBl
return rb, nil
}
func blobRequest(bwb []blocks2.BlockWithROBlobs, blobWindowStart primitives.Slot) *p2ppb.BlobSidecarsByRangeRequest {
if len(bwb) == 0 {
return nil
}
lowest := lowestSlotNeedsBlob(blobWindowStart, bwb)
if lowest == nil {
return nil
}
highest := bwb[len(bwb)-1].Block.Block().Slot()
return &p2ppb.BlobSidecarsByRangeRequest{
StartSlot: *lowest,
Count: uint64(highest.SubSlot(*lowest)) + 1,
}
type commitmentCount struct {
slot primitives.Slot
root [32]byte
count int
}
func lowestSlotNeedsBlob(retentionStart primitives.Slot, bwb []blocks2.BlockWithROBlobs) *primitives.Slot {
type commitmentCountList []commitmentCount
// countCommitments makes a list of all blocks that have commitments that need to be satisfied.
// This gives us a representation to finish building the request that is lightweight and readable for testing.
func countCommitments(bwb []blocks2.BlockWithROBlobs, retentionStart primitives.Slot) commitmentCountList {
if len(bwb) == 0 {
return nil
}
@@ -397,8 +396,13 @@ func lowestSlotNeedsBlob(retentionStart primitives.Slot, bwb []blocks2.BlockWith
if bwb[len(bwb)-1].Block.Block().Slot() < retentionStart {
return nil
}
for _, b := range bwb {
fc := make([]commitmentCount, 0, len(bwb))
for i := range bwb {
b := bwb[i]
slot := b.Block.Block().Slot()
if b.Block.Version() < version.Deneb {
continue
}
if slot < retentionStart {
continue
}
@@ -406,67 +410,116 @@ func lowestSlotNeedsBlob(retentionStart primitives.Slot, bwb []blocks2.BlockWith
if err != nil || len(commits) == 0 {
continue
}
return &slot
fc = append(fc, commitmentCount{slot: slot, root: b.Block.Root(), count: len(commits)})
}
return fc
}
// func slotRangeForCommitmentCounts(cc []commitmentCount, bs filesystem.BlobStorageSummarizer) *blobRange {
func (cc commitmentCountList) blobRange(bs filesystem.BlobStorageSummarizer) *blobRange {
if len(cc) == 0 {
return nil
}
// If we don't have a blob summarizer, can't check local blobs, request blobs over complete range.
if bs == nil {
return &blobRange{low: cc[0].slot, high: cc[len(cc)-1].slot}
}
for i := range cc {
hci := cc[i]
// This list is always ordered by increasing slot, per req/resp validation rules.
// Skip through slots until we find one with missing blobs.
if bs.Summary(hci.root).AllAvailable(hci.count) {
continue
}
// The slow of the first missing blob is the lower bound.
// If we don't find an upper bound, we'll have a 1 slot request (same low/high).
needed := &blobRange{low: hci.slot, high: hci.slot}
// Iterate backward through the list to find the highest missing slot above the lower bound.
// Return the complete range as soon as we find it; if lower bound is already the last element,
// or if we never find an upper bound, we'll fall through to the bounds being equal after this loop.
for z := len(cc) - 1; z > i; z-- {
hcz := cc[z]
if bs.Summary(hcz.root).AllAvailable(hcz.count) {
continue
}
needed.high = hcz.slot
return needed
}
return needed
}
return nil
}
func sortBlobs(blobs []blocks.ROBlob) []blocks.ROBlob {
sort.Slice(blobs, func(i, j int) bool {
if blobs[i].Slot() == blobs[j].Slot() {
return blobs[i].Index < blobs[j].Index
}
return blobs[i].Slot() < blobs[j].Slot()
})
type blobRange struct {
low primitives.Slot
high primitives.Slot
}
return blobs
func (r *blobRange) Request() *p2ppb.BlobSidecarsByRangeRequest {
if r == nil {
return nil
}
return &p2ppb.BlobSidecarsByRangeRequest{
StartSlot: r.low,
Count: uint64(r.high.SubSlot(r.low)) + 1,
}
}
var errBlobVerification = errors.New("peer unable to serve aligned BlobSidecarsByRange and BeaconBlockSidecarsByRange responses")
var errMissingBlobsForBlockCommitments = errors.Wrap(errBlobVerification, "blobs unavailable for processing block with kzg commitments")
func verifyAndPopulateBlobs(bwb []blocks2.BlockWithROBlobs, blobs []blocks.ROBlob, blobWindowStart primitives.Slot) ([]blocks2.BlockWithROBlobs, error) {
// Assumes bwb has already been sorted by sortedBlockWithVerifiedBlobSlice.
blobs = sortBlobs(blobs)
blobi := 0
// Loop over all blocks, and each time a commitment is observed, advance the index into the blob slice.
// The assumption is that the blob slice contains a value for every commitment in the blocks it is based on,
// correctly ordered by slot and blob index.
for i, bb := range bwb {
block := bb.Block.Block()
if block.Slot() < blobWindowStart {
func verifyAndPopulateBlobs(bwb []blocks2.BlockWithROBlobs, blobs []blocks.ROBlob, req *p2ppb.BlobSidecarsByRangeRequest, bss filesystem.BlobStorageSummarizer) ([]blocks2.BlockWithROBlobs, error) {
blobsByRoot := make(map[[32]byte][]blocks.ROBlob)
for i := range blobs {
if blobs[i].Slot() < req.StartSlot {
continue
}
commits, err := block.Body().BlobKzgCommitments()
br := blobs[i].BlockRoot()
blobsByRoot[br] = append(blobsByRoot[br], blobs[i])
}
for i := range bwb {
bwi, err := populateBlock(bwb[i], blobsByRoot[bwb[i].Block.Root()], req, bss)
if err != nil {
if errors.Is(err, consensus_types.ErrUnsupportedField) {
log.
WithField("blockSlot", block.Slot()).
WithField("retentionStart", blobWindowStart).
Warn("block with slot within blob retention period has version which does not support commitments")
if errors.Is(err, errDidntPopulate) {
continue
}
return nil, err
return bwb, err
}
bb.Blobs = make([]blocks.ROBlob, len(commits))
for ci := range commits {
// There are more expected commitments in this block, but we've run out of blobs from the response
// (out-of-bound error guard).
if blobi == len(blobs) {
return nil, missingCommitError(bb.Block.Root(), bb.Block.Block().Slot(), commits[ci:])
}
bl := blobs[blobi]
if err := verify.BlobAlignsWithBlock(bl, bb.Block); err != nil {
return nil, err
}
bb.Blobs[ci] = bl
blobi += 1
}
bwb[i] = bb
bwb[i] = bwi
}
return bwb, nil
}
var errDidntPopulate = errors.New("skipping population of block")
func populateBlock(bw blocks2.BlockWithROBlobs, blobs []blocks.ROBlob, req *p2ppb.BlobSidecarsByRangeRequest, bss filesystem.BlobStorageSummarizer) (blocks2.BlockWithROBlobs, error) {
blk := bw.Block
if blk.Version() < version.Deneb || blk.Block().Slot() < req.StartSlot {
return bw, errDidntPopulate
}
commits, err := blk.Block().Body().BlobKzgCommitments()
if err != nil {
return bw, errDidntPopulate
}
if len(commits) == 0 {
return bw, errDidntPopulate
}
// Drop blobs on the floor if we already have them.
if bss != nil && bss.Summary(blk.Root()).AllAvailable(len(commits)) {
return bw, errDidntPopulate
}
if len(commits) != len(blobs) {
return bw, missingCommitError(blk.Root(), blk.Block().Slot(), commits)
}
for ci := range commits {
if err := verify.BlobAlignsWithBlock(blobs[ci], blk); err != nil {
return bw, err
}
}
bw.Blobs = blobs
return bw, nil
}
func missingCommitError(root [32]byte, slot primitives.Slot, missing [][]byte) error {
missStr := make([]string, 0, len(missing))
for k := range missing {
@@ -488,7 +541,7 @@ func (f *blocksFetcher) fetchBlobsFromPeer(ctx context.Context, bwb []blocks2.Bl
return nil, err
}
// Construct request message based on observed interval of blocks in need of blobs.
req := blobRequest(bwb, blobWindowStart)
req := countCommitments(bwb, blobWindowStart).blobRange(f.bs).Request()
if req == nil {
return bwb, nil
}
@@ -508,7 +561,7 @@ func (f *blocksFetcher) fetchBlobsFromPeer(ctx context.Context, bwb []blocks2.Bl
continue
}
f.p2p.Peers().Scorers().BlockProviderScorer().Touch(p)
robs, err := verifyAndPopulateBlobs(bwb, blobs, blobWindowStart)
robs, err := verifyAndPopulateBlobs(bwb, blobs, req, f.bs)
if err != nil {
log.WithField("peer", p).WithError(err).Debug("Invalid BeaconBlobsByRange response")
continue

View File

@@ -4,7 +4,6 @@ import (
"context"
"fmt"
"math"
"math/rand"
"sort"
"sync"
"testing"
@@ -14,13 +13,14 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
mock "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
dbtest "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/testing"
p2pm "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
p2pt "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/startup"
beaconsync "github.com/prysmaticlabs/prysm/v5/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/sync/verify"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
@@ -960,28 +960,7 @@ func TestTimeToWait(t *testing.T) {
}
}
func TestSortBlobs(t *testing.T) {
_, blobs := util.ExtendBlocksPlusBlobs(t, []blocks.ROBlock{}, 10)
shuffled := make([]blocks.ROBlob, len(blobs))
for i := range blobs {
shuffled[i] = blobs[i]
}
rand.Shuffle(len(shuffled), func(i, j int) {
shuffled[i], shuffled[j] = shuffled[j], shuffled[i]
})
sorted := sortBlobs(shuffled)
require.Equal(t, len(sorted), len(shuffled))
for i := range blobs {
expect := blobs[i]
actual := sorted[i]
require.Equal(t, expect.Slot(), actual.Slot())
require.Equal(t, expect.Index, actual.Index)
require.Equal(t, bytesutil.ToBytes48(expect.KzgCommitment), bytesutil.ToBytes48(actual.KzgCommitment))
require.Equal(t, expect.BlockRoot(), actual.BlockRoot())
}
}
func TestLowestSlotNeedsBlob(t *testing.T) {
func TestBlobRangeForBlocks(t *testing.T) {
blks, _ := util.ExtendBlocksPlusBlobs(t, []blocks.ROBlock{}, 10)
sbbs := make([]interfaces.ReadOnlySignedBeaconBlock, len(blks))
for i := range blks {
@@ -990,12 +969,12 @@ func TestLowestSlotNeedsBlob(t *testing.T) {
retentionStart := primitives.Slot(5)
bwb, err := sortedBlockWithVerifiedBlobSlice(sbbs)
require.NoError(t, err)
lowest := lowestSlotNeedsBlob(retentionStart, bwb)
require.Equal(t, retentionStart, *lowest)
bounds := countCommitments(bwb, retentionStart).blobRange(nil)
require.Equal(t, retentionStart, bounds.low)
higher := primitives.Slot(len(blks) + 1)
lowest = lowestSlotNeedsBlob(higher, bwb)
var nilSlot *primitives.Slot
require.Equal(t, nilSlot, lowest)
bounds = countCommitments(bwb, higher).blobRange(nil)
var nilBounds *blobRange
require.Equal(t, nilBounds, bounds)
blks, _ = util.ExtendBlocksPlusBlobs(t, []blocks.ROBlock{}, 10)
sbbs = make([]interfaces.ReadOnlySignedBeaconBlock, len(blks))
@@ -1008,14 +987,14 @@ func TestLowestSlotNeedsBlob(t *testing.T) {
next := bwb[6].Block.Block().Slot()
skip := bwb[5].Block.Block()
bwb[5].Block, _ = util.GenerateTestDenebBlockWithSidecar(t, skip.ParentRoot(), skip.Slot(), 0)
lowest = lowestSlotNeedsBlob(retentionStart, bwb)
require.Equal(t, next, *lowest)
bounds = countCommitments(bwb, retentionStart).blobRange(nil)
require.Equal(t, next, bounds.low)
}
func TestBlobRequest(t *testing.T) {
var nilReq *ethpb.BlobSidecarsByRangeRequest
// no blocks
req := blobRequest([]blocks.BlockWithROBlobs{}, 0)
req := countCommitments([]blocks.BlockWithROBlobs{}, 0).blobRange(nil).Request()
require.Equal(t, nilReq, req)
blks, _ := util.ExtendBlocksPlusBlobs(t, []blocks.ROBlock{}, 10)
sbbs := make([]interfaces.ReadOnlySignedBeaconBlock, len(blks))
@@ -1027,26 +1006,180 @@ func TestBlobRequest(t *testing.T) {
maxBlkSlot := primitives.Slot(len(blks) - 1)
tooHigh := primitives.Slot(len(blks) + 1)
req = blobRequest(bwb, tooHigh)
req = countCommitments(bwb, tooHigh).blobRange(nil).Request()
require.Equal(t, nilReq, req)
req = blobRequest(bwb, maxBlkSlot)
req = countCommitments(bwb, maxBlkSlot).blobRange(nil).Request()
require.Equal(t, uint64(1), req.Count)
require.Equal(t, maxBlkSlot, req.StartSlot)
halfway := primitives.Slot(5)
req = blobRequest(bwb, halfway)
req = countCommitments(bwb, halfway).blobRange(nil).Request()
require.Equal(t, halfway, req.StartSlot)
// adding 1 to include the halfway slot itself
require.Equal(t, uint64(1+maxBlkSlot-halfway), req.Count)
before := bwb[0].Block.Block().Slot()
allAfter := bwb[1:]
req = blobRequest(allAfter, before)
req = countCommitments(allAfter, before).blobRange(nil).Request()
require.Equal(t, allAfter[0].Block.Block().Slot(), req.StartSlot)
require.Equal(t, len(allAfter), int(req.Count))
}
func TestCountCommitments(t *testing.T) {
// no blocks
// blocks before retention start filtered
// blocks without commitments filtered
// pre-deneb filtered
// variety of commitment counts are accurate, from 1 to max
type testcase struct {
name string
bwb func(t *testing.T, c testcase) []blocks.BlockWithROBlobs
numBlocks int
retStart primitives.Slot
resCount int
}
cases := []testcase{
{
name: "nil blocks is safe",
bwb: func(t *testing.T, c testcase) []blocks.BlockWithROBlobs {
return nil
},
retStart: 0,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
bwb := c.bwb(t, c)
cc := countCommitments(bwb, c.retStart)
require.Equal(t, c.resCount, len(cc))
})
}
}
func TestCommitmentCountList(t *testing.T) {
cases := []struct {
name string
cc commitmentCountList
bss func(*testing.T) filesystem.BlobStorageSummarizer
expected *blobRange
request *ethpb.BlobSidecarsByRangeRequest
}{
{
name: "nil commitmentCount is safe",
cc: nil,
expected: nil,
request: nil,
},
{
name: "nil bss, single slot",
cc: []commitmentCount{
{slot: 11235, count: 1},
},
expected: &blobRange{low: 11235, high: 11235},
request: &ethpb.BlobSidecarsByRangeRequest{StartSlot: 11235, Count: 1},
},
{
name: "nil bss, sparse slots",
cc: []commitmentCount{
{slot: 11235, count: 1},
{slot: 11240, count: fieldparams.MaxBlobsPerBlock},
{slot: 11250, count: 3},
},
expected: &blobRange{low: 11235, high: 11250},
request: &ethpb.BlobSidecarsByRangeRequest{StartSlot: 11235, Count: 16},
},
{
name: "AllAvailable in middle, some avail low, none high",
bss: func(t *testing.T) filesystem.BlobStorageSummarizer {
onDisk := map[[32]byte][]int{
bytesutil.ToBytes32([]byte("0")): {0, 1},
bytesutil.ToBytes32([]byte("1")): {0, 1, 2, 3, 4, 5},
}
return filesystem.NewMockBlobStorageSummarizer(t, onDisk)
},
cc: []commitmentCount{
{slot: 0, count: 3, root: bytesutil.ToBytes32([]byte("0"))},
{slot: 5, count: fieldparams.MaxBlobsPerBlock, root: bytesutil.ToBytes32([]byte("1"))},
{slot: 15, count: 3},
},
expected: &blobRange{low: 0, high: 15},
request: &ethpb.BlobSidecarsByRangeRequest{StartSlot: 0, Count: 16},
},
{
name: "AllAvailable at high and low",
bss: func(t *testing.T) filesystem.BlobStorageSummarizer {
onDisk := map[[32]byte][]int{
bytesutil.ToBytes32([]byte("0")): {0, 1},
bytesutil.ToBytes32([]byte("2")): {0, 1, 2, 3, 4, 5},
}
return filesystem.NewMockBlobStorageSummarizer(t, onDisk)
},
cc: []commitmentCount{
{slot: 0, count: 2, root: bytesutil.ToBytes32([]byte("0"))},
{slot: 5, count: 3},
{slot: 15, count: fieldparams.MaxBlobsPerBlock, root: bytesutil.ToBytes32([]byte("2"))},
},
expected: &blobRange{low: 5, high: 5},
request: &ethpb.BlobSidecarsByRangeRequest{StartSlot: 5, Count: 1},
},
{
name: "AllAvailable at high and low, adjacent range in middle",
bss: func(t *testing.T) filesystem.BlobStorageSummarizer {
onDisk := map[[32]byte][]int{
bytesutil.ToBytes32([]byte("0")): {0, 1},
bytesutil.ToBytes32([]byte("2")): {0, 1, 2, 3, 4, 5},
}
return filesystem.NewMockBlobStorageSummarizer(t, onDisk)
},
cc: []commitmentCount{
{slot: 0, count: 2, root: bytesutil.ToBytes32([]byte("0"))},
{slot: 5, count: 3},
{slot: 6, count: 3},
{slot: 15, count: fieldparams.MaxBlobsPerBlock, root: bytesutil.ToBytes32([]byte("2"))},
},
expected: &blobRange{low: 5, high: 6},
request: &ethpb.BlobSidecarsByRangeRequest{StartSlot: 5, Count: 2},
},
{
name: "AllAvailable at high and low, range in middle",
bss: func(t *testing.T) filesystem.BlobStorageSummarizer {
onDisk := map[[32]byte][]int{
bytesutil.ToBytes32([]byte("0")): {0, 1},
bytesutil.ToBytes32([]byte("1")): {0, 1},
bytesutil.ToBytes32([]byte("2")): {0, 1, 2, 3, 4, 5},
}
return filesystem.NewMockBlobStorageSummarizer(t, onDisk)
},
cc: []commitmentCount{
{slot: 0, count: 2, root: bytesutil.ToBytes32([]byte("0"))},
{slot: 5, count: 3, root: bytesutil.ToBytes32([]byte("1"))},
{slot: 10, count: 3},
{slot: 15, count: fieldparams.MaxBlobsPerBlock, root: bytesutil.ToBytes32([]byte("2"))},
},
expected: &blobRange{low: 5, high: 10},
request: &ethpb.BlobSidecarsByRangeRequest{StartSlot: 5, Count: 6},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
var bss filesystem.BlobStorageSummarizer
if c.bss != nil {
bss = c.bss(t)
}
br := c.cc.blobRange(bss)
require.DeepEqual(t, c.expected, br)
if c.request == nil {
require.IsNil(t, br.Request())
} else {
req := br.Request()
require.DeepEqual(t, req.StartSlot, c.request.StartSlot)
require.DeepEqual(t, req.Count, c.request.Count)
}
})
}
}
func testSequenceBlockWithBlob(t *testing.T, nblocks int) ([]blocks.BlockWithROBlobs, []blocks.ROBlob) {
blks, blobs := util.ExtendBlocksPlusBlobs(t, []blocks.ROBlock{}, nblocks)
sbbs := make([]interfaces.ReadOnlySignedBeaconBlock, len(blks))
@@ -1058,91 +1191,75 @@ func testSequenceBlockWithBlob(t *testing.T, nblocks int) ([]blocks.BlockWithROB
return bwb, blobs
}
func testReqFromResp(bwb []blocks.BlockWithROBlobs) *ethpb.BlobSidecarsByRangeRequest {
return &ethpb.BlobSidecarsByRangeRequest{
StartSlot: bwb[0].Block.Block().Slot(),
Count: uint64(bwb[len(bwb)-1].Block.Block().Slot()-bwb[0].Block.Block().Slot()) + 1,
}
}
func TestVerifyAndPopulateBlobs(t *testing.T) {
bwb, blobs := testSequenceBlockWithBlob(t, 10)
lastBlobIdx := len(blobs) - 1
// Blocks are all before the retention window, blobs argument is ignored.
windowAfter := bwb[len(bwb)-1].Block.Block().Slot() + 1
_, err := verifyAndPopulateBlobs(bwb, nil, windowAfter)
require.NoError(t, err)
t.Run("happy path", func(t *testing.T) {
bwb, blobs := testSequenceBlockWithBlob(t, 10)
firstBlockSlot := bwb[0].Block.Block().Slot()
// slice off blobs for the last block so we hit the out of bounds / blob exhaustion check.
_, err = verifyAndPopulateBlobs(bwb, blobs[0:len(blobs)-6], firstBlockSlot)
require.ErrorIs(t, err, errMissingBlobsForBlockCommitments)
bwb, blobs = testSequenceBlockWithBlob(t, 10)
// Misalign the slots of the blobs for the first block to simulate them being missing from the response.
offByOne := blobs[0].Slot()
for i := range blobs {
if blobs[i].Slot() == offByOne {
blobs[i].SignedBlockHeader.Header.Slot = offByOne + 1
expectedCommits := make(map[[48]byte]bool)
for _, bl := range blobs {
expectedCommits[bytesutil.ToBytes48(bl.KzgCommitment)] = true
}
}
_, err = verifyAndPopulateBlobs(bwb, blobs, firstBlockSlot)
require.ErrorIs(t, err, verify.ErrBlobBlockMisaligned)
require.Equal(t, len(blobs), len(expectedCommits))
bwb, blobs = testSequenceBlockWithBlob(t, 10)
blobs[lastBlobIdx], err = blocks.NewROBlobWithRoot(blobs[lastBlobIdx].BlobSidecar, blobs[0].BlockRoot())
require.NoError(t, err)
_, err = verifyAndPopulateBlobs(bwb, blobs, firstBlockSlot)
require.ErrorIs(t, err, verify.ErrBlobBlockMisaligned)
bwb, blobs = testSequenceBlockWithBlob(t, 10)
blobs[lastBlobIdx].Index = 100
_, err = verifyAndPopulateBlobs(bwb, blobs, firstBlockSlot)
require.ErrorIs(t, err, verify.ErrIncorrectBlobIndex)
bwb, blobs = testSequenceBlockWithBlob(t, 10)
blobs[lastBlobIdx].SignedBlockHeader.Header.ProposerIndex = 100
blobs[lastBlobIdx], err = blocks.NewROBlob(blobs[lastBlobIdx].BlobSidecar)
require.NoError(t, err)
_, err = verifyAndPopulateBlobs(bwb, blobs, firstBlockSlot)
require.ErrorIs(t, err, verify.ErrBlobBlockMisaligned)
bwb, blobs = testSequenceBlockWithBlob(t, 10)
blobs[lastBlobIdx].SignedBlockHeader.Header.ParentRoot = blobs[0].SignedBlockHeader.Header.ParentRoot
blobs[lastBlobIdx], err = blocks.NewROBlob(blobs[lastBlobIdx].BlobSidecar)
require.NoError(t, err)
_, err = verifyAndPopulateBlobs(bwb, blobs, firstBlockSlot)
require.ErrorIs(t, err, verify.ErrBlobBlockMisaligned)
var emptyKzg [48]byte
bwb, blobs = testSequenceBlockWithBlob(t, 10)
blobs[lastBlobIdx].KzgCommitment = emptyKzg[:]
blobs[lastBlobIdx], err = blocks.NewROBlob(blobs[lastBlobIdx].BlobSidecar)
require.NoError(t, err)
_, err = verifyAndPopulateBlobs(bwb, blobs, firstBlockSlot)
require.ErrorIs(t, err, verify.ErrMismatchedBlobCommitments)
// happy path
bwb, blobs = testSequenceBlockWithBlob(t, 10)
expectedCommits := make(map[[48]byte]bool)
for _, bl := range blobs {
expectedCommits[bytesutil.ToBytes48(bl.KzgCommitment)] = true
}
// The assertions using this map expect all commitments to be unique, so make sure that stays true.
require.Equal(t, len(blobs), len(expectedCommits))
bwb, err = verifyAndPopulateBlobs(bwb, blobs, firstBlockSlot)
require.NoError(t, err)
for _, bw := range bwb {
commits, err := bw.Block.Block().Body().BlobKzgCommitments()
bwb, err := verifyAndPopulateBlobs(bwb, blobs, testReqFromResp(bwb), nil)
require.NoError(t, err)
require.Equal(t, len(commits), len(bw.Blobs))
for i := range commits {
bc := bytesutil.ToBytes48(commits[i])
require.Equal(t, bc, bytesutil.ToBytes48(bw.Blobs[i].KzgCommitment))
// Since we delete entries we've seen, duplicates will cause an error here.
_, ok := expectedCommits[bc]
// Make sure this was an expected delete, then delete it from the map so we can make sure we saw all of them.
require.Equal(t, true, ok)
delete(expectedCommits, bc)
for _, bw := range bwb {
commits, err := bw.Block.Block().Body().BlobKzgCommitments()
require.NoError(t, err)
require.Equal(t, len(commits), len(bw.Blobs))
for i := range commits {
bc := bytesutil.ToBytes48(commits[i])
require.Equal(t, bc, bytesutil.ToBytes48(bw.Blobs[i].KzgCommitment))
// Since we delete entries we've seen, duplicates will cause an error here.
_, ok := expectedCommits[bc]
// Make sure this was an expected delete, then delete it from the map so we can make sure we saw all of them.
require.Equal(t, true, ok)
delete(expectedCommits, bc)
}
}
}
// We delete each entry we've seen, so if we see all expected commits, the map should be empty at the end.
require.Equal(t, 0, len(expectedCommits))
// We delete each entry we've seen, so if we see all expected commits, the map should be empty at the end.
require.Equal(t, 0, len(expectedCommits))
})
t.Run("missing blobs", func(t *testing.T) {
bwb, blobs := testSequenceBlockWithBlob(t, 10)
_, err := verifyAndPopulateBlobs(bwb, blobs[1:], testReqFromResp(bwb), nil)
require.ErrorIs(t, err, errMissingBlobsForBlockCommitments)
})
t.Run("no blobs for last block", func(t *testing.T) {
bwb, blobs := testSequenceBlockWithBlob(t, 10)
lastIdx := len(bwb) - 1
lastBlk := bwb[lastIdx].Block
cmts, err := lastBlk.Block().Body().BlobKzgCommitments()
require.NoError(t, err)
blobs = blobs[0 : len(blobs)-len(cmts)]
lastBlk, _ = util.GenerateTestDenebBlockWithSidecar(t, lastBlk.Block().ParentRoot(), lastBlk.Block().Slot(), 0)
bwb[lastIdx].Block = lastBlk
_, err = verifyAndPopulateBlobs(bwb, blobs, testReqFromResp(bwb), nil)
require.NoError(t, err)
})
t.Run("blobs not copied if all locally available", func(t *testing.T) {
bwb, blobs := testSequenceBlockWithBlob(t, 10)
// r1 only has some blobs locally available, so we'll still copy them all.
// r7 has all blobs locally available, so we shouldn't copy them.
i1, i7 := 1, 7
r1, r7 := bwb[i1].Block.Root(), bwb[i7].Block.Root()
onDisk := map[[32]byte][]int{
r1: {0, 1},
r7: {0, 1, 2, 3, 4, 5},
}
bss := filesystem.NewMockBlobStorageSummarizer(t, onDisk)
bwb, err := verifyAndPopulateBlobs(bwb, blobs, testReqFromResp(bwb), bss)
require.NoError(t, err)
require.Equal(t, 6, len(bwb[i1].Blobs))
require.Equal(t, 0, len(bwb[i7].Blobs))
})
}
func TestBatchLimit(t *testing.T) {

View File

@@ -7,6 +7,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/startup"
beaconsync "github.com/prysmaticlabs/prysm/v5/beacon-chain/sync"
@@ -69,6 +70,7 @@ type blocksQueueConfig struct {
p2p p2p.P2P
db db.ReadOnlyDatabase
mode syncMode
bs filesystem.BlobStorageSummarizer
}
// blocksQueue is a priority queue that serves as a intermediary between block fetchers (producers)
@@ -101,12 +103,16 @@ func newBlocksQueue(ctx context.Context, cfg *blocksQueueConfig) *blocksQueue {
blocksFetcher := cfg.blocksFetcher
if blocksFetcher == nil {
if cfg.bs == nil {
log.Warn("rpc fetcher starting without blob availability cache, duplicate blobs may be requested.")
}
blocksFetcher = newBlocksFetcher(ctx, &blocksFetcherConfig{
ctxMap: cfg.ctxMap,
chain: cfg.chain,
p2p: cfg.p2p,
db: cfg.db,
clock: cfg.clock,
bs: cfg.bs,
})
}
highestExpectedSlot := cfg.highestExpectedSlot
@@ -139,7 +145,7 @@ func newBlocksQueue(ctx context.Context, cfg *blocksQueueConfig) *blocksQueue {
queue.smm.addEventHandler(eventDataReceived, stateScheduled, queue.onDataReceivedEvent(ctx))
queue.smm.addEventHandler(eventTick, stateDataParsed, queue.onReadyToSendEvent(ctx))
queue.smm.addEventHandler(eventTick, stateSkipped, queue.onProcessSkippedEvent(ctx))
queue.smm.addEventHandler(eventTick, stateSent, queue.onCheckStaleEvent(ctx))
queue.smm.addEventHandler(eventTick, stateSent, onCheckStaleEvent(ctx))
return queue
}
@@ -451,7 +457,7 @@ func (q *blocksQueue) onProcessSkippedEvent(ctx context.Context) eventHandlerFn
// onCheckStaleEvent is an event that allows to mark stale epochs,
// so that they can be re-processed.
func (_ *blocksQueue) onCheckStaleEvent(ctx context.Context) eventHandlerFn {
func onCheckStaleEvent(ctx context.Context) eventHandlerFn {
return func(m *stateMachine, in interface{}) (stateID, error) {
if ctx.Err() != nil {
return m.state, ctx.Err()

View File

@@ -971,24 +971,12 @@ func TestBlocksQueue_onProcessSkippedEvent(t *testing.T) {
}
func TestBlocksQueue_onCheckStaleEvent(t *testing.T) {
blockBatchLimit := flags.Get().BlockBatchLimit
mc, p2p, _ := initializeTestServices(t, []primitives.Slot{}, []*peerData{})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
chain: mc,
p2p: p2p,
})
t.Run("expired context", func(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
chain: mc,
highestExpectedSlot: primitives.Slot(blockBatchLimit),
})
handlerFn := queue.onCheckStaleEvent(ctx)
handlerFn := onCheckStaleEvent(ctx)
cancel()
updatedState, err := handlerFn(&stateMachine{
state: stateSkipped,
@@ -998,16 +986,10 @@ func TestBlocksQueue_onCheckStaleEvent(t *testing.T) {
})
t.Run("invalid input state", func(t *testing.T) {
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
chain: mc,
highestExpectedSlot: primitives.Slot(blockBatchLimit),
})
invalidStates := []stateID{stateNew, stateScheduled, stateDataParsed, stateSkipped}
for _, state := range invalidStates {
t.Run(state.String(), func(t *testing.T) {
handlerFn := queue.onCheckStaleEvent(ctx)
handlerFn := onCheckStaleEvent(ctx)
updatedState, err := handlerFn(&stateMachine{
state: state,
}, nil)
@@ -1018,12 +1000,7 @@ func TestBlocksQueue_onCheckStaleEvent(t *testing.T) {
})
t.Run("process non stale machine", func(t *testing.T) {
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
chain: mc,
highestExpectedSlot: primitives.Slot(blockBatchLimit),
})
handlerFn := queue.onCheckStaleEvent(ctx)
handlerFn := onCheckStaleEvent(ctx)
updatedState, err := handlerFn(&stateMachine{
state: stateSent,
updated: prysmTime.Now().Add(-staleEpochTimeout / 2),
@@ -1034,12 +1011,7 @@ func TestBlocksQueue_onCheckStaleEvent(t *testing.T) {
})
t.Run("process stale machine", func(t *testing.T) {
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
chain: mc,
highestExpectedSlot: primitives.Slot(blockBatchLimit),
})
handlerFn := queue.onCheckStaleEvent(ctx)
handlerFn := onCheckStaleEvent(ctx)
updatedState, err := handlerFn(&stateMachine{
state: stateSent,
updated: prysmTime.Now().Add(-staleEpochTimeout),

View File

@@ -11,6 +11,7 @@ import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
@@ -62,7 +63,39 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
return s.syncToNonFinalizedEpoch(ctx, genesis)
}
// syncToFinalizedEpoch sync from head to best known finalized epoch.
func (s *Service) startBlocksQueue(ctx context.Context, highestSlot primitives.Slot, mode syncMode) (*blocksQueue, error) {
vr := s.clock.GenesisValidatorsRoot()
ctxMap, err := sync.ContextByteVersionsForValRoot(vr)
if err != nil {
return nil, errors.Wrapf(err, "unable to initialize context version map using genesis validator root = %#x", vr)
}
summarizer, err := s.cfg.BlobStorage.WaitForSummarizer(ctx)
if err != nil {
// The summarizer is an optional optimization, we can continue without, only stop if there is a different error.
if !errors.Is(err, filesystem.ErrBlobStorageSummarizerUnavailable) {
return nil, err
}
summarizer = nil // This should already be nil, but we'll set it just to be safe.
}
cfg := &blocksQueueConfig{
p2p: s.cfg.P2P,
db: s.cfg.DB,
chain: s.cfg.Chain,
clock: s.clock,
ctxMap: ctxMap,
highestExpectedSlot: highestSlot,
mode: mode,
bs: summarizer,
}
queue := newBlocksQueue(ctx, cfg)
if err := queue.start(); err != nil {
return nil, err
}
return queue, nil
}
// syncToFinalizedEpoch sync from head to the best known finalized epoch.
func (s *Service) syncToFinalizedEpoch(ctx context.Context, genesis time.Time) error {
highestFinalizedSlot, err := slots.EpochStart(s.highestFinalizedEpoch())
if err != nil {
@@ -74,28 +107,12 @@ func (s *Service) syncToFinalizedEpoch(ctx context.Context, genesis time.Time) e
return nil
}
vr := s.clock.GenesisValidatorsRoot()
ctxMap, err := sync.ContextByteVersionsForValRoot(vr)
queue, err := s.startBlocksQueue(ctx, highestFinalizedSlot, modeStopOnFinalizedEpoch)
if err != nil {
return errors.Wrapf(err, "unable to initialize context version map using genesis validator root = %#x", vr)
}
queue := newBlocksQueue(ctx, &blocksQueueConfig{
p2p: s.cfg.P2P,
db: s.cfg.DB,
chain: s.cfg.Chain,
clock: s.clock,
ctxMap: ctxMap,
highestExpectedSlot: highestFinalizedSlot,
mode: modeStopOnFinalizedEpoch,
})
if err := queue.start(); err != nil {
return err
}
for data := range queue.fetchedData {
// If blobs are available. Verify blobs and blocks are consistence.
// We can't import a block if there's no associated blob within DA bound.
// The blob has to pass aggregated proof check.
s.processFetchedData(ctx, genesis, s.cfg.Chain.HeadSlot(), data)
}
@@ -113,21 +130,8 @@ func (s *Service) syncToFinalizedEpoch(ctx context.Context, genesis time.Time) e
// syncToNonFinalizedEpoch sync from head to best known non-finalized epoch supported by majority
// of peers (no less than MinimumSyncPeers*2 peers).
func (s *Service) syncToNonFinalizedEpoch(ctx context.Context, genesis time.Time) error {
vr := s.clock.GenesisValidatorsRoot()
ctxMap, err := sync.ContextByteVersionsForValRoot(vr)
queue, err := s.startBlocksQueue(ctx, slots.Since(genesis), modeNonConstrained)
if err != nil {
return errors.Wrapf(err, "unable to initialize context version map using genesis validator root = %#x", vr)
}
queue := newBlocksQueue(ctx, &blocksQueueConfig{
p2p: s.cfg.P2P,
db: s.cfg.DB,
chain: s.cfg.Chain,
clock: s.clock,
ctxMap: ctxMap,
highestExpectedSlot: slots.Since(genesis),
mode: modeNonConstrained,
})
if err := queue.start(); err != nil {
return err
}
for data := range queue.fetchedData {

View File

@@ -10,7 +10,6 @@ import (
libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr"
ssz "github.com/prysmaticlabs/fastssz"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
p2ptypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
@@ -146,25 +145,8 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
// About the special case for quic-v1, please see:
// https://github.com/quic-go/quic-go/issues/3291
defer func() {
isQuic := false
multiaddr.ForEach(stream.Conn().RemoteMultiaddr(), func(c multiaddr.Component) bool {
pCode := c.Protocol().Code
if pCode == multiaddr.P_QUIC || pCode == multiaddr.P_QUIC_V1 {
isQuic = true
return false
}
return true
})
// We special case for QUIC connections as unlike yamux where a reset is a no-op for a successful close. An abrupt
// stream termination can lead to the remote peer dropping the inbound data. For that reason, we only reset streams
// in the event there is an error while closing them.
if isQuic {
if err := stream.Close(); err != nil {
_err := stream.Reset()
_ = _err
}
return
if strings.Contains(stream.Conn().RemoteMultiaddr().String(), "quic-v1") {
time.Sleep(2 * time.Second)
}
_err := stream.Reset()

View File

@@ -4,6 +4,7 @@ package cmd
import (
"fmt"
"math"
"slices"
"strings"
"github.com/prysmaticlabs/prysm/v5/config/params"
@@ -329,10 +330,10 @@ func ValidateNoArgs(ctx *cli.Context) error {
// verifies that the provided command is in the command list.
func checkCommandList(commands []*cli.Command, name string) *cli.Command {
for _, c := range commands {
if c.Name == name {
return c
}
if i := slices.IndexFunc(commands, func(c *cli.Command) bool {
return c.Name == name
}); i >= 0 {
return commands[i]
}
return nil
}

View File

@@ -85,7 +85,7 @@ var (
},
&cli.StringFlag{
Name: "config-name",
Usage: "Config kind to be used for generating the genesis state. Default: mainnet. Options include mainnet, interop, minimal, prater, sepolia. --chain-config-file will override this flag.",
Usage: "Config kind to be used for generating the genesis state. Default: mainnet. Options include mainnet, interop, minimal, sepolia, holesky. --chain-config-file will override this flag.",
Destination: &generateGenesisStateFlags.ConfigName,
Value: params.MainnetName,
},

View File

@@ -167,7 +167,6 @@ var Commands = []*cli.Command{
flags.ForceExitFlag,
flags.VoluntaryExitJSONOutputPath,
features.Mainnet,
features.PraterTestnet,
features.SepoliaTestnet,
features.HoleskyTestnet,
cmd.AcceptTosFlag,

View File

@@ -27,7 +27,6 @@ var Commands = &cli.Command{
flags.WalletPasswordFileFlag,
flags.DeletePublicKeysFlag,
features.Mainnet,
features.PraterTestnet,
features.SepoliaTestnet,
features.HoleskyTestnet,
cmd.AcceptTosFlag,
@@ -63,7 +62,6 @@ var Commands = &cli.Command{
flags.GrpcRetriesFlag,
flags.GrpcRetryDelayFlag,
features.Mainnet,
features.PraterTestnet,
features.SepoliaTestnet,
features.HoleskyTestnet,
cmd.AcceptTosFlag,
@@ -97,7 +95,6 @@ var Commands = &cli.Command{
flags.BackupPublicKeysFlag,
flags.BackupPasswordFile,
features.Mainnet,
features.PraterTestnet,
features.SepoliaTestnet,
features.HoleskyTestnet,
cmd.AcceptTosFlag,
@@ -128,7 +125,6 @@ var Commands = &cli.Command{
flags.AccountPasswordFileFlag,
flags.ImportPrivateKeyFileFlag,
features.Mainnet,
features.PraterTestnet,
features.SepoliaTestnet,
features.HoleskyTestnet,
cmd.AcceptTosFlag,
@@ -171,7 +167,6 @@ var Commands = &cli.Command{
flags.ForceExitFlag,
flags.VoluntaryExitJSONOutputPath,
features.Mainnet,
features.PraterTestnet,
features.SepoliaTestnet,
features.HoleskyTestnet,
cmd.AcceptTosFlag,

View File

@@ -22,7 +22,6 @@ var Commands = &cli.Command{
cmd.DataDirFlag,
flags.SlashingProtectionExportDirFlag,
features.Mainnet,
features.PraterTestnet,
features.SepoliaTestnet,
features.HoleskyTestnet,
features.EnableMinimalSlashingProtection,
@@ -51,7 +50,6 @@ var Commands = &cli.Command{
cmd.DataDirFlag,
flags.SlashingProtectionJSONFileFlag,
features.Mainnet,
features.PraterTestnet,
features.SepoliaTestnet,
features.HoleskyTestnet,
features.EnableMinimalSlashingProtection,

View File

@@ -31,7 +31,6 @@ var Commands = &cli.Command{
flags.Mnemonic25thWordFileFlag,
flags.SkipMnemonic25thWordCheckFlag,
features.Mainnet,
features.PraterTestnet,
features.SepoliaTestnet,
features.HoleskyTestnet,
cmd.AcceptTosFlag,
@@ -63,7 +62,6 @@ var Commands = &cli.Command{
flags.Mnemonic25thWordFileFlag,
flags.SkipMnemonic25thWordCheckFlag,
features.Mainnet,
features.PraterTestnet,
features.SepoliaTestnet,
features.HoleskyTestnet,
cmd.AcceptTosFlag,

View File

@@ -42,6 +42,7 @@ type Flags struct {
WriteSSZStateTransitions bool // WriteSSZStateTransitions to tmp directory.
EnablePeerScorer bool // EnablePeerScorer enables experimental peer scoring in p2p.
EnableLightClient bool // EnableLightClient enables light client APIs.
EnableQUIC bool // EnableQUIC specifies whether to enable QUIC transport for libp2p.
WriteWalletPasswordOnWebOnboarding bool // WriteWalletPasswordOnWebOnboarding writes the password to disk after Prysm web signup.
EnableDoppelGanger bool // EnableDoppelGanger enables doppelganger protection on startup for the validator.
EnableHistoricalSpaceRepresentation bool // EnableHistoricalSpaceRepresentation enables the saving of registry validators in separate buckets to save space
@@ -123,14 +124,7 @@ func InitWithReset(c *Flags) func() {
// configureTestnet sets the config according to specified testnet flag
func configureTestnet(ctx *cli.Context) error {
if ctx.Bool(PraterTestnet.Name) {
log.Info("Running on the Prater Testnet")
if err := params.SetActive(params.PraterConfig().Copy()); err != nil {
return err
}
applyPraterFeatureFlags(ctx)
params.UsePraterNetworkConfig()
} else if ctx.Bool(SepoliaTestnet.Name) {
if ctx.Bool(SepoliaTestnet.Name) {
log.Info("Running on the Sepolia Beacon Chain Testnet")
if err := params.SetActive(params.SepoliaConfig().Copy()); err != nil {
return err
@@ -157,10 +151,6 @@ func configureTestnet(ctx *cli.Context) error {
return nil
}
// Insert feature flags within the function to be enabled for Prater testnet.
func applyPraterFeatureFlags(ctx *cli.Context) {
}
// Insert feature flags within the function to be enabled for Sepolia testnet.
func applySepoliaFeatureFlags(ctx *cli.Context) {
}
@@ -265,6 +255,10 @@ func ConfigureBeaconChain(ctx *cli.Context) error {
logEnabled(BlobSaveFsync)
cfg.BlobSaveFsync = true
}
if ctx.IsSet(EnableQUIC.Name) {
logEnabled(EnableQUIC)
cfg.EnableQUIC = true
}
cfg.AggregateIntervals = [3]time.Duration{aggregateFirstInterval.Value, aggregateSecondInterval.Value, aggregateThirdInterval.Value}
Init(cfg)

View File

@@ -8,12 +8,6 @@ import (
)
var (
// PraterTestnet flag for the multiclient Ethereum consensus testnet.
PraterTestnet = &cli.BoolFlag{
Name: "prater",
Usage: "Runs Prysm configured for the Prater / Goerli test network.",
Aliases: []string{"goerli"},
}
// SepoliaTestnet flag for the multiclient Ethereum consensus testnet.
SepoliaTestnet = &cli.BoolFlag{
Name: "sepolia",
@@ -171,19 +165,24 @@ var (
Name: "blob-save-fsync",
Usage: "Forces new blob files to be fysnc'd before continuing, ensuring durable blob writes.",
}
// EnableQUIC enables connection using the QUIC protocol for peers which support it.
EnableQUIC = &cli.BoolFlag{
Name: "enable-quic",
Usage: "Enables connection using the QUIC protocol for peers which support it.",
}
)
// devModeFlags holds list of flags that are set when development mode is on.
var devModeFlags = []cli.Flag{
enableExperimentalState,
backfill.EnableExperimentalBackfill,
EnableQUIC,
}
// ValidatorFlags contains a list of all the feature flags that apply to the validator client.
var ValidatorFlags = append(deprecatedFlags, []cli.Flag{
writeWalletPasswordOnWebOnboarding,
HoleskyTestnet,
PraterTestnet,
SepoliaTestnet,
Mainnet,
dynamicKeyReloadDebounceInterval,
@@ -208,7 +207,6 @@ var BeaconChainFlags = append(deprecatedBeaconFlags, append(deprecatedFlags, []c
saveInvalidBlobTempFlag,
disableGRPCConnectionLogging,
HoleskyTestnet,
PraterTestnet,
SepoliaTestnet,
Mainnet,
disablePeerScorer,
@@ -229,6 +227,7 @@ var BeaconChainFlags = append(deprecatedBeaconFlags, append(deprecatedFlags, []c
DisableRegistrationCache,
EnableLightClient,
BlobSaveFsync,
EnableQUIC,
}...)...)
// E2EBeaconChainFlags contains a list of the beacon chain feature flags to be tested in E2E.
@@ -239,7 +238,6 @@ var E2EBeaconChainFlags = []string{
// NetworkFlags contains a list of network flags.
var NetworkFlags = []cli.Flag{
Mainnet,
PraterTestnet,
SepoliaTestnet,
HoleskyTestnet,
}

View File

@@ -16,7 +16,6 @@ go_library(
"network_config.go",
"testnet_e2e_config.go",
"testnet_holesky_config.go",
"testnet_prater_config.go",
"testnet_sepolia_config.go",
"testutils.go",
"testutils_develop.go", # keep
@@ -50,7 +49,6 @@ go_test(
"mainnet_config_test.go",
"testnet_config_test.go",
"testnet_holesky_config_test.go",
"testnet_prater_config_test.go",
],
data = glob(["*.yaml"]) + [
"testdata/e2e_config.yaml",
@@ -58,14 +56,12 @@ go_test(
"@consensus_spec_tests_mainnet//:test_data",
"@consensus_spec_tests_minimal//:test_data",
"@eth2_networks//:configs",
"@goerli_testnet//:configs",
"@holesky_testnet//:configs",
],
embed = [":go_default_library"],
gotags = ["develop"],
tags = ["CI_race_detection"],
deps = [
"//build/bazel:go_default_library",
"//consensus-types/primitives:go_default_library",
"//encoding/bytesutil:go_default_library",
"//io/file:go_default_library",

View File

@@ -3,7 +3,6 @@ package params
func init() {
defaults := []*BeaconChainConfig{
MainnetConfig(),
PraterConfig(),
MinimalSpecConfig(),
E2ETestConfig(),
E2EMainnetTestConfig(),

View File

@@ -82,7 +82,7 @@ PROPOSER_SCORE_BOOST: 40
# Deposit contract
# ---------------------------------------------------------------
# Ethereum Goerli testnet
# Testnet
DEPOSIT_CHAIN_ID: 1337 # Override for e2e tests
DEPOSIT_NETWORK_ID: 1337 # Override for e2e tests
# Configured on a per testnet basis

View File

@@ -1,49 +0,0 @@
package params
import (
eth1Params "github.com/ethereum/go-ethereum/params"
)
// UsePraterNetworkConfig uses the Prater specific
// network config.
func UsePraterNetworkConfig() {
cfg := BeaconNetworkConfig().Copy()
cfg.ContractDeploymentBlock = 4367322
cfg.BootstrapNodes = []string{
// Prysm's bootnode
"enr:-Ku4QFmUkNp0g9bsLX2PfVeIyT-9WO-PZlrqZBNtEyofOOfLMScDjaTzGxIb1Ns9Wo5Pm_8nlq-SZwcQfTH2cgO-s88Bh2F0dG5ldHOIAAAAAAAAAACEZXRoMpDkvpOTAAAQIP__________gmlkgnY0gmlwhBLf22SJc2VjcDI1NmsxoQLV_jMOIxKbjHFKgrkFvwDvpexo6Nd58TK5k7ss4Vt0IoN1ZHCCG1g",
// Lighthouse's bootnode by Afri
"enr:-LK4QH1xnjotgXwg25IDPjrqRGFnH1ScgNHA3dv1Z8xHCp4uP3N3Jjl_aYv_WIxQRdwZvSukzbwspXZ7JjpldyeVDzMCh2F0dG5ldHOIAAAAAAAAAACEZXRoMpB53wQoAAAQIP__________gmlkgnY0gmlwhIe1te-Jc2VjcDI1NmsxoQOkcGXqbCJYbcClZ3z5f6NWhX_1YPFRYRRWQpJjwSHpVIN0Y3CCIyiDdWRwgiMo",
// Lighthouse's bootnode by Sigp
"enr:-Ly4QFPk-cTMxZ3jWTafiNblEZkQIXGF2aVzCIGW0uHp6KaEAvBMoctE8S7YU0qZtuS7By0AA4YMfKoN9ls_GJRccVpFh2F0dG5ldHOI__________-EZXRoMpCC9KcrAgAQIIS2AQAAAAAAgmlkgnY0gmlwhKh3joWJc2VjcDI1NmsxoQKrxz8M1IHwJqRIpDqdVW_U1PeixMW5SfnBD-8idYIQrIhzeW5jbmV0cw-DdGNwgiMog3VkcIIjKA",
"enr:-L64QJmwSDtaHVgGiqIxJWUtxWg6uLCipsms6j-8BdsOJfTWAs7CLF9HJnVqFE728O-JYUDCxzKvRdeMqBSauHVCMdaCAVWHYXR0bmV0c4j__________4RldGgykIL0pysCABAghLYBAAAAAACCaWSCdjSCaXCEQWxOdolzZWNwMjU2azGhA7Qmod9fK86WidPOzLsn5_8QyzL7ZcJ1Reca7RnD54vuiHN5bmNuZXRzD4N0Y3CCIyiDdWRwgiMo",
// Teku's bootnode By Afri
"enr:-KG4QCIzJZTY_fs_2vqWEatJL9RrtnPwDCv-jRBuO5FQ2qBrfJubWOWazri6s9HsyZdu-fRUfEzkebhf1nvO42_FVzwDhGV0aDKQed8EKAAAECD__________4JpZIJ2NIJpcISHtbYziXNlY3AyNTZrMaED4m9AqVs6F32rSCGsjtYcsyfQE2K8nDiGmocUY_iq-TSDdGNwgiMog3VkcIIjKA",
}
OverrideBeaconNetworkConfig(cfg)
}
// PraterConfig defines the config for the
// Prater testnet.
func PraterConfig() *BeaconChainConfig {
cfg := MainnetConfig().Copy()
cfg.MinGenesisTime = 1614588812
cfg.GenesisDelay = 1919188
cfg.ConfigName = PraterName
cfg.GenesisForkVersion = []byte{0x00, 0x00, 0x10, 0x20}
cfg.SecondsPerETH1Block = 14
cfg.DepositChainID = eth1Params.GoerliChainConfig.ChainID.Uint64()
cfg.DepositNetworkID = eth1Params.GoerliChainConfig.ChainID.Uint64()
cfg.AltairForkEpoch = 36660
cfg.AltairForkVersion = []byte{0x1, 0x0, 0x10, 0x20}
cfg.BellatrixForkEpoch = 112260
cfg.BellatrixForkVersion = []byte{0x2, 0x0, 0x10, 0x20}
cfg.CapellaForkEpoch = 162304
cfg.CapellaForkVersion = []byte{0x3, 0x0, 0x10, 0x20}
cfg.DenebForkEpoch = 231680 // 2024-01-17 06:32:00 (UTC)
cfg.DenebForkVersion = []byte{0x4, 0x0, 0x10, 0x20}
cfg.TerminalTotalDifficulty = "10790000"
cfg.DepositContractAddress = "0xff50ed3d0ec03aC01D4C79aAd74928BFF48a7b2b"
cfg.InitializeForkSchedule()
return cfg
}

View File

@@ -1,28 +0,0 @@
package params_test
import (
"path"
"testing"
"github.com/prysmaticlabs/prysm/v5/build/bazel"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/testing/require"
)
func TestPraterConfigMatchesUpstreamYaml(t *testing.T) {
presetFPs := presetsFilePath(t, "mainnet")
mn, err := params.ByName(params.MainnetName)
require.NoError(t, err)
cfg := mn.Copy()
for _, fp := range presetFPs {
cfg, err = params.UnmarshalConfigFile(fp, cfg)
require.NoError(t, err)
}
fPath, err := bazel.Runfile("external/goerli_testnet")
require.NoError(t, err)
configFP := path.Join(fPath, "prater", "config.yaml")
pcfg, err := params.UnmarshalConfigFile(configFP, nil)
require.NoError(t, err)
fields := fieldsFromYamls(t, append(presetFPs, configFP))
assertYamlFieldsMatch(t, "prater", fields, pcfg, params.PraterConfig())
}

View File

@@ -8,8 +8,6 @@ const (
MainnetName = "mainnet"
MainnetTestName = "mainnet-test"
MinimalName = "minimal"
PraterName = "prater"
GoerliName = "goerli"
SepoliaName = "sepolia"
HoleskyName = "holesky"
)

View File

@@ -91,6 +91,7 @@ package mvslice
import (
"fmt"
"slices"
"sync"
"github.com/pkg/errors"
@@ -578,10 +579,8 @@ func (s *Slice[V]) updateAppendedItem(obj Identifiable, index uint64, val V) err
}
func containsId(ids []uint64, wanted uint64) (int, bool) {
for i, id := range ids {
if id == wanted {
return i, true
}
if i := slices.Index(ids, wanted); i >= 0 {
return i, true
}
return 0, false
}

View File

@@ -13,8 +13,8 @@ var urltests = []struct {
maskedUrl string
}{
{"https://a:b@xyz.net", "https://***@xyz.net"},
{"https://eth-goerli.alchemyapi.io/v2/tOZG5mjl3.zl_nZdZTNIBUzsDq62R_dkOtY",
"https://eth-goerli.alchemyapi.io/***"},
{"https://eth-holesky.alchemyapi.io/v2/tOZG5mjl3.zl_nZdZTNIBUzsDq62R_dkOtY",
"https://eth-holesky.alchemyapi.io/***"},
{"https://google.com/search?q=golang", "https://google.com/***"},
{"https://user@example.com/foo%2fbar", "https://***@example.com/***"},
{"http://john@example.com/#x/y%2Fz", "http://***@example.com/#***"},

View File

@@ -276,6 +276,7 @@ func (node *BeaconNode) Start(ctx context.Context) error {
"--" + cmdshared.ForceClearDB.Name,
"--" + cmdshared.AcceptTosFlag.Name,
"--" + flags.EnableDebugRPCEndpoints.Name,
"--" + features.EnableQUIC.Name,
}
if config.UsePprof {
args = append(args, "--pprof", fmt.Sprintf("--pprofport=%d", e2e.TestParams.Ports.PrysmBeaconNodePprofPort+index))
@@ -314,7 +315,7 @@ func (node *BeaconNode) Start(ctx context.Context) error {
}
if config.UseFixedPeerIDs {
peerId, err := helpers.FindFollowingTextInFile(stdOutFile, "Running node with peerId=")
peerId, err := helpers.FindFollowingTextInFile(stdOutFile, "Running node with id")
if err != nil {
return fmt.Errorf("could not find peer id: %w", err)
}

View File

@@ -30,7 +30,7 @@ var (
var (
port = flag.Int("port", 9090, "Port to serve /metrics")
web3URL = flag.String("web3-provider", "https://goerli.prylabs.net", "Web3 URL to access information about ETH1")
web3URL = flag.String("web3-provider", "https://holesky.prylabs.net", "Web3 URL to access information about ETH1")
prefix = flag.String("prefix", "", "Metrics prefix.")
addressFilePath = flag.String("addresses", "", "File path to addresses text file.")
)

View File

@@ -7,7 +7,7 @@ Flags:
-beacon string
gRPC address of the Prysm beacon node (default "127.0.0.1:4000")
-genesis uint
Genesis time. mainnet=1606824023, prater=1616508000 (default 1606824023)
Genesis time. mainnet=1606824023, holesky=1695902400 (default 1606824023)
```
Usage:

View File

@@ -19,7 +19,7 @@ import (
var (
beacon = flag.String("beacon", "127.0.0.1:4000", "gRPC address of the Prysm beacon node")
genesis = flag.Uint64("genesis", 1606824023, "Genesis time. mainnet=1606824023, prater=1616508000")
genesis = flag.Uint64("genesis", 1606824023, "Genesis time. mainnet=1606824023, holesky=1695902400")
)
func main() {

View File

@@ -209,14 +209,6 @@ var stateTransitionCommand = &cli.Command{
Action: func(c *cli.Context) error {
if network != "" {
switch network {
case params.PraterName:
if err := params.SetActive(params.PraterConfig()); err != nil {
log.Fatal(err)
}
case params.GoerliName:
if err := params.SetActive(params.PraterConfig()); err != nil {
log.Fatal(err)
}
case params.SepoliaName:
if err := params.SetActive(params.SepoliaConfig()); err != nil {
log.Fatal(err)

View File

@@ -175,8 +175,6 @@ func formatBeaconChaURL(key []byte) string {
keyWithout0x := hexutil.Encode(key)[2:]
switch env := params.BeaconConfig().ConfigName; env {
case params.PraterName, params.GoerliName:
return fmt.Sprintf(baseURL, "prater.", keyWithout0x)
case params.HoleskyName:
return fmt.Sprintf(baseURL, "holesky.", keyWithout0x)
case params.SepoliaName:

View File

@@ -24,14 +24,6 @@ func TestDisplayExitInfo(t *testing.T) {
displayExitInfo([][]byte{key}, []string{string(key)})
assert.LogsContain(t, logHook, "https://beaconcha.in/validator/3078313233343536")
params.BeaconConfig().ConfigName = params.GoerliName
displayExitInfo([][]byte{key}, []string{string(key)})
assert.LogsContain(t, logHook, "https://prater.beaconcha.in/validator/3078313233343536")
params.BeaconConfig().ConfigName = params.PraterName
displayExitInfo([][]byte{key}, []string{string(key)})
assert.LogsContain(t, logHook, "https://prater.beaconcha.in/validator/3078313233343536")
params.BeaconConfig().ConfigName = params.HoleskyName
displayExitInfo([][]byte{key}, []string{string(key)})
assert.LogsContain(t, logHook, "https://holesky.beaconcha.in/validator/3078313233343536")