Compare commits

...

1 Commits

Author SHA1 Message Date
Manu NALEPA
fbe25f426a Activate peerDAS at electra. 2024-12-18 21:27:20 +01:00
31 changed files with 541 additions and 399 deletions

View File

@@ -55,7 +55,7 @@ func HigherEqualThanAltairVersionAndEpoch(s state.BeaconState, e primitives.Epoc
// PeerDASIsActive checks whether peerDAS is active at the provided slot.
func PeerDASIsActive(slot primitives.Slot) bool {
return params.PeerDASEnabled() && slots.ToEpoch(slot) >= params.BeaconConfig().Eip7594ForkEpoch
return params.PeerDASEnabled() && slots.ToEpoch(slot) >= params.BeaconConfig().ElectraForkEpoch
}
// CanUpgradeToAltair returns true if the input `slot` can upgrade to Altair.

View File

@@ -113,7 +113,7 @@ func (s *LazilyPersistentStoreColumn) IsDataAvailable(
// ignore their response and decrease their peer score.
roDataColumns, err := entry.filterColumns(blockRoot, blockCommitments)
if err != nil {
return errors.Wrap(err, "incomplete BlobSidecar batch")
return errors.Wrap(err, "incomplete DataColumnSidecar batch")
}
// Create verified RO data columns from RO data columns.

View File

@@ -110,7 +110,7 @@ func (e *cacheEntry) filter(root [32]byte, kc safeCommitmentArray) ([]blocks.ROB
return nil, nil
}
scs := make([]blocks.ROBlob, 0, kc.count())
for i := uint64(0); i < fieldparams.MaxBlobsPerBlock; i++ {
for i := range uint64(fieldparams.MaxBlobsPerBlock) {
// We already have this blob, we don't need to write it or validate it.
if e.diskSummary.HasIndex(i) {
continue
@@ -143,8 +143,8 @@ func (e *cacheEntry) filterColumns(root [32]byte, commitmentsArray *safeCommitme
commitmentsCount := commitmentsArray.count()
sidecars := make([]blocks.RODataColumn, 0, commitmentsCount)
for i := uint64(0); i < fieldparams.NumberOfColumns; i++ {
// Skip if we arleady store this data column.
for i := range uint64(fieldparams.NumberOfColumns) {
// Skip if we already store this data column.
if e.diskSummary.HasIndex(i) {
continue
}

View File

@@ -228,10 +228,10 @@ func (s *Service) RefreshPersistentSubnets() {
isBitSUpToDate := bytes.Equal(bitS, inRecordBitS) && bytes.Equal(bitS, currentBitSInMetadata)
// Compare current epoch with EIP-7594 fork epoch.
eip7594ForkEpoch := params.BeaconConfig().Eip7594ForkEpoch
// Compare current epoch with the Electra fork epoch.
electraForkEpoch := params.BeaconConfig().ElectraForkEpoch
if currentEpoch < eip7594ForkEpoch {
if currentEpoch < electraForkEpoch {
// Altair behaviour.
if metadataVersion == version.Altair && isBitVUpToDate && isBitSUpToDate {
// Nothing to do, return early.

View File

@@ -135,7 +135,7 @@ func TestStartDiscV5_DiscoverAllPeers(t *testing.T) {
func TestCreateLocalNode(t *testing.T) {
params.SetupTestConfigCleanup(t)
cfg := params.BeaconConfig()
cfg.Eip7594ForkEpoch = 1
cfg.ElectraForkEpoch = 1
params.OverrideBeaconConfig(cfg)
testCases := []struct {
name string
@@ -626,7 +626,7 @@ func TestRefreshPersistentSubnets(t *testing.T) {
const (
altairForkEpoch = 5
eip7594ForkEpoch = 10
electraForkEpoch = 10
)
custodySubnetCount := params.BeaconConfig().CustodyRequirement
@@ -635,7 +635,7 @@ func TestRefreshPersistentSubnets(t *testing.T) {
defaultCfg := params.BeaconConfig()
cfg := defaultCfg.Copy()
cfg.AltairForkEpoch = altairForkEpoch
cfg.Eip7594ForkEpoch = eip7594ForkEpoch
cfg.ElectraForkEpoch = electraForkEpoch
params.OverrideBeaconConfig(cfg)
// Compute the number of seconds per epoch.
@@ -706,7 +706,7 @@ func TestRefreshPersistentSubnets(t *testing.T) {
},
{
name: "PeerDAS",
epochSinceGenesis: eip7594ForkEpoch,
epochSinceGenesis: electraForkEpoch,
checks: []check{
{
pingCount: 0,

View File

@@ -306,7 +306,7 @@ func TopicFromMessage(msg string, epoch primitives.Epoch) (string, error) {
}
// Check if the message is to be updated in peerDAS.
isPeerDAS := epoch >= params.BeaconConfig().Eip7594ForkEpoch
isPeerDAS := epoch >= params.BeaconConfig().ElectraForkEpoch
if isPeerDAS && peerDASMapping[msg] {
version = SchemaVersionV3
}

View File

@@ -79,7 +79,6 @@ func TestGetSpec(t *testing.T) {
config.DenebForkEpoch = 105
config.ElectraForkVersion = []byte("ElectraForkVersion")
config.ElectraForkEpoch = 107
config.Eip7594ForkEpoch = 109
config.BLSWithdrawalPrefixByte = byte('b')
config.ETH1AddressWithdrawalPrefixByte = byte('c')
config.GenesisDelay = 24
@@ -190,7 +189,7 @@ func TestGetSpec(t *testing.T) {
data, ok := resp.Data.(map[string]interface{})
require.Equal(t, true, ok)
assert.Equal(t, 156, len(data))
assert.Equal(t, 155, len(data))
for k, v := range data {
t.Run(k, func(t *testing.T) {
switch k {
@@ -268,8 +267,6 @@ func TestGetSpec(t *testing.T) {
assert.Equal(t, "0x"+hex.EncodeToString([]byte("ElectraForkVersion")), v)
case "ELECTRA_FORK_EPOCH":
assert.Equal(t, "107", v)
case "EIP7594_FORK_EPOCH":
assert.Equal(t, "109", v)
case "MIN_ANCHOR_POW_BLOCK_DIFFICULTY":
assert.Equal(t, "1000", v)
case "BLS_WITHDRAWAL_PREFIX":

View File

@@ -471,12 +471,12 @@ func (p *BeaconDbBlocker) Blobs(ctx context.Context, id string, indices map[uint
blockSlot := b.Block().Slot()
// Get the first peerDAS epoch.
eip7594ForkEpoch := params.BeaconConfig().Eip7594ForkEpoch
electraForkEpoch := params.BeaconConfig().ElectraForkEpoch
// Compute the first peerDAS slot.
peerDASStartSlot := primitives.Slot(math.MaxUint64)
if eip7594ForkEpoch != primitives.Epoch(math.MaxUint64) {
peerDASStartSlot, err = slots.EpochStart(eip7594ForkEpoch)
if electraForkEpoch != primitives.Epoch(math.MaxUint64) {
peerDASStartSlot, err = slots.EpochStart(electraForkEpoch)
if err != nil {
return nil, &core.RpcError{Err: errors.Wrap(err, "could not calculate peerDAS start slot"), Reason: core.Internal}
}

View File

@@ -199,7 +199,7 @@ func setupDataColumnSamplerTest(t *testing.T, blobCount uint64) (*dataSamplerTes
ctx: context.Background(),
p2pSvc: p2pSvc,
peers: []*p2ptest.TestP2P{},
ctxMap: map[[4]byte]int{{245, 165, 253, 66}: version.Deneb},
ctxMap: map[[4]byte]int{{245, 165, 253, 66}: version.Electra},
chainSvc: chainSvc,
blockProcessedData: blockProcessedData,
blobs: blobs,

View File

@@ -1,6 +1,7 @@
package sync
import (
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v5/config/params"
@@ -39,80 +40,114 @@ func (s *Service) forkWatcher() {
}
}
// Checks if there is a fork in the next epoch and if there is
// it registers the appropriate gossip and rpc topics.
func (s *Service) registerForUpcomingFork(currEpoch primitives.Epoch) error {
genRoot := s.cfg.clock.GenesisValidatorsRoot()
isNextForkEpoch, err := forks.IsForkNextEpoch(s.cfg.clock.GenesisTime(), genRoot[:])
// Register appropriate gossip and RPC topic if there is a fork in the next epoch.
func (s *Service) registerForUpcomingFork(currentEpoch primitives.Epoch) error {
// Get the genesis validators root.
genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot()
// Check if there is a fork in the next epoch.
isForkNextEpoch, err := forks.IsForkNextEpoch(s.cfg.clock.GenesisTime(), genesisValidatorsRoot[:])
if err != nil {
return errors.Wrap(err, "Could not retrieve next fork epoch")
}
// In preparation for the upcoming fork
// in the following epoch, the node
// will subscribe the new topics in advance.
if isNextForkEpoch {
nextEpoch := currEpoch + 1
digest, err := forks.ForkDigestFromEpoch(nextEpoch, genRoot[:])
if err != nil {
return errors.Wrap(err, "could not retrieve fork digest")
}
if s.subHandler.digestExists(digest) {
return nil
}
s.registerSubscribers(nextEpoch, digest)
if nextEpoch == params.BeaconConfig().AltairForkEpoch {
s.registerRPCHandlersAltair()
}
if nextEpoch == params.BeaconConfig().DenebForkEpoch {
s.registerRPCHandlersDeneb()
}
// Exit early if there is no fork in the next epoch.
if !isForkNextEpoch {
return nil
}
// Specially handle peerDAS
if params.PeerDASEnabled() && currEpoch+1 == params.BeaconConfig().Eip7594ForkEpoch {
s.registerRPCHandlersPeerDAS()
// Compute the next epoch.
nextEpoch := currentEpoch + 1
// Get the fork digest for the next epoch.
digest, err := forks.ForkDigestFromEpoch(nextEpoch, genesisValidatorsRoot[:])
if err != nil {
return errors.Wrap(err, "could not retrieve fork digest")
}
// Exit early if the topics for the next epoch are already registered.
// It likely to be the case for all slots of the epoch that are not the first one.
if s.subHandler.digestExists(digest) {
return nil
}
// Register the subscribers (gossipsub) for the next epoch.
s.registerSubscribers(nextEpoch, digest)
// Get the handlers for the current and next fork.
currentHandlerByTopic, err := s.rpcHandlerByTopicFromEpoch(currentEpoch)
if err != nil {
return errors.Wrap(err, "RPC handler by topic")
}
nextHandlerByTopic, err := s.rpcHandlerByTopicFromEpoch(nextEpoch)
if err != nil {
return errors.Wrap(err, "RPC handler by topic")
}
// Compute newsly added topics.
newRPCHandlerByTopic := addedRPCHandlerByTopic(currentHandlerByTopic, nextHandlerByTopic)
// Register the new RPC handlers.
for topic, handler := range newRPCHandlerByTopic {
s.registerRPC(topic, handler)
}
return nil
}
// Checks if there was a fork in the previous epoch, and if there
// was then we deregister the topics from that particular fork.
func (s *Service) deregisterFromPastFork(currEpoch primitives.Epoch) error {
genRoot := s.cfg.clock.GenesisValidatorsRoot()
// This method takes care of the de-registration of
// old gossip pubsub handlers. Once we are at the epoch
// after the fork, we de-register from all the outdated topics.
currFork, err := forks.Fork(currEpoch)
// deregisterFromPastFork checks if there was a fork in the previous epoch,
// and if there was then we deregister the gossipsub topics from that particular fork,
// and the RPC handlers that are no longer relevant.
func (s *Service) deregisterFromPastFork(currentEpoch primitives.Epoch) error {
// Extract the genesis validators root.
genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot()
// Get the fork.
currentFork, err := forks.Fork(currentEpoch)
if err != nil {
return err
return errors.Wrap(err, "genesis validators root")
}
// If we are still in our genesis fork version then
// we simply exit early.
if currFork.Epoch == params.BeaconConfig().GenesisEpoch {
// If we are still in our genesis fork version then exit early.
if currentFork.Epoch == params.BeaconConfig().GenesisEpoch {
return nil
}
epochAfterFork := currFork.Epoch + 1
// If we are in the epoch after the fork, we start de-registering.
if epochAfterFork == currEpoch {
epochAfterFork := currentFork.Epoch + 1
// Start de-registring if the current epoch is the first epoch after the fork.
if epochAfterFork == currentEpoch {
// Look at the previous fork's digest.
epochBeforeFork := currFork.Epoch - 1
prevDigest, err := forks.ForkDigestFromEpoch(epochBeforeFork, genRoot[:])
epochBeforeFork := currentFork.Epoch - 1
previousDigest, err := forks.ForkDigestFromEpoch(epochBeforeFork, genesisValidatorsRoot[:])
if err != nil {
return errors.Wrap(err, "Failed to determine previous epoch fork digest")
return errors.Wrap(err, "fork digest from epoch")
}
// Exit early if there are no topics with that particular
// digest.
if !s.subHandler.digestExists(prevDigest) {
// Exit early if there are no topics with that particular digest.
if !s.subHandler.digestExists(previousDigest) {
return nil
}
prevFork, err := forks.Fork(epochBeforeFork)
// Compute the RPC handlers that are no longer needed.
currentHandlerByTopic, err := s.rpcHandlerByTopicFromEpoch(currentEpoch)
if err != nil {
return errors.Wrap(err, "failed to determine previous epoch fork data")
return errors.Wrap(err, "RPC handler by topic from epoch")
}
if prevFork.Epoch == params.BeaconConfig().GenesisEpoch {
s.unregisterPhase0Handlers()
nextHandlerByTopic, err := s.rpcHandlerByTopicFromEpoch(epochAfterFork)
if err != nil {
return errors.Wrap(err, "RPC handler by topic from epoch")
}
topicsToRemove := removedRPCTopics(currentHandlerByTopic, nextHandlerByTopic)
for topic := range topicsToRemove {
fullTopic := topic + s.cfg.p2p.Encoding().ProtocolSuffix()
s.cfg.p2p.Host().RemoveStreamHandler(protocol.ID(fullTopic))
}
// Run through all our current active topics and see
// if there are any subscriptions to be removed.
for _, t := range s.subHandler.allTopics() {
@@ -121,14 +156,11 @@ func (s *Service) deregisterFromPastFork(currEpoch primitives.Epoch) error {
log.WithError(err).Error("Could not retrieve digest")
continue
}
if retDigest == prevDigest {
if retDigest == previousDigest {
s.unSubscribeFromTopic(t)
}
}
}
// Handle PeerDAS as its a special case.
if params.PeerDASEnabled() && currEpoch > 0 && (currEpoch-1) == params.BeaconConfig().Eip7594ForkEpoch {
s.unregisterBlobHandlers()
}
return nil
}

View File

@@ -14,6 +14,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/network/forks"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
"github.com/prysmaticlabs/prysm/v5/testing/assert"
)
@@ -230,7 +231,8 @@ func TestService_CheckForPreviousEpochFork(t *testing.T) {
chainStarted: abool.New(),
subHandler: newSubTopicHandler(),
}
r.registerRPCHandlers()
err := r.registerRPCHandlers()
assert.NoError(t, err)
return r
},
currEpoch: 10,
@@ -278,10 +280,21 @@ func TestService_CheckForPreviousEpochFork(t *testing.T) {
prevGenesis := chainService.Genesis
// To allow registration of v1 handlers
chainService.Genesis = time.Now().Add(-1 * oneEpoch())
r.registerRPCHandlers()
err := r.registerRPCHandlers()
assert.NoError(t, err)
chainService.Genesis = prevGenesis
r.registerRPCHandlersAltair()
previous, err := r.rpcHandlerByTopicFromFork(version.Phase0)
assert.NoError(t, err)
next, err := r.rpcHandlerByTopicFromFork(version.Altair)
assert.NoError(t, err)
handlerByTopic := addedRPCHandlerByTopic(previous, next)
for topic, handler := range handlerByTopic {
r.registerRPC(topic, handler)
}
genRoot := r.cfg.clock.GenesisValidatorsRoot()
digest, err := forks.ForkDigestFromEpoch(0, genRoot[:])

View File

@@ -3,6 +3,7 @@ package initialsync
import (
"context"
"fmt"
"math"
"slices"
"sort"
"strings"
@@ -12,7 +13,6 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
coreTime "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
@@ -337,19 +337,33 @@ func (f *blocksFetcher) handleRequest(ctx context.Context, start primitives.Slot
return response
}
if coreTime.PeerDASIsActive(start) {
response.err = f.fetchDataColumnsFromPeers(ctx, response.bwb, nil, delay, batchSize)
// Compute the first electra slot.
firstElectraSlot, err := slots.EpochStart(params.BeaconConfig().ElectraForkEpoch)
if err != nil {
firstElectraSlot = math.MaxUint64
}
// Find the first block with a slot greater than or equal to the first electra slot.
// (Blocks are sorted by slot)
firstElectraIndex := sort.Search(len(response.bwb), func(i int) bool {
return response.bwb[i].Block.Block().Slot() >= firstElectraSlot
})
preElectraBwbs := response.bwb[:firstElectraIndex]
postElectraBwbs := response.bwb[firstElectraIndex:]
// Fetch blobs.
if err := f.fetchBlobsFromPeer(ctx, preElectraBwbs, response.pid, peers); err != nil {
response.err = err
return response
}
if err := f.fetchBlobsFromPeer(ctx, response.bwb, response.pid, peers); err != nil {
response.err = err
}
// Fetch data columns.
response.err = f.fetchDataColumnsFromPeers(ctx, postElectraBwbs, nil, delay, batchSize)
return response
}
// fetchBlocksFromPeer fetches blocks from a single randomly selected peer.
// fetchBlocksFromPeer fetches blocks from a single randomly selected peer, sorted by slot.
func (f *blocksFetcher) fetchBlocksFromPeer(
ctx context.Context,
start primitives.Slot, count uint64,
@@ -369,20 +383,19 @@ func (f *blocksFetcher) fetchBlocksFromPeer(
// peers are dialed first.
peers = append(bestPeers, peers...)
peers = dedupPeers(peers)
for i := 0; i < len(peers); i++ {
p := peers[i]
blocks, err := f.requestBlocks(ctx, req, p)
for _, peer := range peers {
blocks, err := f.requestBlocks(ctx, req, peer)
if err != nil {
log.WithField("peer", p).WithError(err).Debug("Could not request blocks by range from peer")
log.WithField("peer", peer).WithError(err).Debug("Could not request blocks by range from peer")
continue
}
f.p2p.Peers().Scorers().BlockProviderScorer().Touch(p)
f.p2p.Peers().Scorers().BlockProviderScorer().Touch(peer)
robs, err := sortedBlockWithVerifiedBlobSlice(blocks)
if err != nil {
log.WithField("peer", p).WithError(err).Debug("invalid BeaconBlocksByRange response")
log.WithField("peer", peer).WithError(err).Debug("invalid BeaconBlocksByRange response")
continue
}
return robs, p, err
return robs, peer, err
}
return nil, "", errNoPeersAvailable
}
@@ -565,6 +578,10 @@ func missingCommitError(root [32]byte, slot primitives.Slot, missing [][]byte) e
// fetchBlobsFromPeer fetches blocks from a single randomly selected peer.
// This function mutates the input `bwb` argument.
func (f *blocksFetcher) fetchBlobsFromPeer(ctx context.Context, bwb []blocks.BlockWithROBlobs, pid peer.ID, peers []peer.ID) error {
if len(bwb) == 0 {
return nil
}
ctx, span := trace.StartSpan(ctx, "initialsync.fetchBlobsFromPeer")
defer span.End()
if slots.ToEpoch(f.clock.CurrentSlot()) < params.BeaconConfig().DenebForkEpoch {
@@ -936,7 +953,7 @@ type bwbsMissingColumns struct {
// fetchDataColumnsFromPeers looks at the blocks in `bwb` and retrieves all
// data columns for with the block has blob commitments, and for which our store is missing data columns
// we should custody.
// This function mutates `bwb` by adding the retrieved data columns.
// This function mutates `bwbs` by adding the retrieved data columns.
// Prerequisite: bwb is sorted by slot.
func (f *blocksFetcher) fetchDataColumnsFromPeers(
ctx context.Context,
@@ -951,6 +968,10 @@ func (f *blocksFetcher) fetchDataColumnsFromPeers(
maxAllowedStall = 5 // Number of trials before giving up.
)
if len(bwbs) == 0 {
return nil
}
// Generate random identifier.
identifier := f.rand.Intn(maxIdentifier)

View File

@@ -1713,7 +1713,7 @@ func TestFetchDataColumnsFromPeers(t *testing.T) {
// Fork epochs.
denebForkEpoch primitives.Epoch
eip7954ForkEpoch primitives.Epoch
electraForkEpoch primitives.Epoch
// Current slot.
currentSlot uint64
@@ -1753,29 +1753,29 @@ func TestFetchDataColumnsFromPeers(t *testing.T) {
isError: false,
},
{
name: "All blocks are before EIP-7954 fork epoch",
name: "All blocks are before Electra fork epoch",
denebForkEpoch: 0,
eip7954ForkEpoch: 1,
electraForkEpoch: 1,
currentSlot: 40,
blocksParams: []blockParams{
{slot: 25, hasBlobs: false}, // Before EIP-7954 fork epoch
{slot: 26, hasBlobs: false}, // Before EIP-7954 fork epoch
{slot: 27, hasBlobs: false}, // Before EIP-7954 fork epoch
{slot: 28, hasBlobs: false}, // Before EIP-7954 fork epoch
{slot: 25, hasBlobs: false}, // Before Electra fork epoch
{slot: 26, hasBlobs: false}, // Before Electra fork epoch
{slot: 27, hasBlobs: false}, // Before Electra fork epoch
{slot: 28, hasBlobs: false}, // Before Electra fork epoch
},
batchSize: 32,
addedRODataColumns: [][]int{nil, nil, nil, nil},
isError: false,
},
{
name: "All blocks with commitments before are EIP-7954 fork epoch",
name: "All blocks with commitments before are Electra fork epoch",
denebForkEpoch: 0,
eip7954ForkEpoch: 1,
electraForkEpoch: 1,
currentSlot: 40,
blocksParams: []blockParams{
{slot: 25, hasBlobs: false}, // Before EIP-7954 fork epoch
{slot: 26, hasBlobs: true}, // Before EIP-7954 fork epoch
{slot: 27, hasBlobs: true}, // Before EIP-7954 fork epoch
{slot: 25, hasBlobs: false}, // Before Electra fork epoch
{slot: 26, hasBlobs: true}, // Before Electra fork epoch
{slot: 27, hasBlobs: true}, // Before Electra fork epoch
{slot: 32, hasBlobs: false},
{slot: 33, hasBlobs: false},
},
@@ -1785,12 +1785,12 @@ func TestFetchDataColumnsFromPeers(t *testing.T) {
{
name: "Some blocks with blobs but without any missing data columns",
denebForkEpoch: 0,
eip7954ForkEpoch: 1,
electraForkEpoch: 1,
currentSlot: 40,
blocksParams: []blockParams{
{slot: 25, hasBlobs: false}, // Before EIP-7954 fork epoch
{slot: 26, hasBlobs: true}, // Before EIP-7954 fork epoch
{slot: 27, hasBlobs: true}, // Before EIP-7954 fork epoch
{slot: 25, hasBlobs: false}, // Before Electra fork epoch
{slot: 26, hasBlobs: true}, // Before Electra fork epoch
{slot: 27, hasBlobs: true}, // Before Electra fork epoch
{slot: 32, hasBlobs: false},
{slot: 33, hasBlobs: true},
},
@@ -1808,11 +1808,11 @@ func TestFetchDataColumnsFromPeers(t *testing.T) {
{
name: "Some blocks with blobs with missing data columns - one round needed",
denebForkEpoch: 0,
eip7954ForkEpoch: 1,
electraForkEpoch: 1,
currentSlot: 40,
blocksParams: []blockParams{
{slot: 25, hasBlobs: false}, // Before EIP-7954 fork epoch
{slot: 27, hasBlobs: true}, // Before EIP-7954 fork epoch
{slot: 25, hasBlobs: false}, // Before Electra fork epoch
{slot: 27, hasBlobs: true}, // Before Electra fork epoch
{slot: 32, hasBlobs: false},
{slot: 33, hasBlobs: true},
{slot: 34, hasBlobs: true},
@@ -1916,7 +1916,7 @@ func TestFetchDataColumnsFromPeers(t *testing.T) {
{
name: "Some blocks with blobs with missing data columns - partial responses",
denebForkEpoch: 0,
eip7954ForkEpoch: 1,
electraForkEpoch: 1,
currentSlot: 40,
blocksParams: []blockParams{
{slot: 33, hasBlobs: true},
@@ -1970,7 +1970,7 @@ func TestFetchDataColumnsFromPeers(t *testing.T) {
{
name: "Some blocks with blobs with missing data columns - first response is invalid",
denebForkEpoch: 0,
eip7954ForkEpoch: 1,
electraForkEpoch: 1,
currentSlot: 40,
blocksParams: []blockParams{
{slot: 38, hasBlobs: true},
@@ -2004,7 +2004,7 @@ func TestFetchDataColumnsFromPeers(t *testing.T) {
{
name: "Some blocks with blobs with missing data columns - first response is empty",
denebForkEpoch: 0,
eip7954ForkEpoch: 1,
electraForkEpoch: 1,
currentSlot: 40,
blocksParams: []blockParams{{slot: 38, hasBlobs: true}},
storedDataColumns: []map[int]bool{{38: true, 102: true}},
@@ -2033,7 +2033,7 @@ func TestFetchDataColumnsFromPeers(t *testing.T) {
{
name: "Some blocks with blobs with missing data columns - no response at all",
denebForkEpoch: 0,
eip7954ForkEpoch: 1,
electraForkEpoch: 1,
currentSlot: 40,
blocksParams: []blockParams{{slot: 38, hasBlobs: true}},
storedDataColumns: []map[int]bool{{38: true, 102: true}},
@@ -2056,7 +2056,7 @@ func TestFetchDataColumnsFromPeers(t *testing.T) {
{
name: "Some blocks with blobs with missing data columns - request has to be split",
denebForkEpoch: 0,
eip7954ForkEpoch: 1,
electraForkEpoch: 1,
currentSlot: 40,
blocksParams: []blockParams{
{slot: 32, hasBlobs: true}, {slot: 33, hasBlobs: true}, {slot: 34, hasBlobs: true}, {slot: 35, hasBlobs: true}, // 4
@@ -2176,8 +2176,8 @@ func TestFetchDataColumnsFromPeers(t *testing.T) {
// Set the Deneb fork epoch.
params.BeaconConfig().DenebForkEpoch = tc.denebForkEpoch
// Set the EIP-7594 fork epoch.
params.BeaconConfig().Eip7594ForkEpoch = tc.eip7954ForkEpoch
// Set the Electra fork epoch.
params.BeaconConfig().ElectraForkEpoch = tc.electraForkEpoch
// Save the blocks in the store.
storage := make(map[[fieldparams.RootLength]byte][]int)

View File

@@ -4,17 +4,19 @@ import (
"context"
"encoding/hex"
"fmt"
"math"
"sort"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/paulbellamy/ratecounter"
"github.com/pkg/errors"
coreTime "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
@@ -172,59 +174,83 @@ func (s *Service) processFetchedDataRegSync(
log.WithError(err).Debug("batch did not contain a valid sequence of unprocessed blocks")
return
}
if len(bwb) == 0 {
return
}
if coreTime.PeerDASIsActive(startSlot) {
avs := das.NewLazilyPersistentStoreColumn(s.cfg.BlobStorage)
batchFields := logrus.Fields{
"firstSlot": data.bwb[0].Block.Block().Slot(),
"firstUnprocessed": bwb[0].Block.Block().Slot(),
// Compute the first electra slot.
firstElectraSlot, err := slots.EpochStart(params.BeaconConfig().ElectraForkEpoch)
if err != nil {
firstElectraSlot = math.MaxUint64
}
// Find the first block with a slot greater than or equal to the first electra slot.
// (Blocks are sorted by slot)
firstElectraIndex := sort.Search(len(bwb), func(i int) bool {
return bwb[i].Block.Block().Slot() >= firstElectraSlot
})
preElectraBwbs := bwb[:firstElectraIndex]
postElectraBwbs := bwb[firstElectraIndex:]
blobBatchVerifier := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.InitsyncBlobSidecarRequirements)
lazilyPersistentStore := das.NewLazilyPersistentStore(s.cfg.BlobStorage, blobBatchVerifier)
log := log.WithField("firstSlot", data.bwb[0].Block.Block().Slot())
logPre := log
if len(preElectraBwbs) > 0 {
logPre = logPre.WithField("firstUnprocessed", preElectraBwbs[0].Block.Block().Slot())
}
for _, b := range preElectraBwbs {
log := logPre.WithFields(syncFields(b.Block))
if err := lazilyPersistentStore.Persist(s.clock.CurrentSlot(), b.Blobs...); err != nil {
log.WithError(err).Warning("Batch failure due to BlobSidecar issues")
return
}
for _, b := range bwb {
if err := avs.PersistColumns(s.clock.CurrentSlot(), b.Columns...); err != nil {
log.WithError(err).WithFields(batchFields).WithFields(syncFields(b.Block)).Warn("Batch failure due to DataColumnSidecar issues")
if err := s.processBlock(ctx, genesis, b, s.cfg.Chain.ReceiveBlock, lazilyPersistentStore); err != nil {
switch {
case errors.Is(err, errParentDoesNotExist):
log.
WithField("missingParent", fmt.Sprintf("%#x", b.Block.Block().ParentRoot())).
Debug("Could not process batch blocks due to missing parent")
return
default:
log.WithError(err).Warning("Block processing failure")
return
}
if err := s.processBlock(ctx, genesis, b, s.cfg.Chain.ReceiveBlock, avs); err != nil {
switch {
case errors.Is(err, errParentDoesNotExist):
log.WithFields(batchFields).WithField("missingParent", fmt.Sprintf("%#x", b.Block.Block().ParentRoot())).
WithFields(syncFields(b.Block)).Debug("Could not process batch blocks due to missing parent")
return
default:
log.WithError(err).WithFields(batchFields).WithFields(syncFields(b.Block)).Warn("Block processing failure")
return
}
}
}
} else {
bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.InitsyncBlobSidecarRequirements)
avs := das.NewLazilyPersistentStore(s.cfg.BlobStorage, bv)
}
batchFields := logrus.Fields{
"firstSlot": data.bwb[0].Block.Block().Slot(),
"firstUnprocessed": bwb[0].Block.Block().Slot(),
logPost := log
if len(postElectraBwbs) > 0 {
logPost = log.WithField("firstUnprocessed", postElectraBwbs[0].Block.Block().Slot())
}
lazilyPersistentStoreColumn := das.NewLazilyPersistentStoreColumn(s.cfg.BlobStorage)
for _, b := range postElectraBwbs {
log := logPost.WithFields(syncFields(b.Block))
if err := lazilyPersistentStoreColumn.PersistColumns(s.clock.CurrentSlot(), b.Columns...); err != nil {
log.WithError(err).Warning("Batch failure due to DataColumnSidecar issues")
return
}
for _, b := range bwb {
if err := avs.Persist(s.clock.CurrentSlot(), b.Blobs...); err != nil {
log.WithError(err).WithFields(batchFields).WithFields(syncFields(b.Block)).Warn("Batch failure due to BlobSidecar issues")
if err := s.processBlock(ctx, genesis, b, s.cfg.Chain.ReceiveBlock, lazilyPersistentStoreColumn); err != nil {
switch {
case errors.Is(err, errParentDoesNotExist):
log.
WithField("missingParent", fmt.Sprintf("%#x", b.Block.Block().ParentRoot())).
Debug("Could not process batch blocks due to missing parent")
return
default:
log.WithError(err).Warning("Block processing failure")
return
}
if err := s.processBlock(ctx, genesis, b, s.cfg.Chain.ReceiveBlock, avs); err != nil {
switch {
case errors.Is(err, errParentDoesNotExist):
log.WithFields(batchFields).WithField("missingParent", fmt.Sprintf("%#x", b.Block.Block().ParentRoot())).
WithFields(syncFields(b.Block)).Debug("Could not process batch blocks due to missing parent")
return
default:
log.WithError(err).WithFields(batchFields).WithFields(syncFields(b.Block)).Warn("Block processing failure")
return
}
}
}
}
@@ -349,54 +375,83 @@ func validUnprocessed(ctx context.Context, bwb []blocks.BlockWithROBlobs, headSl
return bwb[nonProcessedIdx:], nil
}
func (s *Service) processBatchedBlocks(ctx context.Context, genesis time.Time,
bwb []blocks.BlockWithROBlobs, bFunc batchBlockReceiverFn) error {
if len(bwb) == 0 {
func (s *Service) processBatchedBlocks(
ctx context.Context,
genesis time.Time,
bwbs []blocks.BlockWithROBlobs,
bFunc batchBlockReceiverFn,
) error {
if len(bwbs) == 0 {
return errors.New("0 blocks provided into method")
}
headSlot := s.cfg.Chain.HeadSlot()
var err error
bwb, err = validUnprocessed(ctx, bwb, headSlot, s.isProcessedBlock)
bwbs, err := validUnprocessed(ctx, bwbs, headSlot, s.isProcessedBlock)
if err != nil {
return err
return errors.Wrap(err, "validating unprocessed blocks")
}
if len(bwb) == 0 {
if len(bwbs) == 0 {
return nil
}
first := bwb[0].Block
first := bwbs[0].Block
if !s.cfg.Chain.HasBlock(ctx, first.Block().ParentRoot()) {
return fmt.Errorf("%w: %#x (in processBatchedBlocks, slot=%d)",
errParentDoesNotExist, first.Block().ParentRoot(), first.Block().Slot())
}
var aStore das.AvailabilityStore
if coreTime.PeerDASIsActive(first.Block().Slot()) {
avs := das.NewLazilyPersistentStoreColumn(s.cfg.BlobStorage)
s.logBatchSyncStatus(genesis, first, len(bwb))
for _, bb := range bwb {
if len(bb.Columns) == 0 {
continue
}
if err := avs.PersistColumns(s.clock.CurrentSlot(), bb.Columns...); err != nil {
return err
}
}
aStore = avs
} else {
bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.InitsyncBlobSidecarRequirements)
avs := das.NewLazilyPersistentStore(s.cfg.BlobStorage, bv)
s.logBatchSyncStatus(genesis, first, len(bwb))
for _, bb := range bwb {
if len(bb.Blobs) == 0 {
continue
}
if err := avs.Persist(s.clock.CurrentSlot(), bb.Blobs...); err != nil {
return err
}
}
aStore = avs
// Compute the first electra slot.
firstElectraSlot, err := slots.EpochStart(params.BeaconConfig().ElectraForkEpoch)
if err != nil {
firstElectraSlot = math.MaxUint64
}
return bFunc(ctx, blocks.BlockWithROBlobsSlice(bwb).ROBlocks(), aStore)
// Find the first block with a slot greater than or equal to the first electra slot.
// (Blocks are sorted by slot)
firstElectraIndex := sort.Search(len(bwbs), func(i int) bool {
return bwbs[i].Block.Block().Slot() >= firstElectraSlot
})
preElectraBwbs := bwbs[:firstElectraIndex]
postElectraBwbs := bwbs[firstElectraIndex:]
batchVerifier := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.InitsyncBlobSidecarRequirements)
persistentStore := das.NewLazilyPersistentStore(s.cfg.BlobStorage, batchVerifier)
s.logBatchSyncStatus(genesis, first, len(preElectraBwbs))
for _, bwb := range preElectraBwbs {
if len(bwb.Blobs) == 0 {
continue
}
if err := persistentStore.Persist(s.clock.CurrentSlot(), bwb.Blobs...); err != nil {
return err
}
}
if err := bFunc(ctx, blocks.BlockWithROBlobsSlice(preElectraBwbs).ROBlocks(), persistentStore); err != nil {
return errors.Wrap(err, "process pre-electra blocks")
}
persistentStoreColumn := das.NewLazilyPersistentStoreColumn(s.cfg.BlobStorage)
s.logBatchSyncStatus(genesis, first, len(postElectraBwbs))
for _, bwb := range postElectraBwbs {
if len(bwb.Columns) == 0 {
continue
}
if err := persistentStoreColumn.PersistColumns(s.clock.CurrentSlot(), bwb.Columns...); err != nil {
return err
}
}
if err := bFunc(ctx, blocks.BlockWithROBlobsSlice(postElectraBwbs).ROBlocks(), persistentStoreColumn); err != nil {
return errors.Wrap(err, "process post-electra blocks")
}
return nil
}
// updatePeerScorerStats adjusts monitored metrics for a peer.
@@ -424,11 +479,13 @@ func (s *Service) isProcessedBlock(ctx context.Context, blk blocks.ROBlock) bool
// If block is before our finalized checkpoint
// we do not process it.
if blk.Block().Slot() <= finalizedSlot {
log.WithFields(logrus.Fields{"blockSlot": blk.Block().Slot(), "finalizedSlot": finalizedSlot}).Debug("AAAAAAAAAAAA")
return true
}
// If block exists in our db and is before or equal to our current head
// we ignore it.
if s.cfg.Chain.HeadSlot() >= blk.Block().Slot() && s.cfg.Chain.HasBlock(ctx, blk.Root()) {
log.WithFields(logrus.Fields{"blockSlot": blk.Block().Slot(), "headSlot": s.cfg.Chain.HeadSlot(), "hasBlock": s.cfg.Chain.HasBlock(ctx, blk.Root()), "blockRoot": blk.Root()}).Debug("BBBBBBBBBB")
return true
}
return false

View File

@@ -9,143 +9,165 @@ import (
libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/pkg/errors"
ssz "github.com/prysmaticlabs/fastssz"
coreTime "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
p2ptypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
"github.com/prysmaticlabs/prysm/v5/time/slots"
)
// Time to first byte timeout. The maximum time to wait for first byte of
// request response (time-to-first-byte). The client is expected to give up if
// they don't receive the first byte within 5 seconds.
var ttfbTimeout = params.BeaconConfig().TtfbTimeoutDuration()
var (
// Time to first byte timeout. The maximum time to wait for first byte of
// request response (time-to-first-byte). The client is expected to give up if
// they don't receive the first byte within 5 seconds.
ttfbTimeout = params.BeaconConfig().TtfbTimeoutDuration()
// respTimeout is the maximum time for complete response transfer.
var respTimeout = params.BeaconConfig().RespTimeoutDuration()
// respTimeout is the maximum time for complete response transfer.
respTimeout = params.BeaconConfig().RespTimeoutDuration()
)
// rpcHandler is responsible for handling and responding to any incoming message.
// This method may return an error to internal monitoring, but the error will
// not be relayed to the peer.
type rpcHandler func(context.Context, interface{}, libp2pcore.Stream) error
// registerRPCHandlers for p2p RPC.
func (s *Service) registerRPCHandlers() {
currEpoch := slots.ToEpoch(s.cfg.clock.CurrentSlot())
// Register V2 handlers if we are past altair fork epoch.
if currEpoch >= params.BeaconConfig().AltairForkEpoch {
s.registerRPC(
p2p.RPCStatusTopicV1,
s.statusRPCHandler,
)
s.registerRPC(
p2p.RPCGoodByeTopicV1,
s.goodbyeRPCHandler,
)
s.registerRPC(
p2p.RPCPingTopicV1,
s.pingHandler,
)
s.registerRPCHandlersAltair()
// rpcHandlerByTopicFromFork returns the RPC handlers for a given fork index.
func (s *Service) rpcHandlerByTopicFromFork(forkIndex int) (map[string]rpcHandler, error) {
switch forkIndex {
// PhaseO: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#messages
case version.Phase0:
return map[string]rpcHandler{
p2p.RPCStatusTopicV1: s.statusRPCHandler,
p2p.RPCGoodByeTopicV1: s.goodbyeRPCHandler,
p2p.RPCBlocksByRangeTopicV1: s.beaconBlocksByRangeRPCHandler,
p2p.RPCBlocksByRootTopicV1: s.beaconBlocksRootRPCHandler,
p2p.RPCPingTopicV1: s.pingHandler,
p2p.RPCMetaDataTopicV1: s.metaDataHandler,
}, nil
if coreTime.PeerDASIsActive(slots.UnsafeEpochStart(currEpoch)) {
s.registerRPCHandlersPeerDAS()
} else if currEpoch >= params.BeaconConfig().DenebForkEpoch {
s.registerRPCHandlersDeneb()
}
return
// Altair: https://github.com/ethereum/consensus-specs/tree/dev/specs/altair#messages
// Bellatrix: https://github.com/ethereum/consensus-specs/tree/dev/specs/bellatrix#messages
// Capella: https://github.com/ethereum/consensus-specs/tree/dev/specs/capella#messages
case version.Altair, version.Bellatrix, version.Capella:
return map[string]rpcHandler{
p2p.RPCStatusTopicV1: s.statusRPCHandler,
p2p.RPCGoodByeTopicV1: s.goodbyeRPCHandler,
p2p.RPCBlocksByRangeTopicV2: s.beaconBlocksByRangeRPCHandler, // Modified in Altair
p2p.RPCBlocksByRootTopicV2: s.beaconBlocksRootRPCHandler, // Modified in Altair
p2p.RPCPingTopicV1: s.pingHandler,
p2p.RPCMetaDataTopicV2: s.metaDataHandler, // Modified in Altair
}, nil
// Deneb: https://github.com/ethereum/consensus-specs/blob/dev/specs/deneb/p2p-interface.md#messages
case version.Deneb:
return map[string]rpcHandler{
p2p.RPCStatusTopicV1: s.statusRPCHandler,
p2p.RPCGoodByeTopicV1: s.goodbyeRPCHandler,
p2p.RPCBlocksByRangeTopicV2: s.beaconBlocksByRangeRPCHandler,
p2p.RPCBlocksByRootTopicV2: s.beaconBlocksRootRPCHandler,
p2p.RPCPingTopicV1: s.pingHandler,
p2p.RPCMetaDataTopicV2: s.metaDataHandler,
p2p.RPCBlobSidecarsByRootTopicV1: s.blobSidecarByRootRPCHandler, // Added in Deneb
p2p.RPCBlobSidecarsByRangeTopicV1: s.blobSidecarsByRangeRPCHandler, // Added in Deneb
}, nil
// Electra: https://github.com/ethereum/consensus-specs/blob/dev/specs/_features/eip7594/p2p-interface.md#messages
case version.Electra:
return map[string]rpcHandler{
p2p.RPCStatusTopicV1: s.statusRPCHandler,
p2p.RPCGoodByeTopicV1: s.goodbyeRPCHandler,
p2p.RPCBlocksByRangeTopicV2: s.beaconBlocksByRangeRPCHandler,
p2p.RPCBlocksByRootTopicV2: s.beaconBlocksRootRPCHandler,
p2p.RPCPingTopicV1: s.pingHandler,
p2p.RPCMetaDataTopicV3: s.metaDataHandler, // Modified in Electra
p2p.RPCBlobSidecarsByRootTopicV1: s.blobSidecarByRootRPCHandler,
p2p.RPCBlobSidecarsByRangeTopicV1: s.blobSidecarsByRangeRPCHandler,
p2p.RPCDataColumnSidecarsByRootTopicV1: s.dataColumnSidecarByRootRPCHandler, // Added in Electra
p2p.RPCDataColumnSidecarsByRangeTopicV1: s.dataColumnSidecarsByRangeRPCHandler, // Added in Electra
}, nil
default:
return nil, errors.Errorf("RPC handler not found for fork index %d", forkIndex)
}
s.registerRPC(
p2p.RPCStatusTopicV1,
s.statusRPCHandler,
)
s.registerRPC(
p2p.RPCGoodByeTopicV1,
s.goodbyeRPCHandler,
)
s.registerRPC(
p2p.RPCBlocksByRangeTopicV1,
s.beaconBlocksByRangeRPCHandler,
)
s.registerRPC(
p2p.RPCBlocksByRootTopicV1,
s.beaconBlocksRootRPCHandler,
)
s.registerRPC(
p2p.RPCPingTopicV1,
s.pingHandler,
)
s.registerRPC(
p2p.RPCMetaDataTopicV1,
s.metaDataHandler,
)
}
// registerRPCHandlers for altair.
func (s *Service) registerRPCHandlersAltair() {
s.registerRPC(
p2p.RPCBlocksByRangeTopicV2,
s.beaconBlocksByRangeRPCHandler,
)
s.registerRPC(
p2p.RPCBlocksByRootTopicV2,
s.beaconBlocksRootRPCHandler,
)
s.registerRPC(
p2p.RPCMetaDataTopicV2,
s.metaDataHandler,
)
// rpcHandlerByTopic returns the RPC handlers for a given epoch.
func (s *Service) rpcHandlerByTopicFromEpoch(epoch primitives.Epoch) (map[string]rpcHandler, error) {
// Get the beacon config.
beaconConfig := params.BeaconConfig()
if epoch >= beaconConfig.ElectraForkEpoch {
return s.rpcHandlerByTopicFromFork(version.Electra)
}
if epoch >= beaconConfig.DenebForkEpoch {
return s.rpcHandlerByTopicFromFork(version.Deneb)
}
if epoch >= beaconConfig.CapellaForkEpoch {
return s.rpcHandlerByTopicFromFork(version.Capella)
}
if epoch >= beaconConfig.BellatrixForkEpoch {
return s.rpcHandlerByTopicFromFork(version.Bellatrix)
}
if epoch >= beaconConfig.AltairForkEpoch {
return s.rpcHandlerByTopicFromFork(version.Altair)
}
return s.rpcHandlerByTopicFromFork(version.Phase0)
}
func (s *Service) registerRPCHandlersDeneb() {
s.registerRPC(
p2p.RPCBlobSidecarsByRangeTopicV1,
s.blobSidecarsByRangeRPCHandler,
)
s.registerRPC(
p2p.RPCBlobSidecarsByRootTopicV1,
s.blobSidecarByRootRPCHandler,
)
// addedRPCHandlerByTopic returns the RPC handlers that are added in the new map that are not present in the old map.
func addedRPCHandlerByTopic(previous, next map[string]rpcHandler) map[string]rpcHandler {
added := make(map[string]rpcHandler)
for topic, handler := range next {
if _, ok := previous[topic]; !ok {
added[topic] = handler
}
}
return added
}
func (s *Service) registerRPCHandlersPeerDAS() {
s.registerRPC(
p2p.RPCDataColumnSidecarsByRootTopicV1,
s.dataColumnSidecarByRootRPCHandler,
)
s.registerRPC(
p2p.RPCDataColumnSidecarsByRangeTopicV1,
s.dataColumnSidecarsByRangeRPCHandler,
)
s.registerRPC(
p2p.RPCMetaDataTopicV3,
s.metaDataHandler,
)
// removedTopics returns the topics that are removed in the new map that are not present in the old map.
func removedRPCTopics(previous, next map[string]rpcHandler) map[string]bool {
removed := make(map[string]bool)
for topic := range previous {
if _, ok := next[topic]; !ok {
removed[topic] = true
}
}
return removed
}
// Remove all v1 Stream handlers that are no longer supported
// from altair onwards.
func (s *Service) unregisterPhase0Handlers() {
fullBlockRangeTopic := p2p.RPCBlocksByRangeTopicV1 + s.cfg.p2p.Encoding().ProtocolSuffix()
fullBlockRootTopic := p2p.RPCBlocksByRootTopicV1 + s.cfg.p2p.Encoding().ProtocolSuffix()
fullMetadataTopic := p2p.RPCMetaDataTopicV1 + s.cfg.p2p.Encoding().ProtocolSuffix()
// registerRPCHandlers for p2p RPC.
func (s *Service) registerRPCHandlers() error {
// Get the current epoch.
currentSlot := s.cfg.clock.CurrentSlot()
currentEpoch := slots.ToEpoch(currentSlot)
s.cfg.p2p.Host().RemoveStreamHandler(protocol.ID(fullBlockRangeTopic))
s.cfg.p2p.Host().RemoveStreamHandler(protocol.ID(fullBlockRootTopic))
s.cfg.p2p.Host().RemoveStreamHandler(protocol.ID(fullMetadataTopic))
}
// Get the RPC handlers for the current epoch.
handlerByTopic, err := s.rpcHandlerByTopicFromEpoch(currentEpoch)
if err != nil {
return errors.Wrap(err, "rpc handler by topic from epoch")
}
func (s *Service) unregisterBlobHandlers() {
fullBlobRangeTopic := p2p.RPCBlobSidecarsByRangeTopicV1 + s.cfg.p2p.Encoding().ProtocolSuffix()
fullBlobRootTopic := p2p.RPCBlobSidecarsByRootTopicV1 + s.cfg.p2p.Encoding().ProtocolSuffix()
// Register the RPC handlers for the current epoch.
for topic, handler := range handlerByTopic {
s.registerRPC(topic, handler)
}
s.cfg.p2p.Host().RemoveStreamHandler(protocol.ID(fullBlobRangeTopic))
s.cfg.p2p.Host().RemoveStreamHandler(protocol.ID(fullBlobRootTopic))
return nil
}
// registerRPC for a given topic with an expected protobuf message type.

View File

@@ -209,7 +209,7 @@ func DataColumnsRPCMinValidSlot(current primitives.Slot) (primitives.Slot, error
}
minReqEpochs := params.BeaconConfig().MinEpochsForDataColumnSidecarsRequest
currEpoch := slots.ToEpoch(current)
minStart := params.BeaconConfig().Eip7594ForkEpoch
minStart := params.BeaconConfig().ElectraForkEpoch
if currEpoch > minReqEpochs && currEpoch-minReqEpochs > minStart {
minStart = currEpoch - minReqEpochs
}

View File

@@ -70,7 +70,7 @@ func (s *Service) metaDataHandler(_ context.Context, _ interface{}, stream libp2
switch streamVersion {
case p2p.SchemaVersionV1:
switch metadataVersion {
case version.Altair, version.Deneb:
case version.Altair, version.Electra:
metadata = wrapper.WrappedMetadataV0(
&pb.MetaDataV0{
Attnets: metadata.AttnetsBitfield(),
@@ -87,7 +87,7 @@ func (s *Service) metaDataHandler(_ context.Context, _ interface{}, stream libp2
SeqNumber: metadata.SequenceNumber(),
Syncnets: bitfield.Bitvector4{byte(0x00)},
})
case version.Deneb:
case version.Electra:
metadata = wrapper.WrappedMetadataV1(
&pb.MetaDataV1{
Attnets: metadata.AttnetsBitfield(),
@@ -190,7 +190,7 @@ func (s *Service) sendMetaDataRequest(ctx context.Context, peerID peer.ID) (meta
topicVersion = p2p.SchemaVersionV1
case version.Altair:
topicVersion = p2p.SchemaVersionV2
case version.Deneb:
case version.Electra:
topicVersion = p2p.SchemaVersionV3
}

View File

@@ -101,8 +101,7 @@ func TestMetadataRPCHandler_SendMetadataRequest(t *testing.T) {
params.SetupTestConfigCleanup(t)
beaconChainConfig := params.BeaconConfig().Copy()
beaconChainConfig.AltairForkEpoch = 5
beaconChainConfig.DenebForkEpoch = 10
beaconChainConfig.Eip7594ForkEpoch = 10
beaconChainConfig.ElectraForkEpoch = 15
params.OverrideBeaconConfig(beaconChainConfig)
params.BeaconConfig().InitializeForkSchedule()
@@ -145,10 +144,10 @@ func TestMetadataRPCHandler_SendMetadataRequest(t *testing.T) {
}),
},
{
name: "Phase0-PeerDAS",
name: "Phase0-Electra",
topic: p2p.RPCMetaDataTopicV1,
epochsSinceGenesisPeer1: 0,
epochsSinceGenesisPeer2: 10,
epochsSinceGenesisPeer2: 15,
metadataPeer2: wrapper.WrappedMetadataV2(&pb.MetaDataV2{
SeqNumber: seqNumber,
Attnets: attnets,
@@ -192,10 +191,10 @@ func TestMetadataRPCHandler_SendMetadataRequest(t *testing.T) {
}),
},
{
name: "Altair-PeerDAS",
name: "Altair-Electra",
topic: p2p.RPCMetaDataTopicV2,
epochsSinceGenesisPeer1: 5,
epochsSinceGenesisPeer2: 10,
epochsSinceGenesisPeer2: 15,
metadataPeer2: wrapper.WrappedMetadataV2(&pb.MetaDataV2{
SeqNumber: seqNumber,
Attnets: attnets,
@@ -209,9 +208,9 @@ func TestMetadataRPCHandler_SendMetadataRequest(t *testing.T) {
}),
},
{
name: "PeerDAS-Phase0",
name: "Electra-Phase0",
topic: p2p.RPCMetaDataTopicV3,
epochsSinceGenesisPeer1: 10,
epochsSinceGenesisPeer1: 15,
epochsSinceGenesisPeer2: 0,
metadataPeer2: wrapper.WrappedMetadataV0(&pb.MetaDataV0{
SeqNumber: seqNumber,
@@ -225,9 +224,9 @@ func TestMetadataRPCHandler_SendMetadataRequest(t *testing.T) {
}),
},
{
name: "PeerDAS-Altail",
name: "Electra-Altair",
topic: p2p.RPCMetaDataTopicV3,
epochsSinceGenesisPeer1: 10,
epochsSinceGenesisPeer1: 15,
epochsSinceGenesisPeer2: 5,
metadataPeer2: wrapper.WrappedMetadataV1(&pb.MetaDataV1{
SeqNumber: seqNumber,
@@ -242,10 +241,10 @@ func TestMetadataRPCHandler_SendMetadataRequest(t *testing.T) {
}),
},
{
name: "PeerDAS-PeerDAS",
name: "Electra-Electra",
topic: p2p.RPCMetaDataTopicV3,
epochsSinceGenesisPeer1: 10,
epochsSinceGenesisPeer2: 10,
epochsSinceGenesisPeer1: 15,
epochsSinceGenesisPeer2: 15,
metadataPeer2: wrapper.WrappedMetadataV2(&pb.MetaDataV2{
SeqNumber: seqNumber,
Attnets: attnets,
@@ -262,53 +261,55 @@ func TestMetadataRPCHandler_SendMetadataRequest(t *testing.T) {
}
for _, tc := range testCases {
var wg sync.WaitGroup
t.Run(tc.name, func(t *testing.T) {
var wg sync.WaitGroup
ctx := context.Background()
ctx := context.Background()
// Setup and connect peers.
peer1, peer2 := p2ptest.NewTestP2P(t), p2ptest.NewTestP2P(t)
peer1.Connect(peer2)
// Setup and connect peers.
peer1, peer2 := p2ptest.NewTestP2P(t), p2ptest.NewTestP2P(t)
peer1.Connect(peer2)
// Ensure the peers are connected.
peersCount := len(peer1.BHost.Network().Peers())
assert.Equal(t, 1, peersCount, "Expected peers to be connected")
// Ensure the peers are connected.
peersCount := len(peer1.BHost.Network().Peers())
require.Equal(t, 1, peersCount, "Expected peers to be connected")
// Setup sync services.
genesisPeer1 := time.Now().Add(-time.Duration(tc.epochsSinceGenesisPeer1) * secondsPerEpoch)
genesisPeer2 := time.Now().Add(-time.Duration(tc.epochsSinceGenesisPeer2) * secondsPerEpoch)
// Setup sync services.
genesisPeer1 := time.Now().Add(-time.Duration(tc.epochsSinceGenesisPeer1) * secondsPerEpoch)
genesisPeer2 := time.Now().Add(-time.Duration(tc.epochsSinceGenesisPeer2) * secondsPerEpoch)
chainPeer1 := &mock.ChainService{Genesis: genesisPeer1, ValidatorsRoot: [32]byte{}}
chainPeer2 := &mock.ChainService{Genesis: genesisPeer2, ValidatorsRoot: [32]byte{}}
chainPeer1 := &mock.ChainService{Genesis: genesisPeer1, ValidatorsRoot: [32]byte{}}
chainPeer2 := &mock.ChainService{Genesis: genesisPeer2, ValidatorsRoot: [32]byte{}}
servicePeer1 := createService(peer1, chainPeer1)
servicePeer2 := createService(peer2, chainPeer2)
servicePeer1 := createService(peer1, chainPeer1)
servicePeer2 := createService(peer2, chainPeer2)
// Define the behavior of peer2 when receiving a METADATA request.
protocolSuffix := servicePeer2.cfg.p2p.Encoding().ProtocolSuffix()
protocolID := protocol.ID(tc.topic + protocolSuffix)
peer2.LocalMetadata = tc.metadataPeer2
// Define the behavior of peer2 when receiving a METADATA request.
protocolSuffix := servicePeer2.cfg.p2p.Encoding().ProtocolSuffix()
protocolID := protocol.ID(tc.topic + protocolSuffix)
peer2.LocalMetadata = tc.metadataPeer2
wg.Add(1)
peer2.BHost.SetStreamHandler(protocolID, func(stream network.Stream) {
defer wg.Done()
err := servicePeer2.metaDataHandler(ctx, new(interface{}), stream)
assert.NoError(t, err)
wg.Add(1)
peer2.BHost.SetStreamHandler(protocolID, func(stream network.Stream) {
defer wg.Done()
err := servicePeer2.metaDataHandler(ctx, new(interface{}), stream)
require.NoError(t, err)
})
// Send a METADATA request from peer1 to peer2.
actual, err := servicePeer1.sendMetaDataRequest(ctx, peer2.BHost.ID())
require.NoError(t, err)
// Wait until the METADATA request is received by peer2 or timeout.
timeOutReached := util.WaitTimeout(&wg, requestTimeout)
require.Equal(t, false, timeOutReached, "Did not receive METADATA request within timeout")
// Compare the received METADATA object with the expected METADATA object.
require.DeepSSZEqual(t, tc.expected.InnerObject(), actual.InnerObject(), "Metadata unequal")
// Ensure the peers are still connected.
peersCount = len(peer1.BHost.Network().Peers())
assert.Equal(t, 1, peersCount, "Expected peers to be connected")
})
// Send a METADATA request from peer1 to peer2.
actual, err := servicePeer1.sendMetaDataRequest(ctx, peer2.BHost.ID())
assert.NoError(t, err)
// Wait until the METADATA request is received by peer2 or timeout.
timeOutReached := util.WaitTimeout(&wg, requestTimeout)
require.Equal(t, false, timeOutReached, "Did not receive METADATA request within timeout")
// Compare the received METADATA object with the expected METADATA object.
require.DeepSSZEqual(t, tc.expected.InnerObject(), actual.InnerObject(), "Metadata unequal")
// Ensure the peers are still connected.
peersCount = len(peer1.BHost.Network().Peers())
assert.Equal(t, 1, peersCount, "Expected peers to be connected")
}
}

View File

@@ -438,15 +438,18 @@ func readChunkedDataColumnSideCar(
}
// Check if the fork digest is recognized.
v, ok := ctxMap[bytesutil.ToBytes4(ctxBytes)]
msgVersion, ok := ctxMap[bytesutil.ToBytes4(ctxBytes)]
if !ok {
return nil, errors.Errorf("unrecognized fork digest %#x", ctxBytes)
}
// Check if we are on debeb.
// Only deneb is supported at this time, because we lack a fork-spanning interface/union type for blobs.
if v != version.Deneb {
return nil, errors.Errorf("unexpected context bytes for deneb DataColumnSidecar, ctx=%#x, v=%v", ctxBytes, v)
if msgVersion < version.Electra {
return nil, errors.Errorf(
"unexpected context bytes for DataColumnSidecar, ctx=%#x, msgVersion=%v, minimalSupportedVersion=%v",
ctxBytes, version.String(msgVersion), version.String(version.Electra),
)
}
// Decode the data column sidecar from the stream.

View File

@@ -327,14 +327,21 @@ func (s *Service) waitForChainStart() {
ctxMap, err := ContextByteVersionsForValRoot(clock.GenesisValidatorsRoot())
if err != nil {
log.WithError(err).WithField("genesisValidatorRoot", clock.GenesisValidatorsRoot()).
log.
WithError(err).
WithField("genesisValidatorRoot", clock.GenesisValidatorsRoot()).
Error("sync service failed to initialize context version map")
return
}
s.ctxMap = ctxMap
// Register respective rpc handlers at state initialized event.
s.registerRPCHandlers()
err = s.registerRPCHandlers()
if err != nil {
log.WithError(err).Error("Could not register rpc handlers")
return
}
// Wait for chainstart in separate routine.
if startTime.After(prysmTime.Now()) {
time.Sleep(prysmTime.Until(startTime))

View File

@@ -16,7 +16,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/altair"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
coreTime "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
@@ -119,7 +118,7 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
s.attesterSubnetIndices,
)
// Altair Fork Version
if epoch >= params.BeaconConfig().AltairForkEpoch {
if params.BeaconConfig().AltairForkEpoch <= epoch {
s.subscribe(
p2p.SyncContributionAndProofSubnetTopicFormat,
s.validateSyncContributionAndProof,
@@ -137,7 +136,7 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
}
// New Gossip Topic in Capella
if epoch >= params.BeaconConfig().CapellaForkEpoch {
if params.BeaconConfig().CapellaForkEpoch <= epoch {
s.subscribe(
p2p.BlsToExecutionChangeSubnetTopicFormat,
s.validateBlsToExecutionChange,
@@ -146,22 +145,8 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
)
}
// New Gossip Topic in Deneb
if epoch >= params.BeaconConfig().DenebForkEpoch {
if coreTime.PeerDASIsActive(slots.UnsafeEpochStart(epoch)) {
s.subscribeWithParameters(
p2p.DataColumnSubnetTopicFormat,
s.validateDataColumn,
s.dataColumnSubscriber,
digest,
s.dataColumnSubnetIndices,
// TODO: Should we find peers always? When validators are managed? When validators are managed AND when we are going to propose a block?
func(currentSlot primitives.Slot) []uint64 { return []uint64{} },
)
return
}
// New Gossip Topic in Deneb, removed in Electra
if params.BeaconConfig().DenebForkEpoch <= epoch && epoch < params.BeaconConfig().ElectraForkEpoch {
s.subscribeWithParameters(
p2p.BlobSubnetTopicFormat,
s.validateBlob,
@@ -171,6 +156,19 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
func(currentSlot primitives.Slot) []uint64 { return []uint64{} },
)
}
// New Gossip Topic in Electra
if params.BeaconConfig().ElectraForkEpoch <= epoch {
s.subscribeWithParameters(
p2p.DataColumnSubnetTopicFormat,
s.validateDataColumn,
s.dataColumnSubscriber,
digest,
s.dataColumnSubnetIndices,
// TODO: Should we find peers always? When validators are managed? When validators are managed AND when we are going to propose a block?
func(currentSlot primitives.Slot) []uint64 { return []uint64{} },
)
}
}
// subscribe to a given topic with a given validator and subscription handler.

View File

@@ -166,7 +166,6 @@ type BeaconChainConfig struct {
DenebForkEpoch primitives.Epoch `yaml:"DENEB_FORK_EPOCH" spec:"true"` // DenebForkEpoch is used to represent the assigned fork epoch for deneb.
ElectraForkVersion []byte `yaml:"ELECTRA_FORK_VERSION" spec:"true"` // ElectraForkVersion is used to represent the fork version for electra.
ElectraForkEpoch primitives.Epoch `yaml:"ELECTRA_FORK_EPOCH" spec:"true"` // ElectraForkEpoch is used to represent the assigned fork epoch for electra.
Eip7594ForkEpoch primitives.Epoch `yaml:"EIP7594_FORK_EPOCH" spec:"true"` // EIP7594ForkEpoch is used to represent the assigned fork epoch for peer das.
ForkVersionSchedule map[[fieldparams.VersionLength]byte]primitives.Epoch // Schedule of fork epochs by version.
ForkVersionNames map[[fieldparams.VersionLength]byte]string // Human-readable names of fork versions.
@@ -367,7 +366,7 @@ func DenebEnabled() bool {
// PeerDASEnabled centralizes the check to determine if code paths
// that are specific to peerdas should be allowed to execute.
func PeerDASEnabled() bool {
return BeaconConfig().Eip7594ForkEpoch < math.MaxUint64
return BeaconConfig().ElectraForkEpoch < math.MaxUint64
}
// WithinDAPeriod checks if the block epoch is within MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS of the given current epoch.

View File

@@ -217,7 +217,6 @@ func ConfigToYaml(cfg *BeaconChainConfig) []byte {
fmt.Sprintf("DENEB_FORK_VERSION: %#x", cfg.DenebForkVersion),
fmt.Sprintf("ELECTRA_FORK_EPOCH: %d", cfg.ElectraForkEpoch),
fmt.Sprintf("ELECTRA_FORK_VERSION: %#x", cfg.ElectraForkVersion),
fmt.Sprintf("EIP7594_FORK_EPOCH: %d", cfg.Eip7594ForkEpoch),
fmt.Sprintf("EPOCHS_PER_SUBNET_SUBSCRIPTION: %d", cfg.EpochsPerSubnetSubscription),
fmt.Sprintf("ATTESTATION_SUBNET_EXTRA_BITS: %d", cfg.AttestationSubnetExtraBits),
fmt.Sprintf("ATTESTATION_SUBNET_PREFIX_BITS: %d", cfg.AttestationSubnetPrefixBits),

View File

@@ -30,6 +30,7 @@ var placeholderFields = []string{
"EIP6110_FORK_VERSION",
"EIP7002_FORK_EPOCH",
"EIP7002_FORK_VERSION",
"EIP7594_FORK_EPOCH",
"EIP7594_FORK_VERSION",
"EIP7732_FORK_EPOCH",
"EIP7732_FORK_VERSION",

View File

@@ -217,7 +217,6 @@ var mainnetBeaconConfig = &BeaconChainConfig{
DenebForkEpoch: mainnetDenebForkEpoch,
ElectraForkVersion: []byte{5, 0, 0, 0},
ElectraForkEpoch: mainnetElectraForkEpoch,
Eip7594ForkEpoch: math.MaxUint64,
// New values introduced in Altair hard fork 1.
// Participation flag indices.

View File

@@ -96,7 +96,6 @@ func MinimalSpecConfig() *BeaconChainConfig {
minimalConfig.DenebForkEpoch = math.MaxUint64
minimalConfig.ElectraForkVersion = []byte{5, 0, 0, 1}
minimalConfig.ElectraForkEpoch = math.MaxUint64
minimalConfig.Eip7594ForkEpoch = math.MaxUint64
minimalConfig.SyncCommitteeSize = 32
minimalConfig.InactivityScoreBias = 4

View File

@@ -44,7 +44,6 @@ func E2ETestConfig() *BeaconChainConfig {
e2eConfig.CapellaForkEpoch = CapellaE2EForkEpoch
e2eConfig.DenebForkEpoch = DenebE2EForkEpoch
e2eConfig.ElectraForkEpoch = ElectraE2EForkEpoch
e2eConfig.Eip7594ForkEpoch = DenebE2EForkEpoch
// Terminal Total Difficulty.
e2eConfig.TerminalTotalDifficulty = "480"
@@ -89,7 +88,6 @@ func E2EMainnetTestConfig() *BeaconChainConfig {
e2eConfig.CapellaForkEpoch = CapellaE2EForkEpoch
e2eConfig.DenebForkEpoch = DenebE2EForkEpoch
e2eConfig.ElectraForkEpoch = ElectraE2EForkEpoch
e2eConfig.Eip7594ForkEpoch = DenebE2EForkEpoch
// Terminal Total Difficulty.
e2eConfig.TerminalTotalDifficulty = "480"

View File

@@ -40,7 +40,6 @@ func HoleskyConfig() *BeaconChainConfig {
cfg.DenebForkEpoch = 29696
cfg.DenebForkVersion = []byte{0x05, 0x1, 0x70, 0x0}
cfg.ElectraForkEpoch = math.MaxUint64
cfg.Eip7594ForkEpoch = math.MaxUint64
cfg.ElectraForkVersion = []byte{0x06, 0x1, 0x70, 0x0} // TODO: Define holesky fork version for electra. This is a placeholder value.
cfg.TerminalTotalDifficulty = "0"
cfg.DepositContractAddress = "0x4242424242424242424242424242424242424242"

View File

@@ -46,7 +46,6 @@ func SepoliaConfig() *BeaconChainConfig {
cfg.DenebForkVersion = []byte{0x90, 0x00, 0x00, 0x73}
cfg.ElectraForkEpoch = math.MaxUint64
cfg.ElectraForkVersion = []byte{0x90, 0x00, 0x00, 0x74} // TODO: Define sepolia fork version for electra. This is a placeholder value.
cfg.Eip7594ForkEpoch = math.MaxUint64
cfg.TerminalTotalDifficulty = "17000000000000000"
cfg.DepositContractAddress = "0x7f02C3E3c98b133055B8B348B2Ac625669Ed295D"
cfg.InitializeForkSchedule()

View File

@@ -287,5 +287,5 @@ func (m MetadataV2) MetadataObjV2() *pb.MetaDataV2 {
// Version returns the fork version of the underlying object.
func (MetadataV2) Version() int {
return version.Deneb
return version.Electra
}

View File

@@ -24,11 +24,9 @@ func InitForkCfg(start, end int, c *params.BeaconChainConfig) *params.BeaconChai
}
if start >= version.Deneb {
c.DenebForkEpoch = 0
c.Eip7594ForkEpoch = 0
}
if end < version.Deneb {
c.DenebForkEpoch = math.MaxUint64
c.Eip7594ForkEpoch = math.MaxUint64
}
if end < version.Capella {
c.CapellaForkEpoch = math.MaxUint64