mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 07:28:06 -05:00
Implement the new Fulu Metadata. (#15440)
This commit is contained in:
@@ -55,9 +55,10 @@ func newRateLimiter(p2pProvider p2p.P2P) *limiter {
|
||||
topicMap := make(map[string]*leakybucket.Collector, len(p2p.RPCTopicMappings))
|
||||
// Goodbye Message
|
||||
topicMap[addEncoding(p2p.RPCGoodByeTopicV1)] = leakybucket.NewCollector(1, 1, leakyBucketPeriod, false /* deleteEmptyBuckets */)
|
||||
// MetadataV0 Message
|
||||
// Metadata Message
|
||||
topicMap[addEncoding(p2p.RPCMetaDataTopicV1)] = leakybucket.NewCollector(1, defaultBurstLimit, leakyBucketPeriod, false /* deleteEmptyBuckets */)
|
||||
topicMap[addEncoding(p2p.RPCMetaDataTopicV2)] = leakybucket.NewCollector(1, defaultBurstLimit, leakyBucketPeriod, false /* deleteEmptyBuckets */)
|
||||
topicMap[addEncoding(p2p.RPCMetaDataTopicV3)] = leakybucket.NewCollector(1, defaultBurstLimit, leakyBucketPeriod, false /* deleteEmptyBuckets */)
|
||||
// Ping Message
|
||||
topicMap[addEncoding(p2p.RPCPingTopicV1)] = leakybucket.NewCollector(1, defaultBurstLimit, leakyBucketPeriod, false /* deleteEmptyBuckets */)
|
||||
// Status Message
|
||||
|
||||
@@ -17,7 +17,7 @@ import (
|
||||
|
||||
func TestNewRateLimiter(t *testing.T) {
|
||||
rlimiter := newRateLimiter(mockp2p.NewTestP2P(t))
|
||||
assert.Equal(t, len(rlimiter.limiterMap), 18, "correct number of topics not registered")
|
||||
assert.Equal(t, len(rlimiter.limiterMap), 19, "correct number of topics not registered")
|
||||
}
|
||||
|
||||
func TestNewRateLimiter_FreeCorrectly(t *testing.T) {
|
||||
|
||||
@@ -17,7 +17,7 @@ import (
|
||||
"github.com/prysmaticlabs/go-bitfield"
|
||||
)
|
||||
|
||||
// metaDataHandler reads the incoming metadata rpc request from the peer.
|
||||
// metaDataHandler reads the incoming metadata RPC request from the peer.
|
||||
func (s *Service) metaDataHandler(_ context.Context, _ interface{}, stream libp2pcore.Stream) error {
|
||||
SetRPCStreamDeadlines(stream)
|
||||
|
||||
@@ -70,7 +70,9 @@ func (s *Service) metaDataHandler(_ context.Context, _ interface{}, stream libp2
|
||||
switch streamVersion {
|
||||
case p2p.SchemaVersionV1:
|
||||
switch metadataVersion {
|
||||
case version.Altair, version.Deneb:
|
||||
case version.Altair, version.Fulu:
|
||||
// If the stream version corresponds to Phase 0 but our metadata
|
||||
// corresponds to Altair or Fulu, convert our metadata to the Phase 0 one.
|
||||
metadata = wrapper.WrappedMetadataV0(
|
||||
&pb.MetaDataV0{
|
||||
Attnets: metadata.AttnetsBitfield(),
|
||||
@@ -81,13 +83,18 @@ func (s *Service) metaDataHandler(_ context.Context, _ interface{}, stream libp2
|
||||
case p2p.SchemaVersionV2:
|
||||
switch metadataVersion {
|
||||
case version.Phase0:
|
||||
// If the stream version corresponds to Altair but our metadata
|
||||
// corresponds to Phase 0, convert our metadata to the Altair one,
|
||||
// and use a zeroed syncnets bitfield.
|
||||
metadata = wrapper.WrappedMetadataV1(
|
||||
&pb.MetaDataV1{
|
||||
Attnets: metadata.AttnetsBitfield(),
|
||||
SeqNumber: metadata.SequenceNumber(),
|
||||
Syncnets: bitfield.Bitvector4{byte(0x00)},
|
||||
})
|
||||
case version.Deneb:
|
||||
case version.Fulu:
|
||||
// If the stream version corresponds to Altair but our metadata
|
||||
// corresponds to Fulu, convert our metadata to the Altair one.
|
||||
metadata = wrapper.WrappedMetadataV1(
|
||||
&pb.MetaDataV1{
|
||||
Attnets: metadata.AttnetsBitfield(),
|
||||
@@ -95,6 +102,32 @@ func (s *Service) metaDataHandler(_ context.Context, _ interface{}, stream libp2
|
||||
Syncnets: metadata.SyncnetsBitfield(),
|
||||
})
|
||||
}
|
||||
|
||||
case p2p.SchemaVersionV3:
|
||||
switch metadataVersion {
|
||||
case version.Phase0:
|
||||
// If the stream version corresponds to Fulu but our metadata
|
||||
// corresponds to Phase 0, convert our metadata to the Fulu one,
|
||||
// and use a zeroed syncnets bitfield and custody group count.
|
||||
metadata = wrapper.WrappedMetadataV2(
|
||||
&pb.MetaDataV2{
|
||||
Attnets: metadata.AttnetsBitfield(),
|
||||
SeqNumber: metadata.SequenceNumber(),
|
||||
Syncnets: bitfield.Bitvector4{byte(0x00)},
|
||||
CustodyGroupCount: 0,
|
||||
})
|
||||
case version.Altair:
|
||||
// If the stream version corresponds to Fulu but our metadata
|
||||
// corresponds to Altair, convert our metadata to the Fulu one and
|
||||
// use a zeroed custody group count.
|
||||
metadata = wrapper.WrappedMetadataV2(
|
||||
&pb.MetaDataV2{
|
||||
Attnets: metadata.AttnetsBitfield(),
|
||||
SeqNumber: metadata.SequenceNumber(),
|
||||
Syncnets: metadata.SyncnetsBitfield(),
|
||||
CustodyGroupCount: 0,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Write the METADATA response into the stream.
|
||||
@@ -164,12 +197,14 @@ func (s *Service) sendMetaDataRequest(ctx context.Context, peerID peer.ID) (meta
|
||||
}
|
||||
|
||||
// Defensive check to ensure valid objects are being sent.
|
||||
topicVersion := ""
|
||||
var topicVersion string
|
||||
switch msg.Version() {
|
||||
case version.Phase0:
|
||||
topicVersion = p2p.SchemaVersionV1
|
||||
case version.Altair:
|
||||
topicVersion = p2p.SchemaVersionV2
|
||||
case version.Fulu:
|
||||
topicVersion = p2p.SchemaVersionV3
|
||||
}
|
||||
|
||||
// Validate the version of the topic.
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -15,6 +16,7 @@ import (
|
||||
leakybucket "github.com/OffchainLabs/prysm/v6/container/leaky-bucket"
|
||||
"github.com/OffchainLabs/prysm/v6/encoding/ssz/equality"
|
||||
pb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
|
||||
"github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1/metadata"
|
||||
"github.com/OffchainLabs/prysm/v6/testing/assert"
|
||||
"github.com/OffchainLabs/prysm/v6/testing/require"
|
||||
"github.com/OffchainLabs/prysm/v6/testing/util"
|
||||
@@ -22,6 +24,7 @@ import (
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/protocol"
|
||||
libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic"
|
||||
"github.com/prysmaticlabs/go-bitfield"
|
||||
)
|
||||
|
||||
func TestMetaDataRPCHandler_ReceivesMetadata(t *testing.T) {
|
||||
@@ -76,158 +79,241 @@ func TestMetaDataRPCHandler_ReceivesMetadata(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMetadataRPCHandler_SendsMetadata(t *testing.T) {
|
||||
p1 := p2ptest.NewTestP2P(t)
|
||||
p2 := p2ptest.NewTestP2P(t)
|
||||
p1.Connect(p2)
|
||||
assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")
|
||||
bitfield := [8]byte{'A', 'B'}
|
||||
p2.LocalMetadata = wrapper.WrappedMetadataV0(&pb.MetaDataV0{
|
||||
SeqNumber: 2,
|
||||
Attnets: bitfield[:],
|
||||
})
|
||||
|
||||
// Set up a head state in the database with data we expect.
|
||||
chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
|
||||
d := db.SetupDB(t)
|
||||
r := &Service{
|
||||
func createService(peer p2p.P2P, chain *mock.ChainService) *Service {
|
||||
return &Service{
|
||||
cfg: &config{
|
||||
beaconDB: d,
|
||||
p2p: p1,
|
||||
chain: chain,
|
||||
clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot),
|
||||
p2p: peer,
|
||||
chain: chain,
|
||||
clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot),
|
||||
},
|
||||
rateLimiter: newRateLimiter(p1),
|
||||
}
|
||||
|
||||
r2 := &Service{
|
||||
cfg: &config{
|
||||
beaconDB: d,
|
||||
p2p: p2,
|
||||
chain: &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}},
|
||||
},
|
||||
rateLimiter: newRateLimiter(p2),
|
||||
}
|
||||
|
||||
// Setup streams
|
||||
pcl := protocol.ID(p2p.RPCMetaDataTopicV1 + r.cfg.p2p.Encoding().ProtocolSuffix())
|
||||
topic := string(pcl)
|
||||
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false)
|
||||
r2.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
|
||||
defer wg.Done()
|
||||
assert.NoError(t, r2.metaDataHandler(t.Context(), new(interface{}), stream))
|
||||
})
|
||||
|
||||
md, err := r.sendMetaDataRequest(t.Context(), p2.BHost.ID())
|
||||
assert.NoError(t, err)
|
||||
|
||||
if !equality.DeepEqual(md.InnerObject(), p2.LocalMetadata.InnerObject()) {
|
||||
t.Fatalf("MetadataV0 unequal, received %v but wanted %v", md, p2.LocalMetadata)
|
||||
}
|
||||
|
||||
if util.WaitTimeout(&wg, 1*time.Second) {
|
||||
t.Fatal("Did not receive stream within 1 sec")
|
||||
}
|
||||
|
||||
conns := p1.BHost.Network().ConnsToPeer(p2.BHost.ID())
|
||||
if len(conns) == 0 {
|
||||
t.Error("Peer is disconnected despite receiving a valid ping")
|
||||
rateLimiter: newRateLimiter(peer),
|
||||
}
|
||||
}
|
||||
|
||||
func TestMetadataRPCHandler_SendsMetadataAltair(t *testing.T) {
|
||||
func TestMetadataRPCHandler_SendMetadataRequest(t *testing.T) {
|
||||
const (
|
||||
requestTimeout = 1 * time.Second
|
||||
seqNumber = 2
|
||||
custodyGroupCount = 4
|
||||
)
|
||||
|
||||
attnets := []byte{'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H'}
|
||||
syncnets := []byte{0x4}
|
||||
|
||||
// Configure the test beacon chain.
|
||||
params.SetupTestConfigCleanup(t)
|
||||
bCfg := params.BeaconConfig().Copy()
|
||||
bCfg.AltairForkEpoch = 5
|
||||
params.OverrideBeaconConfig(bCfg)
|
||||
beaconChainConfig := params.BeaconConfig().Copy()
|
||||
beaconChainConfig.AltairForkEpoch = 5
|
||||
beaconChainConfig.FuluForkEpoch = 15
|
||||
params.OverrideBeaconConfig(beaconChainConfig)
|
||||
params.BeaconConfig().InitializeForkSchedule()
|
||||
|
||||
p1 := p2ptest.NewTestP2P(t)
|
||||
p2 := p2ptest.NewTestP2P(t)
|
||||
p1.Connect(p2)
|
||||
assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")
|
||||
bitfield := [8]byte{'A', 'B'}
|
||||
p2.LocalMetadata = wrapper.WrappedMetadataV0(&pb.MetaDataV0{
|
||||
SeqNumber: 2,
|
||||
Attnets: bitfield[:],
|
||||
})
|
||||
// Compute the number of seconds in an epoch.
|
||||
secondsPerEpoch := oneEpoch()
|
||||
|
||||
// Set up a head state in the database with data we expect.
|
||||
d := db.SetupDB(t)
|
||||
chain := &mock.ChainService{Genesis: time.Now().Add(-5 * oneEpoch()), ValidatorsRoot: [32]byte{}}
|
||||
r := &Service{
|
||||
cfg: &config{
|
||||
beaconDB: d,
|
||||
p2p: p1,
|
||||
chain: chain,
|
||||
clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot),
|
||||
testCases := []struct {
|
||||
name string
|
||||
topic string
|
||||
epochsSinceGenesisPeer1, epochsSinceGenesisPeer2 int
|
||||
metadataPeer2, expected metadata.Metadata
|
||||
}{
|
||||
{
|
||||
name: "Phase0-Phase0",
|
||||
topic: p2p.RPCMetaDataTopicV1,
|
||||
epochsSinceGenesisPeer1: 0,
|
||||
epochsSinceGenesisPeer2: 0,
|
||||
metadataPeer2: wrapper.WrappedMetadataV0(&pb.MetaDataV0{
|
||||
SeqNumber: seqNumber,
|
||||
Attnets: attnets,
|
||||
}),
|
||||
expected: wrapper.WrappedMetadataV0(&pb.MetaDataV0{
|
||||
SeqNumber: seqNumber,
|
||||
Attnets: attnets,
|
||||
}),
|
||||
},
|
||||
rateLimiter: newRateLimiter(p1),
|
||||
}
|
||||
|
||||
chain2 := &mock.ChainService{Genesis: time.Now().Add(-5 * oneEpoch()), ValidatorsRoot: [32]byte{}}
|
||||
r2 := &Service{
|
||||
cfg: &config{
|
||||
beaconDB: d,
|
||||
p2p: p2,
|
||||
chain: chain2,
|
||||
clock: startup.NewClock(chain2.Genesis, chain2.ValidatorsRoot),
|
||||
{
|
||||
name: "Phase0-Altair",
|
||||
topic: p2p.RPCMetaDataTopicV1,
|
||||
epochsSinceGenesisPeer1: 0,
|
||||
epochsSinceGenesisPeer2: 5,
|
||||
metadataPeer2: wrapper.WrappedMetadataV1(&pb.MetaDataV1{
|
||||
SeqNumber: seqNumber,
|
||||
Attnets: attnets,
|
||||
Syncnets: syncnets,
|
||||
}),
|
||||
expected: wrapper.WrappedMetadataV0(&pb.MetaDataV0{
|
||||
SeqNumber: seqNumber,
|
||||
Attnets: attnets,
|
||||
}),
|
||||
},
|
||||
{
|
||||
name: "Phase0-Fulu",
|
||||
topic: p2p.RPCMetaDataTopicV1,
|
||||
epochsSinceGenesisPeer1: 0,
|
||||
epochsSinceGenesisPeer2: 15,
|
||||
metadataPeer2: wrapper.WrappedMetadataV2(&pb.MetaDataV2{
|
||||
SeqNumber: seqNumber,
|
||||
Attnets: attnets,
|
||||
Syncnets: syncnets,
|
||||
CustodyGroupCount: custodyGroupCount,
|
||||
}),
|
||||
expected: wrapper.WrappedMetadataV0(&pb.MetaDataV0{
|
||||
SeqNumber: seqNumber,
|
||||
Attnets: attnets,
|
||||
}),
|
||||
},
|
||||
{
|
||||
name: "Altair-Phase0",
|
||||
topic: p2p.RPCMetaDataTopicV2,
|
||||
epochsSinceGenesisPeer1: 5,
|
||||
epochsSinceGenesisPeer2: 0,
|
||||
metadataPeer2: wrapper.WrappedMetadataV0(&pb.MetaDataV0{
|
||||
SeqNumber: seqNumber,
|
||||
Attnets: attnets,
|
||||
}),
|
||||
expected: wrapper.WrappedMetadataV1(&pb.MetaDataV1{
|
||||
SeqNumber: seqNumber,
|
||||
Attnets: attnets,
|
||||
Syncnets: bitfield.Bitvector4{byte(0x00)},
|
||||
}),
|
||||
},
|
||||
{
|
||||
name: "Altair-Altair",
|
||||
topic: p2p.RPCMetaDataTopicV2,
|
||||
epochsSinceGenesisPeer1: 5,
|
||||
epochsSinceGenesisPeer2: 5,
|
||||
metadataPeer2: wrapper.WrappedMetadataV1(&pb.MetaDataV1{
|
||||
SeqNumber: seqNumber,
|
||||
Attnets: attnets,
|
||||
Syncnets: syncnets,
|
||||
}),
|
||||
expected: wrapper.WrappedMetadataV1(&pb.MetaDataV1{
|
||||
SeqNumber: seqNumber,
|
||||
Attnets: attnets,
|
||||
Syncnets: syncnets,
|
||||
}),
|
||||
},
|
||||
{
|
||||
name: "Altair-Fulu",
|
||||
topic: p2p.RPCMetaDataTopicV2,
|
||||
epochsSinceGenesisPeer1: 5,
|
||||
epochsSinceGenesisPeer2: 15,
|
||||
metadataPeer2: wrapper.WrappedMetadataV2(&pb.MetaDataV2{
|
||||
SeqNumber: seqNumber,
|
||||
Attnets: attnets,
|
||||
Syncnets: syncnets,
|
||||
CustodyGroupCount: custodyGroupCount,
|
||||
}),
|
||||
expected: wrapper.WrappedMetadataV1(&pb.MetaDataV1{
|
||||
SeqNumber: seqNumber,
|
||||
Attnets: attnets,
|
||||
Syncnets: syncnets,
|
||||
}),
|
||||
},
|
||||
{
|
||||
name: "Fulu-Phase0",
|
||||
topic: p2p.RPCMetaDataTopicV3,
|
||||
epochsSinceGenesisPeer1: 15,
|
||||
epochsSinceGenesisPeer2: 0,
|
||||
metadataPeer2: wrapper.WrappedMetadataV0(&pb.MetaDataV0{
|
||||
SeqNumber: seqNumber,
|
||||
Attnets: attnets,
|
||||
}),
|
||||
expected: wrapper.WrappedMetadataV2(&pb.MetaDataV2{
|
||||
SeqNumber: seqNumber,
|
||||
Attnets: attnets,
|
||||
Syncnets: bitfield.Bitvector4{byte(0x00)},
|
||||
CustodyGroupCount: 0,
|
||||
}),
|
||||
},
|
||||
{
|
||||
name: "Fulu-Altair",
|
||||
topic: p2p.RPCMetaDataTopicV3,
|
||||
epochsSinceGenesisPeer1: 15,
|
||||
epochsSinceGenesisPeer2: 5,
|
||||
metadataPeer2: wrapper.WrappedMetadataV1(&pb.MetaDataV1{
|
||||
SeqNumber: seqNumber,
|
||||
Attnets: attnets,
|
||||
Syncnets: syncnets,
|
||||
}),
|
||||
expected: wrapper.WrappedMetadataV2(&pb.MetaDataV2{
|
||||
SeqNumber: seqNumber,
|
||||
Attnets: attnets,
|
||||
Syncnets: syncnets,
|
||||
CustodyGroupCount: 0,
|
||||
}),
|
||||
},
|
||||
{
|
||||
name: "Fulu-Fulu",
|
||||
topic: p2p.RPCMetaDataTopicV3,
|
||||
epochsSinceGenesisPeer1: 15,
|
||||
epochsSinceGenesisPeer2: 15,
|
||||
metadataPeer2: wrapper.WrappedMetadataV2(&pb.MetaDataV2{
|
||||
SeqNumber: seqNumber,
|
||||
Attnets: attnets,
|
||||
Syncnets: syncnets,
|
||||
CustodyGroupCount: custodyGroupCount,
|
||||
}),
|
||||
expected: wrapper.WrappedMetadataV2(&pb.MetaDataV2{
|
||||
SeqNumber: seqNumber,
|
||||
Attnets: attnets,
|
||||
Syncnets: syncnets,
|
||||
CustodyGroupCount: custodyGroupCount,
|
||||
}),
|
||||
},
|
||||
rateLimiter: newRateLimiter(p2),
|
||||
}
|
||||
|
||||
// Setup streams
|
||||
pcl := protocol.ID(p2p.RPCMetaDataTopicV2 + r.cfg.p2p.Encoding().ProtocolSuffix())
|
||||
topic := string(pcl)
|
||||
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(2, 2, time.Second, false)
|
||||
r2.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(2, 2, time.Second, false)
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
|
||||
defer wg.Done()
|
||||
err := r2.metaDataHandler(t.Context(), new(interface{}), stream)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
ctx := context.Background()
|
||||
|
||||
_, err := r.sendMetaDataRequest(t.Context(), p2.BHost.ID())
|
||||
assert.NoError(t, err)
|
||||
// Setup and connect peers.
|
||||
peer1, peer2 := p2ptest.NewTestP2P(t), p2ptest.NewTestP2P(t)
|
||||
peer1.Connect(peer2)
|
||||
|
||||
if util.WaitTimeout(&wg, 1*time.Second) {
|
||||
t.Fatal("Did not receive stream within 1 sec")
|
||||
}
|
||||
// Ensure the peers are connected.
|
||||
peersCount := len(peer1.BHost.Network().Peers())
|
||||
require.Equal(t, 1, peersCount, "Expected peers to be connected")
|
||||
|
||||
// Fix up peer with the correct metadata.
|
||||
p2.LocalMetadata = wrapper.WrappedMetadataV1(&pb.MetaDataV1{
|
||||
SeqNumber: 2,
|
||||
Attnets: bitfield[:],
|
||||
Syncnets: []byte{0x0},
|
||||
})
|
||||
// Setup sync services.
|
||||
genesisPeer1 := time.Now().Add(-time.Duration(tc.epochsSinceGenesisPeer1) * secondsPerEpoch)
|
||||
genesisPeer2 := time.Now().Add(-time.Duration(tc.epochsSinceGenesisPeer2) * secondsPerEpoch)
|
||||
|
||||
wg.Add(1)
|
||||
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
|
||||
defer wg.Done()
|
||||
assert.NoError(t, r2.metaDataHandler(t.Context(), new(interface{}), stream))
|
||||
})
|
||||
chainPeer1 := &mock.ChainService{Genesis: genesisPeer1, ValidatorsRoot: [32]byte{}}
|
||||
chainPeer2 := &mock.ChainService{Genesis: genesisPeer2, ValidatorsRoot: [32]byte{}}
|
||||
|
||||
md, err := r.sendMetaDataRequest(t.Context(), p2.BHost.ID())
|
||||
assert.NoError(t, err)
|
||||
servicePeer1 := createService(peer1, chainPeer1)
|
||||
servicePeer2 := createService(peer2, chainPeer2)
|
||||
|
||||
if !equality.DeepEqual(md.InnerObject(), p2.LocalMetadata.InnerObject()) {
|
||||
t.Fatalf("MetadataV1 unequal, received %v but wanted %v", md, p2.LocalMetadata)
|
||||
}
|
||||
// Define the behavior of peer2 when receiving a METADATA request.
|
||||
protocolSuffix := servicePeer2.cfg.p2p.Encoding().ProtocolSuffix()
|
||||
protocolID := protocol.ID(tc.topic + protocolSuffix)
|
||||
peer2.LocalMetadata = tc.metadataPeer2
|
||||
|
||||
if util.WaitTimeout(&wg, 1*time.Second) {
|
||||
t.Fatal("Did not receive stream within 1 sec")
|
||||
}
|
||||
wg.Add(1)
|
||||
peer2.BHost.SetStreamHandler(protocolID, func(stream network.Stream) {
|
||||
defer wg.Done()
|
||||
err := servicePeer2.metaDataHandler(ctx, new(interface{}), stream)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
conns := p1.BHost.Network().ConnsToPeer(p2.BHost.ID())
|
||||
if len(conns) == 0 {
|
||||
t.Error("Peer is disconnected despite receiving a valid ping")
|
||||
// Send a METADATA request from peer1 to peer2.
|
||||
actual, err := servicePeer1.sendMetaDataRequest(ctx, peer2.BHost.ID())
|
||||
require.NoError(t, err)
|
||||
|
||||
// Wait until the METADATA request is received by peer2 or timeout.
|
||||
timeOutReached := util.WaitTimeout(&wg, requestTimeout)
|
||||
require.Equal(t, false, timeOutReached, "Did not receive METADATA request within timeout")
|
||||
|
||||
// Compare the received METADATA object with the expected METADATA object.
|
||||
require.DeepSSZEqual(t, tc.expected.InnerObject(), actual.InnerObject(), "Metadata unequal")
|
||||
|
||||
// Ensure the peers are still connected.
|
||||
peersCount = len(peer1.BHost.Network().Peers())
|
||||
assert.Equal(t, 1, peersCount, "Expected peers to be connected")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
2
changelog/manu-peerdas-metadata.md
Normal file
2
changelog/manu-peerdas-metadata.md
Normal file
@@ -0,0 +1,2 @@
|
||||
### Added
|
||||
- PeerDAS: Implement the new Fulu Metadata.
|
||||
Reference in New Issue
Block a user