wrapping goodbye messages in goroutine to speed up node shutdown (#15542)

* wrapping goodbye messages in goroutine to speed up node shutdown

* fixing requirement
This commit is contained in:
james-prysm
2025-07-31 07:52:31 -05:00
committed by GitHub
parent d7d8764a91
commit bd6b4ecd5b
3 changed files with 235 additions and 8 deletions

View File

@@ -287,25 +287,33 @@ func (s *Service) Stop() error {
}
}()
// Say goodbye to all peers.
// Create context with timeout to prevent hanging
goodbyeCtx, cancel := context.WithTimeout(s.ctx, 10*time.Second)
defer cancel()
// Use WaitGroup to ensure all goodbye messages complete
var wg sync.WaitGroup
for _, peerID := range s.cfg.p2p.Peers().Connected() {
if s.cfg.p2p.Host().Network().Connectedness(peerID) == network.Connected {
if err := s.sendGoodByeAndDisconnect(s.ctx, p2ptypes.GoodbyeCodeClientShutdown, peerID); err != nil {
log.WithError(err).WithField("peerID", peerID).Error("Failed to send goodbye message")
}
wg.Add(1)
go func(pid peer.ID) {
defer wg.Done()
if err := s.sendGoodByeAndDisconnect(goodbyeCtx, p2ptypes.GoodbyeCodeClientShutdown, pid); err != nil {
log.WithError(err).WithField("peerID", pid).Error("Failed to send goodbye message")
}
}(peerID)
}
}
wg.Wait()
log.Debug("All goodbye messages sent successfully")
// Removing RPC Stream handlers.
// Now safe to remove handlers / unsubscribe.
for _, p := range s.cfg.p2p.Host().Mux().Protocols() {
s.cfg.p2p.Host().RemoveStreamHandler(p)
}
// Deregister Topic Subscribers.
for _, t := range s.cfg.p2p.PubSub().GetTopics() {
s.unSubscribeFromTopic(t)
}
return nil
}

View File

