PeerDAS: Implement P2P (#15347)

* PeerDAS: Implement P2P.

* Fix Terence's comment.

* Fix Terence's comment.

* Fix Terence's comment.

* Fix Preston's comment.

* Fix Preston's comment.

* `TopicFromMessage`: Exit early.

* Fix Preston's comment.

* `TestService_BroadcastDataColumn`: Avoid ugly sleep.

* Fix Kasey's comment.

* Fix Kasey's comment.

* Fix Kasey's comment.

* Fix Kasey's comment.
This commit is contained in:
Manu NALEPA
2025-05-28 17:23:19 +02:00
committed by GitHub
parent 2c09bc65a4
commit 3a3bd3902c
34 changed files with 1301 additions and 108 deletions

View File

@@ -20,8 +20,10 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/operations/attestations"
"github.com/OffchainLabs/prysm/v6/beacon-chain/operations/blstoexec"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
p2pTesting "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
"github.com/OffchainLabs/prysm/v6/beacon-chain/state/stategen"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v6/testing/require"
@@ -47,6 +49,11 @@ type mockBroadcaster struct {
broadcastCalled bool
}
type mockAccesser struct {
mockBroadcaster
p2pTesting.MockPeerManager
}
func (mb *mockBroadcaster) Broadcast(_ context.Context, _ proto.Message) error {
mb.broadcastCalled = true
return nil
@@ -77,6 +84,11 @@ func (mb *mockBroadcaster) BroadcastLightClientFinalityUpdate(_ context.Context,
return nil
}
func (mb *mockBroadcaster) BroadcastDataColumn(_ [fieldparams.RootLength]byte, _ uint64, _ *ethpb.DataColumnSidecar, _ ...chan<- bool) error {
mb.broadcastCalled = true
return nil
}
func (mb *mockBroadcaster) BroadcastBLSChanges(_ context.Context, _ []*ethpb.SignedBLSToExecutionChange) {
}
@@ -134,6 +146,7 @@ func minimalTestService(t *testing.T, opts ...Option) (*Service, *testServiceReq
WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)),
WithSyncChecker(mock.MockChecker{}),
WithExecutionEngineCaller(&mockExecution.EngineClient{}),
WithP2PBroadcaster(&mockAccesser{}),
WithLightClientStore(&lightclient.Store{}),
}
// append the variadic opts so they override the defaults by being processed afterwards

View File

