mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-08 21:08:10 -05:00
Ping handler timeout (#6302)
* Fix goroutine using cancelled context. * Only close stream when finished * Simplify function structure
This commit is contained in:
@@ -13,40 +13,63 @@ import (
|
||||
|
||||
// pingHandler reads the incoming ping rpc message from the peer.
|
||||
func (r *Service) pingHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error {
|
||||
defer func() {
|
||||
if err := stream.Close(); err != nil {
|
||||
log.WithError(err).Error("Failed to close stream")
|
||||
}
|
||||
}()
|
||||
_, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
setRPCStreamDeadlines(stream)
|
||||
|
||||
m, ok := msg.(*uint64)
|
||||
if !ok {
|
||||
if err := stream.Close(); err != nil {
|
||||
log.WithError(err).Error("Failed to close stream")
|
||||
}
|
||||
return fmt.Errorf("wrong message type for ping, got %T, wanted *uint64", msg)
|
||||
}
|
||||
valid, err := r.validateSequenceNum(*m, stream.Conn().RemotePeer())
|
||||
if err != nil {
|
||||
if err := stream.Close(); err != nil {
|
||||
log.WithError(err).Error("Failed to close stream")
|
||||
}
|
||||
return err
|
||||
}
|
||||
if !valid {
|
||||
// send metadata request in a new routine and stream.
|
||||
go func() {
|
||||
md, err := r.sendMetaDataRequest(ctx, stream.Conn().RemotePeer())
|
||||
if err != nil {
|
||||
log.WithField("peer", stream.Conn().RemotePeer()).WithError(err).Debug("Failed to send metadata request")
|
||||
return
|
||||
}
|
||||
// update metadata if there is no error
|
||||
r.p2p.Peers().SetMetadata(stream.Conn().RemotePeer(), md)
|
||||
}()
|
||||
}
|
||||
if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil {
|
||||
if err := stream.Close(); err != nil {
|
||||
log.WithError(err).Error("Failed to close stream")
|
||||
}
|
||||
return err
|
||||
}
|
||||
_, err = r.p2p.Encoding().EncodeWithLength(stream, r.p2p.MetadataSeq())
|
||||
return err
|
||||
if _, err := r.p2p.Encoding().EncodeWithLength(stream, r.p2p.MetadataSeq()); err != nil {
|
||||
if err := stream.Close(); err != nil {
|
||||
log.WithError(err).Error("Failed to close stream")
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
if valid {
|
||||
// If the sequence number was valid we're done.
|
||||
if err := stream.Close(); err != nil {
|
||||
log.WithError(err).Error("Failed to close stream")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// The sequence number was not valid. Start our own ping back to the peer.
|
||||
go func() {
|
||||
defer func() {
|
||||
if err := stream.Close(); err != nil {
|
||||
log.WithError(err).Error("Failed to close stream")
|
||||
}
|
||||
}()
|
||||
// New context so the calling function doesn't cancel on us.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), ttfbTimeout)
|
||||
defer cancel()
|
||||
md, err := r.sendMetaDataRequest(ctx, stream.Conn().RemotePeer())
|
||||
if err != nil {
|
||||
log.WithField("peer", stream.Conn().RemotePeer()).WithError(err).Debug("Failed to send metadata request")
|
||||
return
|
||||
}
|
||||
// update metadata if there is no error
|
||||
r.p2p.Peers().SetMetadata(stream.Conn().RemotePeer(), md)
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Service) sendPingRequest(ctx context.Context, id peer.ID) error {
|
||||
|
||||
Reference in New Issue
Block a user