From ceaceeb33cb92f8a3dbad8ec7ef12150382e629d Mon Sep 17 00:00:00 2001 From: Nishant Das Date: Mon, 1 Jun 2020 16:14:47 +0800 Subject: [PATCH] Reset Streams Properly (#6066) * reset streams * fix build --- beacon-chain/p2p/handshake.go | 5 ----- beacon-chain/sync/initial-sync/blocks_fetcher.go | 4 ++-- beacon-chain/sync/rpc_beacon_blocks_by_root.go | 5 +++++ beacon-chain/sync/rpc_goodbye.go | 11 ++++++++--- beacon-chain/sync/rpc_metadata.go | 4 ++-- beacon-chain/sync/rpc_ping.go | 5 +++++ beacon-chain/sync/rpc_status.go | 5 +++++ 7 files changed, 27 insertions(+), 12 deletions(-) diff --git a/beacon-chain/p2p/handshake.go b/beacon-chain/p2p/handshake.go index ad47eac6a2..7339d9fc1b 100644 --- a/beacon-chain/p2p/handshake.go +++ b/beacon-chain/p2p/handshake.go @@ -15,8 +15,6 @@ import ( ) const ( - // The time to wait before disconnecting a peer. - flushDuration = 50 * time.Millisecond // The time to wait for a status request. timeForStatus = 10 * time.Second ) @@ -85,9 +83,6 @@ func (s *Service) AddConnectionHandler(reqFunc func(ctx context.Context, id peer if err := goodbyeFunc(context.Background(), remotePeer); err != nil { log.WithError(err).Trace("Unable to send goodbye message to peer") } - // Add a short delay to allow the stream to flush before closing the connection. - // There is still a chance that the peer won't receive the message. - time.Sleep(flushDuration) disconnectFromPeer() return } diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher.go b/beacon-chain/sync/initial-sync/blocks_fetcher.go index f35f50ea13..5f1856fbe4 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher.go @@ -458,8 +458,8 @@ func (f *blocksFetcher) requestBlocks( return nil, err } defer func() { - if err := stream.Close(); err != nil { - log.WithError(err).Error("Failed to close stream") + if err := stream.Reset(); err != nil { + log.WithError(err).Errorf("Failed to close stream with protocol %s", stream.Protocol()) } }() diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_root.go b/beacon-chain/sync/rpc_beacon_blocks_by_root.go index d7dc019a21..cc4b335828 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_root.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_root.go @@ -22,6 +22,11 @@ func (r *Service) sendRecentBeaconBlocksRequest(ctx context.Context, blockRoots if err != nil { return err } + defer func() { + if err := stream.Reset(); err != nil { + log.WithError(err).Errorf("Failed to reset stream with protocol %s", stream.Protocol()) + } + }() for i := 0; i < len(blockRoots); i++ { blk, err := ReadChunkedBlock(stream, r.p2p) if err == io.EOF { diff --git a/beacon-chain/sync/rpc_goodbye.go b/beacon-chain/sync/rpc_goodbye.go index 5fa3c1c33b..e4be06c107 100644 --- a/beacon-chain/sync/rpc_goodbye.go +++ b/beacon-chain/sync/rpc_goodbye.go @@ -51,9 +51,6 @@ func (r *Service) sendGoodByeAndDisconnect(ctx context.Context, code uint64, id "peer": id, }).Debug("Could not send goodbye message to peer") } - // Add a short delay to allow the stream to flush before closing the connection. - // There is still a chance that the peer won't receive the message. - time.Sleep(50 * time.Millisecond) if err := r.p2p.Disconnect(id); err != nil { return err } @@ -68,8 +65,16 @@ func (r *Service) sendGoodByeMessage(ctx context.Context, code uint64, id peer.I if err != nil { return err } + defer func() { + if err := stream.Reset(); err != nil { + log.WithError(err).Errorf("Failed to reset stream with protocol %s", stream.Protocol()) + } + }() log := log.WithField("Reason", goodbyeMessage(code)) log.WithField("peer", stream.Conn().RemotePeer()).Debug("Sending Goodbye message to peer") + // Add a short delay to allow the stream to flush before resetting it. + // There is still a chance that the peer won't receive the message. + time.Sleep(50 * time.Millisecond) return nil } diff --git a/beacon-chain/sync/rpc_metadata.go b/beacon-chain/sync/rpc_metadata.go index ae5a6ba2be..a174923c5f 100644 --- a/beacon-chain/sync/rpc_metadata.go +++ b/beacon-chain/sync/rpc_metadata.go @@ -41,8 +41,8 @@ func (r *Service) sendMetaDataRequest(ctx context.Context, id peer.ID) (*pb.Meta // metadata requests send no payload, so closing the // stream early leads it to a reset. defer func() { - if err := stream.Close(); err != nil { - log.WithError(err).Error("Failed to close stream") + if err := stream.Reset(); err != nil { + log.WithError(err).Errorf("Failed to reset stream for protocol %s", stream.Protocol()) } }() code, errMsg, err := ReadStatusCode(stream, r.p2p.Encoding()) diff --git a/beacon-chain/sync/rpc_ping.go b/beacon-chain/sync/rpc_ping.go index 61e8f54be7..1996d88cce 100644 --- a/beacon-chain/sync/rpc_ping.go +++ b/beacon-chain/sync/rpc_ping.go @@ -58,6 +58,11 @@ func (r *Service) sendPingRequest(ctx context.Context, id peer.ID) error { if err != nil { return err } + defer func() { + if err := stream.Reset(); err != nil { + log.WithError(err).Errorf("Failed to reset stream with protocol %s", stream.Protocol()) + } + }() code, errMsg, err := ReadStatusCode(stream, r.p2p.Encoding()) if err != nil { diff --git a/beacon-chain/sync/rpc_status.go b/beacon-chain/sync/rpc_status.go index 1f0e74acd5..2162d1b22d 100644 --- a/beacon-chain/sync/rpc_status.go +++ b/beacon-chain/sync/rpc_status.go @@ -95,6 +95,11 @@ func (r *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error { if err != nil { return err } + defer func() { + if err := stream.Reset(); err != nil { + log.WithError(err).Errorf("Failed to reset stream with protocol %s", stream.Protocol()) + } + }() code, errMsg, err := ReadStatusCode(stream, r.p2p.Encoding()) if err != nil {