mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-10 05:47:59 -05:00
Compare commits
1 Commits
fulu-devne
...
peerDAS-at
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fbe25f426a |
@@ -55,7 +55,7 @@ func HigherEqualThanAltairVersionAndEpoch(s state.BeaconState, e primitives.Epoc
|
||||
|
||||
// PeerDASIsActive checks whether peerDAS is active at the provided slot.
|
||||
func PeerDASIsActive(slot primitives.Slot) bool {
|
||||
return params.PeerDASEnabled() && slots.ToEpoch(slot) >= params.BeaconConfig().Eip7594ForkEpoch
|
||||
return params.PeerDASEnabled() && slots.ToEpoch(slot) >= params.BeaconConfig().ElectraForkEpoch
|
||||
}
|
||||
|
||||
// CanUpgradeToAltair returns true if the input `slot` can upgrade to Altair.
|
||||
|
||||
@@ -113,7 +113,7 @@ func (s *LazilyPersistentStoreColumn) IsDataAvailable(
|
||||
// ignore their response and decrease their peer score.
|
||||
roDataColumns, err := entry.filterColumns(blockRoot, blockCommitments)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "incomplete BlobSidecar batch")
|
||||
return errors.Wrap(err, "incomplete DataColumnSidecar batch")
|
||||
}
|
||||
|
||||
// Create verified RO data columns from RO data columns.
|
||||
|
||||
@@ -110,7 +110,7 @@ func (e *cacheEntry) filter(root [32]byte, kc safeCommitmentArray) ([]blocks.ROB
|
||||
return nil, nil
|
||||
}
|
||||
scs := make([]blocks.ROBlob, 0, kc.count())
|
||||
for i := uint64(0); i < fieldparams.MaxBlobsPerBlock; i++ {
|
||||
for i := range uint64(fieldparams.MaxBlobsPerBlock) {
|
||||
// We already have this blob, we don't need to write it or validate it.
|
||||
if e.diskSummary.HasIndex(i) {
|
||||
continue
|
||||
@@ -143,8 +143,8 @@ func (e *cacheEntry) filterColumns(root [32]byte, commitmentsArray *safeCommitme
|
||||
commitmentsCount := commitmentsArray.count()
|
||||
sidecars := make([]blocks.RODataColumn, 0, commitmentsCount)
|
||||
|
||||
for i := uint64(0); i < fieldparams.NumberOfColumns; i++ {
|
||||
// Skip if we arleady store this data column.
|
||||
for i := range uint64(fieldparams.NumberOfColumns) {
|
||||
// Skip if we already store this data column.
|
||||
if e.diskSummary.HasIndex(i) {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -228,10 +228,10 @@ func (s *Service) RefreshPersistentSubnets() {
|
||||
|
||||
isBitSUpToDate := bytes.Equal(bitS, inRecordBitS) && bytes.Equal(bitS, currentBitSInMetadata)
|
||||
|
||||
// Compare current epoch with EIP-7594 fork epoch.
|
||||
eip7594ForkEpoch := params.BeaconConfig().Eip7594ForkEpoch
|
||||
// Compare current epoch with the Electra fork epoch.
|
||||
electraForkEpoch := params.BeaconConfig().ElectraForkEpoch
|
||||
|
||||
if currentEpoch < eip7594ForkEpoch {
|
||||
if currentEpoch < electraForkEpoch {
|
||||
// Altair behaviour.
|
||||
if metadataVersion == version.Altair && isBitVUpToDate && isBitSUpToDate {
|
||||
// Nothing to do, return early.
|
||||
|
||||
@@ -135,7 +135,7 @@ func TestStartDiscV5_DiscoverAllPeers(t *testing.T) {
|
||||
func TestCreateLocalNode(t *testing.T) {
|
||||
params.SetupTestConfigCleanup(t)
|
||||
cfg := params.BeaconConfig()
|
||||
cfg.Eip7594ForkEpoch = 1
|
||||
cfg.ElectraForkEpoch = 1
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
testCases := []struct {
|
||||
name string
|
||||
@@ -626,7 +626,7 @@ func TestRefreshPersistentSubnets(t *testing.T) {
|
||||
|
||||
const (
|
||||
altairForkEpoch = 5
|
||||
eip7594ForkEpoch = 10
|
||||
electraForkEpoch = 10
|
||||
)
|
||||
|
||||
custodySubnetCount := params.BeaconConfig().CustodyRequirement
|
||||
@@ -635,7 +635,7 @@ func TestRefreshPersistentSubnets(t *testing.T) {
|
||||
defaultCfg := params.BeaconConfig()
|
||||
cfg := defaultCfg.Copy()
|
||||
cfg.AltairForkEpoch = altairForkEpoch
|
||||
cfg.Eip7594ForkEpoch = eip7594ForkEpoch
|
||||
cfg.ElectraForkEpoch = electraForkEpoch
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
|
||||
// Compute the number of seconds per epoch.
|
||||
@@ -706,7 +706,7 @@ func TestRefreshPersistentSubnets(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "PeerDAS",
|
||||
epochSinceGenesis: eip7594ForkEpoch,
|
||||
epochSinceGenesis: electraForkEpoch,
|
||||
checks: []check{
|
||||
{
|
||||
pingCount: 0,
|
||||
|
||||
@@ -306,7 +306,7 @@ func TopicFromMessage(msg string, epoch primitives.Epoch) (string, error) {
|
||||
}
|
||||
|
||||
// Check if the message is to be updated in peerDAS.
|
||||
isPeerDAS := epoch >= params.BeaconConfig().Eip7594ForkEpoch
|
||||
isPeerDAS := epoch >= params.BeaconConfig().ElectraForkEpoch
|
||||
if isPeerDAS && peerDASMapping[msg] {
|
||||
version = SchemaVersionV3
|
||||
}
|
||||
|
||||
@@ -79,7 +79,6 @@ func TestGetSpec(t *testing.T) {
|
||||
config.DenebForkEpoch = 105
|
||||
config.ElectraForkVersion = []byte("ElectraForkVersion")
|
||||
config.ElectraForkEpoch = 107
|
||||
config.Eip7594ForkEpoch = 109
|
||||
config.BLSWithdrawalPrefixByte = byte('b')
|
||||
config.ETH1AddressWithdrawalPrefixByte = byte('c')
|
||||
config.GenesisDelay = 24
|
||||
@@ -190,7 +189,7 @@ func TestGetSpec(t *testing.T) {
|
||||
data, ok := resp.Data.(map[string]interface{})
|
||||
require.Equal(t, true, ok)
|
||||
|
||||
assert.Equal(t, 156, len(data))
|
||||
assert.Equal(t, 155, len(data))
|
||||
for k, v := range data {
|
||||
t.Run(k, func(t *testing.T) {
|
||||
switch k {
|
||||
@@ -268,8 +267,6 @@ func TestGetSpec(t *testing.T) {
|
||||
assert.Equal(t, "0x"+hex.EncodeToString([]byte("ElectraForkVersion")), v)
|
||||
case "ELECTRA_FORK_EPOCH":
|
||||
assert.Equal(t, "107", v)
|
||||
case "EIP7594_FORK_EPOCH":
|
||||
assert.Equal(t, "109", v)
|
||||
case "MIN_ANCHOR_POW_BLOCK_DIFFICULTY":
|
||||
assert.Equal(t, "1000", v)
|
||||
case "BLS_WITHDRAWAL_PREFIX":
|
||||
|
||||
@@ -471,12 +471,12 @@ func (p *BeaconDbBlocker) Blobs(ctx context.Context, id string, indices map[uint
|
||||
blockSlot := b.Block().Slot()
|
||||
|
||||
// Get the first peerDAS epoch.
|
||||
eip7594ForkEpoch := params.BeaconConfig().Eip7594ForkEpoch
|
||||
electraForkEpoch := params.BeaconConfig().ElectraForkEpoch
|
||||
|
||||
// Compute the first peerDAS slot.
|
||||
peerDASStartSlot := primitives.Slot(math.MaxUint64)
|
||||
if eip7594ForkEpoch != primitives.Epoch(math.MaxUint64) {
|
||||
peerDASStartSlot, err = slots.EpochStart(eip7594ForkEpoch)
|
||||
if electraForkEpoch != primitives.Epoch(math.MaxUint64) {
|
||||
peerDASStartSlot, err = slots.EpochStart(electraForkEpoch)
|
||||
if err != nil {
|
||||
return nil, &core.RpcError{Err: errors.Wrap(err, "could not calculate peerDAS start slot"), Reason: core.Internal}
|
||||
}
|
||||
|
||||
@@ -199,7 +199,7 @@ func setupDataColumnSamplerTest(t *testing.T, blobCount uint64) (*dataSamplerTes
|
||||
ctx: context.Background(),
|
||||
p2pSvc: p2pSvc,
|
||||
peers: []*p2ptest.TestP2P{},
|
||||
ctxMap: map[[4]byte]int{{245, 165, 253, 66}: version.Deneb},
|
||||
ctxMap: map[[4]byte]int{{245, 165, 253, 66}: version.Electra},
|
||||
chainSvc: chainSvc,
|
||||
blockProcessedData: blockProcessedData,
|
||||
blobs: blobs,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"github.com/libp2p/go-libp2p/core/protocol"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/params"
|
||||
@@ -39,80 +40,114 @@ func (s *Service) forkWatcher() {
|
||||
}
|
||||
}
|
||||
|
||||
// Checks if there is a fork in the next epoch and if there is
|
||||
// it registers the appropriate gossip and rpc topics.
|
||||
func (s *Service) registerForUpcomingFork(currEpoch primitives.Epoch) error {
|
||||
genRoot := s.cfg.clock.GenesisValidatorsRoot()
|
||||
isNextForkEpoch, err := forks.IsForkNextEpoch(s.cfg.clock.GenesisTime(), genRoot[:])
|
||||
// Register appropriate gossip and RPC topic if there is a fork in the next epoch.
|
||||
func (s *Service) registerForUpcomingFork(currentEpoch primitives.Epoch) error {
|
||||
// Get the genesis validators root.
|
||||
genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot()
|
||||
|
||||
// Check if there is a fork in the next epoch.
|
||||
isForkNextEpoch, err := forks.IsForkNextEpoch(s.cfg.clock.GenesisTime(), genesisValidatorsRoot[:])
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Could not retrieve next fork epoch")
|
||||
}
|
||||
// In preparation for the upcoming fork
|
||||
// in the following epoch, the node
|
||||
// will subscribe the new topics in advance.
|
||||
if isNextForkEpoch {
|
||||
nextEpoch := currEpoch + 1
|
||||
digest, err := forks.ForkDigestFromEpoch(nextEpoch, genRoot[:])
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not retrieve fork digest")
|
||||
}
|
||||
if s.subHandler.digestExists(digest) {
|
||||
return nil
|
||||
}
|
||||
s.registerSubscribers(nextEpoch, digest)
|
||||
if nextEpoch == params.BeaconConfig().AltairForkEpoch {
|
||||
s.registerRPCHandlersAltair()
|
||||
}
|
||||
if nextEpoch == params.BeaconConfig().DenebForkEpoch {
|
||||
s.registerRPCHandlersDeneb()
|
||||
}
|
||||
|
||||
// Exit early if there is no fork in the next epoch.
|
||||
if !isForkNextEpoch {
|
||||
return nil
|
||||
}
|
||||
// Specially handle peerDAS
|
||||
if params.PeerDASEnabled() && currEpoch+1 == params.BeaconConfig().Eip7594ForkEpoch {
|
||||
s.registerRPCHandlersPeerDAS()
|
||||
|
||||
// Compute the next epoch.
|
||||
nextEpoch := currentEpoch + 1
|
||||
|
||||
// Get the fork digest for the next epoch.
|
||||
digest, err := forks.ForkDigestFromEpoch(nextEpoch, genesisValidatorsRoot[:])
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not retrieve fork digest")
|
||||
}
|
||||
|
||||
// Exit early if the topics for the next epoch are already registered.
|
||||
// It likely to be the case for all slots of the epoch that are not the first one.
|
||||
if s.subHandler.digestExists(digest) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Register the subscribers (gossipsub) for the next epoch.
|
||||
s.registerSubscribers(nextEpoch, digest)
|
||||
|
||||
// Get the handlers for the current and next fork.
|
||||
currentHandlerByTopic, err := s.rpcHandlerByTopicFromEpoch(currentEpoch)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "RPC handler by topic")
|
||||
}
|
||||
|
||||
nextHandlerByTopic, err := s.rpcHandlerByTopicFromEpoch(nextEpoch)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "RPC handler by topic")
|
||||
}
|
||||
|
||||
// Compute newsly added topics.
|
||||
newRPCHandlerByTopic := addedRPCHandlerByTopic(currentHandlerByTopic, nextHandlerByTopic)
|
||||
|
||||
// Register the new RPC handlers.
|
||||
for topic, handler := range newRPCHandlerByTopic {
|
||||
s.registerRPC(topic, handler)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Checks if there was a fork in the previous epoch, and if there
|
||||
// was then we deregister the topics from that particular fork.
|
||||
func (s *Service) deregisterFromPastFork(currEpoch primitives.Epoch) error {
|
||||
genRoot := s.cfg.clock.GenesisValidatorsRoot()
|
||||
// This method takes care of the de-registration of
|
||||
// old gossip pubsub handlers. Once we are at the epoch
|
||||
// after the fork, we de-register from all the outdated topics.
|
||||
currFork, err := forks.Fork(currEpoch)
|
||||
// deregisterFromPastFork checks if there was a fork in the previous epoch,
|
||||
// and if there was then we deregister the gossipsub topics from that particular fork,
|
||||
// and the RPC handlers that are no longer relevant.
|
||||
func (s *Service) deregisterFromPastFork(currentEpoch primitives.Epoch) error {
|
||||
// Extract the genesis validators root.
|
||||
genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot()
|
||||
|
||||
// Get the fork.
|
||||
currentFork, err := forks.Fork(currentEpoch)
|
||||
if err != nil {
|
||||
return err
|
||||
return errors.Wrap(err, "genesis validators root")
|
||||
}
|
||||
// If we are still in our genesis fork version then
|
||||
// we simply exit early.
|
||||
if currFork.Epoch == params.BeaconConfig().GenesisEpoch {
|
||||
|
||||
// If we are still in our genesis fork version then exit early.
|
||||
if currentFork.Epoch == params.BeaconConfig().GenesisEpoch {
|
||||
return nil
|
||||
}
|
||||
epochAfterFork := currFork.Epoch + 1
|
||||
// If we are in the epoch after the fork, we start de-registering.
|
||||
if epochAfterFork == currEpoch {
|
||||
|
||||
epochAfterFork := currentFork.Epoch + 1
|
||||
|
||||
// Start de-registring if the current epoch is the first epoch after the fork.
|
||||
if epochAfterFork == currentEpoch {
|
||||
// Look at the previous fork's digest.
|
||||
epochBeforeFork := currFork.Epoch - 1
|
||||
prevDigest, err := forks.ForkDigestFromEpoch(epochBeforeFork, genRoot[:])
|
||||
epochBeforeFork := currentFork.Epoch - 1
|
||||
|
||||
previousDigest, err := forks.ForkDigestFromEpoch(epochBeforeFork, genesisValidatorsRoot[:])
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Failed to determine previous epoch fork digest")
|
||||
return errors.Wrap(err, "fork digest from epoch")
|
||||
}
|
||||
|
||||
// Exit early if there are no topics with that particular
|
||||
// digest.
|
||||
if !s.subHandler.digestExists(prevDigest) {
|
||||
// Exit early if there are no topics with that particular digest.
|
||||
if !s.subHandler.digestExists(previousDigest) {
|
||||
return nil
|
||||
}
|
||||
prevFork, err := forks.Fork(epochBeforeFork)
|
||||
|
||||
// Compute the RPC handlers that are no longer needed.
|
||||
currentHandlerByTopic, err := s.rpcHandlerByTopicFromEpoch(currentEpoch)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to determine previous epoch fork data")
|
||||
return errors.Wrap(err, "RPC handler by topic from epoch")
|
||||
}
|
||||
if prevFork.Epoch == params.BeaconConfig().GenesisEpoch {
|
||||
s.unregisterPhase0Handlers()
|
||||
|
||||
nextHandlerByTopic, err := s.rpcHandlerByTopicFromEpoch(epochAfterFork)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "RPC handler by topic from epoch")
|
||||
}
|
||||
|
||||
topicsToRemove := removedRPCTopics(currentHandlerByTopic, nextHandlerByTopic)
|
||||
for topic := range topicsToRemove {
|
||||
fullTopic := topic + s.cfg.p2p.Encoding().ProtocolSuffix()
|
||||
s.cfg.p2p.Host().RemoveStreamHandler(protocol.ID(fullTopic))
|
||||
}
|
||||
|
||||
// Run through all our current active topics and see
|
||||
// if there are any subscriptions to be removed.
|
||||
for _, t := range s.subHandler.allTopics() {
|
||||
@@ -121,14 +156,11 @@ func (s *Service) deregisterFromPastFork(currEpoch primitives.Epoch) error {
|
||||
log.WithError(err).Error("Could not retrieve digest")
|
||||
continue
|
||||
}
|
||||
if retDigest == prevDigest {
|
||||
if retDigest == previousDigest {
|
||||
s.unSubscribeFromTopic(t)
|
||||
}
|
||||
}
|
||||
}
|
||||
// Handle PeerDAS as its a special case.
|
||||
if params.PeerDASEnabled() && currEpoch > 0 && (currEpoch-1) == params.BeaconConfig().Eip7594ForkEpoch {
|
||||
s.unregisterBlobHandlers()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v5/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/v5/network/forks"
|
||||
"github.com/prysmaticlabs/prysm/v5/runtime/version"
|
||||
"github.com/prysmaticlabs/prysm/v5/testing/assert"
|
||||
)
|
||||
|
||||
@@ -230,7 +231,8 @@ func TestService_CheckForPreviousEpochFork(t *testing.T) {
|
||||
chainStarted: abool.New(),
|
||||
subHandler: newSubTopicHandler(),
|
||||
}
|
||||
r.registerRPCHandlers()
|
||||
err := r.registerRPCHandlers()
|
||||
assert.NoError(t, err)
|
||||
return r
|
||||
},
|
||||
currEpoch: 10,
|
||||
@@ -278,10 +280,21 @@ func TestService_CheckForPreviousEpochFork(t *testing.T) {
|
||||
prevGenesis := chainService.Genesis
|
||||
// To allow registration of v1 handlers
|
||||
chainService.Genesis = time.Now().Add(-1 * oneEpoch())
|
||||
r.registerRPCHandlers()
|
||||
err := r.registerRPCHandlers()
|
||||
assert.NoError(t, err)
|
||||
|
||||
chainService.Genesis = prevGenesis
|
||||
r.registerRPCHandlersAltair()
|
||||
previous, err := r.rpcHandlerByTopicFromFork(version.Phase0)
|
||||
assert.NoError(t, err)
|
||||
|
||||
next, err := r.rpcHandlerByTopicFromFork(version.Altair)
|
||||
assert.NoError(t, err)
|
||||
|
||||
handlerByTopic := addedRPCHandlerByTopic(previous, next)
|
||||
|
||||
for topic, handler := range handlerByTopic {
|
||||
r.registerRPC(topic, handler)
|
||||
}
|
||||
|
||||
genRoot := r.cfg.clock.GenesisValidatorsRoot()
|
||||
digest, err := forks.ForkDigestFromEpoch(0, genRoot[:])
|
||||
|
||||
@@ -3,6 +3,7 @@ package initialsync
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"slices"
|
||||
"sort"
|
||||
"strings"
|
||||
@@ -12,7 +13,6 @@ import (
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
|
||||
coreTime "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
|
||||
@@ -337,19 +337,33 @@ func (f *blocksFetcher) handleRequest(ctx context.Context, start primitives.Slot
|
||||
return response
|
||||
}
|
||||
|
||||
if coreTime.PeerDASIsActive(start) {
|
||||
response.err = f.fetchDataColumnsFromPeers(ctx, response.bwb, nil, delay, batchSize)
|
||||
// Compute the first electra slot.
|
||||
firstElectraSlot, err := slots.EpochStart(params.BeaconConfig().ElectraForkEpoch)
|
||||
if err != nil {
|
||||
firstElectraSlot = math.MaxUint64
|
||||
}
|
||||
|
||||
// Find the first block with a slot greater than or equal to the first electra slot.
|
||||
// (Blocks are sorted by slot)
|
||||
firstElectraIndex := sort.Search(len(response.bwb), func(i int) bool {
|
||||
return response.bwb[i].Block.Block().Slot() >= firstElectraSlot
|
||||
})
|
||||
|
||||
preElectraBwbs := response.bwb[:firstElectraIndex]
|
||||
postElectraBwbs := response.bwb[firstElectraIndex:]
|
||||
|
||||
// Fetch blobs.
|
||||
if err := f.fetchBlobsFromPeer(ctx, preElectraBwbs, response.pid, peers); err != nil {
|
||||
response.err = err
|
||||
return response
|
||||
}
|
||||
|
||||
if err := f.fetchBlobsFromPeer(ctx, response.bwb, response.pid, peers); err != nil {
|
||||
response.err = err
|
||||
}
|
||||
|
||||
// Fetch data columns.
|
||||
response.err = f.fetchDataColumnsFromPeers(ctx, postElectraBwbs, nil, delay, batchSize)
|
||||
return response
|
||||
}
|
||||
|
||||
// fetchBlocksFromPeer fetches blocks from a single randomly selected peer.
|
||||
// fetchBlocksFromPeer fetches blocks from a single randomly selected peer, sorted by slot.
|
||||
func (f *blocksFetcher) fetchBlocksFromPeer(
|
||||
ctx context.Context,
|
||||
start primitives.Slot, count uint64,
|
||||
@@ -369,20 +383,19 @@ func (f *blocksFetcher) fetchBlocksFromPeer(
|
||||
// peers are dialed first.
|
||||
peers = append(bestPeers, peers...)
|
||||
peers = dedupPeers(peers)
|
||||
for i := 0; i < len(peers); i++ {
|
||||
p := peers[i]
|
||||
blocks, err := f.requestBlocks(ctx, req, p)
|
||||
for _, peer := range peers {
|
||||
blocks, err := f.requestBlocks(ctx, req, peer)
|
||||
if err != nil {
|
||||
log.WithField("peer", p).WithError(err).Debug("Could not request blocks by range from peer")
|
||||
log.WithField("peer", peer).WithError(err).Debug("Could not request blocks by range from peer")
|
||||
continue
|
||||
}
|
||||
f.p2p.Peers().Scorers().BlockProviderScorer().Touch(p)
|
||||
f.p2p.Peers().Scorers().BlockProviderScorer().Touch(peer)
|
||||
robs, err := sortedBlockWithVerifiedBlobSlice(blocks)
|
||||
if err != nil {
|
||||
log.WithField("peer", p).WithError(err).Debug("invalid BeaconBlocksByRange response")
|
||||
log.WithField("peer", peer).WithError(err).Debug("invalid BeaconBlocksByRange response")
|
||||
continue
|
||||
}
|
||||
return robs, p, err
|
||||
return robs, peer, err
|
||||
}
|
||||
return nil, "", errNoPeersAvailable
|
||||
}
|
||||
@@ -565,6 +578,10 @@ func missingCommitError(root [32]byte, slot primitives.Slot, missing [][]byte) e
|
||||
// fetchBlobsFromPeer fetches blocks from a single randomly selected peer.
|
||||
// This function mutates the input `bwb` argument.
|
||||
func (f *blocksFetcher) fetchBlobsFromPeer(ctx context.Context, bwb []blocks.BlockWithROBlobs, pid peer.ID, peers []peer.ID) error {
|
||||
if len(bwb) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
ctx, span := trace.StartSpan(ctx, "initialsync.fetchBlobsFromPeer")
|
||||
defer span.End()
|
||||
if slots.ToEpoch(f.clock.CurrentSlot()) < params.BeaconConfig().DenebForkEpoch {
|
||||
@@ -936,7 +953,7 @@ type bwbsMissingColumns struct {
|
||||
// fetchDataColumnsFromPeers looks at the blocks in `bwb` and retrieves all
|
||||
// data columns for with the block has blob commitments, and for which our store is missing data columns
|
||||
// we should custody.
|
||||
// This function mutates `bwb` by adding the retrieved data columns.
|
||||
// This function mutates `bwbs` by adding the retrieved data columns.
|
||||
// Prerequisite: bwb is sorted by slot.
|
||||
func (f *blocksFetcher) fetchDataColumnsFromPeers(
|
||||
ctx context.Context,
|
||||
@@ -951,6 +968,10 @@ func (f *blocksFetcher) fetchDataColumnsFromPeers(
|
||||
maxAllowedStall = 5 // Number of trials before giving up.
|
||||
)
|
||||
|
||||
if len(bwbs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Generate random identifier.
|
||||
identifier := f.rand.Intn(maxIdentifier)
|
||||
|
||||
|
||||
@@ -1713,7 +1713,7 @@ func TestFetchDataColumnsFromPeers(t *testing.T) {
|
||||
|
||||
// Fork epochs.
|
||||
denebForkEpoch primitives.Epoch
|
||||
eip7954ForkEpoch primitives.Epoch
|
||||
electraForkEpoch primitives.Epoch
|
||||
|
||||
// Current slot.
|
||||
currentSlot uint64
|
||||
@@ -1753,29 +1753,29 @@ func TestFetchDataColumnsFromPeers(t *testing.T) {
|
||||
isError: false,
|
||||
},
|
||||
{
|
||||
name: "All blocks are before EIP-7954 fork epoch",
|
||||
name: "All blocks are before Electra fork epoch",
|
||||
denebForkEpoch: 0,
|
||||
eip7954ForkEpoch: 1,
|
||||
electraForkEpoch: 1,
|
||||
currentSlot: 40,
|
||||
blocksParams: []blockParams{
|
||||
{slot: 25, hasBlobs: false}, // Before EIP-7954 fork epoch
|
||||
{slot: 26, hasBlobs: false}, // Before EIP-7954 fork epoch
|
||||
{slot: 27, hasBlobs: false}, // Before EIP-7954 fork epoch
|
||||
{slot: 28, hasBlobs: false}, // Before EIP-7954 fork epoch
|
||||
{slot: 25, hasBlobs: false}, // Before Electra fork epoch
|
||||
{slot: 26, hasBlobs: false}, // Before Electra fork epoch
|
||||
{slot: 27, hasBlobs: false}, // Before Electra fork epoch
|
||||
{slot: 28, hasBlobs: false}, // Before Electra fork epoch
|
||||
},
|
||||
batchSize: 32,
|
||||
addedRODataColumns: [][]int{nil, nil, nil, nil},
|
||||
isError: false,
|
||||
},
|
||||
{
|
||||
name: "All blocks with commitments before are EIP-7954 fork epoch",
|
||||
name: "All blocks with commitments before are Electra fork epoch",
|
||||
denebForkEpoch: 0,
|
||||
eip7954ForkEpoch: 1,
|
||||
electraForkEpoch: 1,
|
||||
currentSlot: 40,
|
||||
blocksParams: []blockParams{
|
||||
{slot: 25, hasBlobs: false}, // Before EIP-7954 fork epoch
|
||||
{slot: 26, hasBlobs: true}, // Before EIP-7954 fork epoch
|
||||
{slot: 27, hasBlobs: true}, // Before EIP-7954 fork epoch
|
||||
{slot: 25, hasBlobs: false}, // Before Electra fork epoch
|
||||
{slot: 26, hasBlobs: true}, // Before Electra fork epoch
|
||||
{slot: 27, hasBlobs: true}, // Before Electra fork epoch
|
||||
{slot: 32, hasBlobs: false},
|
||||
{slot: 33, hasBlobs: false},
|
||||
},
|
||||
@@ -1785,12 +1785,12 @@ func TestFetchDataColumnsFromPeers(t *testing.T) {
|
||||
{
|
||||
name: "Some blocks with blobs but without any missing data columns",
|
||||
denebForkEpoch: 0,
|
||||
eip7954ForkEpoch: 1,
|
||||
electraForkEpoch: 1,
|
||||
currentSlot: 40,
|
||||
blocksParams: []blockParams{
|
||||
{slot: 25, hasBlobs: false}, // Before EIP-7954 fork epoch
|
||||
{slot: 26, hasBlobs: true}, // Before EIP-7954 fork epoch
|
||||
{slot: 27, hasBlobs: true}, // Before EIP-7954 fork epoch
|
||||
{slot: 25, hasBlobs: false}, // Before Electra fork epoch
|
||||
{slot: 26, hasBlobs: true}, // Before Electra fork epoch
|
||||
{slot: 27, hasBlobs: true}, // Before Electra fork epoch
|
||||
{slot: 32, hasBlobs: false},
|
||||
{slot: 33, hasBlobs: true},
|
||||
},
|
||||
@@ -1808,11 +1808,11 @@ func TestFetchDataColumnsFromPeers(t *testing.T) {
|
||||
{
|
||||
name: "Some blocks with blobs with missing data columns - one round needed",
|
||||
denebForkEpoch: 0,
|
||||
eip7954ForkEpoch: 1,
|
||||
electraForkEpoch: 1,
|
||||
currentSlot: 40,
|
||||
blocksParams: []blockParams{
|
||||
{slot: 25, hasBlobs: false}, // Before EIP-7954 fork epoch
|
||||
{slot: 27, hasBlobs: true}, // Before EIP-7954 fork epoch
|
||||
{slot: 25, hasBlobs: false}, // Before Electra fork epoch
|
||||
{slot: 27, hasBlobs: true}, // Before Electra fork epoch
|
||||
{slot: 32, hasBlobs: false},
|
||||
{slot: 33, hasBlobs: true},
|
||||
{slot: 34, hasBlobs: true},
|
||||
@@ -1916,7 +1916,7 @@ func TestFetchDataColumnsFromPeers(t *testing.T) {
|
||||
{
|
||||
name: "Some blocks with blobs with missing data columns - partial responses",
|
||||
denebForkEpoch: 0,
|
||||
eip7954ForkEpoch: 1,
|
||||
electraForkEpoch: 1,
|
||||
currentSlot: 40,
|
||||
blocksParams: []blockParams{
|
||||
{slot: 33, hasBlobs: true},
|
||||
@@ -1970,7 +1970,7 @@ func TestFetchDataColumnsFromPeers(t *testing.T) {
|
||||
{
|
||||
name: "Some blocks with blobs with missing data columns - first response is invalid",
|
||||
denebForkEpoch: 0,
|
||||
eip7954ForkEpoch: 1,
|
||||
electraForkEpoch: 1,
|
||||
currentSlot: 40,
|
||||
blocksParams: []blockParams{
|
||||
{slot: 38, hasBlobs: true},
|
||||
@@ -2004,7 +2004,7 @@ func TestFetchDataColumnsFromPeers(t *testing.T) {
|
||||
{
|
||||
name: "Some blocks with blobs with missing data columns - first response is empty",
|
||||
denebForkEpoch: 0,
|
||||
eip7954ForkEpoch: 1,
|
||||
electraForkEpoch: 1,
|
||||
currentSlot: 40,
|
||||
blocksParams: []blockParams{{slot: 38, hasBlobs: true}},
|
||||
storedDataColumns: []map[int]bool{{38: true, 102: true}},
|
||||
@@ -2033,7 +2033,7 @@ func TestFetchDataColumnsFromPeers(t *testing.T) {
|
||||
{
|
||||
name: "Some blocks with blobs with missing data columns - no response at all",
|
||||
denebForkEpoch: 0,
|
||||
eip7954ForkEpoch: 1,
|
||||
electraForkEpoch: 1,
|
||||
currentSlot: 40,
|
||||
blocksParams: []blockParams{{slot: 38, hasBlobs: true}},
|
||||
storedDataColumns: []map[int]bool{{38: true, 102: true}},
|
||||
@@ -2056,7 +2056,7 @@ func TestFetchDataColumnsFromPeers(t *testing.T) {
|
||||
{
|
||||
name: "Some blocks with blobs with missing data columns - request has to be split",
|
||||
denebForkEpoch: 0,
|
||||
eip7954ForkEpoch: 1,
|
||||
electraForkEpoch: 1,
|
||||
currentSlot: 40,
|
||||
blocksParams: []blockParams{
|
||||
{slot: 32, hasBlobs: true}, {slot: 33, hasBlobs: true}, {slot: 34, hasBlobs: true}, {slot: 35, hasBlobs: true}, // 4
|
||||
@@ -2176,8 +2176,8 @@ func TestFetchDataColumnsFromPeers(t *testing.T) {
|
||||
// Set the Deneb fork epoch.
|
||||
params.BeaconConfig().DenebForkEpoch = tc.denebForkEpoch
|
||||
|
||||
// Set the EIP-7594 fork epoch.
|
||||
params.BeaconConfig().Eip7594ForkEpoch = tc.eip7954ForkEpoch
|
||||
// Set the Electra fork epoch.
|
||||
params.BeaconConfig().ElectraForkEpoch = tc.electraForkEpoch
|
||||
|
||||
// Save the blocks in the store.
|
||||
storage := make(map[[fieldparams.RootLength]byte][]int)
|
||||
|
||||
@@ -4,17 +4,19 @@ import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/paulbellamy/ratecounter"
|
||||
"github.com/pkg/errors"
|
||||
coreTime "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/transition"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/das"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/sync"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
@@ -172,59 +174,83 @@ func (s *Service) processFetchedDataRegSync(
|
||||
log.WithError(err).Debug("batch did not contain a valid sequence of unprocessed blocks")
|
||||
return
|
||||
}
|
||||
|
||||
if len(bwb) == 0 {
|
||||
return
|
||||
}
|
||||
if coreTime.PeerDASIsActive(startSlot) {
|
||||
avs := das.NewLazilyPersistentStoreColumn(s.cfg.BlobStorage)
|
||||
batchFields := logrus.Fields{
|
||||
"firstSlot": data.bwb[0].Block.Block().Slot(),
|
||||
"firstUnprocessed": bwb[0].Block.Block().Slot(),
|
||||
|
||||
// Compute the first electra slot.
|
||||
firstElectraSlot, err := slots.EpochStart(params.BeaconConfig().ElectraForkEpoch)
|
||||
if err != nil {
|
||||
firstElectraSlot = math.MaxUint64
|
||||
}
|
||||
|
||||
// Find the first block with a slot greater than or equal to the first electra slot.
|
||||
// (Blocks are sorted by slot)
|
||||
firstElectraIndex := sort.Search(len(bwb), func(i int) bool {
|
||||
return bwb[i].Block.Block().Slot() >= firstElectraSlot
|
||||
})
|
||||
|
||||
preElectraBwbs := bwb[:firstElectraIndex]
|
||||
postElectraBwbs := bwb[firstElectraIndex:]
|
||||
|
||||
blobBatchVerifier := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.InitsyncBlobSidecarRequirements)
|
||||
lazilyPersistentStore := das.NewLazilyPersistentStore(s.cfg.BlobStorage, blobBatchVerifier)
|
||||
|
||||
log := log.WithField("firstSlot", data.bwb[0].Block.Block().Slot())
|
||||
|
||||
logPre := log
|
||||
if len(preElectraBwbs) > 0 {
|
||||
logPre = logPre.WithField("firstUnprocessed", preElectraBwbs[0].Block.Block().Slot())
|
||||
}
|
||||
|
||||
for _, b := range preElectraBwbs {
|
||||
log := logPre.WithFields(syncFields(b.Block))
|
||||
|
||||
if err := lazilyPersistentStore.Persist(s.clock.CurrentSlot(), b.Blobs...); err != nil {
|
||||
log.WithError(err).Warning("Batch failure due to BlobSidecar issues")
|
||||
return
|
||||
}
|
||||
|
||||
for _, b := range bwb {
|
||||
if err := avs.PersistColumns(s.clock.CurrentSlot(), b.Columns...); err != nil {
|
||||
log.WithError(err).WithFields(batchFields).WithFields(syncFields(b.Block)).Warn("Batch failure due to DataColumnSidecar issues")
|
||||
if err := s.processBlock(ctx, genesis, b, s.cfg.Chain.ReceiveBlock, lazilyPersistentStore); err != nil {
|
||||
switch {
|
||||
case errors.Is(err, errParentDoesNotExist):
|
||||
log.
|
||||
WithField("missingParent", fmt.Sprintf("%#x", b.Block.Block().ParentRoot())).
|
||||
Debug("Could not process batch blocks due to missing parent")
|
||||
return
|
||||
default:
|
||||
log.WithError(err).Warning("Block processing failure")
|
||||
return
|
||||
}
|
||||
|
||||
if err := s.processBlock(ctx, genesis, b, s.cfg.Chain.ReceiveBlock, avs); err != nil {
|
||||
switch {
|
||||
case errors.Is(err, errParentDoesNotExist):
|
||||
log.WithFields(batchFields).WithField("missingParent", fmt.Sprintf("%#x", b.Block.Block().ParentRoot())).
|
||||
WithFields(syncFields(b.Block)).Debug("Could not process batch blocks due to missing parent")
|
||||
return
|
||||
default:
|
||||
log.WithError(err).WithFields(batchFields).WithFields(syncFields(b.Block)).Warn("Block processing failure")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.InitsyncBlobSidecarRequirements)
|
||||
avs := das.NewLazilyPersistentStore(s.cfg.BlobStorage, bv)
|
||||
}
|
||||
|
||||
batchFields := logrus.Fields{
|
||||
"firstSlot": data.bwb[0].Block.Block().Slot(),
|
||||
"firstUnprocessed": bwb[0].Block.Block().Slot(),
|
||||
logPost := log
|
||||
if len(postElectraBwbs) > 0 {
|
||||
logPost = log.WithField("firstUnprocessed", postElectraBwbs[0].Block.Block().Slot())
|
||||
}
|
||||
|
||||
lazilyPersistentStoreColumn := das.NewLazilyPersistentStoreColumn(s.cfg.BlobStorage)
|
||||
|
||||
for _, b := range postElectraBwbs {
|
||||
log := logPost.WithFields(syncFields(b.Block))
|
||||
|
||||
if err := lazilyPersistentStoreColumn.PersistColumns(s.clock.CurrentSlot(), b.Columns...); err != nil {
|
||||
log.WithError(err).Warning("Batch failure due to DataColumnSidecar issues")
|
||||
return
|
||||
}
|
||||
|
||||
for _, b := range bwb {
|
||||
if err := avs.Persist(s.clock.CurrentSlot(), b.Blobs...); err != nil {
|
||||
log.WithError(err).WithFields(batchFields).WithFields(syncFields(b.Block)).Warn("Batch failure due to BlobSidecar issues")
|
||||
if err := s.processBlock(ctx, genesis, b, s.cfg.Chain.ReceiveBlock, lazilyPersistentStoreColumn); err != nil {
|
||||
switch {
|
||||
case errors.Is(err, errParentDoesNotExist):
|
||||
log.
|
||||
WithField("missingParent", fmt.Sprintf("%#x", b.Block.Block().ParentRoot())).
|
||||
Debug("Could not process batch blocks due to missing parent")
|
||||
return
|
||||
default:
|
||||
log.WithError(err).Warning("Block processing failure")
|
||||
return
|
||||
}
|
||||
|
||||
if err := s.processBlock(ctx, genesis, b, s.cfg.Chain.ReceiveBlock, avs); err != nil {
|
||||
switch {
|
||||
case errors.Is(err, errParentDoesNotExist):
|
||||
log.WithFields(batchFields).WithField("missingParent", fmt.Sprintf("%#x", b.Block.Block().ParentRoot())).
|
||||
WithFields(syncFields(b.Block)).Debug("Could not process batch blocks due to missing parent")
|
||||
return
|
||||
default:
|
||||
log.WithError(err).WithFields(batchFields).WithFields(syncFields(b.Block)).Warn("Block processing failure")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -349,54 +375,83 @@ func validUnprocessed(ctx context.Context, bwb []blocks.BlockWithROBlobs, headSl
|
||||
return bwb[nonProcessedIdx:], nil
|
||||
}
|
||||
|
||||
func (s *Service) processBatchedBlocks(ctx context.Context, genesis time.Time,
|
||||
bwb []blocks.BlockWithROBlobs, bFunc batchBlockReceiverFn) error {
|
||||
if len(bwb) == 0 {
|
||||
func (s *Service) processBatchedBlocks(
|
||||
ctx context.Context,
|
||||
genesis time.Time,
|
||||
bwbs []blocks.BlockWithROBlobs,
|
||||
bFunc batchBlockReceiverFn,
|
||||
) error {
|
||||
if len(bwbs) == 0 {
|
||||
return errors.New("0 blocks provided into method")
|
||||
}
|
||||
|
||||
headSlot := s.cfg.Chain.HeadSlot()
|
||||
var err error
|
||||
bwb, err = validUnprocessed(ctx, bwb, headSlot, s.isProcessedBlock)
|
||||
|
||||
bwbs, err := validUnprocessed(ctx, bwbs, headSlot, s.isProcessedBlock)
|
||||
if err != nil {
|
||||
return err
|
||||
return errors.Wrap(err, "validating unprocessed blocks")
|
||||
}
|
||||
if len(bwb) == 0 {
|
||||
|
||||
if len(bwbs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
first := bwb[0].Block
|
||||
first := bwbs[0].Block
|
||||
if !s.cfg.Chain.HasBlock(ctx, first.Block().ParentRoot()) {
|
||||
return fmt.Errorf("%w: %#x (in processBatchedBlocks, slot=%d)",
|
||||
errParentDoesNotExist, first.Block().ParentRoot(), first.Block().Slot())
|
||||
}
|
||||
var aStore das.AvailabilityStore
|
||||
if coreTime.PeerDASIsActive(first.Block().Slot()) {
|
||||
avs := das.NewLazilyPersistentStoreColumn(s.cfg.BlobStorage)
|
||||
s.logBatchSyncStatus(genesis, first, len(bwb))
|
||||
for _, bb := range bwb {
|
||||
if len(bb.Columns) == 0 {
|
||||
continue
|
||||
}
|
||||
if err := avs.PersistColumns(s.clock.CurrentSlot(), bb.Columns...); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
aStore = avs
|
||||
} else {
|
||||
bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.InitsyncBlobSidecarRequirements)
|
||||
avs := das.NewLazilyPersistentStore(s.cfg.BlobStorage, bv)
|
||||
s.logBatchSyncStatus(genesis, first, len(bwb))
|
||||
for _, bb := range bwb {
|
||||
if len(bb.Blobs) == 0 {
|
||||
continue
|
||||
}
|
||||
if err := avs.Persist(s.clock.CurrentSlot(), bb.Blobs...); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
aStore = avs
|
||||
|
||||
// Compute the first electra slot.
|
||||
firstElectraSlot, err := slots.EpochStart(params.BeaconConfig().ElectraForkEpoch)
|
||||
if err != nil {
|
||||
firstElectraSlot = math.MaxUint64
|
||||
}
|
||||
return bFunc(ctx, blocks.BlockWithROBlobsSlice(bwb).ROBlocks(), aStore)
|
||||
|
||||
// Find the first block with a slot greater than or equal to the first electra slot.
|
||||
// (Blocks are sorted by slot)
|
||||
firstElectraIndex := sort.Search(len(bwbs), func(i int) bool {
|
||||
return bwbs[i].Block.Block().Slot() >= firstElectraSlot
|
||||
})
|
||||
|
||||
preElectraBwbs := bwbs[:firstElectraIndex]
|
||||
postElectraBwbs := bwbs[firstElectraIndex:]
|
||||
|
||||
batchVerifier := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.InitsyncBlobSidecarRequirements)
|
||||
persistentStore := das.NewLazilyPersistentStore(s.cfg.BlobStorage, batchVerifier)
|
||||
s.logBatchSyncStatus(genesis, first, len(preElectraBwbs))
|
||||
|
||||
for _, bwb := range preElectraBwbs {
|
||||
if len(bwb.Blobs) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := persistentStore.Persist(s.clock.CurrentSlot(), bwb.Blobs...); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := bFunc(ctx, blocks.BlockWithROBlobsSlice(preElectraBwbs).ROBlocks(), persistentStore); err != nil {
|
||||
return errors.Wrap(err, "process pre-electra blocks")
|
||||
}
|
||||
|
||||
persistentStoreColumn := das.NewLazilyPersistentStoreColumn(s.cfg.BlobStorage)
|
||||
s.logBatchSyncStatus(genesis, first, len(postElectraBwbs))
|
||||
for _, bwb := range postElectraBwbs {
|
||||
if len(bwb.Columns) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := persistentStoreColumn.PersistColumns(s.clock.CurrentSlot(), bwb.Columns...); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := bFunc(ctx, blocks.BlockWithROBlobsSlice(postElectraBwbs).ROBlocks(), persistentStoreColumn); err != nil {
|
||||
return errors.Wrap(err, "process post-electra blocks")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// updatePeerScorerStats adjusts monitored metrics for a peer.
|
||||
@@ -424,11 +479,13 @@ func (s *Service) isProcessedBlock(ctx context.Context, blk blocks.ROBlock) bool
|
||||
// If block is before our finalized checkpoint
|
||||
// we do not process it.
|
||||
if blk.Block().Slot() <= finalizedSlot {
|
||||
log.WithFields(logrus.Fields{"blockSlot": blk.Block().Slot(), "finalizedSlot": finalizedSlot}).Debug("AAAAAAAAAAAA")
|
||||
return true
|
||||
}
|
||||
// If block exists in our db and is before or equal to our current head
|
||||
// we ignore it.
|
||||
if s.cfg.Chain.HeadSlot() >= blk.Block().Slot() && s.cfg.Chain.HasBlock(ctx, blk.Root()) {
|
||||
log.WithFields(logrus.Fields{"blockSlot": blk.Block().Slot(), "headSlot": s.cfg.Chain.HeadSlot(), "hasBlock": s.cfg.Chain.HasBlock(ctx, blk.Root()), "blockRoot": blk.Root()}).Debug("BBBBBBBBBB")
|
||||
return true
|
||||
}
|
||||
return false
|
||||
|
||||
@@ -9,143 +9,165 @@ import (
|
||||
|
||||
libp2pcore "github.com/libp2p/go-libp2p/core"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/protocol"
|
||||
"github.com/pkg/errors"
|
||||
ssz "github.com/prysmaticlabs/fastssz"
|
||||
coreTime "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
|
||||
p2ptypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing"
|
||||
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace"
|
||||
"github.com/prysmaticlabs/prysm/v5/runtime/version"
|
||||
"github.com/prysmaticlabs/prysm/v5/time/slots"
|
||||
)
|
||||
|
||||
// Time to first byte timeout. The maximum time to wait for first byte of
|
||||
// request response (time-to-first-byte). The client is expected to give up if
|
||||
// they don't receive the first byte within 5 seconds.
|
||||
var ttfbTimeout = params.BeaconConfig().TtfbTimeoutDuration()
|
||||
var (
|
||||
// Time to first byte timeout. The maximum time to wait for first byte of
|
||||
// request response (time-to-first-byte). The client is expected to give up if
|
||||
// they don't receive the first byte within 5 seconds.
|
||||
ttfbTimeout = params.BeaconConfig().TtfbTimeoutDuration()
|
||||
|
||||
// respTimeout is the maximum time for complete response transfer.
|
||||
var respTimeout = params.BeaconConfig().RespTimeoutDuration()
|
||||
// respTimeout is the maximum time for complete response transfer.
|
||||
respTimeout = params.BeaconConfig().RespTimeoutDuration()
|
||||
)
|
||||
|
||||
// rpcHandler is responsible for handling and responding to any incoming message.
|
||||
// This method may return an error to internal monitoring, but the error will
|
||||
// not be relayed to the peer.
|
||||
type rpcHandler func(context.Context, interface{}, libp2pcore.Stream) error
|
||||
|
||||
// registerRPCHandlers for p2p RPC.
|
||||
func (s *Service) registerRPCHandlers() {
|
||||
currEpoch := slots.ToEpoch(s.cfg.clock.CurrentSlot())
|
||||
// Register V2 handlers if we are past altair fork epoch.
|
||||
if currEpoch >= params.BeaconConfig().AltairForkEpoch {
|
||||
s.registerRPC(
|
||||
p2p.RPCStatusTopicV1,
|
||||
s.statusRPCHandler,
|
||||
)
|
||||
s.registerRPC(
|
||||
p2p.RPCGoodByeTopicV1,
|
||||
s.goodbyeRPCHandler,
|
||||
)
|
||||
s.registerRPC(
|
||||
p2p.RPCPingTopicV1,
|
||||
s.pingHandler,
|
||||
)
|
||||
s.registerRPCHandlersAltair()
|
||||
// rpcHandlerByTopicFromFork returns the RPC handlers for a given fork index.
|
||||
func (s *Service) rpcHandlerByTopicFromFork(forkIndex int) (map[string]rpcHandler, error) {
|
||||
switch forkIndex {
|
||||
// PhaseO: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#messages
|
||||
case version.Phase0:
|
||||
return map[string]rpcHandler{
|
||||
p2p.RPCStatusTopicV1: s.statusRPCHandler,
|
||||
p2p.RPCGoodByeTopicV1: s.goodbyeRPCHandler,
|
||||
p2p.RPCBlocksByRangeTopicV1: s.beaconBlocksByRangeRPCHandler,
|
||||
p2p.RPCBlocksByRootTopicV1: s.beaconBlocksRootRPCHandler,
|
||||
p2p.RPCPingTopicV1: s.pingHandler,
|
||||
p2p.RPCMetaDataTopicV1: s.metaDataHandler,
|
||||
}, nil
|
||||
|
||||
if coreTime.PeerDASIsActive(slots.UnsafeEpochStart(currEpoch)) {
|
||||
s.registerRPCHandlersPeerDAS()
|
||||
} else if currEpoch >= params.BeaconConfig().DenebForkEpoch {
|
||||
s.registerRPCHandlersDeneb()
|
||||
}
|
||||
return
|
||||
// Altair: https://github.com/ethereum/consensus-specs/tree/dev/specs/altair#messages
|
||||
// Bellatrix: https://github.com/ethereum/consensus-specs/tree/dev/specs/bellatrix#messages
|
||||
// Capella: https://github.com/ethereum/consensus-specs/tree/dev/specs/capella#messages
|
||||
case version.Altair, version.Bellatrix, version.Capella:
|
||||
return map[string]rpcHandler{
|
||||
p2p.RPCStatusTopicV1: s.statusRPCHandler,
|
||||
p2p.RPCGoodByeTopicV1: s.goodbyeRPCHandler,
|
||||
p2p.RPCBlocksByRangeTopicV2: s.beaconBlocksByRangeRPCHandler, // Modified in Altair
|
||||
p2p.RPCBlocksByRootTopicV2: s.beaconBlocksRootRPCHandler, // Modified in Altair
|
||||
p2p.RPCPingTopicV1: s.pingHandler,
|
||||
p2p.RPCMetaDataTopicV2: s.metaDataHandler, // Modified in Altair
|
||||
}, nil
|
||||
|
||||
// Deneb: https://github.com/ethereum/consensus-specs/blob/dev/specs/deneb/p2p-interface.md#messages
|
||||
case version.Deneb:
|
||||
return map[string]rpcHandler{
|
||||
p2p.RPCStatusTopicV1: s.statusRPCHandler,
|
||||
p2p.RPCGoodByeTopicV1: s.goodbyeRPCHandler,
|
||||
p2p.RPCBlocksByRangeTopicV2: s.beaconBlocksByRangeRPCHandler,
|
||||
p2p.RPCBlocksByRootTopicV2: s.beaconBlocksRootRPCHandler,
|
||||
p2p.RPCPingTopicV1: s.pingHandler,
|
||||
p2p.RPCMetaDataTopicV2: s.metaDataHandler,
|
||||
p2p.RPCBlobSidecarsByRootTopicV1: s.blobSidecarByRootRPCHandler, // Added in Deneb
|
||||
p2p.RPCBlobSidecarsByRangeTopicV1: s.blobSidecarsByRangeRPCHandler, // Added in Deneb
|
||||
}, nil
|
||||
|
||||
// Electra: https://github.com/ethereum/consensus-specs/blob/dev/specs/_features/eip7594/p2p-interface.md#messages
|
||||
case version.Electra:
|
||||
return map[string]rpcHandler{
|
||||
p2p.RPCStatusTopicV1: s.statusRPCHandler,
|
||||
p2p.RPCGoodByeTopicV1: s.goodbyeRPCHandler,
|
||||
p2p.RPCBlocksByRangeTopicV2: s.beaconBlocksByRangeRPCHandler,
|
||||
p2p.RPCBlocksByRootTopicV2: s.beaconBlocksRootRPCHandler,
|
||||
p2p.RPCPingTopicV1: s.pingHandler,
|
||||
p2p.RPCMetaDataTopicV3: s.metaDataHandler, // Modified in Electra
|
||||
p2p.RPCBlobSidecarsByRootTopicV1: s.blobSidecarByRootRPCHandler,
|
||||
p2p.RPCBlobSidecarsByRangeTopicV1: s.blobSidecarsByRangeRPCHandler,
|
||||
p2p.RPCDataColumnSidecarsByRootTopicV1: s.dataColumnSidecarByRootRPCHandler, // Added in Electra
|
||||
p2p.RPCDataColumnSidecarsByRangeTopicV1: s.dataColumnSidecarsByRangeRPCHandler, // Added in Electra
|
||||
}, nil
|
||||
|
||||
default:
|
||||
return nil, errors.Errorf("RPC handler not found for fork index %d", forkIndex)
|
||||
}
|
||||
s.registerRPC(
|
||||
p2p.RPCStatusTopicV1,
|
||||
s.statusRPCHandler,
|
||||
)
|
||||
s.registerRPC(
|
||||
p2p.RPCGoodByeTopicV1,
|
||||
s.goodbyeRPCHandler,
|
||||
)
|
||||
s.registerRPC(
|
||||
p2p.RPCBlocksByRangeTopicV1,
|
||||
s.beaconBlocksByRangeRPCHandler,
|
||||
)
|
||||
s.registerRPC(
|
||||
p2p.RPCBlocksByRootTopicV1,
|
||||
s.beaconBlocksRootRPCHandler,
|
||||
)
|
||||
s.registerRPC(
|
||||
p2p.RPCPingTopicV1,
|
||||
s.pingHandler,
|
||||
)
|
||||
s.registerRPC(
|
||||
p2p.RPCMetaDataTopicV1,
|
||||
s.metaDataHandler,
|
||||
)
|
||||
}
|
||||
|
||||
// registerRPCHandlers for altair.
|
||||
func (s *Service) registerRPCHandlersAltair() {
|
||||
s.registerRPC(
|
||||
p2p.RPCBlocksByRangeTopicV2,
|
||||
s.beaconBlocksByRangeRPCHandler,
|
||||
)
|
||||
s.registerRPC(
|
||||
p2p.RPCBlocksByRootTopicV2,
|
||||
s.beaconBlocksRootRPCHandler,
|
||||
)
|
||||
s.registerRPC(
|
||||
p2p.RPCMetaDataTopicV2,
|
||||
s.metaDataHandler,
|
||||
)
|
||||
// rpcHandlerByTopic returns the RPC handlers for a given epoch.
|
||||
func (s *Service) rpcHandlerByTopicFromEpoch(epoch primitives.Epoch) (map[string]rpcHandler, error) {
|
||||
// Get the beacon config.
|
||||
beaconConfig := params.BeaconConfig()
|
||||
|
||||
if epoch >= beaconConfig.ElectraForkEpoch {
|
||||
return s.rpcHandlerByTopicFromFork(version.Electra)
|
||||
}
|
||||
|
||||
if epoch >= beaconConfig.DenebForkEpoch {
|
||||
return s.rpcHandlerByTopicFromFork(version.Deneb)
|
||||
}
|
||||
|
||||
if epoch >= beaconConfig.CapellaForkEpoch {
|
||||
return s.rpcHandlerByTopicFromFork(version.Capella)
|
||||
}
|
||||
|
||||
if epoch >= beaconConfig.BellatrixForkEpoch {
|
||||
return s.rpcHandlerByTopicFromFork(version.Bellatrix)
|
||||
}
|
||||
|
||||
if epoch >= beaconConfig.AltairForkEpoch {
|
||||
return s.rpcHandlerByTopicFromFork(version.Altair)
|
||||
}
|
||||
|
||||
return s.rpcHandlerByTopicFromFork(version.Phase0)
|
||||
}
|
||||
|
||||
func (s *Service) registerRPCHandlersDeneb() {
|
||||
s.registerRPC(
|
||||
p2p.RPCBlobSidecarsByRangeTopicV1,
|
||||
s.blobSidecarsByRangeRPCHandler,
|
||||
)
|
||||
s.registerRPC(
|
||||
p2p.RPCBlobSidecarsByRootTopicV1,
|
||||
s.blobSidecarByRootRPCHandler,
|
||||
)
|
||||
// addedRPCHandlerByTopic returns the RPC handlers that are added in the new map that are not present in the old map.
|
||||
func addedRPCHandlerByTopic(previous, next map[string]rpcHandler) map[string]rpcHandler {
|
||||
added := make(map[string]rpcHandler)
|
||||
|
||||
for topic, handler := range next {
|
||||
if _, ok := previous[topic]; !ok {
|
||||
added[topic] = handler
|
||||
}
|
||||
}
|
||||
|
||||
return added
|
||||
}
|
||||
|
||||
func (s *Service) registerRPCHandlersPeerDAS() {
|
||||
s.registerRPC(
|
||||
p2p.RPCDataColumnSidecarsByRootTopicV1,
|
||||
s.dataColumnSidecarByRootRPCHandler,
|
||||
)
|
||||
s.registerRPC(
|
||||
p2p.RPCDataColumnSidecarsByRangeTopicV1,
|
||||
s.dataColumnSidecarsByRangeRPCHandler,
|
||||
)
|
||||
s.registerRPC(
|
||||
p2p.RPCMetaDataTopicV3,
|
||||
s.metaDataHandler,
|
||||
)
|
||||
// removedTopics returns the topics that are removed in the new map that are not present in the old map.
|
||||
func removedRPCTopics(previous, next map[string]rpcHandler) map[string]bool {
|
||||
removed := make(map[string]bool)
|
||||
|
||||
for topic := range previous {
|
||||
if _, ok := next[topic]; !ok {
|
||||
removed[topic] = true
|
||||
}
|
||||
}
|
||||
|
||||
return removed
|
||||
}
|
||||
|
||||
// Remove all v1 Stream handlers that are no longer supported
|
||||
// from altair onwards.
|
||||
func (s *Service) unregisterPhase0Handlers() {
|
||||
fullBlockRangeTopic := p2p.RPCBlocksByRangeTopicV1 + s.cfg.p2p.Encoding().ProtocolSuffix()
|
||||
fullBlockRootTopic := p2p.RPCBlocksByRootTopicV1 + s.cfg.p2p.Encoding().ProtocolSuffix()
|
||||
fullMetadataTopic := p2p.RPCMetaDataTopicV1 + s.cfg.p2p.Encoding().ProtocolSuffix()
|
||||
// registerRPCHandlers for p2p RPC.
|
||||
func (s *Service) registerRPCHandlers() error {
|
||||
// Get the current epoch.
|
||||
currentSlot := s.cfg.clock.CurrentSlot()
|
||||
currentEpoch := slots.ToEpoch(currentSlot)
|
||||
|
||||
s.cfg.p2p.Host().RemoveStreamHandler(protocol.ID(fullBlockRangeTopic))
|
||||
s.cfg.p2p.Host().RemoveStreamHandler(protocol.ID(fullBlockRootTopic))
|
||||
s.cfg.p2p.Host().RemoveStreamHandler(protocol.ID(fullMetadataTopic))
|
||||
}
|
||||
// Get the RPC handlers for the current epoch.
|
||||
handlerByTopic, err := s.rpcHandlerByTopicFromEpoch(currentEpoch)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "rpc handler by topic from epoch")
|
||||
}
|
||||
|
||||
func (s *Service) unregisterBlobHandlers() {
|
||||
fullBlobRangeTopic := p2p.RPCBlobSidecarsByRangeTopicV1 + s.cfg.p2p.Encoding().ProtocolSuffix()
|
||||
fullBlobRootTopic := p2p.RPCBlobSidecarsByRootTopicV1 + s.cfg.p2p.Encoding().ProtocolSuffix()
|
||||
// Register the RPC handlers for the current epoch.
|
||||
for topic, handler := range handlerByTopic {
|
||||
s.registerRPC(topic, handler)
|
||||
}
|
||||
|
||||
s.cfg.p2p.Host().RemoveStreamHandler(protocol.ID(fullBlobRangeTopic))
|
||||
s.cfg.p2p.Host().RemoveStreamHandler(protocol.ID(fullBlobRootTopic))
|
||||
return nil
|
||||
}
|
||||
|
||||
// registerRPC for a given topic with an expected protobuf message type.
|
||||
|
||||
@@ -209,7 +209,7 @@ func DataColumnsRPCMinValidSlot(current primitives.Slot) (primitives.Slot, error
|
||||
}
|
||||
minReqEpochs := params.BeaconConfig().MinEpochsForDataColumnSidecarsRequest
|
||||
currEpoch := slots.ToEpoch(current)
|
||||
minStart := params.BeaconConfig().Eip7594ForkEpoch
|
||||
minStart := params.BeaconConfig().ElectraForkEpoch
|
||||
if currEpoch > minReqEpochs && currEpoch-minReqEpochs > minStart {
|
||||
minStart = currEpoch - minReqEpochs
|
||||
}
|
||||
|
||||
@@ -70,7 +70,7 @@ func (s *Service) metaDataHandler(_ context.Context, _ interface{}, stream libp2
|
||||
switch streamVersion {
|
||||
case p2p.SchemaVersionV1:
|
||||
switch metadataVersion {
|
||||
case version.Altair, version.Deneb:
|
||||
case version.Altair, version.Electra:
|
||||
metadata = wrapper.WrappedMetadataV0(
|
||||
&pb.MetaDataV0{
|
||||
Attnets: metadata.AttnetsBitfield(),
|
||||
@@ -87,7 +87,7 @@ func (s *Service) metaDataHandler(_ context.Context, _ interface{}, stream libp2
|
||||
SeqNumber: metadata.SequenceNumber(),
|
||||
Syncnets: bitfield.Bitvector4{byte(0x00)},
|
||||
})
|
||||
case version.Deneb:
|
||||
case version.Electra:
|
||||
metadata = wrapper.WrappedMetadataV1(
|
||||
&pb.MetaDataV1{
|
||||
Attnets: metadata.AttnetsBitfield(),
|
||||
@@ -190,7 +190,7 @@ func (s *Service) sendMetaDataRequest(ctx context.Context, peerID peer.ID) (meta
|
||||
topicVersion = p2p.SchemaVersionV1
|
||||
case version.Altair:
|
||||
topicVersion = p2p.SchemaVersionV2
|
||||
case version.Deneb:
|
||||
case version.Electra:
|
||||
topicVersion = p2p.SchemaVersionV3
|
||||
}
|
||||
|
||||
|
||||
@@ -101,8 +101,7 @@ func TestMetadataRPCHandler_SendMetadataRequest(t *testing.T) {
|
||||
params.SetupTestConfigCleanup(t)
|
||||
beaconChainConfig := params.BeaconConfig().Copy()
|
||||
beaconChainConfig.AltairForkEpoch = 5
|
||||
beaconChainConfig.DenebForkEpoch = 10
|
||||
beaconChainConfig.Eip7594ForkEpoch = 10
|
||||
beaconChainConfig.ElectraForkEpoch = 15
|
||||
params.OverrideBeaconConfig(beaconChainConfig)
|
||||
params.BeaconConfig().InitializeForkSchedule()
|
||||
|
||||
@@ -145,10 +144,10 @@ func TestMetadataRPCHandler_SendMetadataRequest(t *testing.T) {
|
||||
}),
|
||||
},
|
||||
{
|
||||
name: "Phase0-PeerDAS",
|
||||
name: "Phase0-Electra",
|
||||
topic: p2p.RPCMetaDataTopicV1,
|
||||
epochsSinceGenesisPeer1: 0,
|
||||
epochsSinceGenesisPeer2: 10,
|
||||
epochsSinceGenesisPeer2: 15,
|
||||
metadataPeer2: wrapper.WrappedMetadataV2(&pb.MetaDataV2{
|
||||
SeqNumber: seqNumber,
|
||||
Attnets: attnets,
|
||||
@@ -192,10 +191,10 @@ func TestMetadataRPCHandler_SendMetadataRequest(t *testing.T) {
|
||||
}),
|
||||
},
|
||||
{
|
||||
name: "Altair-PeerDAS",
|
||||
name: "Altair-Electra",
|
||||
topic: p2p.RPCMetaDataTopicV2,
|
||||
epochsSinceGenesisPeer1: 5,
|
||||
epochsSinceGenesisPeer2: 10,
|
||||
epochsSinceGenesisPeer2: 15,
|
||||
metadataPeer2: wrapper.WrappedMetadataV2(&pb.MetaDataV2{
|
||||
SeqNumber: seqNumber,
|
||||
Attnets: attnets,
|
||||
@@ -209,9 +208,9 @@ func TestMetadataRPCHandler_SendMetadataRequest(t *testing.T) {
|
||||
}),
|
||||
},
|
||||
{
|
||||
name: "PeerDAS-Phase0",
|
||||
name: "Electra-Phase0",
|
||||
topic: p2p.RPCMetaDataTopicV3,
|
||||
epochsSinceGenesisPeer1: 10,
|
||||
epochsSinceGenesisPeer1: 15,
|
||||
epochsSinceGenesisPeer2: 0,
|
||||
metadataPeer2: wrapper.WrappedMetadataV0(&pb.MetaDataV0{
|
||||
SeqNumber: seqNumber,
|
||||
@@ -225,9 +224,9 @@ func TestMetadataRPCHandler_SendMetadataRequest(t *testing.T) {
|
||||
}),
|
||||
},
|
||||
{
|
||||
name: "PeerDAS-Altail",
|
||||
name: "Electra-Altair",
|
||||
topic: p2p.RPCMetaDataTopicV3,
|
||||
epochsSinceGenesisPeer1: 10,
|
||||
epochsSinceGenesisPeer1: 15,
|
||||
epochsSinceGenesisPeer2: 5,
|
||||
metadataPeer2: wrapper.WrappedMetadataV1(&pb.MetaDataV1{
|
||||
SeqNumber: seqNumber,
|
||||
@@ -242,10 +241,10 @@ func TestMetadataRPCHandler_SendMetadataRequest(t *testing.T) {
|
||||
}),
|
||||
},
|
||||
{
|
||||
name: "PeerDAS-PeerDAS",
|
||||
name: "Electra-Electra",
|
||||
topic: p2p.RPCMetaDataTopicV3,
|
||||
epochsSinceGenesisPeer1: 10,
|
||||
epochsSinceGenesisPeer2: 10,
|
||||
epochsSinceGenesisPeer1: 15,
|
||||
epochsSinceGenesisPeer2: 15,
|
||||
metadataPeer2: wrapper.WrappedMetadataV2(&pb.MetaDataV2{
|
||||
SeqNumber: seqNumber,
|
||||
Attnets: attnets,
|
||||
@@ -262,53 +261,55 @@ func TestMetadataRPCHandler_SendMetadataRequest(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
var wg sync.WaitGroup
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
ctx := context.Background()
|
||||
ctx := context.Background()
|
||||
|
||||
// Setup and connect peers.
|
||||
peer1, peer2 := p2ptest.NewTestP2P(t), p2ptest.NewTestP2P(t)
|
||||
peer1.Connect(peer2)
|
||||
// Setup and connect peers.
|
||||
peer1, peer2 := p2ptest.NewTestP2P(t), p2ptest.NewTestP2P(t)
|
||||
peer1.Connect(peer2)
|
||||
|
||||
// Ensure the peers are connected.
|
||||
peersCount := len(peer1.BHost.Network().Peers())
|
||||
assert.Equal(t, 1, peersCount, "Expected peers to be connected")
|
||||
// Ensure the peers are connected.
|
||||
peersCount := len(peer1.BHost.Network().Peers())
|
||||
require.Equal(t, 1, peersCount, "Expected peers to be connected")
|
||||
|
||||
// Setup sync services.
|
||||
genesisPeer1 := time.Now().Add(-time.Duration(tc.epochsSinceGenesisPeer1) * secondsPerEpoch)
|
||||
genesisPeer2 := time.Now().Add(-time.Duration(tc.epochsSinceGenesisPeer2) * secondsPerEpoch)
|
||||
// Setup sync services.
|
||||
genesisPeer1 := time.Now().Add(-time.Duration(tc.epochsSinceGenesisPeer1) * secondsPerEpoch)
|
||||
genesisPeer2 := time.Now().Add(-time.Duration(tc.epochsSinceGenesisPeer2) * secondsPerEpoch)
|
||||
|
||||
chainPeer1 := &mock.ChainService{Genesis: genesisPeer1, ValidatorsRoot: [32]byte{}}
|
||||
chainPeer2 := &mock.ChainService{Genesis: genesisPeer2, ValidatorsRoot: [32]byte{}}
|
||||
chainPeer1 := &mock.ChainService{Genesis: genesisPeer1, ValidatorsRoot: [32]byte{}}
|
||||
chainPeer2 := &mock.ChainService{Genesis: genesisPeer2, ValidatorsRoot: [32]byte{}}
|
||||
|
||||
servicePeer1 := createService(peer1, chainPeer1)
|
||||
servicePeer2 := createService(peer2, chainPeer2)
|
||||
servicePeer1 := createService(peer1, chainPeer1)
|
||||
servicePeer2 := createService(peer2, chainPeer2)
|
||||
|
||||
// Define the behavior of peer2 when receiving a METADATA request.
|
||||
protocolSuffix := servicePeer2.cfg.p2p.Encoding().ProtocolSuffix()
|
||||
protocolID := protocol.ID(tc.topic + protocolSuffix)
|
||||
peer2.LocalMetadata = tc.metadataPeer2
|
||||
// Define the behavior of peer2 when receiving a METADATA request.
|
||||
protocolSuffix := servicePeer2.cfg.p2p.Encoding().ProtocolSuffix()
|
||||
protocolID := protocol.ID(tc.topic + protocolSuffix)
|
||||
peer2.LocalMetadata = tc.metadataPeer2
|
||||
|
||||
wg.Add(1)
|
||||
peer2.BHost.SetStreamHandler(protocolID, func(stream network.Stream) {
|
||||
defer wg.Done()
|
||||
err := servicePeer2.metaDataHandler(ctx, new(interface{}), stream)
|
||||
assert.NoError(t, err)
|
||||
wg.Add(1)
|
||||
peer2.BHost.SetStreamHandler(protocolID, func(stream network.Stream) {
|
||||
defer wg.Done()
|
||||
err := servicePeer2.metaDataHandler(ctx, new(interface{}), stream)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
// Send a METADATA request from peer1 to peer2.
|
||||
actual, err := servicePeer1.sendMetaDataRequest(ctx, peer2.BHost.ID())
|
||||
require.NoError(t, err)
|
||||
|
||||
// Wait until the METADATA request is received by peer2 or timeout.
|
||||
timeOutReached := util.WaitTimeout(&wg, requestTimeout)
|
||||
require.Equal(t, false, timeOutReached, "Did not receive METADATA request within timeout")
|
||||
|
||||
// Compare the received METADATA object with the expected METADATA object.
|
||||
require.DeepSSZEqual(t, tc.expected.InnerObject(), actual.InnerObject(), "Metadata unequal")
|
||||
|
||||
// Ensure the peers are still connected.
|
||||
peersCount = len(peer1.BHost.Network().Peers())
|
||||
assert.Equal(t, 1, peersCount, "Expected peers to be connected")
|
||||
})
|
||||
|
||||
// Send a METADATA request from peer1 to peer2.
|
||||
actual, err := servicePeer1.sendMetaDataRequest(ctx, peer2.BHost.ID())
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Wait until the METADATA request is received by peer2 or timeout.
|
||||
timeOutReached := util.WaitTimeout(&wg, requestTimeout)
|
||||
require.Equal(t, false, timeOutReached, "Did not receive METADATA request within timeout")
|
||||
|
||||
// Compare the received METADATA object with the expected METADATA object.
|
||||
require.DeepSSZEqual(t, tc.expected.InnerObject(), actual.InnerObject(), "Metadata unequal")
|
||||
|
||||
// Ensure the peers are still connected.
|
||||
peersCount = len(peer1.BHost.Network().Peers())
|
||||
assert.Equal(t, 1, peersCount, "Expected peers to be connected")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -438,15 +438,18 @@ func readChunkedDataColumnSideCar(
|
||||
}
|
||||
|
||||
// Check if the fork digest is recognized.
|
||||
v, ok := ctxMap[bytesutil.ToBytes4(ctxBytes)]
|
||||
msgVersion, ok := ctxMap[bytesutil.ToBytes4(ctxBytes)]
|
||||
if !ok {
|
||||
return nil, errors.Errorf("unrecognized fork digest %#x", ctxBytes)
|
||||
}
|
||||
|
||||
// Check if we are on debeb.
|
||||
// Only deneb is supported at this time, because we lack a fork-spanning interface/union type for blobs.
|
||||
if v != version.Deneb {
|
||||
return nil, errors.Errorf("unexpected context bytes for deneb DataColumnSidecar, ctx=%#x, v=%v", ctxBytes, v)
|
||||
if msgVersion < version.Electra {
|
||||
return nil, errors.Errorf(
|
||||
"unexpected context bytes for DataColumnSidecar, ctx=%#x, msgVersion=%v, minimalSupportedVersion=%v",
|
||||
ctxBytes, version.String(msgVersion), version.String(version.Electra),
|
||||
)
|
||||
}
|
||||
|
||||
// Decode the data column sidecar from the stream.
|
||||
|
||||
@@ -327,14 +327,21 @@ func (s *Service) waitForChainStart() {
|
||||
|
||||
ctxMap, err := ContextByteVersionsForValRoot(clock.GenesisValidatorsRoot())
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("genesisValidatorRoot", clock.GenesisValidatorsRoot()).
|
||||
log.
|
||||
WithError(err).
|
||||
WithField("genesisValidatorRoot", clock.GenesisValidatorsRoot()).
|
||||
Error("sync service failed to initialize context version map")
|
||||
return
|
||||
}
|
||||
s.ctxMap = ctxMap
|
||||
|
||||
// Register respective rpc handlers at state initialized event.
|
||||
s.registerRPCHandlers()
|
||||
err = s.registerRPCHandlers()
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not register rpc handlers")
|
||||
return
|
||||
}
|
||||
|
||||
// Wait for chainstart in separate routine.
|
||||
if startTime.After(prysmTime.Now()) {
|
||||
time.Sleep(prysmTime.Until(startTime))
|
||||
|
||||
@@ -16,7 +16,6 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/cache"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/altair"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
|
||||
coreTime "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers"
|
||||
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
|
||||
@@ -119,7 +118,7 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
|
||||
s.attesterSubnetIndices,
|
||||
)
|
||||
// Altair Fork Version
|
||||
if epoch >= params.BeaconConfig().AltairForkEpoch {
|
||||
if params.BeaconConfig().AltairForkEpoch <= epoch {
|
||||
s.subscribe(
|
||||
p2p.SyncContributionAndProofSubnetTopicFormat,
|
||||
s.validateSyncContributionAndProof,
|
||||
@@ -137,7 +136,7 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
|
||||
}
|
||||
|
||||
// New Gossip Topic in Capella
|
||||
if epoch >= params.BeaconConfig().CapellaForkEpoch {
|
||||
if params.BeaconConfig().CapellaForkEpoch <= epoch {
|
||||
s.subscribe(
|
||||
p2p.BlsToExecutionChangeSubnetTopicFormat,
|
||||
s.validateBlsToExecutionChange,
|
||||
@@ -146,22 +145,8 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
|
||||
)
|
||||
}
|
||||
|
||||
// New Gossip Topic in Deneb
|
||||
if epoch >= params.BeaconConfig().DenebForkEpoch {
|
||||
if coreTime.PeerDASIsActive(slots.UnsafeEpochStart(epoch)) {
|
||||
s.subscribeWithParameters(
|
||||
p2p.DataColumnSubnetTopicFormat,
|
||||
s.validateDataColumn,
|
||||
s.dataColumnSubscriber,
|
||||
digest,
|
||||
s.dataColumnSubnetIndices,
|
||||
// TODO: Should we find peers always? When validators are managed? When validators are managed AND when we are going to propose a block?
|
||||
func(currentSlot primitives.Slot) []uint64 { return []uint64{} },
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// New Gossip Topic in Deneb, removed in Electra
|
||||
if params.BeaconConfig().DenebForkEpoch <= epoch && epoch < params.BeaconConfig().ElectraForkEpoch {
|
||||
s.subscribeWithParameters(
|
||||
p2p.BlobSubnetTopicFormat,
|
||||
s.validateBlob,
|
||||
@@ -171,6 +156,19 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
|
||||
func(currentSlot primitives.Slot) []uint64 { return []uint64{} },
|
||||
)
|
||||
}
|
||||
|
||||
// New Gossip Topic in Electra
|
||||
if params.BeaconConfig().ElectraForkEpoch <= epoch {
|
||||
s.subscribeWithParameters(
|
||||
p2p.DataColumnSubnetTopicFormat,
|
||||
s.validateDataColumn,
|
||||
s.dataColumnSubscriber,
|
||||
digest,
|
||||
s.dataColumnSubnetIndices,
|
||||
// TODO: Should we find peers always? When validators are managed? When validators are managed AND when we are going to propose a block?
|
||||
func(currentSlot primitives.Slot) []uint64 { return []uint64{} },
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// subscribe to a given topic with a given validator and subscription handler.
|
||||
|
||||
@@ -166,7 +166,6 @@ type BeaconChainConfig struct {
|
||||
DenebForkEpoch primitives.Epoch `yaml:"DENEB_FORK_EPOCH" spec:"true"` // DenebForkEpoch is used to represent the assigned fork epoch for deneb.
|
||||
ElectraForkVersion []byte `yaml:"ELECTRA_FORK_VERSION" spec:"true"` // ElectraForkVersion is used to represent the fork version for electra.
|
||||
ElectraForkEpoch primitives.Epoch `yaml:"ELECTRA_FORK_EPOCH" spec:"true"` // ElectraForkEpoch is used to represent the assigned fork epoch for electra.
|
||||
Eip7594ForkEpoch primitives.Epoch `yaml:"EIP7594_FORK_EPOCH" spec:"true"` // EIP7594ForkEpoch is used to represent the assigned fork epoch for peer das.
|
||||
|
||||
ForkVersionSchedule map[[fieldparams.VersionLength]byte]primitives.Epoch // Schedule of fork epochs by version.
|
||||
ForkVersionNames map[[fieldparams.VersionLength]byte]string // Human-readable names of fork versions.
|
||||
@@ -367,7 +366,7 @@ func DenebEnabled() bool {
|
||||
// PeerDASEnabled centralizes the check to determine if code paths
|
||||
// that are specific to peerdas should be allowed to execute.
|
||||
func PeerDASEnabled() bool {
|
||||
return BeaconConfig().Eip7594ForkEpoch < math.MaxUint64
|
||||
return BeaconConfig().ElectraForkEpoch < math.MaxUint64
|
||||
}
|
||||
|
||||
// WithinDAPeriod checks if the block epoch is within MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS of the given current epoch.
|
||||
|
||||
@@ -217,7 +217,6 @@ func ConfigToYaml(cfg *BeaconChainConfig) []byte {
|
||||
fmt.Sprintf("DENEB_FORK_VERSION: %#x", cfg.DenebForkVersion),
|
||||
fmt.Sprintf("ELECTRA_FORK_EPOCH: %d", cfg.ElectraForkEpoch),
|
||||
fmt.Sprintf("ELECTRA_FORK_VERSION: %#x", cfg.ElectraForkVersion),
|
||||
fmt.Sprintf("EIP7594_FORK_EPOCH: %d", cfg.Eip7594ForkEpoch),
|
||||
fmt.Sprintf("EPOCHS_PER_SUBNET_SUBSCRIPTION: %d", cfg.EpochsPerSubnetSubscription),
|
||||
fmt.Sprintf("ATTESTATION_SUBNET_EXTRA_BITS: %d", cfg.AttestationSubnetExtraBits),
|
||||
fmt.Sprintf("ATTESTATION_SUBNET_PREFIX_BITS: %d", cfg.AttestationSubnetPrefixBits),
|
||||
|
||||
@@ -30,6 +30,7 @@ var placeholderFields = []string{
|
||||
"EIP6110_FORK_VERSION",
|
||||
"EIP7002_FORK_EPOCH",
|
||||
"EIP7002_FORK_VERSION",
|
||||
"EIP7594_FORK_EPOCH",
|
||||
"EIP7594_FORK_VERSION",
|
||||
"EIP7732_FORK_EPOCH",
|
||||
"EIP7732_FORK_VERSION",
|
||||
|
||||
@@ -217,7 +217,6 @@ var mainnetBeaconConfig = &BeaconChainConfig{
|
||||
DenebForkEpoch: mainnetDenebForkEpoch,
|
||||
ElectraForkVersion: []byte{5, 0, 0, 0},
|
||||
ElectraForkEpoch: mainnetElectraForkEpoch,
|
||||
Eip7594ForkEpoch: math.MaxUint64,
|
||||
|
||||
// New values introduced in Altair hard fork 1.
|
||||
// Participation flag indices.
|
||||
|
||||
@@ -96,7 +96,6 @@ func MinimalSpecConfig() *BeaconChainConfig {
|
||||
minimalConfig.DenebForkEpoch = math.MaxUint64
|
||||
minimalConfig.ElectraForkVersion = []byte{5, 0, 0, 1}
|
||||
minimalConfig.ElectraForkEpoch = math.MaxUint64
|
||||
minimalConfig.Eip7594ForkEpoch = math.MaxUint64
|
||||
|
||||
minimalConfig.SyncCommitteeSize = 32
|
||||
minimalConfig.InactivityScoreBias = 4
|
||||
|
||||
@@ -44,7 +44,6 @@ func E2ETestConfig() *BeaconChainConfig {
|
||||
e2eConfig.CapellaForkEpoch = CapellaE2EForkEpoch
|
||||
e2eConfig.DenebForkEpoch = DenebE2EForkEpoch
|
||||
e2eConfig.ElectraForkEpoch = ElectraE2EForkEpoch
|
||||
e2eConfig.Eip7594ForkEpoch = DenebE2EForkEpoch
|
||||
|
||||
// Terminal Total Difficulty.
|
||||
e2eConfig.TerminalTotalDifficulty = "480"
|
||||
@@ -89,7 +88,6 @@ func E2EMainnetTestConfig() *BeaconChainConfig {
|
||||
e2eConfig.CapellaForkEpoch = CapellaE2EForkEpoch
|
||||
e2eConfig.DenebForkEpoch = DenebE2EForkEpoch
|
||||
e2eConfig.ElectraForkEpoch = ElectraE2EForkEpoch
|
||||
e2eConfig.Eip7594ForkEpoch = DenebE2EForkEpoch
|
||||
|
||||
// Terminal Total Difficulty.
|
||||
e2eConfig.TerminalTotalDifficulty = "480"
|
||||
|
||||
@@ -40,7 +40,6 @@ func HoleskyConfig() *BeaconChainConfig {
|
||||
cfg.DenebForkEpoch = 29696
|
||||
cfg.DenebForkVersion = []byte{0x05, 0x1, 0x70, 0x0}
|
||||
cfg.ElectraForkEpoch = math.MaxUint64
|
||||
cfg.Eip7594ForkEpoch = math.MaxUint64
|
||||
cfg.ElectraForkVersion = []byte{0x06, 0x1, 0x70, 0x0} // TODO: Define holesky fork version for electra. This is a placeholder value.
|
||||
cfg.TerminalTotalDifficulty = "0"
|
||||
cfg.DepositContractAddress = "0x4242424242424242424242424242424242424242"
|
||||
|
||||
@@ -46,7 +46,6 @@ func SepoliaConfig() *BeaconChainConfig {
|
||||
cfg.DenebForkVersion = []byte{0x90, 0x00, 0x00, 0x73}
|
||||
cfg.ElectraForkEpoch = math.MaxUint64
|
||||
cfg.ElectraForkVersion = []byte{0x90, 0x00, 0x00, 0x74} // TODO: Define sepolia fork version for electra. This is a placeholder value.
|
||||
cfg.Eip7594ForkEpoch = math.MaxUint64
|
||||
cfg.TerminalTotalDifficulty = "17000000000000000"
|
||||
cfg.DepositContractAddress = "0x7f02C3E3c98b133055B8B348B2Ac625669Ed295D"
|
||||
cfg.InitializeForkSchedule()
|
||||
|
||||
@@ -287,5 +287,5 @@ func (m MetadataV2) MetadataObjV2() *pb.MetaDataV2 {
|
||||
|
||||
// Version returns the fork version of the underlying object.
|
||||
func (MetadataV2) Version() int {
|
||||
return version.Deneb
|
||||
return version.Electra
|
||||
}
|
||||
|
||||
@@ -24,11 +24,9 @@ func InitForkCfg(start, end int, c *params.BeaconChainConfig) *params.BeaconChai
|
||||
}
|
||||
if start >= version.Deneb {
|
||||
c.DenebForkEpoch = 0
|
||||
c.Eip7594ForkEpoch = 0
|
||||
}
|
||||
if end < version.Deneb {
|
||||
c.DenebForkEpoch = math.MaxUint64
|
||||
c.Eip7594ForkEpoch = math.MaxUint64
|
||||
}
|
||||
if end < version.Capella {
|
||||
c.CapellaForkEpoch = math.MaxUint64
|
||||
|
||||
Reference in New Issue
Block a user