@@ -2,6 +2,7 @@ package sync
import (
"context"
"sync"
"testing"
"time"
@@ -9,16 +10,22 @@ import (
mockChain "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed"
dbTest "github.com/OffchainLabs/prysm/v6/beacon-chain/db/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
p2ptypes "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types"
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
state_native "github.com/OffchainLabs/prysm/v6/beacon-chain/state/state-native"
mockSync "github.com/OffchainLabs/prysm/v6/beacon-chain/sync/initial-sync/testing"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
leakybucket "github.com/OffchainLabs/prysm/v6/container/leaky-bucket"
"github.com/OffchainLabs/prysm/v6/crypto/bls"
"github.com/OffchainLabs/prysm/v6/encoding/bytesutil"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v6/testing/assert"
"github.com/OffchainLabs/prysm/v6/testing/require"
"github.com/OffchainLabs/prysm/v6/testing/util"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
gcache "github.com/patrickmn/go-cache"
)
@@ -227,3 +234,212 @@ func TestSyncService_StopCleanly(t *testing.T) {
require.Equal(t, 0, len(r.cfg.p2p.PubSub().GetTopics()))
require.Equal(t, 0, len(r.cfg.p2p.Host().Mux().Protocols()))
}
func TestService_Stop_SendsGoodbyeMessages(t *testing.T) {
// Create test peers
p1 := p2ptest.NewTestP2P(t)
p2 := p2ptest.NewTestP2P(t)
p3 := p2ptest.NewTestP2P(t)
// Connect peers
p1.Connect(p2)
p1.Connect(p3)
// Register peers in the peer status
p1.Peers().Add(nil, p2.BHost.ID(), p2.BHost.Addrs()[0], network.DirOutbound)
p1.Peers().Add(nil, p3.BHost.ID(), p3.BHost.Addrs()[0], network.DirOutbound)
p1.Peers().SetConnectionState(p2.BHost.ID(), peers.Connected)
p1.Peers().SetConnectionState(p3.BHost.ID(), peers.Connected)
// Create service with connected peers
d := dbTest.SetupDB(t)
chain := &mockChain.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
ctx, cancel := context.WithCancel(context.Background())
r := &Service{
cfg: &config{
beaconDB: d,
p2p: p1,
chain: chain,
clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot),
},
ctx: ctx,
cancel: cancel,
rateLimiter: newRateLimiter(p1),
}
// Initialize context map for RPC
ctxMap, err := ContextByteVersionsForValRoot(chain.ValidatorsRoot)
require.NoError(t, err)
r.ctxMap = ctxMap
// Setup rate limiter for goodbye topic
pcl := protocol.ID("/eth2/beacon_chain/req/goodbye/1/ssz_snappy")
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false)
// Track goodbye messages received
var goodbyeMessages sync.Map
var wg sync.WaitGroup
wg.Add(2)
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
defer wg.Done()
out := new(primitives.SSZUint64)
require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, out))
goodbyeMessages.Store(p2.BHost.ID().String(), *out)
require.NoError(t, stream.Close())
})
p3.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
defer wg.Done()
out := new(primitives.SSZUint64)
require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, out))
goodbyeMessages.Store(p3.BHost.ID().String(), *out)
require.NoError(t, stream.Close())
})
connectedPeers := r.cfg.p2p.Peers().Connected()
t.Logf("Connected peers before Stop: %d", len(connectedPeers))
assert.Equal(t, 2, len(connectedPeers), "Expected 2 connected peers")
err = r.Stop()
assert.NoError(t, err)
// Wait for goodbye messages
if util.WaitTimeout(&wg, 15*time.Second) {
t.Fatal("Did not receive goodbye messages within timeout")
}
// Verify correct goodbye codes were sent
msg2, ok := goodbyeMessages.Load(p2.BHost.ID().String())
assert.Equal(t, true, ok, "Expected goodbye message to peer 2")
assert.Equal(t, p2ptypes.GoodbyeCodeClientShutdown, msg2)
msg3, ok := goodbyeMessages.Load(p3.BHost.ID().String())
assert.Equal(t, true, ok, "Expected goodbye message to peer 3")
assert.Equal(t, p2ptypes.GoodbyeCodeClientShutdown, msg3)
}
func TestService_Stop_TimeoutHandling(t *testing.T) {
p1 := p2ptest.NewTestP2P(t)
p2 := p2ptest.NewTestP2P(t)
p1.Connect(p2)
p1.Peers().Add(nil, p2.BHost.ID(), p2.BHost.Addrs()[0], network.DirOutbound)
p1.Peers().SetConnectionState(p2.BHost.ID(), peers.Connected)
d := dbTest.SetupDB(t)
chain := &mockChain.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
ctx, cancel := context.WithCancel(context.Background())
r := &Service{
cfg: &config{
beaconDB: d,
p2p: p1,
chain: chain,
clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot),
},
ctx: ctx,
cancel: cancel,
rateLimiter: newRateLimiter(p1),
}
// Initialize context map for RPC
ctxMap, err := ContextByteVersionsForValRoot(chain.ValidatorsRoot)
require.NoError(t, err)
r.ctxMap = ctxMap
// Setup rate limiter for goodbye topic
pcl := protocol.ID("/eth2/beacon_chain/req/goodbye/1/ssz_snappy")
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false)
// Don't set up stream handler on p2 to simulate unresponsive peer
// Verify peers are connected before stopping
connectedPeers := r.cfg.p2p.Peers().Connected()
t.Logf("Connected peers before Stop: %d", len(connectedPeers))
start := time.Now()
err = r.Stop()
duration := time.Since(start)
t.Logf("Stop completed in %v", duration)
// Stop should complete successfully even when peers don't respond
assert.NoError(t, err)
// Should not hang - completes quickly when goodbye fails
assert.Equal(t, true, duration < 5*time.Second, "Stop() should not hang when peer is unresponsive")
// Test passes - the timeout behavior is working correctly, goodbye attempts fail quickly
}
func TestService_Stop_ConcurrentGoodbyeMessages(t *testing.T) {
// Test that goodbye messages are sent concurrently, not sequentially
const numPeers = 10
p1 := p2ptest.NewTestP2P(t)
testPeers := make([]*p2ptest.TestP2P, numPeers)
// Create and connect multiple peers
for i := 0; i < numPeers; i++ {
testPeers[i] = p2ptest.NewTestP2P(t)
p1.Connect(testPeers[i])
// Register peer in the peer status
p1.Peers().Add(nil, testPeers[i].BHost.ID(), testPeers[i].BHost.Addrs()[0], network.DirOutbound)
p1.Peers().SetConnectionState(testPeers[i].BHost.ID(), peers.Connected)
}
d := dbTest.SetupDB(t)
chain := &mockChain.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
ctx, cancel := context.WithCancel(context.Background())
r := &Service{
cfg: &config{
beaconDB: d,
p2p: p1,
chain: chain,
clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot),
},
ctx: ctx,
cancel: cancel,
rateLimiter: newRateLimiter(p1),
}
// Initialize context map for RPC
ctxMap, err := ContextByteVersionsForValRoot(chain.ValidatorsRoot)
require.NoError(t, err)
r.ctxMap = ctxMap
// Setup rate limiter for goodbye topic
pcl := protocol.ID("/eth2/beacon_chain/req/goodbye/1/ssz_snappy")
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false)
// Each peer will have artificial delay in processing goodbye
var wg sync.WaitGroup
wg.Add(numPeers)
for i := 0; i < numPeers; i++ {
idx := i // capture loop variable
testPeers[idx].BHost.SetStreamHandler(pcl, func(stream network.Stream) {
defer wg.Done()
time.Sleep(100 * time.Millisecond) // Artificial delay
out := new(primitives.SSZUint64)
require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, out))
require.NoError(t, stream.Close())
})
}
start := time.Now()
err = r.Stop()
duration := time.Since(start)
// If messages were sent sequentially, it would take numPeers * 100ms = 1 second
// If concurrent, should be ~100ms
assert.NoError(t, err)
assert.Equal(t, true, duration < 500*time.Millisecond, "Goodbye messages should be sent concurrently")
require.Equal(t, false, util.WaitTimeout(&wg, 2*time.Second))
}

View File

@@ -0,0 +1,3 @@
### Changed
- when shutting down the sync service we now send p2p goodbye messages in parallel to maxmimize changes of propogating goodbyes to all peers before an unsafe shutdown.