From 3a3bd3902cd2dba82d9f7f0da33074b9e6042eef Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Wed, 28 May 2025 17:23:19 +0200 Subject: [PATCH] PeerDAS: Implement P2P (#15347) * PeerDAS: Implement P2P. * Fix Terence's comment. * Fix Terence's comment. * Fix Terence's comment. * Fix Preston's comment. * Fix Preston's comment. * `TopicFromMessage`: Exit early. * Fix Preston's comment. * `TestService_BroadcastDataColumn`: Avoid ugly sleep. * Fix Kasey's comment. * Fix Kasey's comment. * Fix Kasey's comment. * Fix Kasey's comment. --- beacon-chain/blockchain/setup_test.go | 13 ++ beacon-chain/p2p/BUILD.bazel | 6 + beacon-chain/p2p/broadcaster.go | 147 ++++++++++++++++- beacon-chain/p2p/broadcaster_test.go | 99 +++++++++++ beacon-chain/p2p/config.go | 2 + beacon-chain/p2p/custody.go | 74 +++++++++ beacon-chain/p2p/custody_test.go | 112 +++++++++++++ beacon-chain/p2p/discovery.go | 53 +++++- beacon-chain/p2p/discovery_test.go | 85 +++++++++- beacon-chain/p2p/gossip_scoring_params.go | 2 +- beacon-chain/p2p/handshake.go | 4 +- beacon-chain/p2p/interfaces.go | 6 + beacon-chain/p2p/monitoring.go | 24 ++- beacon-chain/p2p/pubsub_filter.go | 7 +- beacon-chain/p2p/rpc_topic_mappings.go | 49 ++++-- beacon-chain/p2p/rpc_topic_mappings_test.go | 106 ++++++++---- beacon-chain/p2p/sender.go | 15 +- beacon-chain/p2p/subnets.go | 84 +++++++++- beacon-chain/p2p/subnets_test.go | 21 +++ beacon-chain/p2p/testing/BUILD.bazel | 3 + beacon-chain/p2p/testing/fuzz_p2p.go | 12 +- beacon-chain/p2p/testing/mock_broadcaster.go | 7 + beacon-chain/p2p/testing/mock_peermanager.go | 2 +- beacon-chain/p2p/testing/p2p.go | 28 ++++ beacon-chain/p2p/types/BUILD.bazel | 1 + beacon-chain/p2p/types/object_mapping.go | 2 +- beacon-chain/p2p/types/rpc_errors.go | 2 +- beacon-chain/p2p/types/types.go | 143 +++++++++++++++- beacon-chain/p2p/types/types_test.go | 155 ++++++++++++++++++ .../sync/rpc_blob_sidecars_by_root.go | 23 ++- .../sync/rpc_blob_sidecars_by_root_test.go | 2 +- changelog/manu-peerdas-p2p.md | 2 + consensus-types/wrapper/metadata.go | 116 +++++++++++++ .../v1alpha1/metadata/metadata_interfaces.go | 2 + 34 files changed, 1301 insertions(+), 108 deletions(-) create mode 100644 beacon-chain/p2p/custody.go create mode 100644 beacon-chain/p2p/custody_test.go create mode 100644 changelog/manu-peerdas-p2p.md diff --git a/beacon-chain/blockchain/setup_test.go b/beacon-chain/blockchain/setup_test.go index 5b74595cfe..f9f7c01580 100644 --- a/beacon-chain/blockchain/setup_test.go +++ b/beacon-chain/blockchain/setup_test.go @@ -20,8 +20,10 @@ import ( "github.com/OffchainLabs/prysm/v6/beacon-chain/operations/attestations" "github.com/OffchainLabs/prysm/v6/beacon-chain/operations/blstoexec" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p" + p2pTesting "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing" "github.com/OffchainLabs/prysm/v6/beacon-chain/startup" "github.com/OffchainLabs/prysm/v6/beacon-chain/state/stategen" + fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" "github.com/OffchainLabs/prysm/v6/consensus-types/interfaces" ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" "github.com/OffchainLabs/prysm/v6/testing/require" @@ -47,6 +49,11 @@ type mockBroadcaster struct { broadcastCalled bool } +type mockAccesser struct { + mockBroadcaster + p2pTesting.MockPeerManager +} + func (mb *mockBroadcaster) Broadcast(_ context.Context, _ proto.Message) error { mb.broadcastCalled = true return nil @@ -77,6 +84,11 @@ func (mb *mockBroadcaster) BroadcastLightClientFinalityUpdate(_ context.Context, return nil } +func (mb *mockBroadcaster) BroadcastDataColumn(_ [fieldparams.RootLength]byte, _ uint64, _ *ethpb.DataColumnSidecar, _ ...chan<- bool) error { + mb.broadcastCalled = true + return nil +} + func (mb *mockBroadcaster) BroadcastBLSChanges(_ context.Context, _ []*ethpb.SignedBLSToExecutionChange) { } @@ -134,6 +146,7 @@ func minimalTestService(t *testing.T, opts ...Option) (*Service, *testServiceReq WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)), WithSyncChecker(mock.MockChecker{}), WithExecutionEngineCaller(&mockExecution.EngineClient{}), + WithP2PBroadcaster(&mockAccesser{}), WithLightClientStore(&lightclient.Store{}), } // append the variadic opts so they override the defaults by being processed afterwards diff --git a/beacon-chain/p2p/BUILD.bazel b/beacon-chain/p2p/BUILD.bazel index 6bc5619f87..51d0be0d51 100644 --- a/beacon-chain/p2p/BUILD.bazel +++ b/beacon-chain/p2p/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "broadcaster.go", "config.go", "connection_gater.go", + "custody.go", "dial_relay_node.go", "discovery.go", "doc.go", @@ -45,6 +46,7 @@ go_library( "//beacon-chain/core/altair:go_default_library", "//beacon-chain/core/feed/state:go_default_library", "//beacon-chain/core/helpers:go_default_library", + "//beacon-chain/core/peerdas:go_default_library", "//beacon-chain/core/time:go_default_library", "//beacon-chain/db:go_default_library", "//beacon-chain/p2p/encoder:go_default_library", @@ -118,6 +120,7 @@ go_test( "addr_factory_test.go", "broadcaster_test.go", "connection_gater_test.go", + "custody_test.go", "dial_relay_node_test.go", "discovery_test.go", "fork_test.go", @@ -139,10 +142,12 @@ go_test( flaky = True, tags = ["requires-network"], deps = [ + "//beacon-chain/blockchain/kzg:go_default_library", "//beacon-chain/blockchain/testing:go_default_library", "//beacon-chain/cache:go_default_library", "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/core/light-client:go_default_library", + "//beacon-chain/core/peerdas:go_default_library", "//beacon-chain/core/signing:go_default_library", "//beacon-chain/db/testing:go_default_library", "//beacon-chain/p2p/encoder:go_default_library", @@ -166,6 +171,7 @@ go_test( "//network/forks:go_default_library", "//proto/eth/v1:go_default_library", "//proto/prysm/v1alpha1:go_default_library", + "//proto/prysm/v1alpha1/metadata:go_default_library", "//proto/testing:go_default_library", "//runtime/version:go_default_library", "//testing/assert:go_default_library", diff --git a/beacon-chain/p2p/broadcaster.go b/beacon-chain/p2p/broadcaster.go index bc47f297bd..e376b08963 100644 --- a/beacon-chain/p2p/broadcaster.go +++ b/beacon-chain/p2p/broadcaster.go @@ -9,6 +9,7 @@ import ( "github.com/OffchainLabs/prysm/v6/beacon-chain/core/altair" "github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers" + fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" "github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/consensus-types/interfaces" "github.com/OffchainLabs/prysm/v6/crypto/hash" @@ -98,7 +99,7 @@ func (s *Service) BroadcastSyncCommitteeMessage(ctx context.Context, subnet uint return nil } -func (s *Service) internalBroadcastAttestation(ctx context.Context, subnet uint64, att ethpb.Att, forkDigest [4]byte) { +func (s *Service) internalBroadcastAttestation(ctx context.Context, subnet uint64, att ethpb.Att, forkDigest [fieldparams.VersionLength]byte) { _, span := trace.StartSpan(ctx, "p2p.internalBroadcastAttestation") defer span.End() ctx = trace.NewContext(context.Background(), span) // clear parent context / deadline. @@ -154,7 +155,7 @@ func (s *Service) internalBroadcastAttestation(ctx context.Context, subnet uint6 } } -func (s *Service) broadcastSyncCommittee(ctx context.Context, subnet uint64, sMsg *ethpb.SyncCommitteeMessage, forkDigest [4]byte) { +func (s *Service) broadcastSyncCommittee(ctx context.Context, subnet uint64, sMsg *ethpb.SyncCommitteeMessage, forkDigest [fieldparams.VersionLength]byte) { _, span := trace.StartSpan(ctx, "p2p.broadcastSyncCommittee") defer span.End() ctx = trace.NewContext(context.Background(), span) // clear parent context / deadline. @@ -230,7 +231,7 @@ func (s *Service) BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb. return nil } -func (s *Service) internalBroadcastBlob(ctx context.Context, subnet uint64, blobSidecar *ethpb.BlobSidecar, forkDigest [4]byte) { +func (s *Service) internalBroadcastBlob(ctx context.Context, subnet uint64, blobSidecar *ethpb.BlobSidecar, forkDigest [fieldparams.VersionLength]byte) { _, span := trace.StartSpan(ctx, "p2p.internalBroadcastBlob") defer span.End() ctx = trace.NewContext(context.Background(), span) // clear parent context / deadline. @@ -245,7 +246,7 @@ func (s *Service) internalBroadcastBlob(ctx context.Context, subnet uint64, blob s.subnetLocker(wrappedSubIdx).RUnlock() if !hasPeer { - blobSidecarCommitteeBroadcastAttempts.Inc() + blobSidecarBroadcastAttempts.Inc() if err := func() error { s.subnetLocker(wrappedSubIdx).Lock() defer s.subnetLocker(wrappedSubIdx).Unlock() @@ -254,7 +255,7 @@ func (s *Service) internalBroadcastBlob(ctx context.Context, subnet uint64, blob return err } if ok { - blobSidecarCommitteeBroadcasts.Inc() + blobSidecarBroadcasts.Inc() return nil } return errors.New("failed to find peers for subnet") @@ -322,6 +323,132 @@ func (s *Service) BroadcastLightClientFinalityUpdate(ctx context.Context, update return nil } +// BroadcastDataColumn broadcasts a data column to the p2p network, the message is assumed to be +// broadcasted to the current fork and to the input column subnet. +func (s *Service) BroadcastDataColumn( + root [fieldparams.RootLength]byte, + dataColumnSubnet uint64, + dataColumnSidecar *ethpb.DataColumnSidecar, + peersCheckedChans ...chan<- bool, // Used for testing purposes to signal when peers are checked. +) error { + // Add tracing to the function. + ctx, span := trace.StartSpan(s.ctx, "p2p.BroadcastDataColumn") + defer span.End() + + // Ensure the data column sidecar is not nil. + if dataColumnSidecar == nil { + return errors.Errorf("attempted to broadcast nil data column sidecar at subnet %d", dataColumnSubnet) + } + + // Retrieve the current fork digest. + forkDigest, err := s.currentForkDigest() + if err != nil { + err := errors.Wrap(err, "current fork digest") + tracing.AnnotateError(span, err) + return err + } + + // Non-blocking broadcast, with attempts to discover a column subnet peer if none available. + go s.internalBroadcastDataColumn(ctx, root, dataColumnSubnet, dataColumnSidecar, forkDigest, peersCheckedChans) + + return nil +} + +func (s *Service) internalBroadcastDataColumn( + ctx context.Context, + root [fieldparams.RootLength]byte, + columnSubnet uint64, + dataColumnSidecar *ethpb.DataColumnSidecar, + forkDigest [fieldparams.VersionLength]byte, + peersCheckedChans []chan<- bool, // Used for testing purposes to signal when peers are checked. +) { + // Add tracing to the function. + _, span := trace.StartSpan(ctx, "p2p.internalBroadcastDataColumn") + defer span.End() + + // Increase the number of broadcast attempts. + dataColumnSidecarBroadcastAttempts.Inc() + + // Define a one-slot length context timeout. + secondsPerSlot := params.BeaconConfig().SecondsPerSlot + oneSlot := time.Duration(secondsPerSlot) * time.Second + ctx, cancel := context.WithTimeout(ctx, oneSlot) + defer cancel() + + // Build the topic corresponding to this column subnet and this fork digest. + topic := dataColumnSubnetToTopic(columnSubnet, forkDigest) + + // Compute the wrapped subnet index. + wrappedSubIdx := columnSubnet + dataColumnSubnetVal + + // Find peers if needed. + if err := s.findPeersIfNeeded(ctx, wrappedSubIdx, topic, columnSubnet, peersCheckedChans); err != nil { + log.WithError(err).Error("Failed to find peers for data column subnet") + tracing.AnnotateError(span, err) + } + + // Broadcast the data column sidecar to the network. + if err := s.broadcastObject(ctx, dataColumnSidecar, topic); err != nil { + log.WithError(err).Error("Failed to broadcast data column sidecar") + tracing.AnnotateError(span, err) + return + } + + header := dataColumnSidecar.SignedBlockHeader.GetHeader() + slot := header.GetSlot() + + slotStartTime, err := slots.ToTime(uint64(s.genesisTime.Unix()), slot) + if err != nil { + log.WithError(err).Error("Failed to convert slot to time") + } + + log.WithFields(logrus.Fields{ + "slot": slot, + "timeSinceSlotStart": time.Since(slotStartTime), + "root": fmt.Sprintf("%#x", root), + "columnSubnet": columnSubnet, + }).Debug("Broadcasted data column sidecar") + + // Increase the number of successful broadcasts. + dataColumnSidecarBroadcasts.Inc() +} + +func (s *Service) findPeersIfNeeded( + ctx context.Context, + wrappedSubIdx uint64, + topic string, + subnet uint64, + peersCheckedChans []chan<- bool, // Used for testing purposes to signal when peers are checked. +) error { + s.subnetLocker(wrappedSubIdx).Lock() + defer s.subnetLocker(wrappedSubIdx).Unlock() + + // Sending a data column sidecar to only one peer is not ideal, + // but it ensures at least one peer receives it. + const peerCount = 1 + + if s.hasPeerWithSubnet(topic) { + // Exit early if we already have peers with this subnet. + return nil + } + + // Used for testing purposes. + if len(peersCheckedChans) > 0 { + peersCheckedChans[0] <- true + } + + // No peers found, attempt to find peers with this subnet. + ok, err := s.FindPeersWithSubnet(ctx, topic, subnet, peerCount) + if err != nil { + return errors.Wrap(err, "find peers with subnet") + } + if !ok { + return errors.Errorf("failed to find peers for topic %s with subnet %d", topic, subnet) + } + + return nil +} + // method to broadcast messages to other peers in our gossip mesh. func (s *Service) broadcastObject(ctx context.Context, obj ssz.Marshaler, topic string) error { ctx, span := trace.StartSpan(ctx, "p2p.broadcastObject") @@ -351,15 +478,15 @@ func (s *Service) broadcastObject(ctx context.Context, obj ssz.Marshaler, topic return nil } -func attestationToTopic(subnet uint64, forkDigest [4]byte) string { +func attestationToTopic(subnet uint64, forkDigest [fieldparams.VersionLength]byte) string { return fmt.Sprintf(AttestationSubnetTopicFormat, forkDigest, subnet) } -func syncCommitteeToTopic(subnet uint64, forkDigest [4]byte) string { +func syncCommitteeToTopic(subnet uint64, forkDigest [fieldparams.VersionLength]byte) string { return fmt.Sprintf(SyncCommitteeSubnetTopicFormat, forkDigest, subnet) } -func blobSubnetToTopic(subnet uint64, forkDigest [4]byte) string { +func blobSubnetToTopic(subnet uint64, forkDigest [fieldparams.VersionLength]byte) string { return fmt.Sprintf(BlobSubnetTopicFormat, forkDigest, subnet) } @@ -370,3 +497,7 @@ func lcOptimisticToTopic(forkDigest [4]byte) string { func lcFinalityToTopic(forkDigest [4]byte) string { return fmt.Sprintf(LightClientFinalityUpdateTopicFormat, forkDigest) } + +func dataColumnSubnetToTopic(subnet uint64, forkDigest [fieldparams.VersionLength]byte) string { + return fmt.Sprintf(DataColumnSubnetTopicFormat, forkDigest, subnet) +} diff --git a/beacon-chain/p2p/broadcaster_test.go b/beacon-chain/p2p/broadcaster_test.go index e35556711b..f0415b1f6d 100644 --- a/beacon-chain/p2p/broadcaster_test.go +++ b/beacon-chain/p2p/broadcaster_test.go @@ -9,11 +9,14 @@ import ( "testing" "time" + "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/kzg" "github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers" lightClient "github.com/OffchainLabs/prysm/v6/beacon-chain/core/light-client" + "github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/scorers" p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing" + "github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags" fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" "github.com/OffchainLabs/prysm/v6/consensus-types/interfaces" "github.com/OffchainLabs/prysm/v6/consensus-types/wrapper" @@ -655,3 +658,99 @@ func TestService_BroadcastLightClientFinalityUpdate(t *testing.T) { t.Error("Failed to receive pubsub within 1s") } } + +func TestService_BroadcastDataColumn(t *testing.T) { + const ( + port = 2000 + columnIndex = 12 + topicFormat = DataColumnSubnetTopicFormat + ) + + // Load the KZG trust setup. + err := kzg.Start() + require.NoError(t, err) + + gFlags := new(flags.GlobalFlags) + gFlags.MinimumPeersPerSubnet = 1 + flags.Init(gFlags) + + // Reset config. + defer flags.Init(new(flags.GlobalFlags)) + + // Create two peers and connect them. + p1, p2 := p2ptest.NewTestP2P(t), p2ptest.NewTestP2P(t) + p1.Connect(p2) + + // Test the peers are connected. + require.NotEqual(t, 0, len(p1.BHost.Network().Peers()), "No peers") + + // Create a host. + _, pkey, ipAddr := createHost(t, port) + + p := &Service{ + ctx: context.Background(), + host: p1.BHost, + pubsub: p1.PubSub(), + joinedTopics: map[string]*pubsub.Topic{}, + cfg: &Config{}, + genesisTime: time.Now(), + genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32), + subnetsLock: make(map[uint64]*sync.RWMutex), + subnetsLockLock: sync.Mutex{}, + peers: peers.NewStatus(context.Background(), &peers.StatusConfig{ScorerParams: &scorers.Config{}}), + } + + // Create a listener. + listener, err := p.startDiscoveryV5(ipAddr, pkey) + require.NoError(t, err) + + p.dv5Listener = listener + + digest, err := p.currentForkDigest() + require.NoError(t, err) + + subnet := peerdas.ComputeSubnetForDataColumnSidecar(columnIndex) + topic := fmt.Sprintf(topicFormat, digest, subnet) + + roSidecars, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, util.DataColumnsParamsByRoot{{}: {{ColumnIndex: columnIndex}}}) + sidecar := roSidecars[0].DataColumnSidecar + + // Async listen for the pubsub, must be before the broadcast. + var wg sync.WaitGroup + wg.Add(1) + + peersChecked := make(chan bool, 0) + + go func(tt *testing.T) { + defer wg.Done() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Wait for the peers to be checked. + <-peersChecked + + // External peer subscribes to the topic. + topic += p.Encoding().ProtocolSuffix() + sub, err := p2.SubscribeToTopic(topic) + require.NoError(tt, err) + + msg, err := sub.Next(ctx) + require.NoError(tt, err) + + var result ethpb.DataColumnSidecar + require.NoError(tt, p.Encoding().DecodeGossip(msg.Data, &result)) + require.DeepEqual(tt, &result, sidecar) + }(t) + + var emptyRoot [fieldparams.RootLength]byte + + // Attempt to broadcast nil object should fail. + err = p.BroadcastDataColumn(emptyRoot, subnet, nil) + require.ErrorContains(t, "attempted to broadcast nil", err) + + // Broadcast to peers and wait. + err = p.BroadcastDataColumn(emptyRoot, subnet, sidecar, peersChecked) + require.NoError(t, err) + require.Equal(t, false, util.WaitTimeout(&wg, 1*time.Minute), "Failed to receive pubsub within 1s") +} diff --git a/beacon-chain/p2p/config.go b/beacon-chain/p2p/config.go index d8fa021f70..d7de4a0acd 100644 --- a/beacon-chain/p2p/config.go +++ b/beacon-chain/p2p/config.go @@ -4,6 +4,7 @@ import ( "time" statefeed "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/state" + "github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas" "github.com/OffchainLabs/prysm/v6/beacon-chain/db" "github.com/OffchainLabs/prysm/v6/beacon-chain/startup" ) @@ -38,6 +39,7 @@ type Config struct { StateNotifier statefeed.Notifier DB db.ReadOnlyDatabase ClockWaiter startup.ClockWaiter + CustodyInfo *peerdas.CustodyInfo } // validateConfig validates whether the values provided are accurate and will set diff --git a/beacon-chain/p2p/custody.go b/beacon-chain/p2p/custody.go new file mode 100644 index 0000000000..b78512f3d2 --- /dev/null +++ b/beacon-chain/p2p/custody.go @@ -0,0 +1,74 @@ +package p2p + +import ( + "github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas" + "github.com/OffchainLabs/prysm/v6/config/params" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/sirupsen/logrus" +) + +var _ DataColumnsHandler = (*Service)(nil) + +// CustodyGroupCountFromPeer retrieves custody group count from a peer. +// It first tries to get the custody group count from the peer's metadata, +// then falls back to the ENR value if the metadata is not available, then +// falls back to the minimum number of custody groups an honest node should custodiy +// and serve samples from if ENR is not available. +func (s *Service) CustodyGroupCountFromPeer(pid peer.ID) uint64 { + log := log.WithField("peerID", pid) + // Try to get the custody group count from the peer's metadata. + metadata, err := s.peers.Metadata(pid) + if err != nil { + // On error, default to the ENR value. + log.WithError(err).Debug("Failed to retrieve metadata for peer, defaulting to the ENR value") + return s.custodyGroupCountFromPeerENR(pid) + } + + // If the metadata is nil, default to the ENR value. + if metadata == nil { + log.Debug("Metadata is nil, defaulting to the ENR value") + return s.custodyGroupCountFromPeerENR(pid) + } + + // Get the custody subnets count from the metadata. + custodyCount := metadata.CustodyGroupCount() + + // If the custody count is null, default to the ENR value. + if custodyCount == 0 { + log.Debug("The custody count extracted from the metadata equals to 0, defaulting to the ENR value") + return s.custodyGroupCountFromPeerENR(pid) + } + + return custodyCount +} + +// custodyGroupCountFromPeerENR retrieves the custody count from the peer's ENR. +// If the ENR is not available, it defaults to the minimum number of custody groups +// an honest node custodies and serves samples from. +func (s *Service) custodyGroupCountFromPeerENR(pid peer.ID) uint64 { + // By default, we assume the peer custodies the minimum number of groups. + custodyRequirement := params.BeaconConfig().CustodyRequirement + + log := log.WithFields(logrus.Fields{ + "peerID": pid, + "defaultValue": custodyRequirement, + }) + + // Retrieve the ENR of the peer. + record, err := s.peers.ENR(pid) + if err != nil { + log.WithError(err).Debug("Failed to retrieve ENR for peer, defaulting to the default value") + + return custodyRequirement + } + + // Retrieve the custody group count from the ENR. + custodyGroupCount, err := peerdas.CustodyGroupCountFromRecord(record) + if err != nil { + log.WithError(err).Debug("Failed to retrieve custody group count from ENR for peer, defaulting to the default value") + + return custodyRequirement + } + + return custodyGroupCount +} diff --git a/beacon-chain/p2p/custody_test.go b/beacon-chain/p2p/custody_test.go new file mode 100644 index 0000000000..2579ee3e46 --- /dev/null +++ b/beacon-chain/p2p/custody_test.go @@ -0,0 +1,112 @@ +package p2p + +import ( + "context" + "testing" + + "github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas" + "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers" + "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/scorers" + "github.com/OffchainLabs/prysm/v6/config/params" + "github.com/OffchainLabs/prysm/v6/consensus-types/wrapper" + pb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" + "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1/metadata" + "github.com/OffchainLabs/prysm/v6/testing/require" + "github.com/ethereum/go-ethereum/p2p/enr" + "github.com/libp2p/go-libp2p/core/network" +) + +func TestCustodyGroupCountFromPeer(t *testing.T) { + const ( + expectedENR uint64 = 7 + expectedMetadata uint64 = 8 + pid = "test-id" + ) + + cgc := peerdas.Cgc(expectedENR) + + // Define a nil record + var nilRecord *enr.Record = nil + + // Define an empty record (record with non `cgc` entry) + emptyRecord := &enr.Record{} + + // Define a nominal record + nominalRecord := &enr.Record{} + nominalRecord.Set(cgc) + + // Define a metadata with zero custody. + zeroMetadata := wrapper.WrappedMetadataV2(&pb.MetaDataV2{ + CustodyGroupCount: 0, + }) + + // Define a nominal metadata. + nominalMetadata := wrapper.WrappedMetadataV2(&pb.MetaDataV2{ + CustodyGroupCount: expectedMetadata, + }) + + testCases := []struct { + name string + record *enr.Record + metadata metadata.Metadata + expected uint64 + }{ + { + name: "No metadata - No ENR", + record: nilRecord, + expected: params.BeaconConfig().CustodyRequirement, + }, + { + name: "No metadata - Empty ENR", + record: emptyRecord, + expected: params.BeaconConfig().CustodyRequirement, + }, + { + name: "No Metadata - ENR", + record: nominalRecord, + expected: expectedENR, + }, + { + name: "Metadata with 0 value - ENR", + record: nominalRecord, + metadata: zeroMetadata, + expected: expectedENR, + }, + { + name: "Metadata - ENR", + record: nominalRecord, + metadata: nominalMetadata, + expected: expectedMetadata, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Create peers status. + peers := peers.NewStatus(context.Background(), &peers.StatusConfig{ + ScorerParams: &scorers.Config{}, + }) + + // Set the metadata. + if tc.metadata != nil { + peers.SetMetadata(pid, tc.metadata) + } + + // Add a new peer with the record. + peers.Add(tc.record, pid, nil, network.DirOutbound) + + // Create a new service. + service := &Service{ + peers: peers, + metaData: tc.metadata, + } + + // Retrieve the custody count from the remote peer. + actual := service.CustodyGroupCountFromPeer(pid) + + // Verify the result. + require.Equal(t, tc.expected, actual) + }) + } + +} diff --git a/beacon-chain/p2p/discovery.go b/beacon-chain/p2p/discovery.go index 7c944ddedd..2f384ae411 100644 --- a/beacon-chain/p2p/discovery.go +++ b/beacon-chain/p2p/discovery.go @@ -8,6 +8,7 @@ import ( "time" "github.com/OffchainLabs/prysm/v6/beacon-chain/cache" + "github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas" "github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags" "github.com/OffchainLabs/prysm/v6/config/features" "github.com/OffchainLabs/prysm/v6/config/params" @@ -187,7 +188,8 @@ func (s *Service) RefreshPersistentSubnets() { // Compare current epoch with Altair fork epoch altairForkEpoch := params.BeaconConfig().AltairForkEpoch - if currentEpoch < altairForkEpoch { + // We add `1` to the current epoch because we want to prepare one epoch before the Altair fork. + if currentEpoch+1 < altairForkEpoch { // Phase 0 behaviour. if isBitVUpToDate { // Return early if bitfield hasn't changed. @@ -223,15 +225,51 @@ func (s *Service) RefreshPersistentSubnets() { // Is our sync bitvector record up to date? isBitSUpToDate := bytes.Equal(bitS, inRecordBitS) && bytes.Equal(bitS, currentBitSInMetadata) - if metadataVersion == version.Altair && isBitVUpToDate && isBitSUpToDate { + // Compare current epoch with the Fulu fork epoch. + fuluForkEpoch := params.BeaconConfig().FuluForkEpoch + + // We add `1` to the current epoch because we want to prepare one epoch before the Fulu fork. + if currentEpoch+1 < fuluForkEpoch { + // Altair behaviour. + if metadataVersion == version.Altair && isBitVUpToDate && isBitSUpToDate { + // Nothing to do, return early. + return + } + + // Some data have changed, update our record and metadata. + s.updateSubnetRecordWithMetadataV2(bitV, bitS) + + // Ping all peers to inform them of new metadata + s.pingPeersAndLogEnr() + + return + } + + // Get the current custody group count. + custodyGroupCount := s.cfg.CustodyInfo.ActualGroupCount() + + // Get the custody group count we store in our record. + inRecordCustodyGroupCount, err := peerdas.CustodyGroupCountFromRecord(record) + if err != nil { + log.WithError(err).Error("Could not retrieve custody subnet count") + return + } + + // Get the custody group count in our metadata. + inMetadataCustodyGroupCount := s.Metadata().CustodyGroupCount() + + // Is our custody group count record up to date? + isCustodyGroupCountUpToDate := (custodyGroupCount == inRecordCustodyGroupCount && custodyGroupCount == inMetadataCustodyGroupCount) + + if isBitVUpToDate && isBitSUpToDate && isCustodyGroupCountUpToDate { // Nothing to do, return early. return } - // Some data have changed, update our record and metadata. - s.updateSubnetRecordWithMetadataV2(bitV, bitS) + // Some data changed. Update the record and the metadata. + s.updateSubnetRecordWithMetadataV3(bitV, bitS, custodyGroupCount) - // Ping all peers to inform them of new metadata + // Ping all peers. s.pingPeersAndLogEnr() } @@ -458,6 +496,11 @@ func (s *Service) createLocalNode( localNode.Set(quicEntry) } + if params.FuluEnabled() { + custodyGroupCount := s.cfg.CustodyInfo.ActualGroupCount() + localNode.Set(peerdas.Cgc(custodyGroupCount)) + } + localNode.SetFallbackIP(ipAddr) localNode.SetFallbackUDP(udpPort) diff --git a/beacon-chain/p2p/discovery_test.go b/beacon-chain/p2p/discovery_test.go index 3a6014841c..7c03aadef4 100644 --- a/beacon-chain/p2p/discovery_test.go +++ b/beacon-chain/p2p/discovery_test.go @@ -16,6 +16,7 @@ import ( mock "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing" "github.com/OffchainLabs/prysm/v6/beacon-chain/cache" + "github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/peerdata" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/scorers" @@ -140,6 +141,15 @@ func TestStartDiscV5_DiscoverAllPeers(t *testing.T) { func TestCreateLocalNode(t *testing.T) { params.SetupTestConfigCleanup(t) + + // Set the fulu fork epoch to something other than the far future epoch. + initFuluForkEpoch := params.BeaconConfig().FuluForkEpoch + params.BeaconConfig().FuluForkEpoch = 42 + + defer func() { + params.BeaconConfig().FuluForkEpoch = initFuluForkEpoch + }() + testCases := []struct { name string cfg *Config @@ -147,30 +157,31 @@ func TestCreateLocalNode(t *testing.T) { }{ { name: "valid config", - cfg: nil, + cfg: &Config{CustodyInfo: &peerdas.CustodyInfo{}}, expectedError: false, }, { name: "invalid host address", - cfg: &Config{HostAddress: "invalid"}, + cfg: &Config{HostAddress: "invalid", CustodyInfo: &peerdas.CustodyInfo{}}, expectedError: true, }, { name: "valid host address", - cfg: &Config{HostAddress: "192.168.0.1"}, + cfg: &Config{HostAddress: "192.168.0.1", CustodyInfo: &peerdas.CustodyInfo{}}, expectedError: false, }, { name: "invalid host DNS", - cfg: &Config{HostDNS: "invalid"}, + cfg: &Config{HostDNS: "invalid", CustodyInfo: &peerdas.CustodyInfo{}}, expectedError: true, }, { name: "valid host DNS", - cfg: &Config{HostDNS: "www.google.com"}, + cfg: &Config{HostDNS: "www.google.com", CustodyInfo: &peerdas.CustodyInfo{}}, expectedError: false, }, } + for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { // Define ports. @@ -199,7 +210,7 @@ func TestCreateLocalNode(t *testing.T) { require.NoError(t, err) expectedAddress := address - if tt.cfg != nil && tt.cfg.HostAddress != "" { + if tt.cfg.HostAddress != "" { expectedAddress = net.ParseIP(tt.cfg.HostAddress) } @@ -236,6 +247,11 @@ func TestCreateLocalNode(t *testing.T) { syncSubnets := new([]byte) require.NoError(t, localNode.Node().Record().Load(enr.WithEntry(syncCommsSubnetEnrKey, syncSubnets))) require.DeepSSZEqual(t, []byte{0}, *syncSubnets) + + // Check cgc config. + custodyGroupCount := new(uint64) + require.NoError(t, localNode.Node().Record().Load(enr.WithEntry(peerdas.CustodyGroupCountEnrKey, custodyGroupCount))) + require.Equal(t, params.BeaconConfig().CustodyRequirement, *custodyGroupCount) }) } } @@ -535,7 +551,7 @@ type check struct { metadataSequenceNumber uint64 attestationSubnets []uint64 syncSubnets []uint64 - custodySubnetCount *uint64 + custodyGroupCount *uint64 } func checkPingCountCacheMetadataRecord( @@ -601,6 +617,18 @@ func checkPingCountCacheMetadataRecord( actualBitSMetadata := service.metaData.SyncnetsBitfield() require.DeepSSZEqual(t, expectedBitS, actualBitSMetadata) } + + if expected.custodyGroupCount != nil { + // Check custody subnet count in ENR. + var actualCustodyGroupCount uint64 + err := service.dv5Listener.LocalNode().Node().Record().Load(enr.WithEntry(peerdas.CustodyGroupCountEnrKey, &actualCustodyGroupCount)) + require.NoError(t, err) + require.Equal(t, *expected.custodyGroupCount, actualCustodyGroupCount) + + // Check custody subnet count in metadata. + actualGroupCountMetadata := service.metaData.CustodyGroupCount() + require.Equal(t, *expected.custodyGroupCount, actualGroupCountMetadata) + } } func TestRefreshPersistentSubnets(t *testing.T) { @@ -610,12 +638,18 @@ func TestRefreshPersistentSubnets(t *testing.T) { defer cache.SubnetIDs.EmptyAllCaches() defer cache.SyncSubnetIDs.EmptyAllCaches() - const altairForkEpoch = 5 + const ( + altairForkEpoch = 5 + fuluForkEpoch = 10 + ) + + custodyGroupCount := params.BeaconConfig().CustodyRequirement // Set up epochs. defaultCfg := params.BeaconConfig() cfg := defaultCfg.Copy() cfg.AltairForkEpoch = altairForkEpoch + cfg.FuluForkEpoch = fuluForkEpoch params.OverrideBeaconConfig(cfg) // Compute the number of seconds per epoch. @@ -684,6 +718,39 @@ func TestRefreshPersistentSubnets(t *testing.T) { }, }, }, + { + name: "Fulu", + epochSinceGenesis: fuluForkEpoch, + checks: []check{ + { + pingCount: 0, + metadataSequenceNumber: 0, + attestationSubnets: []uint64{}, + syncSubnets: nil, + }, + { + pingCount: 1, + metadataSequenceNumber: 1, + attestationSubnets: []uint64{40, 41}, + syncSubnets: nil, + custodyGroupCount: &custodyGroupCount, + }, + { + pingCount: 2, + metadataSequenceNumber: 2, + attestationSubnets: []uint64{40, 41}, + syncSubnets: []uint64{1, 2}, + custodyGroupCount: &custodyGroupCount, + }, + { + pingCount: 2, + metadataSequenceNumber: 2, + attestationSubnets: []uint64{40, 41}, + syncSubnets: []uint64{1, 2}, + custodyGroupCount: &custodyGroupCount, + }, + }, + }, } for _, tc := range testCases { @@ -717,7 +784,7 @@ func TestRefreshPersistentSubnets(t *testing.T) { actualPingCount++ return nil }, - cfg: &Config{UDPPort: 2000}, + cfg: &Config{UDPPort: 2000, CustodyInfo: &peerdas.CustodyInfo{}}, peers: p2p.Peers(), genesisTime: time.Now().Add(-time.Duration(tc.epochSinceGenesis*secondsPerEpoch) * time.Second), genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32), diff --git a/beacon-chain/p2p/gossip_scoring_params.go b/beacon-chain/p2p/gossip_scoring_params.go index 3d854c1d63..84611428cd 100644 --- a/beacon-chain/p2p/gossip_scoring_params.go +++ b/beacon-chain/p2p/gossip_scoring_params.go @@ -127,7 +127,7 @@ func (s *Service) topicScoreParams(topic string) (*pubsub.TopicScoreParams, erro return defaultAttesterSlashingTopicParams(), nil case strings.Contains(topic, GossipBlsToExecutionChangeMessage): return defaultBlsToExecutionChangeTopicParams(), nil - case strings.Contains(topic, GossipBlobSidecarMessage): + case strings.Contains(topic, GossipBlobSidecarMessage), strings.Contains(topic, GossipDataColumnSidecarMessage): // TODO(Deneb): Using the default block scoring. But this should be updated. return defaultBlockTopicParams(), nil case strings.Contains(topic, GossipLightClientOptimisticUpdateMessage): diff --git a/beacon-chain/p2p/handshake.go b/beacon-chain/p2p/handshake.go index 96f43649c7..e9c0a82eae 100644 --- a/beacon-chain/p2p/handshake.go +++ b/beacon-chain/p2p/handshake.go @@ -22,7 +22,9 @@ const ( ) func peerMultiaddrString(conn network.Conn) string { - return fmt.Sprintf("%s/p2p/%s", conn.RemoteMultiaddr().String(), conn.RemotePeer().String()) + remoteMultiaddr := conn.RemoteMultiaddr().String() + remotePeerID := conn.RemotePeer().String() + return fmt.Sprintf("%s/p2p/%s", remoteMultiaddr, remotePeerID) } func (s *Service) connectToPeer(conn network.Conn) { diff --git a/beacon-chain/p2p/interfaces.go b/beacon-chain/p2p/interfaces.go index d8aa5bf6cb..2b4d81fb76 100644 --- a/beacon-chain/p2p/interfaces.go +++ b/beacon-chain/p2p/interfaces.go @@ -5,6 +5,7 @@ import ( "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/encoder" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers" + fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" "github.com/OffchainLabs/prysm/v6/consensus-types/interfaces" ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1/metadata" @@ -40,6 +41,7 @@ type Broadcaster interface { BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar) error BroadcastLightClientOptimisticUpdate(ctx context.Context, update interfaces.LightClientOptimisticUpdate) error BroadcastLightClientFinalityUpdate(ctx context.Context, update interfaces.LightClientFinalityUpdate) error + BroadcastDataColumn(root [fieldparams.RootLength]byte, columnSubnet uint64, dataColumnSidecar *ethpb.DataColumnSidecar, peersChecked ...chan<- bool) error } // SetStreamHandler configures p2p to handle streams of a certain topic ID. @@ -107,3 +109,7 @@ type MetadataProvider interface { Metadata() metadata.Metadata MetadataSeq() uint64 } + +type DataColumnsHandler interface { + CustodyGroupCountFromPeer(peer.ID) uint64 +} diff --git a/beacon-chain/p2p/monitoring.go b/beacon-chain/p2p/monitoring.go index a6649eb9bb..50b7dd447e 100644 --- a/beacon-chain/p2p/monitoring.go +++ b/beacon-chain/p2p/monitoring.go @@ -46,31 +46,39 @@ var ( }) savedAttestationBroadcasts = promauto.NewCounter(prometheus.CounterOpts{ Name: "p2p_attestation_subnet_recovered_broadcasts", - Help: "The number of attestations that were attempted to be broadcast with no peers on " + + Help: "The number of attestations message broadcast attempts with no peers on " + "the subnet. The beacon node increments this counter when the broadcast is blocked " + "until a subnet peer can be found.", }) attestationBroadcastAttempts = promauto.NewCounter(prometheus.CounterOpts{ Name: "p2p_attestation_subnet_attempted_broadcasts", - Help: "The number of attestations that were attempted to be broadcast.", + Help: "The number of attestations message broadcast attempts.", }) savedSyncCommitteeBroadcasts = promauto.NewCounter(prometheus.CounterOpts{ Name: "p2p_sync_committee_subnet_recovered_broadcasts", - Help: "The number of sync committee messages that were attempted to be broadcast with no peers on " + + Help: "The number of sync committee messages broadcast attempts with no peers on " + "the subnet. The beacon node increments this counter when the broadcast is blocked " + "until a subnet peer can be found.", }) - blobSidecarCommitteeBroadcasts = promauto.NewCounter(prometheus.CounterOpts{ + blobSidecarBroadcasts = promauto.NewCounter(prometheus.CounterOpts{ Name: "p2p_blob_sidecar_committee_broadcasts", - Help: "The number of blob sidecar committee messages that were broadcast with no peer on.", + Help: "The number of blob sidecar messages that were broadcast with no peer on.", }) syncCommitteeBroadcastAttempts = promauto.NewCounter(prometheus.CounterOpts{ Name: "p2p_sync_committee_subnet_attempted_broadcasts", - Help: "The number of sync committee that were attempted to be broadcast.", + Help: "The number of sync committee message broadcast attempts.", }) - blobSidecarCommitteeBroadcastAttempts = promauto.NewCounter(prometheus.CounterOpts{ + blobSidecarBroadcastAttempts = promauto.NewCounter(prometheus.CounterOpts{ Name: "p2p_blob_sidecar_committee_attempted_broadcasts", - Help: "The number of blob sidecar committee messages that were attempted to be broadcast.", + Help: "The number of blob sidecar message broadcast attempts.", + }) + dataColumnSidecarBroadcasts = promauto.NewCounter(prometheus.CounterOpts{ + Name: "p2p_data_column_sidecar_broadcasts", + Help: "The number of data column sidecar messages that were broadcasted.", + }) + dataColumnSidecarBroadcastAttempts = promauto.NewCounter(prometheus.CounterOpts{ + Name: "p2p_data_column_sidecar_attempted_broadcasts", + Help: "The number of data column sidecar message broadcast attempts.", }) // Gossip Tracer Metrics diff --git a/beacon-chain/p2p/pubsub_filter.go b/beacon-chain/p2p/pubsub_filter.go index cd44966b67..5168a1ea84 100644 --- a/beacon-chain/p2p/pubsub_filter.go +++ b/beacon-chain/p2p/pubsub_filter.go @@ -26,10 +26,11 @@ var _ pubsub.SubscriptionFilter = (*Service)(nil) // -> SyncContributionAndProof * 2 = 2 // -> 4 SyncCommitteeSubnets * 2 = 8 // -> BlsToExecutionChange * 2 = 2 -// -> 6 BlobSidecar * 2 = 12 +// -> 128 DataColumnSidecar * 2 = 256 // ------------------------------------- -// TOTAL = 162 -const pubsubSubscriptionRequestLimit = 200 +// TOTAL = 406 +// (Note: BlobSidecar is not included in this list since it is superseded by DataColumnSidecar) +const pubsubSubscriptionRequestLimit = 500 // CanSubscribe returns true if the topic is of interest and we could subscribe to it. func (s *Service) CanSubscribe(topic string) bool { diff --git a/beacon-chain/p2p/rpc_topic_mappings.go b/beacon-chain/p2p/rpc_topic_mappings.go index fc35fc2d2b..d4bef88b5c 100644 --- a/beacon-chain/p2p/rpc_topic_mappings.go +++ b/beacon-chain/p2p/rpc_topic_mappings.go @@ -11,11 +11,16 @@ import ( "github.com/pkg/errors" ) -// SchemaVersionV1 specifies the schema version for our rpc protocol ID. -const SchemaVersionV1 = "/1" +const ( + // SchemaVersionV1 specifies the schema version for our rpc protocol ID. + SchemaVersionV1 = "/1" -// SchemaVersionV2 specifies the next schema version for our rpc protocol ID. -const SchemaVersionV2 = "/2" + // SchemaVersionV2 specifies the next schema version for our rpc protocol ID. + SchemaVersionV2 = "/2" + + // SchemaVersionV3 specifies the next schema version for our rpc protocol ID. + SchemaVersionV3 = "/3" +) // Specifies the protocol prefix for all our Req/Resp topics. const protocolPrefix = "/eth2/beacon_chain/req" @@ -73,10 +78,10 @@ const ( // RPCBlobSidecarsByRangeTopicV1 is a topic for requesting blob sidecars // in the slot range [start_slot, start_slot + count), leading up to the current head block as selected by fork choice. - // Protocol ID: /eth2/beacon_chain/req/blob_sidecars_by_range/1/ - New in deneb. + // /eth2/beacon_chain/req/blob_sidecars_by_range/1/ - New in deneb. RPCBlobSidecarsByRangeTopicV1 = protocolPrefix + BlobSidecarsByRangeName + SchemaVersionV1 - // RPCBlobSidecarsByRootTopicV1 is a topic for requesting blob sidecars by their block root. New in deneb. - // /eth2/beacon_chain/req/blob_sidecars_by_root/1/ + // RPCBlobSidecarsByRootTopicV1 is a topic for requesting blob sidecars by their block root. + // /eth2/beacon_chain/req/blob_sidecars_by_root/1/ - New in deneb. RPCBlobSidecarsByRootTopicV1 = protocolPrefix + BlobSidecarsByRootName + SchemaVersionV1 // RPCLightClientBootstrapTopicV1 is a topic for requesting a light client bootstrap. @@ -95,6 +100,10 @@ const ( RPCBlocksByRootTopicV2 = protocolPrefix + BeaconBlocksByRootsMessageName + SchemaVersionV2 // RPCMetaDataTopicV2 defines the v2 topic for the metadata rpc method. RPCMetaDataTopicV2 = protocolPrefix + MetadataMessageName + SchemaVersionV2 + + // V3 RPC Topics + // RPCMetaDataTopicV3 defines the v3 topic for the metadata rpc method. + RPCMetaDataTopicV3 = protocolPrefix + MetadataMessageName + SchemaVersionV3 ) // RPC errors for topic parsing. @@ -119,6 +128,7 @@ var RPCTopicMappings = map[string]interface{}{ // RPC Metadata Message RPCMetaDataTopicV1: new(interface{}), RPCMetaDataTopicV2: new(interface{}), + RPCMetaDataTopicV3: new(interface{}), // BlobSidecarsByRange v1 Message RPCBlobSidecarsByRangeTopicV1: new(pb.BlobSidecarsByRangeRequest), // BlobSidecarsByRoot v1 Message @@ -160,9 +170,15 @@ var altairMapping = map[string]bool{ MetadataMessageName: true, } +// Maps all the RPC messages which are to updated in fulu. +var fuluMapping = map[string]bool{ + MetadataMessageName: true, +} + var versionMapping = map[string]bool{ SchemaVersionV1: true, SchemaVersionV2: true, + SchemaVersionV3: true, } // OmitContextBytesV1 keeps track of which RPC methods do not write context bytes in their v1 incarnations. @@ -290,13 +306,22 @@ func (r RPCTopic) Version() string { // TopicFromMessage constructs the rpc topic from the provided message // type and epoch. func TopicFromMessage(msg string, epoch primitives.Epoch) (string, error) { + // Check if the topic is known. if !messageMapping[msg] { return "", errors.Errorf("%s: %s", invalidRPCMessageType, msg) } - version := SchemaVersionV1 - isAltair := epoch >= params.BeaconConfig().AltairForkEpoch - if isAltair && altairMapping[msg] { - version = SchemaVersionV2 + + beaconConfig := params.BeaconConfig() + + // Check if the message is to be updated in fulu. + if epoch >= beaconConfig.FuluForkEpoch && fuluMapping[msg] { + return protocolPrefix + msg + SchemaVersionV3, nil } - return protocolPrefix + msg + version, nil + + // Check if the message is to be updated in altair. + if epoch >= beaconConfig.AltairForkEpoch && altairMapping[msg] { + return protocolPrefix + msg + SchemaVersionV2, nil + } + + return protocolPrefix + msg + SchemaVersionV1, nil } diff --git a/beacon-chain/p2p/rpc_topic_mappings_test.go b/beacon-chain/p2p/rpc_topic_mappings_test.go index ea422c5215..c5c16107bc 100644 --- a/beacon-chain/p2p/rpc_topic_mappings_test.go +++ b/beacon-chain/p2p/rpc_topic_mappings_test.go @@ -82,43 +82,87 @@ func TestTopicDeconstructor(t *testing.T) { } func TestTopicFromMessage_CorrectType(t *testing.T) { + const ( + genesisEpoch = primitives.Epoch(0) + altairForkEpoch = primitives.Epoch(100) + fuluForkEpoch = primitives.Epoch(200) + ) + params.SetupTestConfigCleanup(t) bCfg := params.BeaconConfig().Copy() - forkEpoch := primitives.Epoch(100) - bCfg.AltairForkEpoch = forkEpoch - bCfg.ForkVersionSchedule[bytesutil.ToBytes4(bCfg.AltairForkVersion)] = primitives.Epoch(100) + + bCfg.AltairForkEpoch = altairForkEpoch + bCfg.ForkVersionSchedule[bytesutil.ToBytes4(bCfg.AltairForkVersion)] = altairForkEpoch + + bCfg.FuluForkEpoch = fuluForkEpoch + bCfg.ForkVersionSchedule[bytesutil.ToBytes4(bCfg.FuluForkVersion)] = fuluForkEpoch + params.OverrideBeaconConfig(bCfg) - // Garbage Message - badMsg := "wljdjska" - _, err := TopicFromMessage(badMsg, 0) - assert.ErrorContains(t, fmt.Sprintf("%s: %s", invalidRPCMessageType, badMsg), err) - // Before Fork - for m := range messageMapping { - topic, err := TopicFromMessage(m, 0) - assert.NoError(t, err) + t.Run("garbage message", func(t *testing.T) { + // Garbage Message + const badMsg = "wljdjska" + _, err := TopicFromMessage(badMsg, genesisEpoch) + require.ErrorContains(t, fmt.Sprintf("%s: %s", invalidRPCMessageType, badMsg), err) + }) - assert.Equal(t, true, strings.Contains(topic, SchemaVersionV1)) - _, _, version, err := TopicDeconstructor(topic) - assert.NoError(t, err) - assert.Equal(t, SchemaVersionV1, version) - } + t.Run("before altair fork", func(t *testing.T) { + for m := range messageMapping { + topic, err := TopicFromMessage(m, genesisEpoch) + require.NoError(t, err) - // Altair Fork - for m := range messageMapping { - topic, err := TopicFromMessage(m, forkEpoch) - assert.NoError(t, err) - - if altairMapping[m] { - assert.Equal(t, true, strings.Contains(topic, SchemaVersionV2)) + require.Equal(t, true, strings.Contains(topic, SchemaVersionV1)) _, _, version, err := TopicDeconstructor(topic) - assert.NoError(t, err) - assert.Equal(t, SchemaVersionV2, version) - } else { - assert.Equal(t, true, strings.Contains(topic, SchemaVersionV1)) - _, _, version, err := TopicDeconstructor(topic) - assert.NoError(t, err) - assert.Equal(t, SchemaVersionV1, version) + require.NoError(t, err) + require.Equal(t, SchemaVersionV1, version) } - } + }) + + t.Run("after altair fork but before fulu fork", func(t *testing.T) { + for m := range messageMapping { + topic, err := TopicFromMessage(m, altairForkEpoch) + require.NoError(t, err) + + if altairMapping[m] { + require.Equal(t, true, strings.Contains(topic, SchemaVersionV2)) + _, _, version, err := TopicDeconstructor(topic) + require.NoError(t, err) + require.Equal(t, SchemaVersionV2, version) + continue + } + + require.Equal(t, true, strings.Contains(topic, SchemaVersionV1)) + _, _, version, err := TopicDeconstructor(topic) + require.NoError(t, err) + require.Equal(t, SchemaVersionV1, version) + } + }) + + t.Run("after fulu fork", func(t *testing.T) { + for m := range messageMapping { + topic, err := TopicFromMessage(m, fuluForkEpoch) + require.NoError(t, err) + + if fuluMapping[m] { + require.Equal(t, true, strings.Contains(topic, SchemaVersionV3)) + _, _, version, err := TopicDeconstructor(topic) + require.NoError(t, err) + require.Equal(t, SchemaVersionV3, version) + continue + } + + if altairMapping[m] { + require.Equal(t, true, strings.Contains(topic, SchemaVersionV2)) + _, _, version, err := TopicDeconstructor(topic) + require.NoError(t, err) + require.Equal(t, SchemaVersionV2, version) + continue + } + + require.Equal(t, true, strings.Contains(topic, SchemaVersionV1)) + _, _, version, err := TopicDeconstructor(topic) + require.NoError(t, err) + require.Equal(t, SchemaVersionV1, version) + } + }) } diff --git a/beacon-chain/p2p/sender.go b/beacon-chain/p2p/sender.go index 51d584bb8a..fa6e8b4324 100644 --- a/beacon-chain/p2p/sender.go +++ b/beacon-chain/p2p/sender.go @@ -22,8 +22,9 @@ func (s *Service) Send(ctx context.Context, message interface{}, baseTopic strin ctx, span := trace.StartSpan(ctx, "p2p.Send") defer span.End() if err := VerifyTopicMapping(baseTopic, message); err != nil { - return nil, err + return nil, errors.Wrap(err, "verify topic mapping") } + topic := baseTopic + s.Encoding().ProtocolSuffix() span.SetAttributes(trace.StringAttribute("topic", topic)) @@ -39,19 +40,21 @@ func (s *Service) Send(ctx context.Context, message interface{}, baseTopic strin stream, err := s.host.NewStream(ctx, pid, protocol.ID(topic)) if err != nil { tracing.AnnotateError(span, err) - return nil, err + return nil, errors.Wrap(err, "new stream") } - // do not encode anything if we are sending a metadata request - if baseTopic != RPCMetaDataTopicV1 && baseTopic != RPCMetaDataTopicV2 { + + // Do not encode anything if we are sending a metadata request + if baseTopic != RPCMetaDataTopicV1 && baseTopic != RPCMetaDataTopicV2 && baseTopic != RPCMetaDataTopicV3 { castedMsg, ok := message.(ssz.Marshaler) if !ok { return nil, errors.Errorf("%T does not support the ssz marshaller interface", message) } + if _, err := s.Encoding().EncodeWithMaxLength(stream, castedMsg); err != nil { tracing.AnnotateError(span, err) _err := stream.Reset() _ = _err - return nil, err + return nil, errors.Wrap(err, "encode with max length") } } @@ -60,7 +63,7 @@ func (s *Service) Send(ctx context.Context, message interface{}, baseTopic strin tracing.AnnotateError(span, err) _err := stream.Reset() _ = _err - return nil, err + return nil, errors.Wrap(err, "close write") } return stream, nil diff --git a/beacon-chain/p2p/subnets.go b/beacon-chain/p2p/subnets.go index c5cd3f27c6..68e357bf0e 100644 --- a/beacon-chain/p2p/subnets.go +++ b/beacon-chain/p2p/subnets.go @@ -9,6 +9,7 @@ import ( "github.com/OffchainLabs/prysm/v6/beacon-chain/cache" "github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers" + "github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas" "github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags" "github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" @@ -29,8 +30,9 @@ var ( attestationSubnetCount = params.BeaconConfig().AttestationSubnetCount syncCommsSubnetCount = params.BeaconConfig().SyncCommitteeSubnetCount - attSubnetEnrKey = params.BeaconNetworkConfig().AttSubnetKey - syncCommsSubnetEnrKey = params.BeaconNetworkConfig().SyncCommsSubnetKey + attSubnetEnrKey = params.BeaconNetworkConfig().AttSubnetKey + syncCommsSubnetEnrKey = params.BeaconNetworkConfig().SyncCommsSubnetKey + custodyGroupCountEnrKey = params.BeaconNetworkConfig().CustodyGroupCountKey ) // The value used with the subnet, in order @@ -47,7 +49,14 @@ const syncLockerVal = 100 // chosen more than sync and attestation subnet combined. const blobSubnetLockerVal = 110 -// nodeFilter return a function that filters nodes based on the subnet topic and subnet index. +// The value used with the data column sidecar subnet, in order +// to create an appropriate key to retrieve +// the relevant lock. This is used to differentiate +// data column subnets from others. This is deliberately +// chosen more than sync, attestation and blob subnet (6) combined. +const dataColumnSubnetVal = 150 + +// nodeFilter returns a function that filters nodes based on the subnet topic and subnet index. func (s *Service) nodeFilter(topic string, index uint64) (func(node *enode.Node) bool, error) { switch { case strings.Contains(topic, GossipAttestationMessage): @@ -56,6 +65,8 @@ func (s *Service) nodeFilter(topic string, index uint64) (func(node *enode.Node) return s.filterPeerForSyncSubnet(index), nil case strings.Contains(topic, GossipBlobSidecarMessage): return s.filterPeerForBlobSubnet(), nil + case strings.Contains(topic, GossipDataColumnSidecarMessage): + return s.filterPeerForDataColumnsSubnet(index), nil default: return nil, errors.Errorf("no subnet exists for provided topic: %s", topic) } @@ -276,6 +287,22 @@ func (s *Service) filterPeerForBlobSubnet() func(_ *enode.Node) bool { } } +// returns a method with filters peers specifically for a particular data column subnet. +func (s *Service) filterPeerForDataColumnsSubnet(index uint64) func(node *enode.Node) bool { + return func(node *enode.Node) bool { + if !s.filterPeer(node) { + return false + } + + subnets, err := dataColumnSubnets(node.ID(), node.Record()) + if err != nil { + return false + } + + return subnets[index] + } +} + // lower threshold to broadcast object compared to searching // for a subnet. So that even in the event of poor peer // connectivity, we can still broadcast an attestation. @@ -321,6 +348,35 @@ func (s *Service) updateSubnetRecordWithMetadataV2(bitVAtt bitfield.Bitvector64, }) } +// updateSubnetRecordWithMetadataV3 updates: +// - attestation subnet tracked, +// - sync subnets tracked, and +// - custody subnet count +// both in the node's record and in the node's metadata. +func (s *Service) updateSubnetRecordWithMetadataV3( + bitVAtt bitfield.Bitvector64, + bitVSync bitfield.Bitvector4, + custodyGroupCount uint64, +) { + attSubnetsEntry := enr.WithEntry(attSubnetEnrKey, &bitVAtt) + syncSubnetsEntry := enr.WithEntry(syncCommsSubnetEnrKey, &bitVSync) + custodyGroupCountEntry := enr.WithEntry(custodyGroupCountEnrKey, custodyGroupCount) + + localNode := s.dv5Listener.LocalNode() + localNode.Set(attSubnetsEntry) + localNode.Set(syncSubnetsEntry) + localNode.Set(custodyGroupCountEntry) + + newSeqNumber := s.metaData.SequenceNumber() + 1 + + s.metaData = wrapper.WrappedMetadataV2(&pb.MetaDataV2{ + SeqNumber: newSeqNumber, + Attnets: bitVAtt, + Syncnets: bitVSync, + CustodyGroupCount: custodyGroupCount, + }) +} + func initializePersistentSubnets(id enode.ID, epoch primitives.Epoch) error { _, ok, expTime := cache.SubnetIDs.GetPersistentSubnets() if ok && expTime.After(time.Now()) { @@ -458,6 +514,24 @@ func syncSubnets(record *enr.Record) ([]uint64, error) { return committeeIdxs, nil } +// Retrieve the data columns subnets from a node's ENR and node ID. +func dataColumnSubnets(nodeID enode.ID, record *enr.Record) (map[uint64]bool, error) { + // Retrieve the custody count from the ENR. + custodyGroupCount, err := peerdas.CustodyGroupCountFromRecord(record) + if err != nil { + return nil, errors.Wrap(err, "custody group count from record") + } + + // Retrieve the peer info. + peerInfo, _, err := peerdas.Info(nodeID, custodyGroupCount) + if err != nil { + return nil, errors.Wrap(err, "peer info") + } + + // Get custody columns subnets from the columns. + return peerInfo.DataColumnsSubnets, nil +} + // Parses the attestation subnets ENR entry in a node and extracts its value // as a bitvector for further manipulation. func attBitvector(record *enr.Record) (bitfield.Bitvector64, error) { @@ -484,14 +558,16 @@ func syncBitvector(record *enr.Record) (bitfield.Bitvector4, error) { // The subnet locker is a map which keeps track of all // mutexes stored per subnet. This locker is reused -// between both the attestation, sync and blob subnets. +// between both the attestation, sync blob and data column subnets. // Sync subnets are stored by (subnet+syncLockerVal). // Blob subnets are stored by (subnet+blobSubnetLockerVal). +// Data column subnets are stored by (subnet+dataColumnSubnetVal). // This is to prevent conflicts while allowing subnets // to use a single locker. func (s *Service) subnetLocker(i uint64) *sync.RWMutex { s.subnetsLockLock.Lock() defer s.subnetsLockLock.Unlock() + l, ok := s.subnetsLock[i] if !ok { l = &sync.RWMutex{} diff --git a/beacon-chain/p2p/subnets_test.go b/beacon-chain/p2p/subnets_test.go index 138ed3de11..231815a3b8 100644 --- a/beacon-chain/p2p/subnets_test.go +++ b/beacon-chain/p2p/subnets_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/OffchainLabs/prysm/v6/beacon-chain/cache" + "github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas" "github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags" "github.com/OffchainLabs/prysm/v6/config/params" ecdsaprysm "github.com/OffchainLabs/prysm/v6/crypto/ecdsa" @@ -503,6 +504,26 @@ func Test_SyncSubnets(t *testing.T) { } } +func TestDataColumnSubnets(t *testing.T) { + const cgc = 3 + + var ( + nodeID enode.ID + record enr.Record + ) + + record.Set(peerdas.Cgc(cgc)) + + expected := map[uint64]bool{1: true, 87: true, 102: true} + actual, err := dataColumnSubnets(nodeID, &record) + assert.NoError(t, err) + + require.Equal(t, len(expected), len(actual)) + for subnet := range expected { + require.Equal(t, true, actual[subnet]) + } +} + func TestSubnetComputation(t *testing.T) { db, err := enode.OpenDB("") assert.NoError(t, err) diff --git a/beacon-chain/p2p/testing/BUILD.bazel b/beacon-chain/p2p/testing/BUILD.bazel index 665bef06fc..389d8f58fa 100644 --- a/beacon-chain/p2p/testing/BUILD.bazel +++ b/beacon-chain/p2p/testing/BUILD.bazel @@ -17,9 +17,12 @@ go_library( "//beacon-chain:__subpackages__", ], deps = [ + "//beacon-chain/core/peerdas:go_default_library", "//beacon-chain/p2p/encoder:go_default_library", "//beacon-chain/p2p/peers:go_default_library", "//beacon-chain/p2p/peers/scorers:go_default_library", + "//config/fieldparams:go_default_library", + "//config/params:go_default_library", "//consensus-types/interfaces:go_default_library", "//proto/prysm/v1alpha1:go_default_library", "//proto/prysm/v1alpha1/metadata:go_default_library", diff --git a/beacon-chain/p2p/testing/fuzz_p2p.go b/beacon-chain/p2p/testing/fuzz_p2p.go index d72aa34f2c..9d2f8cd3f5 100644 --- a/beacon-chain/p2p/testing/fuzz_p2p.go +++ b/beacon-chain/p2p/testing/fuzz_p2p.go @@ -5,6 +5,7 @@ import ( "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/encoder" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers" + fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" "github.com/OffchainLabs/prysm/v6/consensus-types/interfaces" ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1/metadata" @@ -59,7 +60,7 @@ func (*FakeP2P) ENR() *enr.Record { // NodeID returns the node id of the local peer. func (*FakeP2P) NodeID() enode.ID { - return [32]byte{} + return enode.ID{} } // DiscoveryAddresses -- fake @@ -165,6 +166,11 @@ func (*FakeP2P) BroadcastLightClientFinalityUpdate(_ context.Context, _ interfac return nil } +// BroadcastDataColumn -- fake. +func (*FakeP2P) BroadcastDataColumn(_ [fieldparams.RootLength]byte, _ uint64, _ *ethpb.DataColumnSidecar, _ ...chan<- bool) error { + return nil +} + // InterceptPeerDial -- fake. func (*FakeP2P) InterceptPeerDial(peer.ID) (allow bool) { return true @@ -189,3 +195,7 @@ func (*FakeP2P) InterceptSecured(network.Direction, peer.ID, network.ConnMultiad func (*FakeP2P) InterceptUpgraded(network.Conn) (allow bool, reason control.DisconnectReason) { return true, 0 } + +func (*FakeP2P) CustodyGroupCountFromPeer(peer.ID) uint64 { + return 0 +} diff --git a/beacon-chain/p2p/testing/mock_broadcaster.go b/beacon-chain/p2p/testing/mock_broadcaster.go index bbf5fc29bc..209cefc564 100644 --- a/beacon-chain/p2p/testing/mock_broadcaster.go +++ b/beacon-chain/p2p/testing/mock_broadcaster.go @@ -5,6 +5,7 @@ import ( "sync" "sync/atomic" + fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" "github.com/OffchainLabs/prysm/v6/consensus-types/interfaces" ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" "google.golang.org/protobuf/proto" @@ -61,6 +62,12 @@ func (m *MockBroadcaster) BroadcastLightClientFinalityUpdate(_ context.Context, return nil } +// BroadcastDataColumn broadcasts a data column for mock. +func (m *MockBroadcaster) BroadcastDataColumn([fieldparams.RootLength]byte, uint64, *ethpb.DataColumnSidecar, ...chan<- bool) error { + m.BroadcastCalled.Store(true) + return nil +} + // NumMessages returns the number of messages broadcasted. func (m *MockBroadcaster) NumMessages() int { m.msgLock.Lock() diff --git a/beacon-chain/p2p/testing/mock_peermanager.go b/beacon-chain/p2p/testing/mock_peermanager.go index 5b1d488323..a23341ff69 100644 --- a/beacon-chain/p2p/testing/mock_peermanager.go +++ b/beacon-chain/p2p/testing/mock_peermanager.go @@ -42,7 +42,7 @@ func (m *MockPeerManager) ENR() *enr.Record { // NodeID . func (m MockPeerManager) NodeID() enode.ID { - return [32]byte{} + return enode.ID{} } // DiscoveryAddresses . diff --git a/beacon-chain/p2p/testing/p2p.go b/beacon-chain/p2p/testing/p2p.go index d0bef801f3..0eab911c06 100644 --- a/beacon-chain/p2p/testing/p2p.go +++ b/beacon-chain/p2p/testing/p2p.go @@ -10,9 +10,12 @@ import ( "testing" "time" + "github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/encoder" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/scorers" + fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" + "github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/consensus-types/interfaces" ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1/metadata" @@ -220,6 +223,12 @@ func (p *TestP2P) BroadcastLightClientFinalityUpdate(_ context.Context, _ interf return nil } +// BroadcastDataColumn broadcasts a data column for mock. +func (p *TestP2P) BroadcastDataColumn([fieldparams.RootLength]byte, uint64, *ethpb.DataColumnSidecar, ...chan<- bool) error { + p.BroadcastCalled.Store(true) + return nil +} + // SetStreamHandler for RPC. func (p *TestP2P) SetStreamHandler(topic string, handler network.StreamHandler) { p.BHost.SetStreamHandler(protocol.ID(topic), handler) @@ -451,3 +460,22 @@ func (*TestP2P) InterceptSecured(network.Direction, peer.ID, network.ConnMultiad func (*TestP2P) InterceptUpgraded(network.Conn) (allow bool, reason control.DisconnectReason) { return true, 0 } + +func (s *TestP2P) CustodyGroupCountFromPeer(pid peer.ID) uint64 { + // By default, we assume the peer custodies the minimum number of groups. + custodyRequirement := params.BeaconConfig().CustodyRequirement + + // Retrieve the ENR of the peer. + record, err := s.peers.ENR(pid) + if err != nil { + return custodyRequirement + } + + // Retrieve the custody subnets count from the ENR. + custodyGroupCount, err := peerdas.CustodyGroupCountFromRecord(record) + if err != nil { + return custodyRequirement + } + + return custodyGroupCount +} diff --git a/beacon-chain/p2p/types/BUILD.bazel b/beacon-chain/p2p/types/BUILD.bazel index b812c95cca..61bdc78c93 100644 --- a/beacon-chain/p2p/types/BUILD.bazel +++ b/beacon-chain/p2p/types/BUILD.bazel @@ -41,6 +41,7 @@ go_test( ], embed = [":go_default_library"], deps = [ + "//config/fieldparams:go_default_library", "//config/params:go_default_library", "//consensus-types/primitives:go_default_library", "//encoding/bytesutil:go_default_library", diff --git a/beacon-chain/p2p/types/object_mapping.go b/beacon-chain/p2p/types/object_mapping.go index 5764618094..e96a4a983e 100644 --- a/beacon-chain/p2p/types/object_mapping.go +++ b/beacon-chain/p2p/types/object_mapping.go @@ -108,7 +108,7 @@ func InitializeDataMaps() { return wrapper.WrappedMetadataV1(ðpb.MetaDataV1{}), nil }, bytesutil.ToBytes4(params.BeaconConfig().FuluForkVersion): func() (metadata.Metadata, error) { - return wrapper.WrappedMetadataV1(ðpb.MetaDataV1{}), nil + return wrapper.WrappedMetadataV2(ðpb.MetaDataV2{}), nil }, } diff --git a/beacon-chain/p2p/types/rpc_errors.go b/beacon-chain/p2p/types/rpc_errors.go index 46381876c1..de99e0ecba 100644 --- a/beacon-chain/p2p/types/rpc_errors.go +++ b/beacon-chain/p2p/types/rpc_errors.go @@ -12,7 +12,7 @@ var ( ErrRateLimited = errors.New("rate limited") ErrIODeadline = errors.New("i/o deadline exceeded") ErrInvalidRequest = errors.New("invalid range, step or count") - ErrBlobLTMinRequest = errors.New("blob slot < minimum_request_epoch") + ErrBlobLTMinRequest = errors.New("blob epoch < minimum_request_epoch") ErrMaxBlobReqExceeded = errors.New("requested more than MAX_REQUEST_BLOB_SIDECARS") ErrResourceUnavailable = errors.New("resource requested unavailable") ) diff --git a/beacon-chain/p2p/types/types.go b/beacon-chain/p2p/types/types.go index 386eb40479..3121a57183 100644 --- a/beacon-chain/p2p/types/types.go +++ b/beacon-chain/p2p/types/types.go @@ -5,6 +5,7 @@ package types import ( "bytes" + "encoding/binary" "sort" fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" @@ -14,7 +15,10 @@ import ( ssz "github.com/prysmaticlabs/fastssz" ) -const maxErrorLength = 256 +const ( + maxErrorLength = 256 + bytesPerLengthOffset = 4 +) // SSZBytes is a bytes slice that satisfies the fast-ssz interface. type SSZBytes []byte @@ -182,23 +186,23 @@ func (b *BlobSidecarsByRootReq) UnmarshalSSZ(buf []byte) error { return nil } -var _ sort.Interface = BlobSidecarsByRootReq{} +var _ sort.Interface = (*BlobSidecarsByRootReq)(nil) // Less reports whether the element with index i must sort before the element with index j. // BlobIdentifier will be sorted in lexicographic order by root, with Blob Index as tiebreaker for a given root. func (s BlobSidecarsByRootReq) Less(i, j int) bool { - rootCmp := bytes.Compare(s[i].BlockRoot, s[j].BlockRoot) + rootCmp := bytes.Compare((s)[i].BlockRoot, (s)[j].BlockRoot) if rootCmp != 0 { // They aren't equal; return true if i < j, false if i > j. return rootCmp < 0 } // They are equal; blob index is the tie breaker. - return s[i].Index < s[j].Index + return (s)[i].Index < (s)[j].Index } // Swap swaps the elements with indexes i and j. func (s BlobSidecarsByRootReq) Swap(i, j int) { - s[i], s[j] = s[j], s[i] + (s)[i], (s)[j] = (s)[j], (s)[i] } // Len is the number of elements in the collection. @@ -206,7 +210,130 @@ func (s BlobSidecarsByRootReq) Len() int { return len(s) } -func init() { - sizer := ð.BlobIdentifier{} - blobIdSize = sizer.SizeSSZ() +// ==================================== +// DataColumnsByRootIdentifiers section +// ==================================== +var _ ssz.Marshaler = (*DataColumnsByRootIdentifiers)(nil) +var _ ssz.Unmarshaler = (*DataColumnsByRootIdentifiers)(nil) + +// DataColumnsByRootIdentifiers is used to specify a list of data column targets (root+index) in a DataColumnSidecarsByRoot RPC request. +type DataColumnsByRootIdentifiers []*eth.DataColumnsByRootIdentifier + +// DataColumnIdentifier is a fixed size value, so we can compute its fixed size at start time (see init below) +var dataColumnIdSize int + +// UnmarshalSSZ implements ssz.Unmarshaler. It unmarshals the provided bytes buffer into the DataColumnSidecarsByRootReq value. +func (d *DataColumnsByRootIdentifiers) UnmarshalSSZ(buf []byte) error { + // Exit early if the buffer is too small. + if len(buf) < bytesPerLengthOffset { + return nil + } + + // Get the size of the offsets. + offsetEnd := binary.LittleEndian.Uint32(buf[:bytesPerLengthOffset]) + if offsetEnd%bytesPerLengthOffset != 0 { + return errors.Errorf("expected offsets size to be a multiple of %d but got %d", bytesPerLengthOffset, offsetEnd) + } + + count := offsetEnd / bytesPerLengthOffset + if count < 1 { + return nil + } + + maxSize := params.BeaconConfig().MaxRequestBlocksDeneb + if uint64(count) > maxSize { + return errors.Errorf("data column identifiers list exceeds max size: %d > %d", count, maxSize) + } + + if offsetEnd > uint32(len(buf)) { + return errors.Errorf("offsets value %d larger than buffer %d", offsetEnd, len(buf)) + } + valueStart := offsetEnd + + // Decode the identifers. + *d = make([]*eth.DataColumnsByRootIdentifier, count) + var start uint32 + end := uint32(len(buf)) + for i := count; i > 0; i-- { + offsetEnd -= bytesPerLengthOffset + start = binary.LittleEndian.Uint32(buf[offsetEnd : offsetEnd+bytesPerLengthOffset]) + if start > end { + return errors.Errorf("expected offset[%d] %d to be less than %d", i-1, start, end) + } + if start < valueStart { + return errors.Errorf("offset[%d] %d indexes before value section %d", i-1, start, valueStart) + } + // Decode the identifier. + ident := ð.DataColumnsByRootIdentifier{} + if err := ident.UnmarshalSSZ(buf[start:end]); err != nil { + return err + } + (*d)[i-1] = ident + end = start + } + + return nil +} + +func (d *DataColumnsByRootIdentifiers) MarshalSSZ() ([]byte, error) { + var err error + count := len(*d) + maxSize := params.BeaconConfig().MaxRequestBlocksDeneb + if uint64(count) > maxSize { + return nil, errors.Errorf("data column identifiers list exceeds max size: %d > %d", count, maxSize) + } + + if len(*d) == 0 { + return []byte{}, nil + } + sizes := make([]uint32, count) + valTotal := uint32(0) + for i, elem := range *d { + if elem == nil { + return nil, errors.New("nil item in DataColumnsByRootIdentifiers list") + } + sizes[i] = uint32(elem.SizeSSZ()) + valTotal += sizes[i] + } + offSize := uint32(4 * len(*d)) + out := make([]byte, offSize, offSize+valTotal) + for i := range sizes { + binary.LittleEndian.PutUint32(out[i*4:i*4+4], offSize) + offSize += sizes[i] + } + for _, elem := range *d { + out, err = elem.MarshalSSZTo(out) + if err != nil { + return nil, err + } + } + + return out, nil +} + +// MarshalSSZTo implements ssz.Marshaler. It appends the serialized DataColumnSidecarsByRootReq value to the provided byte slice. +func (d *DataColumnsByRootIdentifiers) MarshalSSZTo(dst []byte) ([]byte, error) { + obj, err := d.MarshalSSZ() + if err != nil { + return nil, err + } + return append(dst, obj...), nil +} + +// SizeSSZ implements ssz.Marshaler. It returns the size of the serialized representation. +func (d *DataColumnsByRootIdentifiers) SizeSSZ() int { + size := 0 + for i := 0; i < len(*d); i++ { + size += 4 + size += (*d)[i].SizeSSZ() + } + return size +} + +func init() { + blobSizer := ð.BlobIdentifier{} + blobIdSize = blobSizer.SizeSSZ() + + dataColumnSizer := ð.DataColumnSidecarsByRangeRequest{} + dataColumnIdSize = dataColumnSizer.SizeSSZ() } diff --git a/beacon-chain/p2p/types/types_test.go b/beacon-chain/p2p/types/types_test.go index 12f4139987..15065e051e 100644 --- a/beacon-chain/p2p/types/types_test.go +++ b/beacon-chain/p2p/types/types_test.go @@ -4,6 +4,7 @@ import ( "encoding/hex" "testing" + fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" "github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" "github.com/OffchainLabs/prysm/v6/encoding/bytesutil" @@ -203,3 +204,157 @@ func hexDecodeOrDie(t *testing.T, str string) []byte { require.NoError(t, err) return decoded } + +// ==================================== +// DataColumnsByRootIdentifiers section +// ==================================== +func generateDataColumnIdentifiers(n int) []*eth.DataColumnsByRootIdentifier { + r := make([]*eth.DataColumnsByRootIdentifier, n) + for i := 0; i < n; i++ { + r[i] = ð.DataColumnsByRootIdentifier{ + BlockRoot: bytesutil.PadTo([]byte{byte(i)}, 32), + Columns: []uint64{uint64(i)}, + } + } + return r +} + +func TestDataColumnSidecarsByRootReq_Marshal(t *testing.T) { + /* + + SSZ encoding of DataColumnsByRootIdentifiers is tested in spectests. + However, encoding a list of DataColumnsByRootIdentifier is not. + We are testing it here. + + Python code to generate the expected value + + # pip install eth2spec + + from eth2spec.utils.ssz import ssz_typing + + Container = ssz_typing.Container + List = ssz_typing.List + + Root = ssz_typing.Bytes32 + ColumnIndex = ssz_typing.uint64 + + NUMBER_OF_COLUMNS=128 + + class DataColumnsByRootIdentifier(Container): + block_root: Root + columns: List[ColumnIndex, NUMBER_OF_COLUMNS] + + first = DataColumnsByRootIdentifier(block_root="0x0100000000000000000000000000000000000000000000000000000000000000", columns=[3,5,7]) + second = DataColumnsByRootIdentifier(block_root="0x0200000000000000000000000000000000000000000000000000000000000000", columns=[]) + third = DataColumnsByRootIdentifier(block_root="0x0300000000000000000000000000000000000000000000000000000000000000", columns=[6, 4]) + + expected = List[DataColumnsByRootIdentifier, 42](first, second, third).encode_bytes().hex() + */ + + const expected = "0c000000480000006c00000001000000000000000000000000000000000000000000000000000000000000002400000003000000000000000500000000000000070000000000000002000000000000000000000000000000000000000000000000000000000000002400000003000000000000000000000000000000000000000000000000000000000000002400000006000000000000000400000000000000" + identifiers := &DataColumnsByRootIdentifiers{ + { + BlockRoot: bytesutil.PadTo([]byte{1}, fieldparams.RootLength), + Columns: []uint64{3, 5, 7}, + }, + { + BlockRoot: bytesutil.PadTo([]byte{2}, fieldparams.RootLength), + Columns: []uint64{}, + }, + { + BlockRoot: bytesutil.PadTo([]byte{3}, fieldparams.RootLength), + Columns: []uint64{6, 4}, + }, + } + + marshalled, err := identifiers.MarshalSSZ() + require.NoError(t, err) + + actual := hex.EncodeToString(marshalled) + require.Equal(t, expected, actual) +} + +func TestDataColumnSidecarsByRootReq_MarshalUnmarshal(t *testing.T) { + cases := []struct { + name string + ids []*eth.DataColumnsByRootIdentifier + marshalErr error + unmarshalErr string + unmarshalMod func([]byte) []byte + }{ + { + name: "empty list", + }, + { + name: "single item list", + ids: generateDataColumnIdentifiers(1), + }, + { + name: "10 item list", + ids: generateDataColumnIdentifiers(10), + }, + { + name: "wonky unmarshal size", + ids: generateDataColumnIdentifiers(10), + unmarshalMod: func(in []byte) []byte { + in = append(in, byte(0)) + return in + }, + unmarshalErr: "a is not evenly divisble by b", + }, + { + name: "size too big", + ids: generateDataColumnIdentifiers(1), + unmarshalMod: func(in []byte) []byte { + maxLen := params.BeaconConfig().MaxRequestDataColumnSidecars * uint64(dataColumnIdSize) + add := make([]byte, maxLen) + in = append(in, add...) + return in + }, + unmarshalErr: "a/b is greater than max", + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + req := DataColumnsByRootIdentifiers(c.ids) + bytes, err := req.MarshalSSZ() + if c.marshalErr != nil { + require.ErrorIs(t, err, c.marshalErr) + return + } + + require.NoError(t, err) + if c.unmarshalMod != nil { + bytes = c.unmarshalMod(bytes) + } + + got := &DataColumnsByRootIdentifiers{} + err = got.UnmarshalSSZ(bytes) + if c.unmarshalErr != "" { + require.ErrorContains(t, c.unmarshalErr, err) + return + } + require.NoError(t, err) + + require.Equal(t, len(c.ids), len(*got)) + + for i, expected := range c.ids { + actual := (*got)[i] + require.DeepEqual(t, expected, actual) + } + }) + } + + // Test MarshalSSZTo + req := DataColumnsByRootIdentifiers(generateDataColumnIdentifiers(10)) + buf := make([]byte, 0) + buf, err := req.MarshalSSZTo(buf) + require.NoError(t, err) + require.Equal(t, len(buf), int(req.SizeSSZ())) + + var unmarshalled DataColumnsByRootIdentifiers + err = unmarshalled.UnmarshalSSZ(buf) + require.NoError(t, err) + require.DeepEqual(t, req, unmarshalled) +} diff --git a/beacon-chain/sync/rpc_blob_sidecars_by_root.go b/beacon-chain/sync/rpc_blob_sidecars_by_root.go index b300fb643d..c80efb822e 100644 --- a/beacon-chain/sync/rpc_blob_sidecars_by_root.go +++ b/beacon-chain/sync/rpc_blob_sidecars_by_root.go @@ -37,8 +37,9 @@ func (s *Service) blobSidecarByRootRPCHandler(ctx context.Context, msg interface blobIdents := *ref cs := s.cfg.clock.CurrentSlot() + remotePeer := stream.Conn().RemotePeer() if err := validateBlobByRootRequest(blobIdents, cs); err != nil { - s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) + s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer) s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream) return err } @@ -75,6 +76,7 @@ func (s *Service) blobSidecarByRootRPCHandler(ctx context.Context, msg interface log.WithError(err).WithFields(logrus.Fields{ "root": fmt.Sprintf("%#x", root), "index": idx, + "peer": remotePeer.String(), }).Debugf("Peer requested blob sidecar by root not found in db") continue } @@ -107,14 +109,21 @@ func (s *Service) blobSidecarByRootRPCHandler(ctx context.Context, msg interface } func validateBlobByRootRequest(blobIdents types.BlobSidecarsByRootReq, slot primitives.Slot) error { - if slots.ToEpoch(slot) >= params.BeaconConfig().ElectraForkEpoch { - if uint64(len(blobIdents)) > params.BeaconConfig().MaxRequestBlobSidecarsElectra { - return types.ErrMaxBlobReqExceeded - } - } else { - if uint64(len(blobIdents)) > params.BeaconConfig().MaxRequestBlobSidecars { + beaconConfig := params.BeaconConfig() + epoch := slots.ToEpoch(slot) + blobIdentCount := uint64(len(blobIdents)) + + if epoch >= beaconConfig.ElectraForkEpoch { + if blobIdentCount > beaconConfig.MaxRequestBlobSidecarsElectra { return types.ErrMaxBlobReqExceeded } + + return nil } + + if blobIdentCount > beaconConfig.MaxRequestBlobSidecars { + return types.ErrMaxBlobReqExceeded + } + return nil } diff --git a/beacon-chain/sync/rpc_blob_sidecars_by_root_test.go b/beacon-chain/sync/rpc_blob_sidecars_by_root_test.go index 4687f826fa..a291524d2b 100644 --- a/beacon-chain/sync/rpc_blob_sidecars_by_root_test.go +++ b/beacon-chain/sync/rpc_blob_sidecars_by_root_test.go @@ -44,7 +44,7 @@ func (c *blobsTestCase) filterExpectedByRoot(t *testing.T, scs []blocks.ROBlob, message: p2pTypes.ErrBlobLTMinRequest.Error(), }} } - sort.Sort(req) + sort.Sort(&req) var expect []*expectedBlobChunk blockOffset := 0 if len(scs) == 0 { diff --git a/changelog/manu-peerdas-p2p.md b/changelog/manu-peerdas-p2p.md new file mode 100644 index 0000000000..70498a08f4 --- /dev/null +++ b/changelog/manu-peerdas-p2p.md @@ -0,0 +1,2 @@ +### Added +- PeerDAS: Implement P2P. diff --git a/consensus-types/wrapper/metadata.go b/consensus-types/wrapper/metadata.go index e251f734bd..e0728f5bb8 100644 --- a/consensus-types/wrapper/metadata.go +++ b/consensus-types/wrapper/metadata.go @@ -36,6 +36,11 @@ func (m MetadataV0) SyncnetsBitfield() bitfield.Bitvector4 { return bitfield.Bitvector4{0} } +// CustodyGroupCount returns custody subnet count from the metadata. +func (m MetadataV0) CustodyGroupCount() uint64 { + return 0 +} + // InnerObject returns the underlying metadata protobuf structure. func (m MetadataV0) InnerObject() interface{} { return m.md @@ -86,6 +91,12 @@ func (MetadataV0) MetadataObjV1() *pb.MetaDataV1 { return nil } +// MetadataObjV2 returns the inner metadata object in its type +// specified form. If it doesn't exist then we return nothing. +func (MetadataV0) MetadataObjV2() *pb.MetaDataV2 { + return nil +} + // Version returns the fork version of the underlying object. func (MetadataV0) Version() int { return version.Phase0 @@ -119,6 +130,11 @@ func (m MetadataV1) SyncnetsBitfield() bitfield.Bitvector4 { return m.md.Syncnets } +// CustodyGroupCount returns custody subnet count from the metadata. +func (m MetadataV1) CustodyGroupCount() uint64 { + return 0 +} + // InnerObject returns the underlying metadata protobuf structure. func (m MetadataV1) InnerObject() interface{} { return m.md @@ -169,7 +185,107 @@ func (m MetadataV1) MetadataObjV1() *pb.MetaDataV1 { return m.md } +// MetadataObjV2 returns the inner metadata object in its type +// specified form. If it doesn't exist then we return nothing. +func (m MetadataV1) MetadataObjV2() *pb.MetaDataV2 { + return nil +} + // Version returns the fork version of the underlying object. func (MetadataV1) Version() int { return version.Altair } + +// MetadataV2 +// ---------- + +// MetadataV2 is a convenience wrapper around our metadata v3 protobuf object. +type MetadataV2 struct { + md *pb.MetaDataV2 +} + +// WrappedMetadataV2 wrappers around the provided protobuf object. +func WrappedMetadataV2(md *pb.MetaDataV2) MetadataV2 { + return MetadataV2{md: md} +} + +// SequenceNumber returns the sequence number from the metadata. +func (m MetadataV2) SequenceNumber() uint64 { + return m.md.SeqNumber +} + +// AttnetsBitfield returns the bitfield stored in the metadata. +func (m MetadataV2) AttnetsBitfield() bitfield.Bitvector64 { + return m.md.Attnets +} + +// SyncnetsBitfield returns the bitfield stored in the metadata. +func (m MetadataV2) SyncnetsBitfield() bitfield.Bitvector4 { + return m.md.Syncnets +} + +// CustodyGroupCount returns custody subnet count from the metadata. +func (m MetadataV2) CustodyGroupCount() uint64 { + return m.md.CustodyGroupCount +} + +// InnerObject returns the underlying metadata protobuf structure. +func (m MetadataV2) InnerObject() interface{} { + return m.md +} + +// IsNil checks for the nilness of the underlying object. +func (m MetadataV2) IsNil() bool { + return m.md == nil +} + +// Copy performs a full copy of the underlying metadata object. +func (m MetadataV2) Copy() metadata.Metadata { + return WrappedMetadataV2(proto.Clone(m.md).(*pb.MetaDataV2)) +} + +// MarshalSSZ marshals the underlying metadata object +// into its serialized form. +func (m MetadataV2) MarshalSSZ() ([]byte, error) { + return m.md.MarshalSSZ() +} + +// MarshalSSZTo marshals the underlying metadata object +// into its serialized form into the provided byte buffer. +func (m MetadataV2) MarshalSSZTo(dst []byte) ([]byte, error) { + return m.md.MarshalSSZTo(dst) +} + +// SizeSSZ returns the serialized size of the metadata object. +func (m MetadataV2) SizeSSZ() int { + return m.md.SizeSSZ() +} + +// UnmarshalSSZ unmarshals the provided byte buffer into +// the underlying metadata object. +func (m MetadataV2) UnmarshalSSZ(buf []byte) error { + return m.md.UnmarshalSSZ(buf) +} + +// MetadataObjV0 returns the inner metadata object in its type +// specified form. If it doesn't exist then we return nothing. +func (MetadataV2) MetadataObjV0() *pb.MetaDataV0 { + return nil +} + +// MetadataObjV1 returns the inner metadata object in its type +// specified form. If it doesn't exist then we return nothing. +func (m MetadataV2) MetadataObjV1() *pb.MetaDataV1 { + return nil +} + +// MetadataObjV2 returns the inner metadata object in its type +// specified form. If it doesn't exist then we return nothing. +func (m MetadataV2) MetadataObjV2() *pb.MetaDataV2 { + return m.md +} + +// Version returns the fork version of the underlying object. +func (MetadataV2) Version() int { + return version.Fulu +} diff --git a/proto/prysm/v1alpha1/metadata/metadata_interfaces.go b/proto/prysm/v1alpha1/metadata/metadata_interfaces.go index 240791f582..032fd26e3a 100644 --- a/proto/prysm/v1alpha1/metadata/metadata_interfaces.go +++ b/proto/prysm/v1alpha1/metadata/metadata_interfaces.go @@ -11,6 +11,7 @@ type Metadata interface { SequenceNumber() uint64 AttnetsBitfield() bitfield.Bitvector64 SyncnetsBitfield() bitfield.Bitvector4 + CustodyGroupCount() uint64 InnerObject() interface{} IsNil() bool Copy() Metadata @@ -18,5 +19,6 @@ type Metadata interface { ssz.Unmarshaler MetadataObjV0() *pb.MetaDataV0 MetadataObjV1() *pb.MetaDataV1 + MetadataObjV2() *pb.MetaDataV2 Version() int }