package p2p import ( "context" "fmt" "net" "reflect" "sync" "testing" "time" "github.com/OffchainLabs/go-bitfield" "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/kzg" "github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers" "github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas" testDB "github.com/OffchainLabs/prysm/v6/beacon-chain/db/testing" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/scorers" p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing" "github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags" fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" "github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/consensus-types/blocks" "github.com/OffchainLabs/prysm/v6/consensus-types/interfaces" "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" "github.com/OffchainLabs/prysm/v6/consensus-types/wrapper" "github.com/OffchainLabs/prysm/v6/encoding/bytesutil" ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" testpb "github.com/OffchainLabs/prysm/v6/proto/testing" "github.com/OffchainLabs/prysm/v6/testing/assert" "github.com/OffchainLabs/prysm/v6/testing/require" "github.com/OffchainLabs/prysm/v6/testing/util" "github.com/OffchainLabs/prysm/v6/time/slots" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" "google.golang.org/protobuf/proto" ) func TestService_Broadcast(t *testing.T) { p1 := p2ptest.NewTestP2P(t) p2 := p2ptest.NewTestP2P(t) p1.Connect(p2) if len(p1.BHost.Network().Peers()) == 0 { t.Fatal("No peers") } p := &Service{ host: p1.BHost, pubsub: p1.PubSub(), joinedTopics: map[string]*pubsub.Topic{}, cfg: &Config{}, genesisTime: time.Now(), genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32), } msg := ðpb.Fork{ Epoch: 55, CurrentVersion: []byte("fooo"), PreviousVersion: []byte("barr"), } topic := "/eth2/%x/testing" // Set a test gossip mapping for testpb.TestSimpleMessage. GossipTypeMapping[reflect.TypeOf(msg)] = topic digest, err := p.currentForkDigest() require.NoError(t, err) topic = fmt.Sprintf(topic, digest) // External peer subscribes to the topic. topic += p.Encoding().ProtocolSuffix() sub, err := p2.SubscribeToTopic(topic) require.NoError(t, err) time.Sleep(50 * time.Millisecond) // libp2p fails without this delay... // Async listen for the pubsub, must be before the broadcast. var wg sync.WaitGroup wg.Add(1) go func(tt *testing.T) { defer wg.Done() ctx, cancel := context.WithTimeout(t.Context(), 1*time.Second) defer cancel() incomingMessage, err := sub.Next(ctx) require.NoError(t, err) result := ðpb.Fork{} require.NoError(t, p.Encoding().DecodeGossip(incomingMessage.Data, result)) if !proto.Equal(result, msg) { tt.Errorf("Did not receive expected message, got %+v, wanted %+v", result, msg) } }(t) // Broadcast to peers and wait. require.NoError(t, p.Broadcast(t.Context(), msg)) if util.WaitTimeout(&wg, 1*time.Second) { t.Error("Failed to receive pubsub within 1s") } } func TestService_Broadcast_ReturnsErr_TopicNotMapped(t *testing.T) { p := Service{ genesisTime: time.Now(), genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32), } assert.ErrorContains(t, ErrMessageNotMapped.Error(), p.Broadcast(t.Context(), &testpb.AddressBook{})) } func TestService_Attestation_Subnet(t *testing.T) { if gtm := GossipTypeMapping[reflect.TypeOf(ðpb.Attestation{})]; gtm != AttestationSubnetTopicFormat { t.Errorf("Constant is out of date. Wanted %s, got %s", AttestationSubnetTopicFormat, gtm) } tests := []struct { att *ethpb.Attestation topic string }{ { att: ðpb.Attestation{ Data: ðpb.AttestationData{ CommitteeIndex: 0, Slot: 2, }, }, topic: "/eth2/00000000/beacon_attestation_2", }, { att: ðpb.Attestation{ Data: ðpb.AttestationData{ CommitteeIndex: 11, Slot: 10, }, }, topic: "/eth2/00000000/beacon_attestation_21", }, { att: ðpb.Attestation{ Data: ðpb.AttestationData{ CommitteeIndex: 55, Slot: 529, }, }, topic: "/eth2/00000000/beacon_attestation_8", }, } for _, tt := range tests { subnet := helpers.ComputeSubnetFromCommitteeAndSlot(100, tt.att.Data.CommitteeIndex, tt.att.Data.Slot) assert.Equal(t, tt.topic, attestationToTopic(subnet, [4]byte{} /* fork digest */), "Wrong topic") } } func TestService_BroadcastAttestation(t *testing.T) { p1 := p2ptest.NewTestP2P(t) p2 := p2ptest.NewTestP2P(t) p1.Connect(p2) if len(p1.BHost.Network().Peers()) == 0 { t.Fatal("No peers") } p := &Service{ host: p1.BHost, pubsub: p1.PubSub(), joinedTopics: map[string]*pubsub.Topic{}, cfg: &Config{}, genesisTime: time.Now(), genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32), subnetsLock: make(map[uint64]*sync.RWMutex), subnetsLockLock: sync.Mutex{}, peers: peers.NewStatus(t.Context(), &peers.StatusConfig{ ScorerParams: &scorers.Config{}, }), } msg := util.HydrateAttestation(ðpb.Attestation{AggregationBits: bitfield.NewBitlist(7)}) subnet := uint64(5) topic := AttestationSubnetTopicFormat GossipTypeMapping[reflect.TypeOf(msg)] = topic digest, err := p.currentForkDigest() require.NoError(t, err) topic = fmt.Sprintf(topic, digest, subnet) // External peer subscribes to the topic. topic += p.Encoding().ProtocolSuffix() sub, err := p2.SubscribeToTopic(topic) require.NoError(t, err) time.Sleep(50 * time.Millisecond) // libp2p fails without this delay... // Async listen for the pubsub, must be before the broadcast. var wg sync.WaitGroup wg.Add(1) go func(tt *testing.T) { defer wg.Done() ctx, cancel := context.WithTimeout(t.Context(), 1*time.Second) defer cancel() incomingMessage, err := sub.Next(ctx) require.NoError(t, err) result := ðpb.Attestation{} require.NoError(t, p.Encoding().DecodeGossip(incomingMessage.Data, result)) if !proto.Equal(result, msg) { tt.Errorf("Did not receive expected message, got %+v, wanted %+v", result, msg) } }(t) // Attempt to broadcast nil object should fail. ctx := t.Context() require.ErrorContains(t, "attempted to broadcast nil", p.BroadcastAttestation(ctx, subnet, nil)) // Broadcast to peers and wait. require.NoError(t, p.BroadcastAttestation(ctx, subnet, msg)) if util.WaitTimeout(&wg, 1*time.Second) { t.Error("Failed to receive pubsub within 1s") } } func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) { const port = uint(2000) // The DB has to be shared in all peers to avoid the // duplicate metrics collector registration attempted. // However, we don't care for this test. db := testDB.SetupDB(t) // Setup bootnode. cfg := &Config{PingInterval: testPingInterval, DB: db} cfg.UDPPort = uint(port) _, pkey := createAddrAndPrivKey(t) ipAddr := net.ParseIP("127.0.0.1") genesisTime := time.Now() genesisValidatorsRoot := make([]byte, 32) s := &Service{ cfg: cfg, genesisTime: genesisTime, genesisValidatorsRoot: genesisValidatorsRoot, custodyInfo: &custodyInfo{}, ctx: t.Context(), custodyInfoSet: make(chan struct{}), } close(s.custodyInfoSet) bootListener, err := s.createListener(ipAddr, pkey) require.NoError(t, err) defer bootListener.Close() bootNode := bootListener.Self() subnet := uint64(5) var listeners []*listenerWrapper var hosts []host.Host // setup other nodes. cfg = &Config{ Discv5BootStrapAddrs: []string{bootNode.String()}, MaxPeers: 2, PingInterval: testPingInterval, DB: db, } // Setup 2 different hosts for i := uint(1); i <= 2; i++ { h, pkey, ipAddr := createHost(t, port+i) cfg.UDPPort = uint(port + i) cfg.TCPPort = uint(port + i) if len(listeners) > 0 { cfg.Discv5BootStrapAddrs = append(cfg.Discv5BootStrapAddrs, listeners[len(listeners)-1].Self().String()) } s := &Service{ cfg: cfg, genesisTime: genesisTime, genesisValidatorsRoot: genesisValidatorsRoot, custodyInfo: &custodyInfo{}, ctx: t.Context(), custodyInfoSet: make(chan struct{}), } close(s.custodyInfoSet) listener, err := s.startDiscoveryV5(ipAddr, pkey) // Set for 2nd peer if i == 2 { s.dv5Listener = listener s.metaData = wrapper.WrappedMetadataV0(new(ethpb.MetaDataV0)) bitV := bitfield.NewBitvector64() bitV.SetBitAt(subnet, true) err := s.updateSubnetRecordWithMetadata(bitV) require.NoError(t, err) } assert.NoError(t, err, "Could not start discovery for node") listeners = append(listeners, listener) hosts = append(hosts, h) } defer func() { // Close down all peers. for _, listener := range listeners { listener.Close() } }() // close peers upon exit of test defer func() { for _, h := range hosts { if err := h.Close(); err != nil { t.Log(err) } } }() ps1, err := pubsub.NewGossipSub(t.Context(), hosts[0], pubsub.WithMessageSigning(false), pubsub.WithStrictSignatureVerification(false), ) require.NoError(t, err) ps2, err := pubsub.NewGossipSub(t.Context(), hosts[1], pubsub.WithMessageSigning(false), pubsub.WithStrictSignatureVerification(false), ) require.NoError(t, err) p := &Service{ host: hosts[0], ctx: t.Context(), pubsub: ps1, dv5Listener: listeners[0], joinedTopics: map[string]*pubsub.Topic{}, cfg: cfg, genesisTime: time.Now(), genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32), subnetsLock: make(map[uint64]*sync.RWMutex), subnetsLockLock: sync.Mutex{}, peers: peers.NewStatus(t.Context(), &peers.StatusConfig{ ScorerParams: &scorers.Config{}, }), } p2 := &Service{ host: hosts[1], ctx: t.Context(), pubsub: ps2, dv5Listener: listeners[1], joinedTopics: map[string]*pubsub.Topic{}, cfg: cfg, genesisTime: time.Now(), genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32), subnetsLock: make(map[uint64]*sync.RWMutex), subnetsLockLock: sync.Mutex{}, peers: peers.NewStatus(t.Context(), &peers.StatusConfig{ ScorerParams: &scorers.Config{}, }), } go p.listenForNewNodes() go p2.listenForNewNodes() msg := util.HydrateAttestation(ðpb.Attestation{AggregationBits: bitfield.NewBitlist(7)}) topic := AttestationSubnetTopicFormat GossipTypeMapping[reflect.TypeOf(msg)] = topic digest, err := p.currentForkDigest() require.NoError(t, err) topic = fmt.Sprintf(topic, digest, subnet) // External peer subscribes to the topic. topic += p.Encoding().ProtocolSuffix() // We don't use our internal subscribe method // due to using floodsub over here. tpHandle, err := p2.JoinTopic(topic) require.NoError(t, err) sub, err := tpHandle.Subscribe() require.NoError(t, err) tpHandle, err = p.JoinTopic(topic) require.NoError(t, err) _, err = tpHandle.Subscribe() require.NoError(t, err) time.Sleep(500 * time.Millisecond) // libp2p fails without this delay... nodePeers := p.pubsub.ListPeers(topic) nodePeers2 := p2.pubsub.ListPeers(topic) assert.Equal(t, 1, len(nodePeers)) assert.Equal(t, 1, len(nodePeers2)) // Async listen for the pubsub, must be before the broadcast. var wg sync.WaitGroup wg.Add(1) go func(tt *testing.T) { defer wg.Done() ctx, cancel := context.WithTimeout(t.Context(), 4*time.Second) defer cancel() incomingMessage, err := sub.Next(ctx) require.NoError(t, err) result := ðpb.Attestation{} require.NoError(t, p.Encoding().DecodeGossip(incomingMessage.Data, result)) if !proto.Equal(result, msg) { tt.Errorf("Did not receive expected message, got %+v, wanted %+v", result, msg) } }(t) // Broadcast to peers and wait. require.NoError(t, p.BroadcastAttestation(t.Context(), subnet, msg)) if util.WaitTimeout(&wg, 4*time.Second) { t.Error("Failed to receive pubsub within 4s") } } func TestService_BroadcastSyncCommittee(t *testing.T) { p1 := p2ptest.NewTestP2P(t) p2 := p2ptest.NewTestP2P(t) p1.Connect(p2) if len(p1.BHost.Network().Peers()) == 0 { t.Fatal("No peers") } p := &Service{ host: p1.BHost, pubsub: p1.PubSub(), joinedTopics: map[string]*pubsub.Topic{}, cfg: &Config{}, genesisTime: time.Now(), genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32), subnetsLock: make(map[uint64]*sync.RWMutex), subnetsLockLock: sync.Mutex{}, peers: peers.NewStatus(t.Context(), &peers.StatusConfig{ ScorerParams: &scorers.Config{}, }), } msg := util.HydrateSyncCommittee(ðpb.SyncCommitteeMessage{}) subnet := uint64(5) topic := SyncCommitteeSubnetTopicFormat GossipTypeMapping[reflect.TypeOf(msg)] = topic digest, err := p.currentForkDigest() require.NoError(t, err) topic = fmt.Sprintf(topic, digest, subnet) // External peer subscribes to the topic. topic += p.Encoding().ProtocolSuffix() sub, err := p2.SubscribeToTopic(topic) require.NoError(t, err) time.Sleep(50 * time.Millisecond) // libp2p fails without this delay... // Async listen for the pubsub, must be before the broadcast. var wg sync.WaitGroup wg.Add(1) go func(tt *testing.T) { defer wg.Done() ctx, cancel := context.WithTimeout(t.Context(), 1*time.Second) defer cancel() incomingMessage, err := sub.Next(ctx) require.NoError(t, err) result := ðpb.SyncCommitteeMessage{} require.NoError(t, p.Encoding().DecodeGossip(incomingMessage.Data, result)) if !proto.Equal(result, msg) { tt.Errorf("Did not receive expected message, got %+v, wanted %+v", result, msg) } }(t) // Broadcasting nil should fail. ctx := t.Context() require.ErrorContains(t, "attempted to broadcast nil", p.BroadcastSyncCommitteeMessage(ctx, subnet, nil)) // Broadcast to peers and wait. require.NoError(t, p.BroadcastSyncCommitteeMessage(ctx, subnet, msg)) if util.WaitTimeout(&wg, 1*time.Second) { t.Error("Failed to receive pubsub within 1s") } } func TestService_BroadcastBlob(t *testing.T) { p1 := p2ptest.NewTestP2P(t) p2 := p2ptest.NewTestP2P(t) p1.Connect(p2) require.NotEqual(t, 0, len(p1.BHost.Network().Peers()), "No peers") p := &Service{ host: p1.BHost, pubsub: p1.PubSub(), joinedTopics: map[string]*pubsub.Topic{}, cfg: &Config{}, genesisTime: time.Now(), genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32), subnetsLock: make(map[uint64]*sync.RWMutex), subnetsLockLock: sync.Mutex{}, peers: peers.NewStatus(t.Context(), &peers.StatusConfig{ ScorerParams: &scorers.Config{}, }), } header := util.HydrateSignedBeaconHeader(ðpb.SignedBeaconBlockHeader{}) commitmentInclusionProof := make([][]byte, 17) for i := range commitmentInclusionProof { commitmentInclusionProof[i] = bytesutil.PadTo([]byte{}, 32) } blobSidecar := ðpb.BlobSidecar{ Index: 1, Blob: bytesutil.PadTo([]byte{'C'}, fieldparams.BlobLength), KzgCommitment: bytesutil.PadTo([]byte{'D'}, fieldparams.BLSPubkeyLength), KzgProof: bytesutil.PadTo([]byte{'E'}, fieldparams.BLSPubkeyLength), SignedBlockHeader: header, CommitmentInclusionProof: commitmentInclusionProof, } subnet := uint64(0) topic := BlobSubnetTopicFormat GossipTypeMapping[reflect.TypeOf(blobSidecar)] = topic digest, err := p.currentForkDigest() require.NoError(t, err) topic = fmt.Sprintf(topic, digest, subnet) // External peer subscribes to the topic. topic += p.Encoding().ProtocolSuffix() sub, err := p2.SubscribeToTopic(topic) require.NoError(t, err) time.Sleep(50 * time.Millisecond) // libp2p fails without this delay... // Async listen for the pubsub, must be before the broadcast. var wg sync.WaitGroup wg.Add(1) go func(tt *testing.T) { defer wg.Done() ctx, cancel := context.WithTimeout(t.Context(), 1*time.Second) defer cancel() incomingMessage, err := sub.Next(ctx) require.NoError(t, err) result := ðpb.BlobSidecar{} require.NoError(t, p.Encoding().DecodeGossip(incomingMessage.Data, result)) require.DeepEqual(t, result, blobSidecar) }(t) // Attempt to broadcast nil object should fail. ctx := t.Context() require.ErrorContains(t, "attempted to broadcast nil", p.BroadcastBlob(ctx, subnet, nil)) // Broadcast to peers and wait. require.NoError(t, p.BroadcastBlob(ctx, subnet, blobSidecar)) require.Equal(t, false, util.WaitTimeout(&wg, 1*time.Second), "Failed to receive pubsub within 1s") } func TestService_BroadcastLightClientOptimisticUpdate(t *testing.T) { params.SetupTestConfigCleanup(t) config := params.BeaconConfig().Copy() config.SyncMessageDueBPS = 60 // ~72 millisecond params.OverrideBeaconConfig(config) p1 := p2ptest.NewTestP2P(t) p2 := p2ptest.NewTestP2P(t) p1.Connect(p2) require.NotEqual(t, 0, len(p1.BHost.Network().Peers())) p := &Service{ host: p1.BHost, pubsub: p1.PubSub(), joinedTopics: map[string]*pubsub.Topic{}, cfg: &Config{}, genesisTime: time.Now().Add(-33 * time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second), // the signature slot of the mock update is 33 genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32), subnetsLock: make(map[uint64]*sync.RWMutex), subnetsLockLock: sync.Mutex{}, peers: peers.NewStatus(t.Context(), &peers.StatusConfig{ ScorerParams: &scorers.Config{}, }), } msg, err := util.MockOptimisticUpdate() require.NoError(t, err) GossipTypeMapping[reflect.TypeOf(msg)] = LightClientOptimisticUpdateTopicFormat topic := fmt.Sprintf(LightClientOptimisticUpdateTopicFormat, params.ForkDigest(slots.ToEpoch(msg.AttestedHeader().Beacon().Slot))) // External peer subscribes to the topic. topic += p.Encoding().ProtocolSuffix() sub, err := p2.SubscribeToTopic(topic) require.NoError(t, err) time.Sleep(50 * time.Millisecond) // libp2p fails without this delay... // Async listen for the pubsub, must be before the broadcast. var wg sync.WaitGroup wg.Add(1) go func(tt *testing.T) { defer wg.Done() ctx, cancel := context.WithTimeout(t.Context(), 150*time.Millisecond) defer cancel() incomingMessage, err := sub.Next(ctx) require.NoError(t, err) slotStartTime, err := slots.StartTime(p.genesisTime, msg.SignatureSlot()) require.NoError(t, err) expectedDelay := slots.ComponentDuration(primitives.BP(params.BeaconConfig().SyncMessageDueBPS)) if time.Now().Before(slotStartTime.Add(expectedDelay)) { tt.Errorf("Message received too early, now %v, expected at least %v", time.Now(), slotStartTime.Add(expectedDelay)) } result := ðpb.LightClientOptimisticUpdateAltair{} require.NoError(t, p.Encoding().DecodeGossip(incomingMessage.Data, result)) if !proto.Equal(result, msg.Proto()) { tt.Errorf("Did not receive expected message, got %+v, wanted %+v", result, msg) } }(t) // Broadcasting nil should fail. ctx := t.Context() require.ErrorContains(t, "attempted to broadcast nil", p.BroadcastLightClientOptimisticUpdate(ctx, nil)) var nilUpdate interfaces.LightClientOptimisticUpdate require.ErrorContains(t, "attempted to broadcast nil", p.BroadcastLightClientOptimisticUpdate(ctx, nilUpdate)) // Broadcast to peers and wait. require.NoError(t, p.BroadcastLightClientOptimisticUpdate(ctx, msg)) if util.WaitTimeout(&wg, 1*time.Second) { t.Error("Failed to receive pubsub within 1s") } } func TestService_BroadcastLightClientFinalityUpdate(t *testing.T) { params.SetupTestConfigCleanup(t) config := params.BeaconConfig().Copy() config.SyncMessageDueBPS = 60 // ~72 millisecond params.OverrideBeaconConfig(config) p1 := p2ptest.NewTestP2P(t) p2 := p2ptest.NewTestP2P(t) p1.Connect(p2) require.NotEqual(t, 0, len(p1.BHost.Network().Peers())) p := &Service{ host: p1.BHost, pubsub: p1.PubSub(), joinedTopics: map[string]*pubsub.Topic{}, cfg: &Config{}, genesisTime: time.Now().Add(-33 * time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second), // the signature slot of the mock update is 33 genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32), subnetsLock: make(map[uint64]*sync.RWMutex), subnetsLockLock: sync.Mutex{}, peers: peers.NewStatus(t.Context(), &peers.StatusConfig{ ScorerParams: &scorers.Config{}, }), } msg, err := util.MockFinalityUpdate() require.NoError(t, err) GossipTypeMapping[reflect.TypeOf(msg)] = LightClientFinalityUpdateTopicFormat topic := fmt.Sprintf(LightClientFinalityUpdateTopicFormat, params.ForkDigest(slots.ToEpoch(msg.AttestedHeader().Beacon().Slot))) // External peer subscribes to the topic. topic += p.Encoding().ProtocolSuffix() sub, err := p2.SubscribeToTopic(topic) require.NoError(t, err) time.Sleep(50 * time.Millisecond) // libp2p fails without this delay... // Async listen for the pubsub, must be before the broadcast. var wg sync.WaitGroup wg.Add(1) go func(tt *testing.T) { defer wg.Done() ctx, cancel := context.WithTimeout(t.Context(), 150*time.Millisecond) defer cancel() incomingMessage, err := sub.Next(ctx) require.NoError(t, err) slotStartTime, err := slots.StartTime(p.genesisTime, msg.SignatureSlot()) require.NoError(t, err) expectedDelay := slots.ComponentDuration(primitives.BP(params.BeaconConfig().SyncMessageDueBPS)) if time.Now().Before(slotStartTime.Add(expectedDelay)) { tt.Errorf("Message received too early, now %v, expected at least %v", time.Now(), slotStartTime.Add(expectedDelay)) } result := ðpb.LightClientFinalityUpdateAltair{} require.NoError(t, p.Encoding().DecodeGossip(incomingMessage.Data, result)) if !proto.Equal(result, msg.Proto()) { tt.Errorf("Did not receive expected message, got %+v, wanted %+v", result, msg) } }(t) // Broadcasting nil should fail. ctx := t.Context() require.ErrorContains(t, "attempted to broadcast nil", p.BroadcastLightClientFinalityUpdate(ctx, nil)) var nilUpdate interfaces.LightClientFinalityUpdate require.ErrorContains(t, "attempted to broadcast nil", p.BroadcastLightClientFinalityUpdate(ctx, nilUpdate)) // Broadcast to peers and wait. require.NoError(t, p.BroadcastLightClientFinalityUpdate(ctx, msg)) if util.WaitTimeout(&wg, 1*time.Second) { t.Error("Failed to receive pubsub within 1s") } } func TestService_BroadcastDataColumn(t *testing.T) { const ( port = 2000 columnIndex = 12 topicFormat = DataColumnSubnetTopicFormat ) ctx := t.Context() // Load the KZG trust setup. err := kzg.Start() require.NoError(t, err) gFlags := new(flags.GlobalFlags) gFlags.MinimumPeersPerSubnet = 1 flags.Init(gFlags) // Reset config. defer flags.Init(new(flags.GlobalFlags)) // Create two peers and connect them. p1, p2 := p2ptest.NewTestP2P(t), p2ptest.NewTestP2P(t) p1.Connect(p2) // Test the peers are connected. require.NotEqual(t, 0, len(p1.BHost.Network().Peers()), "No peers") // Create a host. _, pkey, ipAddr := createHost(t, port) // Create a shared DB for the service db := testDB.SetupDB(t) // Create and close the custody info channel immediately since custodyInfo is already set custodyInfoSet := make(chan struct{}) close(custodyInfoSet) service := &Service{ ctx: ctx, host: p1.BHost, pubsub: p1.PubSub(), joinedTopics: map[string]*pubsub.Topic{}, cfg: &Config{DB: db}, genesisTime: time.Now(), genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32), subnetsLock: make(map[uint64]*sync.RWMutex), subnetsLockLock: sync.Mutex{}, peers: peers.NewStatus(ctx, &peers.StatusConfig{ScorerParams: &scorers.Config{}}), custodyInfo: &custodyInfo{}, custodyInfoSet: custodyInfoSet, } // Create a listener. listener, err := service.startDiscoveryV5(ipAddr, pkey) require.NoError(t, err) service.dv5Listener = listener digest, err := service.currentForkDigest() require.NoError(t, err) subnet := peerdas.ComputeSubnetForDataColumnSidecar(columnIndex) topic := fmt.Sprintf(topicFormat, digest, subnet) + service.Encoding().ProtocolSuffix() _, verifiedRoSidecars := util.CreateTestVerifiedRoDataColumnSidecars(t, []util.DataColumnParam{{Index: columnIndex}}) verifiedRoSidecar := verifiedRoSidecars[0] // Subscribe to the topic. sub, err := p2.SubscribeToTopic(topic) require.NoError(t, err) // libp2p fails without this delay time.Sleep(50 * time.Millisecond) // Broadcast to peers and wait. err = service.BroadcastDataColumnSidecars(ctx, []blocks.VerifiedRODataColumn{verifiedRoSidecar}) require.NoError(t, err) // Receive the message. ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second) defer cancel() msg, err := sub.Next(ctx) require.NoError(t, err) var result ethpb.DataColumnSidecar require.NoError(t, service.Encoding().DecodeGossip(msg.Data, &result)) require.DeepEqual(t, &result, verifiedRoSidecar) }