@@ -7,6 +7,7 @@ go_library(
"broadcaster.go",
"config.go",
"connection_gater.go",
"custody.go",
"dial_relay_node.go",
"discovery.go",
"doc.go",
@@ -45,6 +46,7 @@ go_library(
"//beacon-chain/core/altair:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/core/time:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
@@ -118,6 +120,7 @@ go_test(
"addr_factory_test.go",
"broadcaster_test.go",
"connection_gater_test.go",
"custody_test.go",
"dial_relay_node_test.go",
"discovery_test.go",
"fork_test.go",
@@ -139,10 +142,12 @@ go_test(
flaky = True,
tags = ["requires-network"],
deps = [
"//beacon-chain/blockchain/kzg:go_default_library",
"//beacon-chain/blockchain/testing:go_default_library",
"//beacon-chain/cache:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/light-client:go_default_library",
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/db/testing:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
@@ -166,6 +171,7 @@ go_test(
"//network/forks:go_default_library",
"//proto/eth/v1:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//proto/prysm/v1alpha1/metadata:go_default_library",
"//proto/testing:go_default_library",
"//runtime/version:go_default_library",
"//testing/assert:go_default_library",

View File

@@ -9,6 +9,7 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/altair"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v6/crypto/hash"
@@ -98,7 +99,7 @@ func (s *Service) BroadcastSyncCommitteeMessage(ctx context.Context, subnet uint
return nil
}
func (s *Service) internalBroadcastAttestation(ctx context.Context, subnet uint64, att ethpb.Att, forkDigest [4]byte) {
func (s *Service) internalBroadcastAttestation(ctx context.Context, subnet uint64, att ethpb.Att, forkDigest [fieldparams.VersionLength]byte) {
_, span := trace.StartSpan(ctx, "p2p.internalBroadcastAttestation")
defer span.End()
ctx = trace.NewContext(context.Background(), span) // clear parent context / deadline.
@@ -154,7 +155,7 @@ func (s *Service) internalBroadcastAttestation(ctx context.Context, subnet uint6
}
}
func (s *Service) broadcastSyncCommittee(ctx context.Context, subnet uint64, sMsg *ethpb.SyncCommitteeMessage, forkDigest [4]byte) {
func (s *Service) broadcastSyncCommittee(ctx context.Context, subnet uint64, sMsg *ethpb.SyncCommitteeMessage, forkDigest [fieldparams.VersionLength]byte) {
_, span := trace.StartSpan(ctx, "p2p.broadcastSyncCommittee")
defer span.End()
ctx = trace.NewContext(context.Background(), span) // clear parent context / deadline.
@@ -230,7 +231,7 @@ func (s *Service) BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.
return nil
}
func (s *Service) internalBroadcastBlob(ctx context.Context, subnet uint64, blobSidecar *ethpb.BlobSidecar, forkDigest [4]byte) {
func (s *Service) internalBroadcastBlob(ctx context.Context, subnet uint64, blobSidecar *ethpb.BlobSidecar, forkDigest [fieldparams.VersionLength]byte) {
_, span := trace.StartSpan(ctx, "p2p.internalBroadcastBlob")
defer span.End()
ctx = trace.NewContext(context.Background(), span) // clear parent context / deadline.
@@ -245,7 +246,7 @@ func (s *Service) internalBroadcastBlob(ctx context.Context, subnet uint64, blob
s.subnetLocker(wrappedSubIdx).RUnlock()
if !hasPeer {
blobSidecarCommitteeBroadcastAttempts.Inc()
blobSidecarBroadcastAttempts.Inc()
if err := func() error {
s.subnetLocker(wrappedSubIdx).Lock()
defer s.subnetLocker(wrappedSubIdx).Unlock()
@@ -254,7 +255,7 @@ func (s *Service) internalBroadcastBlob(ctx context.Context, subnet uint64, blob
return err
}
if ok {
blobSidecarCommitteeBroadcasts.Inc()
blobSidecarBroadcasts.Inc()
return nil
}
return errors.New("failed to find peers for subnet")
@@ -322,6 +323,132 @@ func (s *Service) BroadcastLightClientFinalityUpdate(ctx context.Context, update
return nil
}
// BroadcastDataColumn broadcasts a data column to the p2p network, the message is assumed to be
// broadcasted to the current fork and to the input column subnet.
func (s *Service) BroadcastDataColumn(
root [fieldparams.RootLength]byte,
dataColumnSubnet uint64,
dataColumnSidecar *ethpb.DataColumnSidecar,
peersCheckedChans ...chan<- bool, // Used for testing purposes to signal when peers are checked.
) error {
// Add tracing to the function.
ctx, span := trace.StartSpan(s.ctx, "p2p.BroadcastDataColumn")
defer span.End()
// Ensure the data column sidecar is not nil.
if dataColumnSidecar == nil {
return errors.Errorf("attempted to broadcast nil data column sidecar at subnet %d", dataColumnSubnet)
}
// Retrieve the current fork digest.
forkDigest, err := s.currentForkDigest()
if err != nil {
err := errors.Wrap(err, "current fork digest")
tracing.AnnotateError(span, err)
return err
}
// Non-blocking broadcast, with attempts to discover a column subnet peer if none available.
go s.internalBroadcastDataColumn(ctx, root, dataColumnSubnet, dataColumnSidecar, forkDigest, peersCheckedChans)
return nil
}
func (s *Service) internalBroadcastDataColumn(
ctx context.Context,
root [fieldparams.RootLength]byte,
columnSubnet uint64,
dataColumnSidecar *ethpb.DataColumnSidecar,
forkDigest [fieldparams.VersionLength]byte,
peersCheckedChans []chan<- bool, // Used for testing purposes to signal when peers are checked.
) {
// Add tracing to the function.
_, span := trace.StartSpan(ctx, "p2p.internalBroadcastDataColumn")
defer span.End()
// Increase the number of broadcast attempts.
dataColumnSidecarBroadcastAttempts.Inc()
// Define a one-slot length context timeout.
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
oneSlot := time.Duration(secondsPerSlot) * time.Second
ctx, cancel := context.WithTimeout(ctx, oneSlot)
defer cancel()
// Build the topic corresponding to this column subnet and this fork digest.
topic := dataColumnSubnetToTopic(columnSubnet, forkDigest)
// Compute the wrapped subnet index.
wrappedSubIdx := columnSubnet + dataColumnSubnetVal
// Find peers if needed.
if err := s.findPeersIfNeeded(ctx, wrappedSubIdx, topic, columnSubnet, peersCheckedChans); err != nil {
log.WithError(err).Error("Failed to find peers for data column subnet")
tracing.AnnotateError(span, err)
}
// Broadcast the data column sidecar to the network.
if err := s.broadcastObject(ctx, dataColumnSidecar, topic); err != nil {
log.WithError(err).Error("Failed to broadcast data column sidecar")
tracing.AnnotateError(span, err)
return
}
header := dataColumnSidecar.SignedBlockHeader.GetHeader()
slot := header.GetSlot()
slotStartTime, err := slots.ToTime(uint64(s.genesisTime.Unix()), slot)
if err != nil {
log.WithError(err).Error("Failed to convert slot to time")
}
log.WithFields(logrus.Fields{
"slot": slot,
"timeSinceSlotStart": time.Since(slotStartTime),
"root": fmt.Sprintf("%#x", root),
"columnSubnet": columnSubnet,
}).Debug("Broadcasted data column sidecar")
// Increase the number of successful broadcasts.
dataColumnSidecarBroadcasts.Inc()
}
func (s *Service) findPeersIfNeeded(
ctx context.Context,
wrappedSubIdx uint64,
topic string,
subnet uint64,
peersCheckedChans []chan<- bool, // Used for testing purposes to signal when peers are checked.
) error {
s.subnetLocker(wrappedSubIdx).Lock()
defer s.subnetLocker(wrappedSubIdx).Unlock()
// Sending a data column sidecar to only one peer is not ideal,
// but it ensures at least one peer receives it.
const peerCount = 1
if s.hasPeerWithSubnet(topic) {
// Exit early if we already have peers with this subnet.
return nil
}
// Used for testing purposes.
if len(peersCheckedChans) > 0 {
peersCheckedChans[0] <- true
}
// No peers found, attempt to find peers with this subnet.
ok, err := s.FindPeersWithSubnet(ctx, topic, subnet, peerCount)
if err != nil {
return errors.Wrap(err, "find peers with subnet")
}
if !ok {
return errors.Errorf("failed to find peers for topic %s with subnet %d", topic, subnet)
}
return nil
}
// method to broadcast messages to other peers in our gossip mesh.
func (s *Service) broadcastObject(ctx context.Context, obj ssz.Marshaler, topic string) error {
ctx, span := trace.StartSpan(ctx, "p2p.broadcastObject")
@@ -351,15 +478,15 @@ func (s *Service) broadcastObject(ctx context.Context, obj ssz.Marshaler, topic
return nil
}
func attestationToTopic(subnet uint64, forkDigest [4]byte) string {
func attestationToTopic(subnet uint64, forkDigest [fieldparams.VersionLength]byte) string {
return fmt.Sprintf(AttestationSubnetTopicFormat, forkDigest, subnet)
}
func syncCommitteeToTopic(subnet uint64, forkDigest [4]byte) string {
func syncCommitteeToTopic(subnet uint64, forkDigest [fieldparams.VersionLength]byte) string {
return fmt.Sprintf(SyncCommitteeSubnetTopicFormat, forkDigest, subnet)
}
func blobSubnetToTopic(subnet uint64, forkDigest [4]byte) string {
func blobSubnetToTopic(subnet uint64, forkDigest [fieldparams.VersionLength]byte) string {
return fmt.Sprintf(BlobSubnetTopicFormat, forkDigest, subnet)
}
@@ -370,3 +497,7 @@ func lcOptimisticToTopic(forkDigest [4]byte) string {
func lcFinalityToTopic(forkDigest [4]byte) string {
return fmt.Sprintf(LightClientFinalityUpdateTopicFormat, forkDigest)
}
func dataColumnSubnetToTopic(subnet uint64, forkDigest [fieldparams.VersionLength]byte) string {
return fmt.Sprintf(DataColumnSubnetTopicFormat, forkDigest, subnet)
}

View File

@@ -9,11 +9,14 @@ import (
"testing"
"time"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/kzg"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers"
lightClient "github.com/OffchainLabs/prysm/v6/beacon-chain/core/light-client"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/scorers"
p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
"github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v6/consensus-types/wrapper"
@@ -655,3 +658,99 @@ func TestService_BroadcastLightClientFinalityUpdate(t *testing.T) {
t.Error("Failed to receive pubsub within 1s")
}
}
func TestService_BroadcastDataColumn(t *testing.T) {
const (
port = 2000
columnIndex = 12
topicFormat = DataColumnSubnetTopicFormat
)
// Load the KZG trust setup.
err := kzg.Start()
require.NoError(t, err)
gFlags := new(flags.GlobalFlags)
gFlags.MinimumPeersPerSubnet = 1
flags.Init(gFlags)
// Reset config.
defer flags.Init(new(flags.GlobalFlags))
// Create two peers and connect them.
p1, p2 := p2ptest.NewTestP2P(t), p2ptest.NewTestP2P(t)
p1.Connect(p2)
// Test the peers are connected.
require.NotEqual(t, 0, len(p1.BHost.Network().Peers()), "No peers")
// Create a host.
_, pkey, ipAddr := createHost(t, port)
p := &Service{
ctx: context.Background(),
host: p1.BHost,
pubsub: p1.PubSub(),
joinedTopics: map[string]*pubsub.Topic{},
cfg: &Config{},
genesisTime: time.Now(),
genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32),
subnetsLock: make(map[uint64]*sync.RWMutex),
subnetsLockLock: sync.Mutex{},
peers: peers.NewStatus(context.Background(), &peers.StatusConfig{ScorerParams: &scorers.Config{}}),
}
// Create a listener.
listener, err := p.startDiscoveryV5(ipAddr, pkey)
require.NoError(t, err)
p.dv5Listener = listener
digest, err := p.currentForkDigest()
require.NoError(t, err)
subnet := peerdas.ComputeSubnetForDataColumnSidecar(columnIndex)
topic := fmt.Sprintf(topicFormat, digest, subnet)
roSidecars, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, util.DataColumnsParamsByRoot{{}: {{ColumnIndex: columnIndex}}})
sidecar := roSidecars[0].DataColumnSidecar
// Async listen for the pubsub, must be before the broadcast.
var wg sync.WaitGroup
wg.Add(1)
peersChecked := make(chan bool, 0)
go func(tt *testing.T) {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Wait for the peers to be checked.
<-peersChecked
// External peer subscribes to the topic.
topic += p.Encoding().ProtocolSuffix()
sub, err := p2.SubscribeToTopic(topic)
require.NoError(tt, err)
msg, err := sub.Next(ctx)
require.NoError(tt, err)
var result ethpb.DataColumnSidecar
require.NoError(tt, p.Encoding().DecodeGossip(msg.Data, &result))
require.DeepEqual(tt, &result, sidecar)
}(t)
var emptyRoot [fieldparams.RootLength]byte
// Attempt to broadcast nil object should fail.
err = p.BroadcastDataColumn(emptyRoot, subnet, nil)
require.ErrorContains(t, "attempted to broadcast nil", err)
// Broadcast to peers and wait.
err = p.BroadcastDataColumn(emptyRoot, subnet, sidecar, peersChecked)
require.NoError(t, err)
require.Equal(t, false, util.WaitTimeout(&wg, 1*time.Minute), "Failed to receive pubsub within 1s")
}

View File

@@ -4,6 +4,7 @@ import (
"time"
statefeed "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/state"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/beacon-chain/db"
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
)
@@ -38,6 +39,7 @@ type Config struct {
StateNotifier statefeed.Notifier
DB db.ReadOnlyDatabase
ClockWaiter startup.ClockWaiter
CustodyInfo *peerdas.CustodyInfo
}
// validateConfig validates whether the values provided are accurate and will set

View File

@@ -0,0 +1,74 @@
package p2p
import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/sirupsen/logrus"
)
var _ DataColumnsHandler = (*Service)(nil)
// CustodyGroupCountFromPeer retrieves custody group count from a peer.
// It first tries to get the custody group count from the peer's metadata,
// then falls back to the ENR value if the metadata is not available, then
// falls back to the minimum number of custody groups an honest node should custodiy
// and serve samples from if ENR is not available.
func (s *Service) CustodyGroupCountFromPeer(pid peer.ID) uint64 {
log := log.WithField("peerID", pid)
// Try to get the custody group count from the peer's metadata.
metadata, err := s.peers.Metadata(pid)
if err != nil {
// On error, default to the ENR value.
log.WithError(err).Debug("Failed to retrieve metadata for peer, defaulting to the ENR value")
return s.custodyGroupCountFromPeerENR(pid)
}
// If the metadata is nil, default to the ENR value.
if metadata == nil {
log.Debug("Metadata is nil, defaulting to the ENR value")
return s.custodyGroupCountFromPeerENR(pid)
}
// Get the custody subnets count from the metadata.
custodyCount := metadata.CustodyGroupCount()
// If the custody count is null, default to the ENR value.
if custodyCount == 0 {
log.Debug("The custody count extracted from the metadata equals to 0, defaulting to the ENR value")
return s.custodyGroupCountFromPeerENR(pid)
}
return custodyCount
}
// custodyGroupCountFromPeerENR retrieves the custody count from the peer's ENR.
// If the ENR is not available, it defaults to the minimum number of custody groups
// an honest node custodies and serves samples from.
func (s *Service) custodyGroupCountFromPeerENR(pid peer.ID) uint64 {
// By default, we assume the peer custodies the minimum number of groups.
custodyRequirement := params.BeaconConfig().CustodyRequirement
log := log.WithFields(logrus.Fields{
"peerID": pid,
"defaultValue": custodyRequirement,
})
// Retrieve the ENR of the peer.
record, err := s.peers.ENR(pid)
if err != nil {
log.WithError(err).Debug("Failed to retrieve ENR for peer, defaulting to the default value")
return custodyRequirement
}
// Retrieve the custody group count from the ENR.
custodyGroupCount, err := peerdas.CustodyGroupCountFromRecord(record)
if err != nil {
log.WithError(err).Debug("Failed to retrieve custody group count from ENR for peer, defaulting to the default value")
return custodyRequirement
}
return custodyGroupCount
}

View File

@@ -0,0 +1,112 @@
package p2p
import (
"context"
"testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/scorers"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/wrapper"
pb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1/metadata"
"github.com/OffchainLabs/prysm/v6/testing/require"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p/core/network"
)
func TestCustodyGroupCountFromPeer(t *testing.T) {
const (
expectedENR uint64 = 7
expectedMetadata uint64 = 8
pid = "test-id"
)
cgc := peerdas.Cgc(expectedENR)
// Define a nil record
var nilRecord *enr.Record = nil
// Define an empty record (record with non `cgc` entry)
emptyRecord := &enr.Record{}
// Define a nominal record
nominalRecord := &enr.Record{}
nominalRecord.Set(cgc)
// Define a metadata with zero custody.
zeroMetadata := wrapper.WrappedMetadataV2(&pb.MetaDataV2{
CustodyGroupCount: 0,
})
// Define a nominal metadata.
nominalMetadata := wrapper.WrappedMetadataV2(&pb.MetaDataV2{
CustodyGroupCount: expectedMetadata,
})
testCases := []struct {
name string
record *enr.Record
metadata metadata.Metadata
expected uint64
}{
{
name: "No metadata - No ENR",
record: nilRecord,
expected: params.BeaconConfig().CustodyRequirement,
},
{
name: "No metadata - Empty ENR",
record: emptyRecord,
expected: params.BeaconConfig().CustodyRequirement,
},
{
name: "No Metadata - ENR",
record: nominalRecord,
expected: expectedENR,
},
{
name: "Metadata with 0 value - ENR",
record: nominalRecord,
metadata: zeroMetadata,
expected: expectedENR,
},
{
name: "Metadata - ENR",
record: nominalRecord,
metadata: nominalMetadata,
expected: expectedMetadata,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Create peers status.
peers := peers.NewStatus(context.Background(), &peers.StatusConfig{
ScorerParams: &scorers.Config{},
})
// Set the metadata.
if tc.metadata != nil {
peers.SetMetadata(pid, tc.metadata)
}
// Add a new peer with the record.
peers.Add(tc.record, pid, nil, network.DirOutbound)
// Create a new service.
service := &Service{
peers: peers,
metaData: tc.metadata,
}
// Retrieve the custody count from the remote peer.
actual := service.CustodyGroupCountFromPeer(pid)
// Verify the result.
require.Equal(t, tc.expected, actual)
})
}
}

View File

@@ -8,6 +8,7 @@ import (
"time"
"github.com/OffchainLabs/prysm/v6/beacon-chain/cache"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags"
"github.com/OffchainLabs/prysm/v6/config/features"
"github.com/OffchainLabs/prysm/v6/config/params"
@@ -187,7 +188,8 @@ func (s *Service) RefreshPersistentSubnets() {
// Compare current epoch with Altair fork epoch
altairForkEpoch := params.BeaconConfig().AltairForkEpoch
if currentEpoch < altairForkEpoch {
// We add `1` to the current epoch because we want to prepare one epoch before the Altair fork.
if currentEpoch+1 < altairForkEpoch {
// Phase 0 behaviour.
if isBitVUpToDate {
// Return early if bitfield hasn't changed.
@@ -223,15 +225,51 @@ func (s *Service) RefreshPersistentSubnets() {
// Is our sync bitvector record up to date?
isBitSUpToDate := bytes.Equal(bitS, inRecordBitS) && bytes.Equal(bitS, currentBitSInMetadata)
if metadataVersion == version.Altair && isBitVUpToDate && isBitSUpToDate {
// Compare current epoch with the Fulu fork epoch.
fuluForkEpoch := params.BeaconConfig().FuluForkEpoch
// We add `1` to the current epoch because we want to prepare one epoch before the Fulu fork.
if currentEpoch+1 < fuluForkEpoch {
// Altair behaviour.
if metadataVersion == version.Altair && isBitVUpToDate && isBitSUpToDate {
// Nothing to do, return early.
return
}
// Some data have changed, update our record and metadata.
s.updateSubnetRecordWithMetadataV2(bitV, bitS)
// Ping all peers to inform them of new metadata
s.pingPeersAndLogEnr()
return
}
// Get the current custody group count.
custodyGroupCount := s.cfg.CustodyInfo.ActualGroupCount()
// Get the custody group count we store in our record.
inRecordCustodyGroupCount, err := peerdas.CustodyGroupCountFromRecord(record)
if err != nil {
log.WithError(err).Error("Could not retrieve custody subnet count")
return
}
// Get the custody group count in our metadata.
inMetadataCustodyGroupCount := s.Metadata().CustodyGroupCount()
// Is our custody group count record up to date?
isCustodyGroupCountUpToDate := (custodyGroupCount == inRecordCustodyGroupCount && custodyGroupCount == inMetadataCustodyGroupCount)
if isBitVUpToDate && isBitSUpToDate && isCustodyGroupCountUpToDate {
// Nothing to do, return early.
return
}
// Some data have changed, update our record and metadata.
s.updateSubnetRecordWithMetadataV2(bitV, bitS)
// Some data changed. Update the record and the metadata.
s.updateSubnetRecordWithMetadataV3(bitV, bitS, custodyGroupCount)
// Ping all peers to inform them of new metadata
// Ping all peers.
s.pingPeersAndLogEnr()
}
@@ -458,6 +496,11 @@ func (s *Service) createLocalNode(
localNode.Set(quicEntry)
}
if params.FuluEnabled() {
custodyGroupCount := s.cfg.CustodyInfo.ActualGroupCount()
localNode.Set(peerdas.Cgc(custodyGroupCount))
}
localNode.SetFallbackIP(ipAddr)
localNode.SetFallbackUDP(udpPort)

View File

@@ -16,6 +16,7 @@ import (
mock "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/cache"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/peerdata"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/scorers"
@@ -140,6 +141,15 @@ func TestStartDiscV5_DiscoverAllPeers(t *testing.T) {
func TestCreateLocalNode(t *testing.T) {
params.SetupTestConfigCleanup(t)
// Set the fulu fork epoch to something other than the far future epoch.
initFuluForkEpoch := params.BeaconConfig().FuluForkEpoch
params.BeaconConfig().FuluForkEpoch = 42
defer func() {
params.BeaconConfig().FuluForkEpoch = initFuluForkEpoch
}()
testCases := []struct {
name string
cfg *Config
@@ -147,30 +157,31 @@ func TestCreateLocalNode(t *testing.T) {
}{
{
name: "valid config",
cfg: nil,
cfg: &Config{CustodyInfo: &peerdas.CustodyInfo{}},
expectedError: false,
},
{
name: "invalid host address",
cfg: &Config{HostAddress: "invalid"},
cfg: &Config{HostAddress: "invalid", CustodyInfo: &peerdas.CustodyInfo{}},
expectedError: true,
},
{
name: "valid host address",
cfg: &Config{HostAddress: "192.168.0.1"},
cfg: &Config{HostAddress: "192.168.0.1", CustodyInfo: &peerdas.CustodyInfo{}},
expectedError: false,
},
{
name: "invalid host DNS",
cfg: &Config{HostDNS: "invalid"},
cfg: &Config{HostDNS: "invalid", CustodyInfo: &peerdas.CustodyInfo{}},
expectedError: true,
},
{
name: "valid host DNS",
cfg: &Config{HostDNS: "www.google.com"},
cfg: &Config{HostDNS: "www.google.com", CustodyInfo: &peerdas.CustodyInfo{}},
expectedError: false,
},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
// Define ports.
@@ -199,7 +210,7 @@ func TestCreateLocalNode(t *testing.T) {
require.NoError(t, err)
expectedAddress := address
if tt.cfg != nil && tt.cfg.HostAddress != "" {
if tt.cfg.HostAddress != "" {
expectedAddress = net.ParseIP(tt.cfg.HostAddress)
}
@@ -236,6 +247,11 @@ func TestCreateLocalNode(t *testing.T) {
syncSubnets := new([]byte)
require.NoError(t, localNode.Node().Record().Load(enr.WithEntry(syncCommsSubnetEnrKey, syncSubnets)))
require.DeepSSZEqual(t, []byte{0}, *syncSubnets)
// Check cgc config.
custodyGroupCount := new(uint64)
require.NoError(t, localNode.Node().Record().Load(enr.WithEntry(peerdas.CustodyGroupCountEnrKey, custodyGroupCount)))
require.Equal(t, params.BeaconConfig().CustodyRequirement, *custodyGroupCount)
})
}
}
@@ -535,7 +551,7 @@ type check struct {
metadataSequenceNumber uint64
attestationSubnets []uint64
syncSubnets []uint64
custodySubnetCount *uint64
custodyGroupCount *uint64
}
func checkPingCountCacheMetadataRecord(
@@ -601,6 +617,18 @@ func checkPingCountCacheMetadataRecord(
actualBitSMetadata := service.metaData.SyncnetsBitfield()
require.DeepSSZEqual(t, expectedBitS, actualBitSMetadata)
}
if expected.custodyGroupCount != nil {
// Check custody subnet count in ENR.
var actualCustodyGroupCount uint64
err := service.dv5Listener.LocalNode().Node().Record().Load(enr.WithEntry(peerdas.CustodyGroupCountEnrKey, &actualCustodyGroupCount))
require.NoError(t, err)
require.Equal(t, *expected.custodyGroupCount, actualCustodyGroupCount)
// Check custody subnet count in metadata.
actualGroupCountMetadata := service.metaData.CustodyGroupCount()
require.Equal(t, *expected.custodyGroupCount, actualGroupCountMetadata)
}
}
func TestRefreshPersistentSubnets(t *testing.T) {
@@ -610,12 +638,18 @@ func TestRefreshPersistentSubnets(t *testing.T) {
defer cache.SubnetIDs.EmptyAllCaches()
defer cache.SyncSubnetIDs.EmptyAllCaches()
const altairForkEpoch = 5
const (
altairForkEpoch = 5
fuluForkEpoch = 10
)
custodyGroupCount := params.BeaconConfig().CustodyRequirement
// Set up epochs.
defaultCfg := params.BeaconConfig()
cfg := defaultCfg.Copy()
cfg.AltairForkEpoch = altairForkEpoch
cfg.FuluForkEpoch = fuluForkEpoch
params.OverrideBeaconConfig(cfg)
// Compute the number of seconds per epoch.
@@ -684,6 +718,39 @@ func TestRefreshPersistentSubnets(t *testing.T) {
},
},
},
{
name: "Fulu",
epochSinceGenesis: fuluForkEpoch,
checks: []check{
{
pingCount: 0,
metadataSequenceNumber: 0,
attestationSubnets: []uint64{},
syncSubnets: nil,
},
{
pingCount: 1,
metadataSequenceNumber: 1,
attestationSubnets: []uint64{40, 41},
syncSubnets: nil,
custodyGroupCount: &custodyGroupCount,
},
{
pingCount: 2,
metadataSequenceNumber: 2,
attestationSubnets: []uint64{40, 41},
syncSubnets: []uint64{1, 2},
custodyGroupCount: &custodyGroupCount,
},
{
pingCount: 2,
metadataSequenceNumber: 2,
attestationSubnets: []uint64{40, 41},
syncSubnets: []uint64{1, 2},
custodyGroupCount: &custodyGroupCount,
},
},
},
}
for _, tc := range testCases {
@@ -717,7 +784,7 @@ func TestRefreshPersistentSubnets(t *testing.T) {
actualPingCount++
return nil
},
cfg: &Config{UDPPort: 2000},
cfg: &Config{UDPPort: 2000, CustodyInfo: &peerdas.CustodyInfo{}},
peers: p2p.Peers(),
genesisTime: time.Now().Add(-time.Duration(tc.epochSinceGenesis*secondsPerEpoch) * time.Second),
genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32),

View File

@@ -127,7 +127,7 @@ func (s *Service) topicScoreParams(topic string) (*pubsub.TopicScoreParams, erro
return defaultAttesterSlashingTopicParams(), nil
case strings.Contains(topic, GossipBlsToExecutionChangeMessage):
return defaultBlsToExecutionChangeTopicParams(), nil
case strings.Contains(topic, GossipBlobSidecarMessage):
case strings.Contains(topic, GossipBlobSidecarMessage), strings.Contains(topic, GossipDataColumnSidecarMessage):
// TODO(Deneb): Using the default block scoring. But this should be updated.
return defaultBlockTopicParams(), nil
case strings.Contains(topic, GossipLightClientOptimisticUpdateMessage):

View File

@@ -22,7 +22,9 @@ const (
)
func peerMultiaddrString(conn network.Conn) string {
return fmt.Sprintf("%s/p2p/%s", conn.RemoteMultiaddr().String(), conn.RemotePeer().String())
remoteMultiaddr := conn.RemoteMultiaddr().String()
remotePeerID := conn.RemotePeer().String()
return fmt.Sprintf("%s/p2p/%s", remoteMultiaddr, remotePeerID)
}
func (s *Service) connectToPeer(conn network.Conn) {

View File

@@ -5,6 +5,7 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1/metadata"
@@ -40,6 +41,7 @@ type Broadcaster interface {
BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar) error
BroadcastLightClientOptimisticUpdate(ctx context.Context, update interfaces.LightClientOptimisticUpdate) error
BroadcastLightClientFinalityUpdate(ctx context.Context, update interfaces.LightClientFinalityUpdate) error
BroadcastDataColumn(root [fieldparams.RootLength]byte, columnSubnet uint64, dataColumnSidecar *ethpb.DataColumnSidecar, peersChecked ...chan<- bool) error
}
// SetStreamHandler configures p2p to handle streams of a certain topic ID.
@@ -107,3 +109,7 @@ type MetadataProvider interface {
Metadata() metadata.Metadata
MetadataSeq() uint64
}
type DataColumnsHandler interface {
CustodyGroupCountFromPeer(peer.ID) uint64
}

View File

@@ -46,31 +46,39 @@ var (
})
savedAttestationBroadcasts = promauto.NewCounter(prometheus.CounterOpts{
Name: "p2p_attestation_subnet_recovered_broadcasts",
Help: "The number of attestations that were attempted to be broadcast with no peers on " +
Help: "The number of attestations message broadcast attempts with no peers on " +
"the subnet. The beacon node increments this counter when the broadcast is blocked " +
"until a subnet peer can be found.",
})
attestationBroadcastAttempts = promauto.NewCounter(prometheus.CounterOpts{
Name: "p2p_attestation_subnet_attempted_broadcasts",
Help: "The number of attestations that were attempted to be broadcast.",
Help: "The number of attestations message broadcast attempts.",
})
savedSyncCommitteeBroadcasts = promauto.NewCounter(prometheus.CounterOpts{
Name: "p2p_sync_committee_subnet_recovered_broadcasts",
Help: "The number of sync committee messages that were attempted to be broadcast with no peers on " +
Help: "The number of sync committee messages broadcast attempts with no peers on " +
"the subnet. The beacon node increments this counter when the broadcast is blocked " +
"until a subnet peer can be found.",
})
blobSidecarCommitteeBroadcasts = promauto.NewCounter(prometheus.CounterOpts{
blobSidecarBroadcasts = promauto.NewCounter(prometheus.CounterOpts{
Name: "p2p_blob_sidecar_committee_broadcasts",
Help: "The number of blob sidecar committee messages that were broadcast with no peer on.",
Help: "The number of blob sidecar messages that were broadcast with no peer on.",
})
syncCommitteeBroadcastAttempts = promauto.NewCounter(prometheus.CounterOpts{
Name: "p2p_sync_committee_subnet_attempted_broadcasts",
Help: "The number of sync committee that were attempted to be broadcast.",
Help: "The number of sync committee message broadcast attempts.",
})
blobSidecarCommitteeBroadcastAttempts = promauto.NewCounter(prometheus.CounterOpts{
blobSidecarBroadcastAttempts = promauto.NewCounter(prometheus.CounterOpts{
Name: "p2p_blob_sidecar_committee_attempted_broadcasts",
Help: "The number of blob sidecar committee messages that were attempted to be broadcast.",
Help: "The number of blob sidecar message broadcast attempts.",
})
dataColumnSidecarBroadcasts = promauto.NewCounter(prometheus.CounterOpts{
Name: "p2p_data_column_sidecar_broadcasts",
Help: "The number of data column sidecar messages that were broadcasted.",
})
dataColumnSidecarBroadcastAttempts = promauto.NewCounter(prometheus.CounterOpts{
Name: "p2p_data_column_sidecar_attempted_broadcasts",
Help: "The number of data column sidecar message broadcast attempts.",
})
// Gossip Tracer Metrics

View File

@@ -26,10 +26,11 @@ var _ pubsub.SubscriptionFilter = (*Service)(nil)
// -> SyncContributionAndProof * 2 = 2
// -> 4 SyncCommitteeSubnets * 2 = 8
// -> BlsToExecutionChange * 2 = 2
// -> 6 BlobSidecar * 2 = 12
// -> 128 DataColumnSidecar * 2 = 256
// -------------------------------------
// TOTAL = 162
const pubsubSubscriptionRequestLimit = 200
// TOTAL = 406
// (Note: BlobSidecar is not included in this list since it is superseded by DataColumnSidecar)
const pubsubSubscriptionRequestLimit = 500
// CanSubscribe returns true if the topic is of interest and we could subscribe to it.
func (s *Service) CanSubscribe(topic string) bool {

View File

@@ -11,11 +11,16 @@ import (
"github.com/pkg/errors"
)
// SchemaVersionV1 specifies the schema version for our rpc protocol ID.
const SchemaVersionV1 = "/1"
const (
// SchemaVersionV1 specifies the schema version for our rpc protocol ID.
SchemaVersionV1 = "/1"
// SchemaVersionV2 specifies the next schema version for our rpc protocol ID.
const SchemaVersionV2 = "/2"
// SchemaVersionV2 specifies the next schema version for our rpc protocol ID.
SchemaVersionV2 = "/2"
// SchemaVersionV3 specifies the next schema version for our rpc protocol ID.
SchemaVersionV3 = "/3"
)
// Specifies the protocol prefix for all our Req/Resp topics.
const protocolPrefix = "/eth2/beacon_chain/req"
@@ -73,10 +78,10 @@ const (
// RPCBlobSidecarsByRangeTopicV1 is a topic for requesting blob sidecars
// in the slot range [start_slot, start_slot + count), leading up to the current head block as selected by fork choice.
// Protocol ID: /eth2/beacon_chain/req/blob_sidecars_by_range/1/ - New in deneb.
// /eth2/beacon_chain/req/blob_sidecars_by_range/1/ - New in deneb.
RPCBlobSidecarsByRangeTopicV1 = protocolPrefix + BlobSidecarsByRangeName + SchemaVersionV1
// RPCBlobSidecarsByRootTopicV1 is a topic for requesting blob sidecars by their block root. New in deneb.
// /eth2/beacon_chain/req/blob_sidecars_by_root/1/
// RPCBlobSidecarsByRootTopicV1 is a topic for requesting blob sidecars by their block root.
// /eth2/beacon_chain/req/blob_sidecars_by_root/1/ - New in deneb.
RPCBlobSidecarsByRootTopicV1 = protocolPrefix + BlobSidecarsByRootName + SchemaVersionV1
// RPCLightClientBootstrapTopicV1 is a topic for requesting a light client bootstrap.
@@ -95,6 +100,10 @@ const (
RPCBlocksByRootTopicV2 = protocolPrefix + BeaconBlocksByRootsMessageName + SchemaVersionV2
// RPCMetaDataTopicV2 defines the v2 topic for the metadata rpc method.
RPCMetaDataTopicV2 = protocolPrefix + MetadataMessageName + SchemaVersionV2
// V3 RPC Topics
// RPCMetaDataTopicV3 defines the v3 topic for the metadata rpc method.
RPCMetaDataTopicV3 = protocolPrefix + MetadataMessageName + SchemaVersionV3
)
// RPC errors for topic parsing.
@@ -119,6 +128,7 @@ var RPCTopicMappings = map[string]interface{}{
// RPC Metadata Message
RPCMetaDataTopicV1: new(interface{}),
RPCMetaDataTopicV2: new(interface{}),
RPCMetaDataTopicV3: new(interface{}),
// BlobSidecarsByRange v1 Message
RPCBlobSidecarsByRangeTopicV1: new(pb.BlobSidecarsByRangeRequest),
// BlobSidecarsByRoot v1 Message
@@ -160,9 +170,15 @@ var altairMapping = map[string]bool{
MetadataMessageName: true,
}
// Maps all the RPC messages which are to updated in fulu.
var fuluMapping = map[string]bool{
MetadataMessageName: true,
}
var versionMapping = map[string]bool{
SchemaVersionV1: true,
SchemaVersionV2: true,
SchemaVersionV3: true,
}
// OmitContextBytesV1 keeps track of which RPC methods do not write context bytes in their v1 incarnations.
@@ -290,13 +306,22 @@ func (r RPCTopic) Version() string {
// TopicFromMessage constructs the rpc topic from the provided message
// type and epoch.
func TopicFromMessage(msg string, epoch primitives.Epoch) (string, error) {
// Check if the topic is known.
if !messageMapping[msg] {
return "", errors.Errorf("%s: %s", invalidRPCMessageType, msg)
}
version := SchemaVersionV1
isAltair := epoch >= params.BeaconConfig().AltairForkEpoch
if isAltair && altairMapping[msg] {
version = SchemaVersionV2
beaconConfig := params.BeaconConfig()
// Check if the message is to be updated in fulu.
if epoch >= beaconConfig.FuluForkEpoch && fuluMapping[msg] {
return protocolPrefix + msg + SchemaVersionV3, nil
}
return protocolPrefix + msg + version, nil
// Check if the message is to be updated in altair.
if epoch >= beaconConfig.AltairForkEpoch && altairMapping[msg] {
return protocolPrefix + msg + SchemaVersionV2, nil
}
return protocolPrefix + msg + SchemaVersionV1, nil
}

View File

@@ -82,43 +82,87 @@ func TestTopicDeconstructor(t *testing.T) {
}
func TestTopicFromMessage_CorrectType(t *testing.T) {
const (
genesisEpoch = primitives.Epoch(0)
altairForkEpoch = primitives.Epoch(100)
fuluForkEpoch = primitives.Epoch(200)
)
params.SetupTestConfigCleanup(t)
bCfg := params.BeaconConfig().Copy()
forkEpoch := primitives.Epoch(100)
bCfg.AltairForkEpoch = forkEpoch
bCfg.ForkVersionSchedule[bytesutil.ToBytes4(bCfg.AltairForkVersion)] = primitives.Epoch(100)
bCfg.AltairForkEpoch = altairForkEpoch
bCfg.ForkVersionSchedule[bytesutil.ToBytes4(bCfg.AltairForkVersion)] = altairForkEpoch
bCfg.FuluForkEpoch = fuluForkEpoch
bCfg.ForkVersionSchedule[bytesutil.ToBytes4(bCfg.FuluForkVersion)] = fuluForkEpoch
params.OverrideBeaconConfig(bCfg)
// Garbage Message
badMsg := "wljdjska"
_, err := TopicFromMessage(badMsg, 0)
assert.ErrorContains(t, fmt.Sprintf("%s: %s", invalidRPCMessageType, badMsg), err)
// Before Fork
for m := range messageMapping {
topic, err := TopicFromMessage(m, 0)
assert.NoError(t, err)
t.Run("garbage message", func(t *testing.T) {
// Garbage Message
const badMsg = "wljdjska"
_, err := TopicFromMessage(badMsg, genesisEpoch)
require.ErrorContains(t, fmt.Sprintf("%s: %s", invalidRPCMessageType, badMsg), err)
})
assert.Equal(t, true, strings.Contains(topic, SchemaVersionV1))
_, _, version, err := TopicDeconstructor(topic)
assert.NoError(t, err)
assert.Equal(t, SchemaVersionV1, version)
}
t.Run("before altair fork", func(t *testing.T) {
for m := range messageMapping {
topic, err := TopicFromMessage(m, genesisEpoch)
require.NoError(t, err)
// Altair Fork
for m := range messageMapping {
topic, err := TopicFromMessage(m, forkEpoch)
assert.NoError(t, err)
if altairMapping[m] {
assert.Equal(t, true, strings.Contains(topic, SchemaVersionV2))
require.Equal(t, true, strings.Contains(topic, SchemaVersionV1))
_, _, version, err := TopicDeconstructor(topic)
assert.NoError(t, err)
assert.Equal(t, SchemaVersionV2, version)
} else {
assert.Equal(t, true, strings.Contains(topic, SchemaVersionV1))
_, _, version, err := TopicDeconstructor(topic)
assert.NoError(t, err)
assert.Equal(t, SchemaVersionV1, version)
require.NoError(t, err)
require.Equal(t, SchemaVersionV1, version)
}
}
})
t.Run("after altair fork but before fulu fork", func(t *testing.T) {
for m := range messageMapping {
topic, err := TopicFromMessage(m, altairForkEpoch)
require.NoError(t, err)
if altairMapping[m] {
require.Equal(t, true, strings.Contains(topic, SchemaVersionV2))
_, _, version, err := TopicDeconstructor(topic)
require.NoError(t, err)
require.Equal(t, SchemaVersionV2, version)
continue
}
require.Equal(t, true, strings.Contains(topic, SchemaVersionV1))
_, _, version, err := TopicDeconstructor(topic)
require.NoError(t, err)
require.Equal(t, SchemaVersionV1, version)
}
})
t.Run("after fulu fork", func(t *testing.T) {
for m := range messageMapping {
topic, err := TopicFromMessage(m, fuluForkEpoch)
require.NoError(t, err)
if fuluMapping[m] {
require.Equal(t, true, strings.Contains(topic, SchemaVersionV3))
_, _, version, err := TopicDeconstructor(topic)
require.NoError(t, err)
require.Equal(t, SchemaVersionV3, version)
continue
}
if altairMapping[m] {
require.Equal(t, true, strings.Contains(topic, SchemaVersionV2))
_, _, version, err := TopicDeconstructor(topic)
require.NoError(t, err)
require.Equal(t, SchemaVersionV2, version)
continue
}
require.Equal(t, true, strings.Contains(topic, SchemaVersionV1))
_, _, version, err := TopicDeconstructor(topic)
require.NoError(t, err)
require.Equal(t, SchemaVersionV1, version)
}
})
}

View File

@@ -22,8 +22,9 @@ func (s *Service) Send(ctx context.Context, message interface{}, baseTopic strin
ctx, span := trace.StartSpan(ctx, "p2p.Send")
defer span.End()
if err := VerifyTopicMapping(baseTopic, message); err != nil {
return nil, err
return nil, errors.Wrap(err, "verify topic mapping")
}
topic := baseTopic + s.Encoding().ProtocolSuffix()
span.SetAttributes(trace.StringAttribute("topic", topic))
@@ -39,19 +40,21 @@ func (s *Service) Send(ctx context.Context, message interface{}, baseTopic strin
stream, err := s.host.NewStream(ctx, pid, protocol.ID(topic))
if err != nil {
tracing.AnnotateError(span, err)
return nil, err
return nil, errors.Wrap(err, "new stream")
}
// do not encode anything if we are sending a metadata request
if baseTopic != RPCMetaDataTopicV1 && baseTopic != RPCMetaDataTopicV2 {
// Do not encode anything if we are sending a metadata request
if baseTopic != RPCMetaDataTopicV1 && baseTopic != RPCMetaDataTopicV2 && baseTopic != RPCMetaDataTopicV3 {
castedMsg, ok := message.(ssz.Marshaler)
if !ok {
return nil, errors.Errorf("%T does not support the ssz marshaller interface", message)
}
if _, err := s.Encoding().EncodeWithMaxLength(stream, castedMsg); err != nil {
tracing.AnnotateError(span, err)
_err := stream.Reset()
_ = _err
return nil, err
return nil, errors.Wrap(err, "encode with max length")
}
}
@@ -60,7 +63,7 @@ func (s *Service) Send(ctx context.Context, message interface{}, baseTopic strin
tracing.AnnotateError(span, err)
_err := stream.Reset()
_ = _err
return nil, err
return nil, errors.Wrap(err, "close write")
}
return stream, nil

View File

@@ -9,6 +9,7 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/cache"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
@@ -29,8 +30,9 @@ var (
attestationSubnetCount = params.BeaconConfig().AttestationSubnetCount
syncCommsSubnetCount = params.BeaconConfig().SyncCommitteeSubnetCount
attSubnetEnrKey = params.BeaconNetworkConfig().AttSubnetKey
syncCommsSubnetEnrKey = params.BeaconNetworkConfig().SyncCommsSubnetKey
attSubnetEnrKey = params.BeaconNetworkConfig().AttSubnetKey
syncCommsSubnetEnrKey = params.BeaconNetworkConfig().SyncCommsSubnetKey
custodyGroupCountEnrKey = params.BeaconNetworkConfig().CustodyGroupCountKey
)
// The value used with the subnet, in order
@@ -47,7 +49,14 @@ const syncLockerVal = 100
// chosen more than sync and attestation subnet combined.
const blobSubnetLockerVal = 110
// nodeFilter return a function that filters nodes based on the subnet topic and subnet index.
// The value used with the data column sidecar subnet, in order
// to create an appropriate key to retrieve
// the relevant lock. This is used to differentiate
// data column subnets from others. This is deliberately
// chosen more than sync, attestation and blob subnet (6) combined.
const dataColumnSubnetVal = 150
// nodeFilter returns a function that filters nodes based on the subnet topic and subnet index.
func (s *Service) nodeFilter(topic string, index uint64) (func(node *enode.Node) bool, error) {
switch {
case strings.Contains(topic, GossipAttestationMessage):
@@ -56,6 +65,8 @@ func (s *Service) nodeFilter(topic string, index uint64) (func(node *enode.Node)
return s.filterPeerForSyncSubnet(index), nil
case strings.Contains(topic, GossipBlobSidecarMessage):
return s.filterPeerForBlobSubnet(), nil
case strings.Contains(topic, GossipDataColumnSidecarMessage):
return s.filterPeerForDataColumnsSubnet(index), nil
default:
return nil, errors.Errorf("no subnet exists for provided topic: %s", topic)
}
@@ -276,6 +287,22 @@ func (s *Service) filterPeerForBlobSubnet() func(_ *enode.Node) bool {
}
}
// returns a method with filters peers specifically for a particular data column subnet.
func (s *Service) filterPeerForDataColumnsSubnet(index uint64) func(node *enode.Node) bool {
return func(node *enode.Node) bool {
if !s.filterPeer(node) {
return false
}
subnets, err := dataColumnSubnets(node.ID(), node.Record())
if err != nil {
return false
}
return subnets[index]
}
}
// lower threshold to broadcast object compared to searching
// for a subnet. So that even in the event of poor peer
// connectivity, we can still broadcast an attestation.
@@ -321,6 +348,35 @@ func (s *Service) updateSubnetRecordWithMetadataV2(bitVAtt bitfield.Bitvector64,
})
}
// updateSubnetRecordWithMetadataV3 updates:
// - attestation subnet tracked,
// - sync subnets tracked, and
// - custody subnet count
// both in the node's record and in the node's metadata.
func (s *Service) updateSubnetRecordWithMetadataV3(
bitVAtt bitfield.Bitvector64,
bitVSync bitfield.Bitvector4,
custodyGroupCount uint64,
) {
attSubnetsEntry := enr.WithEntry(attSubnetEnrKey, &bitVAtt)
syncSubnetsEntry := enr.WithEntry(syncCommsSubnetEnrKey, &bitVSync)
custodyGroupCountEntry := enr.WithEntry(custodyGroupCountEnrKey, custodyGroupCount)
localNode := s.dv5Listener.LocalNode()
localNode.Set(attSubnetsEntry)
localNode.Set(syncSubnetsEntry)
localNode.Set(custodyGroupCountEntry)
newSeqNumber := s.metaData.SequenceNumber() + 1
s.metaData = wrapper.WrappedMetadataV2(&pb.MetaDataV2{
SeqNumber: newSeqNumber,
Attnets: bitVAtt,
Syncnets: bitVSync,
CustodyGroupCount: custodyGroupCount,
})
}
func initializePersistentSubnets(id enode.ID, epoch primitives.Epoch) error {
_, ok, expTime := cache.SubnetIDs.GetPersistentSubnets()
if ok && expTime.After(time.Now()) {
@@ -458,6 +514,24 @@ func syncSubnets(record *enr.Record) ([]uint64, error) {
return committeeIdxs, nil
}
// Retrieve the data columns subnets from a node's ENR and node ID.
func dataColumnSubnets(nodeID enode.ID, record *enr.Record) (map[uint64]bool, error) {
// Retrieve the custody count from the ENR.
custodyGroupCount, err := peerdas.CustodyGroupCountFromRecord(record)
if err != nil {
return nil, errors.Wrap(err, "custody group count from record")
}
// Retrieve the peer info.
peerInfo, _, err := peerdas.Info(nodeID, custodyGroupCount)
if err != nil {
return nil, errors.Wrap(err, "peer info")
}
// Get custody columns subnets from the columns.
return peerInfo.DataColumnsSubnets, nil
}
// Parses the attestation subnets ENR entry in a node and extracts its value
// as a bitvector for further manipulation.
func attBitvector(record *enr.Record) (bitfield.Bitvector64, error) {
@@ -484,14 +558,16 @@ func syncBitvector(record *enr.Record) (bitfield.Bitvector4, error) {
// The subnet locker is a map which keeps track of all
// mutexes stored per subnet. This locker is reused
// between both the attestation, sync and blob subnets.
// between both the attestation, sync blob and data column subnets.
// Sync subnets are stored by (subnet+syncLockerVal).
// Blob subnets are stored by (subnet+blobSubnetLockerVal).
// Data column subnets are stored by (subnet+dataColumnSubnetVal).
// This is to prevent conflicts while allowing subnets
// to use a single locker.
func (s *Service) subnetLocker(i uint64) *sync.RWMutex {
s.subnetsLockLock.Lock()
defer s.subnetsLockLock.Unlock()
l, ok := s.subnetsLock[i]
if !ok {
l = &sync.RWMutex{}

View File

@@ -10,6 +10,7 @@ import (
"time"
"github.com/OffchainLabs/prysm/v6/beacon-chain/cache"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags"
"github.com/OffchainLabs/prysm/v6/config/params"
ecdsaprysm "github.com/OffchainLabs/prysm/v6/crypto/ecdsa"
@@ -503,6 +504,26 @@ func Test_SyncSubnets(t *testing.T) {
}
}
func TestDataColumnSubnets(t *testing.T) {
const cgc = 3
var (
nodeID enode.ID
record enr.Record
)
record.Set(peerdas.Cgc(cgc))
expected := map[uint64]bool{1: true, 87: true, 102: true}
actual, err := dataColumnSubnets(nodeID, &record)
assert.NoError(t, err)
require.Equal(t, len(expected), len(actual))
for subnet := range expected {
require.Equal(t, true, actual[subnet])
}
}
func TestSubnetComputation(t *testing.T) {
db, err := enode.OpenDB("")
assert.NoError(t, err)

View File

@@ -17,9 +17,12 @@ go_library(
"//beacon-chain:__subpackages__",
],
deps = [
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/peers/scorers:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//proto/prysm/v1alpha1/metadata:go_default_library",

View File

@@ -5,6 +5,7 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1/metadata"
@@ -59,7 +60,7 @@ func (*FakeP2P) ENR() *enr.Record {
// NodeID returns the node id of the local peer.
func (*FakeP2P) NodeID() enode.ID {
return [32]byte{}
return enode.ID{}
}
// DiscoveryAddresses -- fake
@@ -165,6 +166,11 @@ func (*FakeP2P) BroadcastLightClientFinalityUpdate(_ context.Context, _ interfac
return nil
}
// BroadcastDataColumn -- fake.
func (*FakeP2P) BroadcastDataColumn(_ [fieldparams.RootLength]byte, _ uint64, _ *ethpb.DataColumnSidecar, _ ...chan<- bool) error {
return nil
}
// InterceptPeerDial -- fake.
func (*FakeP2P) InterceptPeerDial(peer.ID) (allow bool) {
return true
@@ -189,3 +195,7 @@ func (*FakeP2P) InterceptSecured(network.Direction, peer.ID, network.ConnMultiad
func (*FakeP2P) InterceptUpgraded(network.Conn) (allow bool, reason control.DisconnectReason) {
return true, 0
}
func (*FakeP2P) CustodyGroupCountFromPeer(peer.ID) uint64 {
return 0
}

View File

@@ -5,6 +5,7 @@ import (
"sync"
"sync/atomic"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"google.golang.org/protobuf/proto"
@@ -61,6 +62,12 @@ func (m *MockBroadcaster) BroadcastLightClientFinalityUpdate(_ context.Context,
return nil
}
// BroadcastDataColumn broadcasts a data column for mock.
func (m *MockBroadcaster) BroadcastDataColumn([fieldparams.RootLength]byte, uint64, *ethpb.DataColumnSidecar, ...chan<- bool) error {
m.BroadcastCalled.Store(true)
return nil
}
// NumMessages returns the number of messages broadcasted.
func (m *MockBroadcaster) NumMessages() int {
m.msgLock.Lock()

View File

@@ -42,7 +42,7 @@ func (m *MockPeerManager) ENR() *enr.Record {
// NodeID .
func (m MockPeerManager) NodeID() enode.ID {
return [32]byte{}
return enode.ID{}
}
// DiscoveryAddresses .

View File

@@ -10,9 +10,12 @@ import (
"testing"
"time"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/scorers"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1/metadata"
@@ -220,6 +223,12 @@ func (p *TestP2P) BroadcastLightClientFinalityUpdate(_ context.Context, _ interf
return nil
}
// BroadcastDataColumn broadcasts a data column for mock.
func (p *TestP2P) BroadcastDataColumn([fieldparams.RootLength]byte, uint64, *ethpb.DataColumnSidecar, ...chan<- bool) error {
p.BroadcastCalled.Store(true)
return nil
}
// SetStreamHandler for RPC.
func (p *TestP2P) SetStreamHandler(topic string, handler network.StreamHandler) {
p.BHost.SetStreamHandler(protocol.ID(topic), handler)
@@ -451,3 +460,22 @@ func (*TestP2P) InterceptSecured(network.Direction, peer.ID, network.ConnMultiad
func (*TestP2P) InterceptUpgraded(network.Conn) (allow bool, reason control.DisconnectReason) {
return true, 0
}
func (s *TestP2P) CustodyGroupCountFromPeer(pid peer.ID) uint64 {
// By default, we assume the peer custodies the minimum number of groups.
custodyRequirement := params.BeaconConfig().CustodyRequirement
// Retrieve the ENR of the peer.
record, err := s.peers.ENR(pid)
if err != nil {
return custodyRequirement
}
// Retrieve the custody subnets count from the ENR.
custodyGroupCount, err := peerdas.CustodyGroupCountFromRecord(record)
if err != nil {
return custodyRequirement
}
return custodyGroupCount
}

View File

@@ -41,6 +41,7 @@ go_test(
],
embed = [":go_default_library"],
deps = [
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/primitives:go_default_library",
"//encoding/bytesutil:go_default_library",

View File

@@ -108,7 +108,7 @@ func InitializeDataMaps() {
return wrapper.WrappedMetadataV1(&ethpb.MetaDataV1{}), nil
},
bytesutil.ToBytes4(params.BeaconConfig().FuluForkVersion): func() (metadata.Metadata, error) {
return wrapper.WrappedMetadataV1(&ethpb.MetaDataV1{}), nil
return wrapper.WrappedMetadataV2(&ethpb.MetaDataV2{}), nil
},
}

View File

@@ -12,7 +12,7 @@ var (
ErrRateLimited = errors.New("rate limited")
ErrIODeadline = errors.New("i/o deadline exceeded")
ErrInvalidRequest = errors.New("invalid range, step or count")
ErrBlobLTMinRequest = errors.New("blob slot < minimum_request_epoch")
ErrBlobLTMinRequest = errors.New("blob epoch < minimum_request_epoch")
ErrMaxBlobReqExceeded = errors.New("requested more than MAX_REQUEST_BLOB_SIDECARS")
ErrResourceUnavailable = errors.New("resource requested unavailable")
)

View File

@@ -5,6 +5,7 @@ package types
import (
"bytes"
"encoding/binary"
"sort"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
@@ -14,7 +15,10 @@ import (
ssz "github.com/prysmaticlabs/fastssz"
)
const maxErrorLength = 256
const (
maxErrorLength = 256
bytesPerLengthOffset = 4
)
// SSZBytes is a bytes slice that satisfies the fast-ssz interface.
type SSZBytes []byte
@@ -182,23 +186,23 @@ func (b *BlobSidecarsByRootReq) UnmarshalSSZ(buf []byte) error {
return nil
}
var _ sort.Interface = BlobSidecarsByRootReq{}
var _ sort.Interface = (*BlobSidecarsByRootReq)(nil)
// Less reports whether the element with index i must sort before the element with index j.
// BlobIdentifier will be sorted in lexicographic order by root, with Blob Index as tiebreaker for a given root.
func (s BlobSidecarsByRootReq) Less(i, j int) bool {
rootCmp := bytes.Compare(s[i].BlockRoot, s[j].BlockRoot)
rootCmp := bytes.Compare((s)[i].BlockRoot, (s)[j].BlockRoot)
if rootCmp != 0 {
// They aren't equal; return true if i < j, false if i > j.
return rootCmp < 0
}
// They are equal; blob index is the tie breaker.
return s[i].Index < s[j].Index
return (s)[i].Index < (s)[j].Index
}
// Swap swaps the elements with indexes i and j.
func (s BlobSidecarsByRootReq) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
(s)[i], (s)[j] = (s)[j], (s)[i]
}
// Len is the number of elements in the collection.
@@ -206,7 +210,130 @@ func (s BlobSidecarsByRootReq) Len() int {
return len(s)
}
func init() {
sizer := &eth.BlobIdentifier{}
blobIdSize = sizer.SizeSSZ()
// ====================================
// DataColumnsByRootIdentifiers section
// ====================================
var _ ssz.Marshaler = (*DataColumnsByRootIdentifiers)(nil)
var _ ssz.Unmarshaler = (*DataColumnsByRootIdentifiers)(nil)
// DataColumnsByRootIdentifiers is used to specify a list of data column targets (root+index) in a DataColumnSidecarsByRoot RPC request.
type DataColumnsByRootIdentifiers []*eth.DataColumnsByRootIdentifier
// DataColumnIdentifier is a fixed size value, so we can compute its fixed size at start time (see init below)
var dataColumnIdSize int
// UnmarshalSSZ implements ssz.Unmarshaler. It unmarshals the provided bytes buffer into the DataColumnSidecarsByRootReq value.
func (d *DataColumnsByRootIdentifiers) UnmarshalSSZ(buf []byte) error {
// Exit early if the buffer is too small.
if len(buf) < bytesPerLengthOffset {
return nil
}
// Get the size of the offsets.
offsetEnd := binary.LittleEndian.Uint32(buf[:bytesPerLengthOffset])
if offsetEnd%bytesPerLengthOffset != 0 {
return errors.Errorf("expected offsets size to be a multiple of %d but got %d", bytesPerLengthOffset, offsetEnd)
}
count := offsetEnd / bytesPerLengthOffset
if count < 1 {
return nil
}
maxSize := params.BeaconConfig().MaxRequestBlocksDeneb
if uint64(count) > maxSize {
return errors.Errorf("data column identifiers list exceeds max size: %d > %d", count, maxSize)
}
if offsetEnd > uint32(len(buf)) {
return errors.Errorf("offsets value %d larger than buffer %d", offsetEnd, len(buf))
}
valueStart := offsetEnd
// Decode the identifers.
*d = make([]*eth.DataColumnsByRootIdentifier, count)
var start uint32
end := uint32(len(buf))
for i := count; i > 0; i-- {
offsetEnd -= bytesPerLengthOffset
start = binary.LittleEndian.Uint32(buf[offsetEnd : offsetEnd+bytesPerLengthOffset])
if start > end {
return errors.Errorf("expected offset[%d] %d to be less than %d", i-1, start, end)
}
if start < valueStart {
return errors.Errorf("offset[%d] %d indexes before value section %d", i-1, start, valueStart)
}
// Decode the identifier.
ident := &eth.DataColumnsByRootIdentifier{}
if err := ident.UnmarshalSSZ(buf[start:end]); err != nil {
return err
}
(*d)[i-1] = ident
end = start
}
return nil
}
func (d *DataColumnsByRootIdentifiers) MarshalSSZ() ([]byte, error) {
var err error
count := len(*d)
maxSize := params.BeaconConfig().MaxRequestBlocksDeneb
if uint64(count) > maxSize {
return nil, errors.Errorf("data column identifiers list exceeds max size: %d > %d", count, maxSize)
}
if len(*d) == 0 {
return []byte{}, nil
}
sizes := make([]uint32, count)
valTotal := uint32(0)
for i, elem := range *d {
if elem == nil {
return nil, errors.New("nil item in DataColumnsByRootIdentifiers list")
}
sizes[i] = uint32(elem.SizeSSZ())
valTotal += sizes[i]
}
offSize := uint32(4 * len(*d))
out := make([]byte, offSize, offSize+valTotal)
for i := range sizes {
binary.LittleEndian.PutUint32(out[i*4:i*4+4], offSize)
offSize += sizes[i]
}
for _, elem := range *d {
out, err = elem.MarshalSSZTo(out)
if err != nil {
return nil, err
}
}
return out, nil
}
// MarshalSSZTo implements ssz.Marshaler. It appends the serialized DataColumnSidecarsByRootReq value to the provided byte slice.
func (d *DataColumnsByRootIdentifiers) MarshalSSZTo(dst []byte) ([]byte, error) {
obj, err := d.MarshalSSZ()
if err != nil {
return nil, err
}
return append(dst, obj...), nil
}
// SizeSSZ implements ssz.Marshaler. It returns the size of the serialized representation.
func (d *DataColumnsByRootIdentifiers) SizeSSZ() int {
size := 0
for i := 0; i < len(*d); i++ {
size += 4
size += (*d)[i].SizeSSZ()
}
return size
}
func init() {
blobSizer := &eth.BlobIdentifier{}
blobIdSize = blobSizer.SizeSSZ()
dataColumnSizer := &eth.DataColumnSidecarsByRangeRequest{}
dataColumnIdSize = dataColumnSizer.SizeSSZ()
}

View File

@@ -4,6 +4,7 @@ import (
"encoding/hex"
"testing"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/encoding/bytesutil"
@@ -203,3 +204,157 @@ func hexDecodeOrDie(t *testing.T, str string) []byte {
require.NoError(t, err)
return decoded
}
// ====================================
// DataColumnsByRootIdentifiers section
// ====================================
func generateDataColumnIdentifiers(n int) []*eth.DataColumnsByRootIdentifier {
r := make([]*eth.DataColumnsByRootIdentifier, n)
for i := 0; i < n; i++ {
r[i] = &eth.DataColumnsByRootIdentifier{
BlockRoot: bytesutil.PadTo([]byte{byte(i)}, 32),
Columns: []uint64{uint64(i)},
}
}
return r
}
func TestDataColumnSidecarsByRootReq_Marshal(t *testing.T) {
/*
SSZ encoding of DataColumnsByRootIdentifiers is tested in spectests.
However, encoding a list of DataColumnsByRootIdentifier is not.
We are testing it here.
Python code to generate the expected value
# pip install eth2spec
from eth2spec.utils.ssz import ssz_typing
Container = ssz_typing.Container
List = ssz_typing.List
Root = ssz_typing.Bytes32
ColumnIndex = ssz_typing.uint64
NUMBER_OF_COLUMNS=128
class DataColumnsByRootIdentifier(Container):
block_root: Root
columns: List[ColumnIndex, NUMBER_OF_COLUMNS]
first = DataColumnsByRootIdentifier(block_root="0x0100000000000000000000000000000000000000000000000000000000000000", columns=[3,5,7])
second = DataColumnsByRootIdentifier(block_root="0x0200000000000000000000000000000000000000000000000000000000000000", columns=[])
third = DataColumnsByRootIdentifier(block_root="0x0300000000000000000000000000000000000000000000000000000000000000", columns=[6, 4])
expected = List[DataColumnsByRootIdentifier, 42](first, second, third).encode_bytes().hex()
*/
const expected = "0c000000480000006c00000001000000000000000000000000000000000000000000000000000000000000002400000003000000000000000500000000000000070000000000000002000000000000000000000000000000000000000000000000000000000000002400000003000000000000000000000000000000000000000000000000000000000000002400000006000000000000000400000000000000"
identifiers := &DataColumnsByRootIdentifiers{
{
BlockRoot: bytesutil.PadTo([]byte{1}, fieldparams.RootLength),
Columns: []uint64{3, 5, 7},
},
{
BlockRoot: bytesutil.PadTo([]byte{2}, fieldparams.RootLength),
Columns: []uint64{},
},
{
BlockRoot: bytesutil.PadTo([]byte{3}, fieldparams.RootLength),
Columns: []uint64{6, 4},
},
}
marshalled, err := identifiers.MarshalSSZ()
require.NoError(t, err)
actual := hex.EncodeToString(marshalled)
require.Equal(t, expected, actual)
}
func TestDataColumnSidecarsByRootReq_MarshalUnmarshal(t *testing.T) {
cases := []struct {
name string
ids []*eth.DataColumnsByRootIdentifier
marshalErr error
unmarshalErr string
unmarshalMod func([]byte) []byte
}{
{
name: "empty list",
},
{
name: "single item list",
ids: generateDataColumnIdentifiers(1),
},
{
name: "10 item list",
ids: generateDataColumnIdentifiers(10),
},
{
name: "wonky unmarshal size",
ids: generateDataColumnIdentifiers(10),
unmarshalMod: func(in []byte) []byte {
in = append(in, byte(0))
return in
},
unmarshalErr: "a is not evenly divisble by b",
},
{
name: "size too big",
ids: generateDataColumnIdentifiers(1),
unmarshalMod: func(in []byte) []byte {
maxLen := params.BeaconConfig().MaxRequestDataColumnSidecars * uint64(dataColumnIdSize)
add := make([]byte, maxLen)
in = append(in, add...)
return in
},
unmarshalErr: "a/b is greater than max",
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
req := DataColumnsByRootIdentifiers(c.ids)
bytes, err := req.MarshalSSZ()
if c.marshalErr != nil {
require.ErrorIs(t, err, c.marshalErr)
return
}
require.NoError(t, err)
if c.unmarshalMod != nil {
bytes = c.unmarshalMod(bytes)
}
got := &DataColumnsByRootIdentifiers{}
err = got.UnmarshalSSZ(bytes)
if c.unmarshalErr != "" {
require.ErrorContains(t, c.unmarshalErr, err)
return
}
require.NoError(t, err)
require.Equal(t, len(c.ids), len(*got))
for i, expected := range c.ids {
actual := (*got)[i]
require.DeepEqual(t, expected, actual)
}
})
}
// Test MarshalSSZTo
req := DataColumnsByRootIdentifiers(generateDataColumnIdentifiers(10))
buf := make([]byte, 0)
buf, err := req.MarshalSSZTo(buf)
require.NoError(t, err)
require.Equal(t, len(buf), int(req.SizeSSZ()))
var unmarshalled DataColumnsByRootIdentifiers
err = unmarshalled.UnmarshalSSZ(buf)
require.NoError(t, err)
require.DeepEqual(t, req, unmarshalled)
}

View File

@@ -37,8 +37,9 @@ func (s *Service) blobSidecarByRootRPCHandler(ctx context.Context, msg interface
blobIdents := *ref
cs := s.cfg.clock.CurrentSlot()
remotePeer := stream.Conn().RemotePeer()
if err := validateBlobByRootRequest(blobIdents, cs); err != nil {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer)
s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream)
return err
}
@@ -75,6 +76,7 @@ func (s *Service) blobSidecarByRootRPCHandler(ctx context.Context, msg interface
log.WithError(err).WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", root),
"index": idx,
"peer": remotePeer.String(),
}).Debugf("Peer requested blob sidecar by root not found in db")
continue
}
@@ -107,14 +109,21 @@ func (s *Service) blobSidecarByRootRPCHandler(ctx context.Context, msg interface
}
func validateBlobByRootRequest(blobIdents types.BlobSidecarsByRootReq, slot primitives.Slot) error {
if slots.ToEpoch(slot) >= params.BeaconConfig().ElectraForkEpoch {
if uint64(len(blobIdents)) > params.BeaconConfig().MaxRequestBlobSidecarsElectra {
return types.ErrMaxBlobReqExceeded
}
} else {
if uint64(len(blobIdents)) > params.BeaconConfig().MaxRequestBlobSidecars {
beaconConfig := params.BeaconConfig()
epoch := slots.ToEpoch(slot)
blobIdentCount := uint64(len(blobIdents))
if epoch >= beaconConfig.ElectraForkEpoch {
if blobIdentCount > beaconConfig.MaxRequestBlobSidecarsElectra {
return types.ErrMaxBlobReqExceeded
}
return nil
}
if blobIdentCount > beaconConfig.MaxRequestBlobSidecars {
return types.ErrMaxBlobReqExceeded
}
return nil
}

View File

@@ -44,7 +44,7 @@ func (c *blobsTestCase) filterExpectedByRoot(t *testing.T, scs []blocks.ROBlob,
message: p2pTypes.ErrBlobLTMinRequest.Error(),
}}
}
sort.Sort(req)
sort.Sort(&req)
var expect []*expectedBlobChunk
blockOffset := 0
if len(scs) == 0 {

View File

@@ -0,0 +1,2 @@
### Added
- PeerDAS: Implement P2P.

View File

@@ -36,6 +36,11 @@ func (m MetadataV0) SyncnetsBitfield() bitfield.Bitvector4 {
return bitfield.Bitvector4{0}
}
// CustodyGroupCount returns custody subnet count from the metadata.
func (m MetadataV0) CustodyGroupCount() uint64 {
return 0
}
// InnerObject returns the underlying metadata protobuf structure.
func (m MetadataV0) InnerObject() interface{} {
return m.md
@@ -86,6 +91,12 @@ func (MetadataV0) MetadataObjV1() *pb.MetaDataV1 {
return nil
}
// MetadataObjV2 returns the inner metadata object in its type
// specified form. If it doesn't exist then we return nothing.
func (MetadataV0) MetadataObjV2() *pb.MetaDataV2 {
return nil
}
// Version returns the fork version of the underlying object.
func (MetadataV0) Version() int {
return version.Phase0
@@ -119,6 +130,11 @@ func (m MetadataV1) SyncnetsBitfield() bitfield.Bitvector4 {
return m.md.Syncnets
}
// CustodyGroupCount returns custody subnet count from the metadata.
func (m MetadataV1) CustodyGroupCount() uint64 {
return 0
}
// InnerObject returns the underlying metadata protobuf structure.
func (m MetadataV1) InnerObject() interface{} {
return m.md
@@ -169,7 +185,107 @@ func (m MetadataV1) MetadataObjV1() *pb.MetaDataV1 {
return m.md
}
// MetadataObjV2 returns the inner metadata object in its type
// specified form. If it doesn't exist then we return nothing.
func (m MetadataV1) MetadataObjV2() *pb.MetaDataV2 {
return nil
}
// Version returns the fork version of the underlying object.
func (MetadataV1) Version() int {
return version.Altair
}
// MetadataV2
// ----------
// MetadataV2 is a convenience wrapper around our metadata v3 protobuf object.
type MetadataV2 struct {
md *pb.MetaDataV2
}
// WrappedMetadataV2 wrappers around the provided protobuf object.
func WrappedMetadataV2(md *pb.MetaDataV2) MetadataV2 {
return MetadataV2{md: md}
}
// SequenceNumber returns the sequence number from the metadata.
func (m MetadataV2) SequenceNumber() uint64 {
return m.md.SeqNumber
}
// AttnetsBitfield returns the bitfield stored in the metadata.
func (m MetadataV2) AttnetsBitfield() bitfield.Bitvector64 {
return m.md.Attnets
}
// SyncnetsBitfield returns the bitfield stored in the metadata.
func (m MetadataV2) SyncnetsBitfield() bitfield.Bitvector4 {
return m.md.Syncnets
}
// CustodyGroupCount returns custody subnet count from the metadata.
func (m MetadataV2) CustodyGroupCount() uint64 {
return m.md.CustodyGroupCount
}
// InnerObject returns the underlying metadata protobuf structure.
func (m MetadataV2) InnerObject() interface{} {
return m.md
}
// IsNil checks for the nilness of the underlying object.
func (m MetadataV2) IsNil() bool {
return m.md == nil
}
// Copy performs a full copy of the underlying metadata object.
func (m MetadataV2) Copy() metadata.Metadata {
return WrappedMetadataV2(proto.Clone(m.md).(*pb.MetaDataV2))
}
// MarshalSSZ marshals the underlying metadata object
// into its serialized form.
func (m MetadataV2) MarshalSSZ() ([]byte, error) {
return m.md.MarshalSSZ()
}
// MarshalSSZTo marshals the underlying metadata object
// into its serialized form into the provided byte buffer.
func (m MetadataV2) MarshalSSZTo(dst []byte) ([]byte, error) {
return m.md.MarshalSSZTo(dst)
}
// SizeSSZ returns the serialized size of the metadata object.
func (m MetadataV2) SizeSSZ() int {
return m.md.SizeSSZ()
}
// UnmarshalSSZ unmarshals the provided byte buffer into
// the underlying metadata object.
func (m MetadataV2) UnmarshalSSZ(buf []byte) error {
return m.md.UnmarshalSSZ(buf)
}
// MetadataObjV0 returns the inner metadata object in its type
// specified form. If it doesn't exist then we return nothing.
func (MetadataV2) MetadataObjV0() *pb.MetaDataV0 {
return nil
}
// MetadataObjV1 returns the inner metadata object in its type
// specified form. If it doesn't exist then we return nothing.
func (m MetadataV2) MetadataObjV1() *pb.MetaDataV1 {
return nil
}
// MetadataObjV2 returns the inner metadata object in its type
// specified form. If it doesn't exist then we return nothing.
func (m MetadataV2) MetadataObjV2() *pb.MetaDataV2 {
return m.md
}
// Version returns the fork version of the underlying object.
func (MetadataV2) Version() int {
return version.Fulu
}

View File

@@ -11,6 +11,7 @@ type Metadata interface {
SequenceNumber() uint64
AttnetsBitfield() bitfield.Bitvector64
SyncnetsBitfield() bitfield.Bitvector4
CustodyGroupCount() uint64
InnerObject() interface{}
IsNil() bool
Copy() Metadata
@@ -18,5 +19,6 @@ type Metadata interface {
ssz.Unmarshaler
MetadataObjV0() *pb.MetaDataV0
MetadataObjV1() *pb.MetaDataV1
MetadataObjV2() *pb.MetaDataV2
Version() int
}