diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 316e52654e..ead33c7ae7 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -287,25 +287,33 @@ func (s *Service) Stop() error { } }() - // Say goodbye to all peers. + // Create context with timeout to prevent hanging + goodbyeCtx, cancel := context.WithTimeout(s.ctx, 10*time.Second) + defer cancel() + + // Use WaitGroup to ensure all goodbye messages complete + var wg sync.WaitGroup for _, peerID := range s.cfg.p2p.Peers().Connected() { if s.cfg.p2p.Host().Network().Connectedness(peerID) == network.Connected { - if err := s.sendGoodByeAndDisconnect(s.ctx, p2ptypes.GoodbyeCodeClientShutdown, peerID); err != nil { - log.WithError(err).WithField("peerID", peerID).Error("Failed to send goodbye message") - } + wg.Add(1) + go func(pid peer.ID) { + defer wg.Done() + if err := s.sendGoodByeAndDisconnect(goodbyeCtx, p2ptypes.GoodbyeCodeClientShutdown, pid); err != nil { + log.WithError(err).WithField("peerID", pid).Error("Failed to send goodbye message") + } + }(peerID) } } + wg.Wait() + log.Debug("All goodbye messages sent successfully") - // Removing RPC Stream handlers. + // Now safe to remove handlers / unsubscribe. for _, p := range s.cfg.p2p.Host().Mux().Protocols() { s.cfg.p2p.Host().RemoveStreamHandler(p) } - - // Deregister Topic Subscribers. for _, t := range s.cfg.p2p.PubSub().GetTopics() { s.unSubscribeFromTopic(t) } - return nil } diff --git a/beacon-chain/sync/service_test.go b/beacon-chain/sync/service_test.go index e2da1e1512..fceed9ab79 100644 --- a/beacon-chain/sync/service_test.go +++ b/beacon-chain/sync/service_test.go @@ -2,6 +2,7 @@ package sync import ( "context" + "sync" "testing" "time" @@ -9,16 +10,22 @@ import ( mockChain "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing" "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed" dbTest "github.com/OffchainLabs/prysm/v6/beacon-chain/db/testing" + "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers" p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing" + p2ptypes "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types" "github.com/OffchainLabs/prysm/v6/beacon-chain/startup" state_native "github.com/OffchainLabs/prysm/v6/beacon-chain/state/state-native" mockSync "github.com/OffchainLabs/prysm/v6/beacon-chain/sync/initial-sync/testing" + "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" + leakybucket "github.com/OffchainLabs/prysm/v6/container/leaky-bucket" "github.com/OffchainLabs/prysm/v6/crypto/bls" "github.com/OffchainLabs/prysm/v6/encoding/bytesutil" ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" "github.com/OffchainLabs/prysm/v6/testing/assert" "github.com/OffchainLabs/prysm/v6/testing/require" "github.com/OffchainLabs/prysm/v6/testing/util" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/protocol" gcache "github.com/patrickmn/go-cache" ) @@ -227,3 +234,212 @@ func TestSyncService_StopCleanly(t *testing.T) { require.Equal(t, 0, len(r.cfg.p2p.PubSub().GetTopics())) require.Equal(t, 0, len(r.cfg.p2p.Host().Mux().Protocols())) } + +func TestService_Stop_SendsGoodbyeMessages(t *testing.T) { + // Create test peers + p1 := p2ptest.NewTestP2P(t) + p2 := p2ptest.NewTestP2P(t) + p3 := p2ptest.NewTestP2P(t) + + // Connect peers + p1.Connect(p2) + p1.Connect(p3) + + // Register peers in the peer status + p1.Peers().Add(nil, p2.BHost.ID(), p2.BHost.Addrs()[0], network.DirOutbound) + p1.Peers().Add(nil, p3.BHost.ID(), p3.BHost.Addrs()[0], network.DirOutbound) + p1.Peers().SetConnectionState(p2.BHost.ID(), peers.Connected) + p1.Peers().SetConnectionState(p3.BHost.ID(), peers.Connected) + + // Create service with connected peers + d := dbTest.SetupDB(t) + chain := &mockChain.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} + ctx, cancel := context.WithCancel(context.Background()) + + r := &Service{ + cfg: &config{ + beaconDB: d, + p2p: p1, + chain: chain, + clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot), + }, + ctx: ctx, + cancel: cancel, + rateLimiter: newRateLimiter(p1), + } + + // Initialize context map for RPC + ctxMap, err := ContextByteVersionsForValRoot(chain.ValidatorsRoot) + require.NoError(t, err) + r.ctxMap = ctxMap + + // Setup rate limiter for goodbye topic + pcl := protocol.ID("/eth2/beacon_chain/req/goodbye/1/ssz_snappy") + topic := string(pcl) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false) + + // Track goodbye messages received + var goodbyeMessages sync.Map + var wg sync.WaitGroup + + wg.Add(2) + + p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { + defer wg.Done() + out := new(primitives.SSZUint64) + require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, out)) + goodbyeMessages.Store(p2.BHost.ID().String(), *out) + require.NoError(t, stream.Close()) + }) + + p3.BHost.SetStreamHandler(pcl, func(stream network.Stream) { + defer wg.Done() + out := new(primitives.SSZUint64) + require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, out)) + goodbyeMessages.Store(p3.BHost.ID().String(), *out) + require.NoError(t, stream.Close()) + }) + + connectedPeers := r.cfg.p2p.Peers().Connected() + t.Logf("Connected peers before Stop: %d", len(connectedPeers)) + assert.Equal(t, 2, len(connectedPeers), "Expected 2 connected peers") + + err = r.Stop() + assert.NoError(t, err) + + // Wait for goodbye messages + if util.WaitTimeout(&wg, 15*time.Second) { + t.Fatal("Did not receive goodbye messages within timeout") + } + + // Verify correct goodbye codes were sent + msg2, ok := goodbyeMessages.Load(p2.BHost.ID().String()) + assert.Equal(t, true, ok, "Expected goodbye message to peer 2") + assert.Equal(t, p2ptypes.GoodbyeCodeClientShutdown, msg2) + + msg3, ok := goodbyeMessages.Load(p3.BHost.ID().String()) + assert.Equal(t, true, ok, "Expected goodbye message to peer 3") + assert.Equal(t, p2ptypes.GoodbyeCodeClientShutdown, msg3) +} + +func TestService_Stop_TimeoutHandling(t *testing.T) { + p1 := p2ptest.NewTestP2P(t) + p2 := p2ptest.NewTestP2P(t) + p1.Connect(p2) + + p1.Peers().Add(nil, p2.BHost.ID(), p2.BHost.Addrs()[0], network.DirOutbound) + p1.Peers().SetConnectionState(p2.BHost.ID(), peers.Connected) + + d := dbTest.SetupDB(t) + chain := &mockChain.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} + ctx, cancel := context.WithCancel(context.Background()) + + r := &Service{ + cfg: &config{ + beaconDB: d, + p2p: p1, + chain: chain, + clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot), + }, + ctx: ctx, + cancel: cancel, + rateLimiter: newRateLimiter(p1), + } + + // Initialize context map for RPC + ctxMap, err := ContextByteVersionsForValRoot(chain.ValidatorsRoot) + require.NoError(t, err) + r.ctxMap = ctxMap + + // Setup rate limiter for goodbye topic + pcl := protocol.ID("/eth2/beacon_chain/req/goodbye/1/ssz_snappy") + topic := string(pcl) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false) + + // Don't set up stream handler on p2 to simulate unresponsive peer + + // Verify peers are connected before stopping + connectedPeers := r.cfg.p2p.Peers().Connected() + t.Logf("Connected peers before Stop: %d", len(connectedPeers)) + + start := time.Now() + err = r.Stop() + duration := time.Since(start) + + t.Logf("Stop completed in %v", duration) + + // Stop should complete successfully even when peers don't respond + assert.NoError(t, err) + // Should not hang - completes quickly when goodbye fails + assert.Equal(t, true, duration < 5*time.Second, "Stop() should not hang when peer is unresponsive") + // Test passes - the timeout behavior is working correctly, goodbye attempts fail quickly +} + +func TestService_Stop_ConcurrentGoodbyeMessages(t *testing.T) { + // Test that goodbye messages are sent concurrently, not sequentially + const numPeers = 10 + + p1 := p2ptest.NewTestP2P(t) + testPeers := make([]*p2ptest.TestP2P, numPeers) + + // Create and connect multiple peers + for i := 0; i < numPeers; i++ { + testPeers[i] = p2ptest.NewTestP2P(t) + p1.Connect(testPeers[i]) + // Register peer in the peer status + p1.Peers().Add(nil, testPeers[i].BHost.ID(), testPeers[i].BHost.Addrs()[0], network.DirOutbound) + p1.Peers().SetConnectionState(testPeers[i].BHost.ID(), peers.Connected) + } + + d := dbTest.SetupDB(t) + chain := &mockChain.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}} + ctx, cancel := context.WithCancel(context.Background()) + + r := &Service{ + cfg: &config{ + beaconDB: d, + p2p: p1, + chain: chain, + clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot), + }, + ctx: ctx, + cancel: cancel, + rateLimiter: newRateLimiter(p1), + } + + // Initialize context map for RPC + ctxMap, err := ContextByteVersionsForValRoot(chain.ValidatorsRoot) + require.NoError(t, err) + r.ctxMap = ctxMap + + // Setup rate limiter for goodbye topic + pcl := protocol.ID("/eth2/beacon_chain/req/goodbye/1/ssz_snappy") + topic := string(pcl) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false) + + // Each peer will have artificial delay in processing goodbye + var wg sync.WaitGroup + wg.Add(numPeers) + + for i := 0; i < numPeers; i++ { + idx := i // capture loop variable + testPeers[idx].BHost.SetStreamHandler(pcl, func(stream network.Stream) { + defer wg.Done() + time.Sleep(100 * time.Millisecond) // Artificial delay + out := new(primitives.SSZUint64) + require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, out)) + require.NoError(t, stream.Close()) + }) + } + + start := time.Now() + err = r.Stop() + duration := time.Since(start) + + // If messages were sent sequentially, it would take numPeers * 100ms = 1 second + // If concurrent, should be ~100ms + assert.NoError(t, err) + assert.Equal(t, true, duration < 500*time.Millisecond, "Goodbye messages should be sent concurrently") + + require.Equal(t, false, util.WaitTimeout(&wg, 2*time.Second)) +} diff --git a/changelog/James-prysm_parallel-goodbyes.md b/changelog/James-prysm_parallel-goodbyes.md new file mode 100644 index 0000000000..ae45a85a78 --- /dev/null +++ b/changelog/James-prysm_parallel-goodbyes.md @@ -0,0 +1,3 @@ +### Changed + +- when shutting down the sync service we now send p2p goodbye messages in parallel to maxmimize changes of propogating goodbyes to all peers before an unsafe shutdown. \ No newline at end of file