mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-24 22:58:13 -05:00
fix: small networking fixes (#16742)
This commit is contained in:
@@ -83,12 +83,16 @@ impl Peer {
|
||||
}
|
||||
|
||||
/// Applies a reputation change to the peer and returns what action should be taken.
|
||||
pub fn apply_reputation(&mut self, reputation: i32) -> ReputationChangeOutcome {
|
||||
pub fn apply_reputation(
|
||||
&mut self,
|
||||
reputation: i32,
|
||||
kind: ReputationChangeKind,
|
||||
) -> ReputationChangeOutcome {
|
||||
let previous = self.reputation;
|
||||
// we add reputation since negative reputation change decrease total reputation
|
||||
self.reputation = previous.saturating_add(reputation);
|
||||
|
||||
trace!(target: "net::peers", reputation=%self.reputation, banned=%self.is_banned(), "applied reputation change");
|
||||
trace!(target: "net::peers", reputation=%self.reputation, banned=%self.is_banned(), ?kind, "applied reputation change");
|
||||
|
||||
if self.state.is_connected() && self.is_banned() {
|
||||
self.state.disconnect();
|
||||
|
||||
@@ -480,7 +480,7 @@ impl PeersManager {
|
||||
reputation_change = MAX_TRUSTED_PEER_REPUTATION_CHANGE;
|
||||
}
|
||||
}
|
||||
peer.apply_reputation(reputation_change)
|
||||
peer.apply_reputation(reputation_change, rep)
|
||||
}
|
||||
} else {
|
||||
return
|
||||
|
||||
@@ -174,6 +174,7 @@ impl<N: NetworkPrimitives> ActiveSession<N> {
|
||||
if let Some(req) = self.inflight_requests.remove(&request_id) {
|
||||
match req.request {
|
||||
RequestState::Waiting(PeerRequest::$item { response, .. }) => {
|
||||
trace!(peer_id=?self.remote_peer_id, ?request_id, "received response from peer");
|
||||
let _ = response.send(Ok(message));
|
||||
self.update_request_timeout(req.timestamp, Instant::now());
|
||||
}
|
||||
@@ -186,6 +187,7 @@ impl<N: NetworkPrimitives> ActiveSession<N> {
|
||||
}
|
||||
}
|
||||
} else {
|
||||
trace!(peer_id=?self.remote_peer_id, ?request_id, "received response to unknown request");
|
||||
// we received a response to a request we never sent
|
||||
self.on_bad_message();
|
||||
}
|
||||
@@ -277,6 +279,8 @@ impl<N: NetworkPrimitives> ActiveSession<N> {
|
||||
/// Handle an internal peer request that will be sent to the remote.
|
||||
fn on_internal_peer_request(&mut self, request: PeerRequest<N>, deadline: Instant) {
|
||||
let request_id = self.next_id();
|
||||
|
||||
trace!(?request, peer_id=?self.remote_peer_id, ?request_id, "sending request to peer");
|
||||
let msg = request.create_request_message(request_id);
|
||||
self.queued_outgoing.push_back(msg.into());
|
||||
let req = InflightRequest {
|
||||
|
||||
@@ -138,11 +138,15 @@ where
|
||||
.await?;
|
||||
|
||||
// create pipeline
|
||||
let network_client = ctx.components().network().fetch_client().await?;
|
||||
let network_handle = ctx.components().network().clone();
|
||||
let network_client = network_handle.fetch_client().await?;
|
||||
let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
|
||||
|
||||
let node_config = ctx.node_config();
|
||||
|
||||
// We always assume that node is syncing after a restart
|
||||
network_handle.update_sync_state(SyncState::Syncing);
|
||||
|
||||
let max_block = ctx.max_block(network_client.clone()).await?;
|
||||
|
||||
let static_file_producer = ctx.static_file_producer();
|
||||
@@ -289,7 +293,6 @@ where
|
||||
|
||||
// Run consensus engine to completion
|
||||
let initial_target = ctx.initial_backfill_target()?;
|
||||
let network_handle = ctx.components().network().clone();
|
||||
let mut built_payloads = ctx
|
||||
.components()
|
||||
.payload_builder_handle()
|
||||
@@ -329,8 +332,6 @@ where
|
||||
debug!(target: "reth::cli", "Terminating after initial backfill");
|
||||
break
|
||||
}
|
||||
|
||||
network_handle.update_sync_state(SyncState::Idle);
|
||||
}
|
||||
ChainEvent::BackfillSyncStarted => {
|
||||
network_handle.update_sync_state(SyncState::Syncing);
|
||||
@@ -342,6 +343,8 @@ where
|
||||
}
|
||||
ChainEvent::Handler(ev) => {
|
||||
if let Some(head) = ev.canonical_header() {
|
||||
// Once we're progressing via live sync, we can consider the node is not syncing anymore
|
||||
network_handle.update_sync_state(SyncState::Idle);
|
||||
let head_block = Head {
|
||||
number: head.number(),
|
||||
hash: head.hash(),
|
||||
|
||||
Reference in New Issue
Block a user