From bc7664321b477f159ad92559ddfb9de62e95d718 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Tue, 1 Jul 2025 09:07:32 +0200 Subject: [PATCH] Implement the new Fulu Metadata. (#15440) --- beacon-chain/sync/rate_limiter.go | 3 +- beacon-chain/sync/rate_limiter_test.go | 2 +- beacon-chain/sync/rpc_metadata.go | 43 ++- beacon-chain/sync/rpc_metadata_test.go | 346 +++++++++++++++---------- changelog/manu-peerdas-metadata.md | 2 + 5 files changed, 260 insertions(+), 136 deletions(-) create mode 100644 changelog/manu-peerdas-metadata.md diff --git a/beacon-chain/sync/rate_limiter.go b/beacon-chain/sync/rate_limiter.go index 21be07d52b..b7d825be39 100644 --- a/beacon-chain/sync/rate_limiter.go +++ b/beacon-chain/sync/rate_limiter.go @@ -55,9 +55,10 @@ func newRateLimiter(p2pProvider p2p.P2P) *limiter { topicMap := make(map[string]*leakybucket.Collector, len(p2p.RPCTopicMappings)) // Goodbye Message topicMap[addEncoding(p2p.RPCGoodByeTopicV1)] = leakybucket.NewCollector(1, 1, leakyBucketPeriod, false /* deleteEmptyBuckets */) - // MetadataV0 Message + // Metadata Message topicMap[addEncoding(p2p.RPCMetaDataTopicV1)] = leakybucket.NewCollector(1, defaultBurstLimit, leakyBucketPeriod, false /* deleteEmptyBuckets */) topicMap[addEncoding(p2p.RPCMetaDataTopicV2)] = leakybucket.NewCollector(1, defaultBurstLimit, leakyBucketPeriod, false /* deleteEmptyBuckets */) + topicMap[addEncoding(p2p.RPCMetaDataTopicV3)] = leakybucket.NewCollector(1, defaultBurstLimit, leakyBucketPeriod, false /* deleteEmptyBuckets */) // Ping Message topicMap[addEncoding(p2p.RPCPingTopicV1)] = leakybucket.NewCollector(1, defaultBurstLimit, leakyBucketPeriod, false /* deleteEmptyBuckets */) // Status Message diff --git a/beacon-chain/sync/rate_limiter_test.go b/beacon-chain/sync/rate_limiter_test.go index 535a3924bd..7cf19f151d 100644 --- a/beacon-chain/sync/rate_limiter_test.go +++ b/beacon-chain/sync/rate_limiter_test.go @@ -17,7 +17,7 @@ import ( func TestNewRateLimiter(t *testing.T) { rlimiter := newRateLimiter(mockp2p.NewTestP2P(t)) - assert.Equal(t, len(rlimiter.limiterMap), 18, "correct number of topics not registered") + assert.Equal(t, len(rlimiter.limiterMap), 19, "correct number of topics not registered") } func TestNewRateLimiter_FreeCorrectly(t *testing.T) { diff --git a/beacon-chain/sync/rpc_metadata.go b/beacon-chain/sync/rpc_metadata.go index 58835ba208..6a66504c87 100644 --- a/beacon-chain/sync/rpc_metadata.go +++ b/beacon-chain/sync/rpc_metadata.go @@ -17,7 +17,7 @@ import ( "github.com/prysmaticlabs/go-bitfield" ) -// metaDataHandler reads the incoming metadata rpc request from the peer. +// metaDataHandler reads the incoming metadata RPC request from the peer. func (s *Service) metaDataHandler(_ context.Context, _ interface{}, stream libp2pcore.Stream) error { SetRPCStreamDeadlines(stream) @@ -70,7 +70,9 @@ func (s *Service) metaDataHandler(_ context.Context, _ interface{}, stream libp2 switch streamVersion { case p2p.SchemaVersionV1: switch metadataVersion { - case version.Altair, version.Deneb: + case version.Altair, version.Fulu: + // If the stream version corresponds to Phase 0 but our metadata + // corresponds to Altair or Fulu, convert our metadata to the Phase 0 one. metadata = wrapper.WrappedMetadataV0( &pb.MetaDataV0{ Attnets: metadata.AttnetsBitfield(), @@ -81,13 +83,18 @@ func (s *Service) metaDataHandler(_ context.Context, _ interface{}, stream libp2 case p2p.SchemaVersionV2: switch metadataVersion { case version.Phase0: + // If the stream version corresponds to Altair but our metadata + // corresponds to Phase 0, convert our metadata to the Altair one, + // and use a zeroed syncnets bitfield. metadata = wrapper.WrappedMetadataV1( &pb.MetaDataV1{ Attnets: metadata.AttnetsBitfield(), SeqNumber: metadata.SequenceNumber(), Syncnets: bitfield.Bitvector4{byte(0x00)}, }) - case version.Deneb: + case version.Fulu: + // If the stream version corresponds to Altair but our metadata + // corresponds to Fulu, convert our metadata to the Altair one. metadata = wrapper.WrappedMetadataV1( &pb.MetaDataV1{ Attnets: metadata.AttnetsBitfield(), @@ -95,6 +102,32 @@ func (s *Service) metaDataHandler(_ context.Context, _ interface{}, stream libp2 Syncnets: metadata.SyncnetsBitfield(), }) } + + case p2p.SchemaVersionV3: + switch metadataVersion { + case version.Phase0: + // If the stream version corresponds to Fulu but our metadata + // corresponds to Phase 0, convert our metadata to the Fulu one, + // and use a zeroed syncnets bitfield and custody group count. + metadata = wrapper.WrappedMetadataV2( + &pb.MetaDataV2{ + Attnets: metadata.AttnetsBitfield(), + SeqNumber: metadata.SequenceNumber(), + Syncnets: bitfield.Bitvector4{byte(0x00)}, + CustodyGroupCount: 0, + }) + case version.Altair: + // If the stream version corresponds to Fulu but our metadata + // corresponds to Altair, convert our metadata to the Fulu one and + // use a zeroed custody group count. + metadata = wrapper.WrappedMetadataV2( + &pb.MetaDataV2{ + Attnets: metadata.AttnetsBitfield(), + SeqNumber: metadata.SequenceNumber(), + Syncnets: metadata.SyncnetsBitfield(), + CustodyGroupCount: 0, + }) + } } // Write the METADATA response into the stream. @@ -164,12 +197,14 @@ func (s *Service) sendMetaDataRequest(ctx context.Context, peerID peer.ID) (meta } // Defensive check to ensure valid objects are being sent. - topicVersion := "" + var topicVersion string switch msg.Version() { case version.Phase0: topicVersion = p2p.SchemaVersionV1 case version.Altair: topicVersion = p2p.SchemaVersionV2 + case version.Fulu: + topicVersion = p2p.SchemaVersionV3 } // Validate the version of the topic. diff --git a/beacon-chain/sync/rpc_metadata_test.go b/beacon-chain/sync/rpc_metadata_test.go index 10c0b03c9a..e0288b9111 100644 --- a/beacon-chain/sync/rpc_metadata_test.go +++ b/beacon-chain/sync/rpc_metadata_test.go @@ -1,6 +1,7 @@ package sync import ( + "context" "sync" "testing" "time" @@ -15,6 +16,7 @@ import ( leakybucket "github.com/OffchainLabs/prysm/v6/container/leaky-bucket" "github.com/OffchainLabs/prysm/v6/encoding/ssz/equality" pb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" + "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1/metadata" "github.com/OffchainLabs/prysm/v6/testing/assert" "github.com/OffchainLabs/prysm/v6/testing/require" "github.com/OffchainLabs/prysm/v6/testing/util" @@ -22,6 +24,7 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/protocol" libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic" + "github.com/prysmaticlabs/go-bitfield" ) func TestMetaDataRPCHandler_ReceivesMetadata(t *testing.T) { @@ -76,158 +79,241 @@ func TestMetaDataRPCHandler_ReceivesMetadata(t *testing.T) { } } -func TestMetadataRPCHandler_SendsMetadata(t *testing.T) { - p1 := p2ptest.NewTestP2P(t) - p2 := p2ptest.NewTestP2P(t) - p1.Connect(p2) - assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected") - bitfield := [8]byte{'A', 'B'} - p2.LocalMetadata = wrapper.WrappedMetadataV0(&pb.MetaDataV0{ - SeqNumber: 2, - Attnets: bitfield[:], - }) - - // Set up a head state in the database with data we expect. - chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} - d := db.SetupDB(t) - r := &Service{ +func createService(peer p2p.P2P, chain *mock.ChainService) *Service { + return &Service{ cfg: &config{ - beaconDB: d, - p2p: p1, - chain: chain, - clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot), + p2p: peer, + chain: chain, + clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot), }, - rateLimiter: newRateLimiter(p1), - } - - r2 := &Service{ - cfg: &config{ - beaconDB: d, - p2p: p2, - chain: &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}, - }, - rateLimiter: newRateLimiter(p2), - } - - // Setup streams - pcl := protocol.ID(p2p.RPCMetaDataTopicV1 + r.cfg.p2p.Encoding().ProtocolSuffix()) - topic := string(pcl) - r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false) - r2.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false) - - var wg sync.WaitGroup - wg.Add(1) - p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { - defer wg.Done() - assert.NoError(t, r2.metaDataHandler(t.Context(), new(interface{}), stream)) - }) - - md, err := r.sendMetaDataRequest(t.Context(), p2.BHost.ID()) - assert.NoError(t, err) - - if !equality.DeepEqual(md.InnerObject(), p2.LocalMetadata.InnerObject()) { - t.Fatalf("MetadataV0 unequal, received %v but wanted %v", md, p2.LocalMetadata) - } - - if util.WaitTimeout(&wg, 1*time.Second) { - t.Fatal("Did not receive stream within 1 sec") - } - - conns := p1.BHost.Network().ConnsToPeer(p2.BHost.ID()) - if len(conns) == 0 { - t.Error("Peer is disconnected despite receiving a valid ping") + rateLimiter: newRateLimiter(peer), } } -func TestMetadataRPCHandler_SendsMetadataAltair(t *testing.T) { +func TestMetadataRPCHandler_SendMetadataRequest(t *testing.T) { + const ( + requestTimeout = 1 * time.Second + seqNumber = 2 + custodyGroupCount = 4 + ) + + attnets := []byte{'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H'} + syncnets := []byte{0x4} + + // Configure the test beacon chain. params.SetupTestConfigCleanup(t) - bCfg := params.BeaconConfig().Copy() - bCfg.AltairForkEpoch = 5 - params.OverrideBeaconConfig(bCfg) + beaconChainConfig := params.BeaconConfig().Copy() + beaconChainConfig.AltairForkEpoch = 5 + beaconChainConfig.FuluForkEpoch = 15 + params.OverrideBeaconConfig(beaconChainConfig) params.BeaconConfig().InitializeForkSchedule() - p1 := p2ptest.NewTestP2P(t) - p2 := p2ptest.NewTestP2P(t) - p1.Connect(p2) - assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected") - bitfield := [8]byte{'A', 'B'} - p2.LocalMetadata = wrapper.WrappedMetadataV0(&pb.MetaDataV0{ - SeqNumber: 2, - Attnets: bitfield[:], - }) + // Compute the number of seconds in an epoch. + secondsPerEpoch := oneEpoch() - // Set up a head state in the database with data we expect. - d := db.SetupDB(t) - chain := &mock.ChainService{Genesis: time.Now().Add(-5 * oneEpoch()), ValidatorsRoot: [32]byte{}} - r := &Service{ - cfg: &config{ - beaconDB: d, - p2p: p1, - chain: chain, - clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot), + testCases := []struct { + name string + topic string + epochsSinceGenesisPeer1, epochsSinceGenesisPeer2 int + metadataPeer2, expected metadata.Metadata + }{ + { + name: "Phase0-Phase0", + topic: p2p.RPCMetaDataTopicV1, + epochsSinceGenesisPeer1: 0, + epochsSinceGenesisPeer2: 0, + metadataPeer2: wrapper.WrappedMetadataV0(&pb.MetaDataV0{ + SeqNumber: seqNumber, + Attnets: attnets, + }), + expected: wrapper.WrappedMetadataV0(&pb.MetaDataV0{ + SeqNumber: seqNumber, + Attnets: attnets, + }), }, - rateLimiter: newRateLimiter(p1), - } - - chain2 := &mock.ChainService{Genesis: time.Now().Add(-5 * oneEpoch()), ValidatorsRoot: [32]byte{}} - r2 := &Service{ - cfg: &config{ - beaconDB: d, - p2p: p2, - chain: chain2, - clock: startup.NewClock(chain2.Genesis, chain2.ValidatorsRoot), + { + name: "Phase0-Altair", + topic: p2p.RPCMetaDataTopicV1, + epochsSinceGenesisPeer1: 0, + epochsSinceGenesisPeer2: 5, + metadataPeer2: wrapper.WrappedMetadataV1(&pb.MetaDataV1{ + SeqNumber: seqNumber, + Attnets: attnets, + Syncnets: syncnets, + }), + expected: wrapper.WrappedMetadataV0(&pb.MetaDataV0{ + SeqNumber: seqNumber, + Attnets: attnets, + }), + }, + { + name: "Phase0-Fulu", + topic: p2p.RPCMetaDataTopicV1, + epochsSinceGenesisPeer1: 0, + epochsSinceGenesisPeer2: 15, + metadataPeer2: wrapper.WrappedMetadataV2(&pb.MetaDataV2{ + SeqNumber: seqNumber, + Attnets: attnets, + Syncnets: syncnets, + CustodyGroupCount: custodyGroupCount, + }), + expected: wrapper.WrappedMetadataV0(&pb.MetaDataV0{ + SeqNumber: seqNumber, + Attnets: attnets, + }), + }, + { + name: "Altair-Phase0", + topic: p2p.RPCMetaDataTopicV2, + epochsSinceGenesisPeer1: 5, + epochsSinceGenesisPeer2: 0, + metadataPeer2: wrapper.WrappedMetadataV0(&pb.MetaDataV0{ + SeqNumber: seqNumber, + Attnets: attnets, + }), + expected: wrapper.WrappedMetadataV1(&pb.MetaDataV1{ + SeqNumber: seqNumber, + Attnets: attnets, + Syncnets: bitfield.Bitvector4{byte(0x00)}, + }), + }, + { + name: "Altair-Altair", + topic: p2p.RPCMetaDataTopicV2, + epochsSinceGenesisPeer1: 5, + epochsSinceGenesisPeer2: 5, + metadataPeer2: wrapper.WrappedMetadataV1(&pb.MetaDataV1{ + SeqNumber: seqNumber, + Attnets: attnets, + Syncnets: syncnets, + }), + expected: wrapper.WrappedMetadataV1(&pb.MetaDataV1{ + SeqNumber: seqNumber, + Attnets: attnets, + Syncnets: syncnets, + }), + }, + { + name: "Altair-Fulu", + topic: p2p.RPCMetaDataTopicV2, + epochsSinceGenesisPeer1: 5, + epochsSinceGenesisPeer2: 15, + metadataPeer2: wrapper.WrappedMetadataV2(&pb.MetaDataV2{ + SeqNumber: seqNumber, + Attnets: attnets, + Syncnets: syncnets, + CustodyGroupCount: custodyGroupCount, + }), + expected: wrapper.WrappedMetadataV1(&pb.MetaDataV1{ + SeqNumber: seqNumber, + Attnets: attnets, + Syncnets: syncnets, + }), + }, + { + name: "Fulu-Phase0", + topic: p2p.RPCMetaDataTopicV3, + epochsSinceGenesisPeer1: 15, + epochsSinceGenesisPeer2: 0, + metadataPeer2: wrapper.WrappedMetadataV0(&pb.MetaDataV0{ + SeqNumber: seqNumber, + Attnets: attnets, + }), + expected: wrapper.WrappedMetadataV2(&pb.MetaDataV2{ + SeqNumber: seqNumber, + Attnets: attnets, + Syncnets: bitfield.Bitvector4{byte(0x00)}, + CustodyGroupCount: 0, + }), + }, + { + name: "Fulu-Altair", + topic: p2p.RPCMetaDataTopicV3, + epochsSinceGenesisPeer1: 15, + epochsSinceGenesisPeer2: 5, + metadataPeer2: wrapper.WrappedMetadataV1(&pb.MetaDataV1{ + SeqNumber: seqNumber, + Attnets: attnets, + Syncnets: syncnets, + }), + expected: wrapper.WrappedMetadataV2(&pb.MetaDataV2{ + SeqNumber: seqNumber, + Attnets: attnets, + Syncnets: syncnets, + CustodyGroupCount: 0, + }), + }, + { + name: "Fulu-Fulu", + topic: p2p.RPCMetaDataTopicV3, + epochsSinceGenesisPeer1: 15, + epochsSinceGenesisPeer2: 15, + metadataPeer2: wrapper.WrappedMetadataV2(&pb.MetaDataV2{ + SeqNumber: seqNumber, + Attnets: attnets, + Syncnets: syncnets, + CustodyGroupCount: custodyGroupCount, + }), + expected: wrapper.WrappedMetadataV2(&pb.MetaDataV2{ + SeqNumber: seqNumber, + Attnets: attnets, + Syncnets: syncnets, + CustodyGroupCount: custodyGroupCount, + }), }, - rateLimiter: newRateLimiter(p2), } - // Setup streams - pcl := protocol.ID(p2p.RPCMetaDataTopicV2 + r.cfg.p2p.Encoding().ProtocolSuffix()) - topic := string(pcl) - r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(2, 2, time.Second, false) - r2.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(2, 2, time.Second, false) + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + var wg sync.WaitGroup - var wg sync.WaitGroup - wg.Add(1) - p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { - defer wg.Done() - err := r2.metaDataHandler(t.Context(), new(interface{}), stream) - assert.NoError(t, err) - }) + ctx := context.Background() - _, err := r.sendMetaDataRequest(t.Context(), p2.BHost.ID()) - assert.NoError(t, err) + // Setup and connect peers. + peer1, peer2 := p2ptest.NewTestP2P(t), p2ptest.NewTestP2P(t) + peer1.Connect(peer2) - if util.WaitTimeout(&wg, 1*time.Second) { - t.Fatal("Did not receive stream within 1 sec") - } + // Ensure the peers are connected. + peersCount := len(peer1.BHost.Network().Peers()) + require.Equal(t, 1, peersCount, "Expected peers to be connected") - // Fix up peer with the correct metadata. - p2.LocalMetadata = wrapper.WrappedMetadataV1(&pb.MetaDataV1{ - SeqNumber: 2, - Attnets: bitfield[:], - Syncnets: []byte{0x0}, - }) + // Setup sync services. + genesisPeer1 := time.Now().Add(-time.Duration(tc.epochsSinceGenesisPeer1) * secondsPerEpoch) + genesisPeer2 := time.Now().Add(-time.Duration(tc.epochsSinceGenesisPeer2) * secondsPerEpoch) - wg.Add(1) - p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { - defer wg.Done() - assert.NoError(t, r2.metaDataHandler(t.Context(), new(interface{}), stream)) - }) + chainPeer1 := &mock.ChainService{Genesis: genesisPeer1, ValidatorsRoot: [32]byte{}} + chainPeer2 := &mock.ChainService{Genesis: genesisPeer2, ValidatorsRoot: [32]byte{}} - md, err := r.sendMetaDataRequest(t.Context(), p2.BHost.ID()) - assert.NoError(t, err) + servicePeer1 := createService(peer1, chainPeer1) + servicePeer2 := createService(peer2, chainPeer2) - if !equality.DeepEqual(md.InnerObject(), p2.LocalMetadata.InnerObject()) { - t.Fatalf("MetadataV1 unequal, received %v but wanted %v", md, p2.LocalMetadata) - } + // Define the behavior of peer2 when receiving a METADATA request. + protocolSuffix := servicePeer2.cfg.p2p.Encoding().ProtocolSuffix() + protocolID := protocol.ID(tc.topic + protocolSuffix) + peer2.LocalMetadata = tc.metadataPeer2 - if util.WaitTimeout(&wg, 1*time.Second) { - t.Fatal("Did not receive stream within 1 sec") - } + wg.Add(1) + peer2.BHost.SetStreamHandler(protocolID, func(stream network.Stream) { + defer wg.Done() + err := servicePeer2.metaDataHandler(ctx, new(interface{}), stream) + require.NoError(t, err) + }) - conns := p1.BHost.Network().ConnsToPeer(p2.BHost.ID()) - if len(conns) == 0 { - t.Error("Peer is disconnected despite receiving a valid ping") + // Send a METADATA request from peer1 to peer2. + actual, err := servicePeer1.sendMetaDataRequest(ctx, peer2.BHost.ID()) + require.NoError(t, err) + + // Wait until the METADATA request is received by peer2 or timeout. + timeOutReached := util.WaitTimeout(&wg, requestTimeout) + require.Equal(t, false, timeOutReached, "Did not receive METADATA request within timeout") + + // Compare the received METADATA object with the expected METADATA object. + require.DeepSSZEqual(t, tc.expected.InnerObject(), actual.InnerObject(), "Metadata unequal") + + // Ensure the peers are still connected. + peersCount = len(peer1.BHost.Network().Peers()) + assert.Equal(t, 1, peersCount, "Expected peers to be connected") + }) } } diff --git a/changelog/manu-peerdas-metadata.md b/changelog/manu-peerdas-metadata.md new file mode 100644 index 0000000000..937f84288b --- /dev/null +++ b/changelog/manu-peerdas-metadata.md @@ -0,0 +1,2 @@ +### Added +- PeerDAS: Implement the new Fulu Metadata.