protect one peer per topic

This commit is contained in:
aarshkshah1992
2026-01-06 21:31:05 +05:30
parent ea1962bf17
commit 3e7cd8c2f1
4 changed files with 22 additions and 25 deletions

View File

@@ -303,33 +303,31 @@ func (g *GossipPeerDialer) peersForTopic(topic string, targetCount int) []*enode
return newPeers return newPeers
} }
// SoleProviderPeers returns peer IDs that are the sole provider for at least one topic. // ProtectedPeers returns peer IDs that should be protected from pruning.
// A peer is considered a sole provider if it's the only connected peer for a topic (listPeers returns only this peer) // For each topic, one connected peer is marked as protected to ensure
// // we maintain connectivity to all subscribed topics.
// These peers should be protected from pruning since losing them would mean func (g *GossipPeerDialer) ProtectedPeers() []peer.ID {
// losing connectivity to that topic entirely.
func (g *GossipPeerDialer) SoleProviderPeers() []peer.ID {
if g.topicsProvider == nil { if g.topicsProvider == nil {
return nil return nil
} }
topics := g.topicsProvider() topics := g.topicsProvider()
soleProviders := make(map[peer.ID]struct{}) protectedPeers := make(map[peer.ID]struct{})
for topic := range topics { for topic := range topics {
connectedPeers := g.listPeers(topic) connectedPeers := g.listPeers(topic)
// Skip if no peers or more than one connected peer // Skip if no peers connected
if len(connectedPeers) != 1 { if len(connectedPeers) == 0 {
continue continue
} }
// This peer is the sole known provider // Protect the first peer for this topic
soleProviders[connectedPeers[0]] = struct{}{} protectedPeers[connectedPeers[0]] = struct{}{}
} }
result := make([]peer.ID, 0, len(soleProviders)) result := make([]peer.ID, 0, len(protectedPeers))
for pid := range soleProviders { for pid := range protectedPeers {
result = append(result, pid) result = append(result, pid)
} }
return result return result

View File

@@ -442,7 +442,7 @@ func TestGossipPeerDialer_selectPeersForTopics(t *testing.T) {
} }
} }
func TestGossipPeerDialer_SoleProviderPeers(t *testing.T) { func TestGossipPeerDialer_ProtectedPeers(t *testing.T) {
peerA := peer.ID("peerA") peerA := peer.ID("peerA")
peerB := peer.ID("peerB") peerB := peer.ID("peerB")
peerC := peer.ID("peerC") peerC := peer.ID("peerC")
@@ -472,10 +472,10 @@ func TestGossipPeerDialer_SoleProviderPeers(t *testing.T) {
expected: []peer.ID{}, expected: []peer.ID{},
}, },
{ {
name: "multiple peers for all topics", name: "multiple peers for all topics protects first peer from each",
topicsProvider: func() map[string]int { return map[string]int{"topic/a": 2, "topic/b": 2} }, topicsProvider: func() map[string]int { return map[string]int{"topic/a": 2, "topic/b": 2} },
connectedPeers: map[string][]peer.ID{"topic/a": {peerA, peerB}, "topic/b": {peerB, peerC}}, connectedPeers: map[string][]peer.ID{"topic/a": {peerA, peerB}, "topic/b": {peerB, peerC}},
expected: []peer.ID{}, expected: []peer.ID{peerA, peerB},
}, },
{ {
name: "single peer for one topic", name: "single peer for one topic",
@@ -484,22 +484,22 @@ func TestGossipPeerDialer_SoleProviderPeers(t *testing.T) {
expected: []peer.ID{peerA}, expected: []peer.ID{peerA},
}, },
{ {
name: "same peer is sole provider for multiple topics", name: "same peer is first for multiple topics",
topicsProvider: func() map[string]int { return map[string]int{"topic/a": 1, "topic/b": 1} }, topicsProvider: func() map[string]int { return map[string]int{"topic/a": 1, "topic/b": 1} },
connectedPeers: map[string][]peer.ID{"topic/a": {peerA}, "topic/b": {peerA}}, connectedPeers: map[string][]peer.ID{"topic/a": {peerA}, "topic/b": {peerA}},
expected: []peer.ID{peerA}, expected: []peer.ID{peerA},
}, },
{ {
name: "different sole providers for different topics", name: "different first peers for different topics",
topicsProvider: func() map[string]int { return map[string]int{"topic/a": 1, "topic/b": 1} }, topicsProvider: func() map[string]int { return map[string]int{"topic/a": 1, "topic/b": 1} },
connectedPeers: map[string][]peer.ID{"topic/a": {peerA}, "topic/b": {peerB}}, connectedPeers: map[string][]peer.ID{"topic/a": {peerA}, "topic/b": {peerB}},
expected: []peer.ID{peerA, peerB}, expected: []peer.ID{peerA, peerB},
}, },
{ {
name: "mix of single and multiple peers", name: "protects first peer from each topic",
topicsProvider: func() map[string]int { return map[string]int{"topic/a": 1, "topic/b": 2, "topic/c": 1} }, topicsProvider: func() map[string]int { return map[string]int{"topic/a": 1, "topic/b": 2, "topic/c": 1} },
connectedPeers: map[string][]peer.ID{"topic/a": {peerA}, "topic/b": {peerB, peerC}, "topic/c": {peerC}}, connectedPeers: map[string][]peer.ID{"topic/a": {peerA}, "topic/b": {peerB, peerC}, "topic/c": {peerC}},
expected: []peer.ID{peerA, peerC}, expected: []peer.ID{peerA, peerB, peerC},
}, },
} }
@@ -514,7 +514,7 @@ func TestGossipPeerDialer_SoleProviderPeers(t *testing.T) {
listPeers: listPeers, listPeers: listPeers,
} }
got := dialer.SoleProviderPeers() got := dialer.ProtectedPeers()
if tt.expected == nil { if tt.expected == nil {
require.Nil(t, got) require.Nil(t, got)

View File

@@ -33,5 +33,5 @@ type SubnetTopicsProvider func() map[string]int
type GossipDialer interface { type GossipDialer interface {
Start(provider SubnetTopicsProvider) error Start(provider SubnetTopicsProvider) error
DialPeersForTopicBlocking(ctx context.Context, topic string, nPeers int) error DialPeersForTopicBlocking(ctx context.Context, topic string, nPeers int) error
SoleProviderPeers() []peer.ID ProtectedPeers() []peer.ID
} }

View File

@@ -248,9 +248,8 @@ func (s *Service) filterNeededPeers(pids []peer.ID) []peer.ID {
dialer := s.cfg.p2p.GossipDialer() dialer := s.cfg.p2p.GossipDialer()
if dialer != nil { if dialer != nil {
// Protect peers that are the sole provider for any gossip topic. // ask the dialer for peers that should be protected from pruning.
// These peers should not be pruned since we have no alternative. for _, pid := range dialer.ProtectedPeers() {
for _, pid := range dialer.SoleProviderPeers() {
peerMap[pid] = true peerMap[pid] = true
} }
} }