mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-10 07:58:22 -05:00
QSP-BestPractice: Refactors inline ttfb/resp time constants (#6463)
* refactors inline ttfb/resp time constants
This commit is contained in:
@@ -25,7 +25,6 @@ import (
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/go-bitfield"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
|
||||
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
|
||||
@@ -33,6 +32,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
|
||||
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
|
||||
"github.com/prysmaticlabs/prysm/shared"
|
||||
"github.com/prysmaticlabs/prysm/shared/params"
|
||||
"github.com/prysmaticlabs/prysm/shared/runutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/slotutil"
|
||||
)
|
||||
@@ -203,11 +203,11 @@ func (s *Service) Start() {
|
||||
}
|
||||
|
||||
// Periodic functions.
|
||||
runutil.RunEvery(s.ctx, 5*time.Second, func() {
|
||||
runutil.RunEvery(s.ctx, params.BeaconNetworkConfig().TtfbTimeout, func() {
|
||||
ensurePeerConnections(s.ctx, s.host, peersToWatch...)
|
||||
})
|
||||
runutil.RunEvery(s.ctx, time.Hour, s.Peers().Decay)
|
||||
runutil.RunEvery(s.ctx, 10*time.Second, s.updateMetrics)
|
||||
runutil.RunEvery(s.ctx, params.BeaconNetworkConfig().RespTimeout, s.updateMetrics)
|
||||
runutil.RunEvery(s.ctx, refreshRate, func() {
|
||||
s.RefreshENR()
|
||||
})
|
||||
|
||||
@@ -20,6 +20,9 @@ import (
|
||||
// they don't receive the first byte within 5 seconds.
|
||||
var ttfbTimeout = params.BeaconNetworkConfig().TtfbTimeout
|
||||
|
||||
// respTimeout is the maximum time for complete response transfer.
|
||||
var respTimeout = params.BeaconNetworkConfig().RespTimeout
|
||||
|
||||
// rpcHandler is responsible for handling and responding to any incoming message.
|
||||
// This method may return an error to internal monitoring, but the error will
|
||||
// not be relayed to the peer.
|
||||
|
||||
@@ -25,7 +25,7 @@ func (s *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa
|
||||
log.WithError(err).Error("Failed to close stream")
|
||||
}
|
||||
}()
|
||||
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
ctx, cancel := context.WithTimeout(ctx, respTimeout)
|
||||
defer cancel()
|
||||
SetRPCStreamDeadlines(stream)
|
||||
log := log.WithField("handler", "beacon_blocks_by_range")
|
||||
|
||||
@@ -3,7 +3,6 @@ package sync
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
libp2pcore "github.com/libp2p/go-libp2p-core"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
@@ -17,7 +16,7 @@ import (
|
||||
// sendRecentBeaconBlocksRequest sends a recent beacon blocks request to a peer to get
|
||||
// those corresponding blocks from that peer.
|
||||
func (s *Service) sendRecentBeaconBlocksRequest(ctx context.Context, blockRoots [][32]byte, id peer.ID) error {
|
||||
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
ctx, cancel := context.WithTimeout(ctx, respTimeout)
|
||||
defer cancel()
|
||||
|
||||
stream, err := s.p2p.Send(ctx, blockRoots, p2p.RPCBlocksByRootTopic, id)
|
||||
@@ -62,7 +61,7 @@ func (s *Service) sendRecentBeaconBlocksRequest(ctx context.Context, blockRoots
|
||||
// backward compatible with old Onyx nodes.
|
||||
// TODO(#6408)
|
||||
func (s *Service) sendRecentBeaconBlocksRequestFallback(ctx context.Context, blockRoots [][32]byte, id peer.ID) error {
|
||||
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
ctx, cancel := context.WithTimeout(ctx, respTimeout)
|
||||
defer cancel()
|
||||
req := &pbp2p.BeaconBlocksByRootRequest{}
|
||||
for _, root := range blockRoots {
|
||||
@@ -111,7 +110,7 @@ func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{
|
||||
log.WithError(err).Error("Failed to close stream")
|
||||
}
|
||||
}()
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
ctx, cancel := context.WithTimeout(ctx, ttfbTimeout)
|
||||
defer cancel()
|
||||
SetRPCStreamDeadlines(stream)
|
||||
log := log.WithField("handler", "beacon_blocks_by_root")
|
||||
|
||||
@@ -2,7 +2,6 @@ package sync
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
libp2pcore "github.com/libp2p/go-libp2p-core"
|
||||
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||
@@ -41,7 +40,7 @@ func ReadChunkedBlock(stream libp2pcore.Stream, p2p p2p.P2P) (*eth.SignedBeaconB
|
||||
// readResponseChunk reads the response from the stream and decodes it into the
|
||||
// provided message type.
|
||||
func readResponseChunk(stream libp2pcore.Stream, p2p p2p.P2P, to interface{}) error {
|
||||
SetStreamReadDeadline(stream, 10*time.Second)
|
||||
SetStreamReadDeadline(stream, respTimeout)
|
||||
code, errMsg, err := ReadStatusCode(stream, p2p.Encoding())
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -30,7 +30,7 @@ func (s *Service) goodbyeRPCHandler(ctx context.Context, msg interface{}, stream
|
||||
log.WithError(err).Error("Failed to close stream")
|
||||
}
|
||||
}()
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
ctx, cancel := context.WithTimeout(ctx, ttfbTimeout)
|
||||
defer cancel()
|
||||
SetRPCStreamDeadlines(stream)
|
||||
|
||||
@@ -58,7 +58,7 @@ func (s *Service) sendGoodByeAndDisconnect(ctx context.Context, code uint64, id
|
||||
}
|
||||
|
||||
func (s *Service) sendGoodByeMessage(ctx context.Context, code uint64, id peer.ID) error {
|
||||
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
ctx, cancel := context.WithTimeout(ctx, respTimeout)
|
||||
defer cancel()
|
||||
|
||||
stream, err := s.p2p.Send(ctx, &code, p2p.RPCGoodByeTopic, id)
|
||||
|
||||
@@ -2,7 +2,6 @@ package sync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
libp2pcore "github.com/libp2p/go-libp2p-core"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
@@ -18,7 +17,7 @@ func (s *Service) metaDataHandler(ctx context.Context, msg interface{}, stream l
|
||||
log.WithError(err).Error("Failed to close stream")
|
||||
}
|
||||
}()
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
ctx, cancel := context.WithTimeout(ctx, ttfbTimeout)
|
||||
defer cancel()
|
||||
SetRPCStreamDeadlines(stream)
|
||||
|
||||
@@ -30,7 +29,7 @@ func (s *Service) metaDataHandler(ctx context.Context, msg interface{}, stream l
|
||||
}
|
||||
|
||||
func (s *Service) sendMetaDataRequest(ctx context.Context, id peer.ID) (*pb.MetaData, error) {
|
||||
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
ctx, cancel := context.WithTimeout(ctx, respTimeout)
|
||||
defer cancel()
|
||||
|
||||
stream, err := s.p2p.Send(ctx, new(interface{}), p2p.RPCMetaDataTopic, id)
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
libp2pcore "github.com/libp2p/go-libp2p-core"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
@@ -74,7 +73,7 @@ func (s *Service) pingHandler(ctx context.Context, msg interface{}, stream libp2
|
||||
}
|
||||
|
||||
func (s *Service) sendPingRequest(ctx context.Context, id peer.ID) error {
|
||||
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
ctx, cancel := context.WithTimeout(ctx, respTimeout)
|
||||
defer cancel()
|
||||
|
||||
metadataSeq := s.p2p.MetadataSeq()
|
||||
|
||||
@@ -101,7 +101,7 @@ func (s *Service) shouldReSync() bool {
|
||||
|
||||
// sendRPCStatusRequest for a given topic with an expected protobuf message type.
|
||||
func (s *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error {
|
||||
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
ctx, cancel := context.WithTimeout(ctx, respTimeout)
|
||||
defer cancel()
|
||||
|
||||
headRoot, err := s.chain.HeadRoot(ctx)
|
||||
@@ -178,7 +178,7 @@ func (s *Service) statusRPCHandler(ctx context.Context, msg interface{}, stream
|
||||
log.WithError(err).Error("Failed to close stream")
|
||||
}
|
||||
}()
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
ctx, cancel := context.WithTimeout(ctx, ttfbTimeout)
|
||||
defer cancel()
|
||||
SetRPCStreamDeadlines(stream)
|
||||
log := log.WithField("handler", "status")
|
||||
|
||||
Reference in New Issue
Block a user