mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-26 23:58:46 -05:00
feat(network): Added Option for dispatching range updates to remote peer (#16776)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
@@ -43,10 +43,16 @@ use tokio_stream::wrappers::ReceiverStream;
|
||||
use tokio_util::sync::PollSender;
|
||||
use tracing::{debug, trace};
|
||||
|
||||
/// The recommended interval at which a new range update should be sent to the remote peer.
|
||||
///
|
||||
/// This is set to 120 seconds (2 minutes) as per the Ethereum specification for eth69.
|
||||
pub(super) const RANGE_UPDATE_INTERVAL: Duration = Duration::from_secs(120);
|
||||
|
||||
// Constants for timeout updating.
|
||||
|
||||
/// Minimum timeout value
|
||||
const MINIMUM_TIMEOUT: Duration = Duration::from_secs(2);
|
||||
|
||||
/// Maximum timeout value
|
||||
const MAXIMUM_TIMEOUT: Duration = INITIAL_REQUEST_TIMEOUT;
|
||||
/// How much the new measurements affect the current timeout (X percent)
|
||||
@@ -119,6 +125,9 @@ pub(crate) struct ActiveSession<N: NetworkPrimitives> {
|
||||
/// The eth69 range info for the local node (this node).
|
||||
/// This represents the range of blocks that this node can serve to other peers.
|
||||
pub(crate) local_range_info: BlockRangeInfo,
|
||||
/// Optional interval for sending periodic range updates to the remote peer (eth69+)
|
||||
/// Recommended frequency is ~2 minutes per spec
|
||||
pub(crate) range_update_interval: Option<Interval>,
|
||||
}
|
||||
|
||||
impl<N: NetworkPrimitives> ActiveSession<N> {
|
||||
@@ -707,6 +716,15 @@ impl<N: NetworkPrimitives> Future for ActiveSession<N> {
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(interval) = &mut this.range_update_interval {
|
||||
// queue in new range updates if the interval is ready
|
||||
while interval.poll_tick(cx).is_ready() {
|
||||
this.queued_outgoing.push_back(
|
||||
EthMessage::BlockRangeUpdate(this.local_range_info.to_message()).into(),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
while this.internal_request_timeout_interval.poll_tick(cx).is_ready() {
|
||||
// check for timed out requests
|
||||
if this.check_timed_out_requests(Instant::now()) {
|
||||
@@ -1010,6 +1028,7 @@ mod tests {
|
||||
1000,
|
||||
alloy_primitives::B256::ZERO,
|
||||
),
|
||||
range_update_interval: None,
|
||||
}
|
||||
}
|
||||
ev => {
|
||||
|
||||
@@ -48,6 +48,7 @@ use tokio_stream::wrappers::ReceiverStream;
|
||||
use tokio_util::sync::PollSender;
|
||||
use tracing::{debug, instrument, trace};
|
||||
|
||||
use crate::session::active::RANGE_UPDATE_INTERVAL;
|
||||
pub use conn::EthRlpxConnection;
|
||||
pub use handle::{
|
||||
ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle,
|
||||
@@ -532,6 +533,14 @@ impl<N: NetworkPrimitives> SessionManager<N> {
|
||||
// negotiated version
|
||||
let version = conn.version();
|
||||
|
||||
// Configure the interval at which the range information is updated, starting with
|
||||
// ETH69
|
||||
let range_update_interval = (conn.version() >= EthVersion::Eth69).then(|| {
|
||||
let mut interval = tokio::time::interval(RANGE_UPDATE_INTERVAL);
|
||||
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
|
||||
interval
|
||||
});
|
||||
|
||||
let session = ActiveSession {
|
||||
next_id: 0,
|
||||
remote_peer_id: peer_id,
|
||||
@@ -556,6 +565,7 @@ impl<N: NetworkPrimitives> SessionManager<N> {
|
||||
terminate_message: None,
|
||||
range_info: None,
|
||||
local_range_info: self.local_range_info.clone(),
|
||||
range_update_interval,
|
||||
};
|
||||
|
||||
self.spawn(session);
|
||||
|
||||
Reference in New Issue
Block a user