mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-03 03:25:01 -05:00
fix: use recv timeout if persistence task is active (#10087)
This commit is contained in:
@@ -44,7 +44,10 @@ use reth_trie::HashedPostState;
|
||||
use std::{
|
||||
collections::{BTreeMap, HashMap, HashSet},
|
||||
ops::Bound,
|
||||
sync::{mpsc::Receiver, Arc},
|
||||
sync::{
|
||||
mpsc::{Receiver, RecvError, RecvTimeoutError},
|
||||
Arc,
|
||||
},
|
||||
time::Instant,
|
||||
};
|
||||
use tokio::sync::{
|
||||
@@ -539,15 +542,54 @@ where
|
||||
///
|
||||
/// This will block the current thread and process incoming messages.
|
||||
pub fn run(mut self) {
|
||||
while let Ok(msg) = self.incoming.recv() {
|
||||
self.run_once(msg);
|
||||
loop {
|
||||
match self.try_recv_engine_message() {
|
||||
Ok(Some(msg)) => {
|
||||
self.on_engine_message(msg);
|
||||
}
|
||||
Ok(None) => {
|
||||
debug!(target: "engine", "received no engine message for some time, while waiting for persistence task to complete");
|
||||
}
|
||||
Err(err) => {
|
||||
error!(target: "engine", "Engine channel disconnected");
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
self.advance_persistence();
|
||||
}
|
||||
}
|
||||
|
||||
/// Run the engine API handler once.
|
||||
fn run_once(&mut self, msg: FromEngine<BeaconEngineMessage<T>>) {
|
||||
self.on_engine_message(msg);
|
||||
/// Attempts to receive the next engine request.
|
||||
///
|
||||
/// If there's currently no persistence action in progress, this will block until a new request
|
||||
/// is received. If there's a persistence action in progress, this will try to receive the
|
||||
/// next request with a timeout to not block indefinitely and return `Ok(None)` if no request is
|
||||
/// received in time.
|
||||
///
|
||||
/// Returns an error if the engine channel is disconnected.
|
||||
fn try_recv_engine_message(
|
||||
&self,
|
||||
) -> Result<Option<FromEngine<BeaconEngineMessage<T>>>, RecvError> {
|
||||
if self.persistence_state.in_progress() {
|
||||
// try to receive the next request with a timeout to not block indefinitely
|
||||
match self.incoming.recv_timeout(std::time::Duration::from_millis(500)) {
|
||||
Ok(msg) => Ok(Some(msg)),
|
||||
Err(err) => match err {
|
||||
RecvTimeoutError::Timeout => Ok(None),
|
||||
RecvTimeoutError::Disconnected => Err(RecvError),
|
||||
},
|
||||
}
|
||||
} else {
|
||||
self.incoming.recv().map(Some)
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempts to advance the persistence state.
|
||||
///
|
||||
/// If we're currently awaiting a response this will try to receive the response (non-blocking)
|
||||
/// or send a new persistence action if necessary.
|
||||
fn advance_persistence(&mut self) {
|
||||
if self.should_persist() && !self.persistence_state.in_progress() {
|
||||
let blocks_to_persist = self.get_canonical_blocks_to_persist();
|
||||
if !blocks_to_persist.is_empty() {
|
||||
|
||||
Reference in New Issue
Block a user