mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-29 00:58:11 -05:00
perf: handle sync and engine messages in same loop (#3276)
This commit is contained in:
@@ -1301,20 +1301,9 @@ where
|
||||
// Process all incoming messages from the CL, these can affect the state of the
|
||||
// SyncController, hence they are polled first, and they're also time sensitive.
|
||||
loop {
|
||||
// If a new pipeline run is pending we poll the sync controller first so that it takes
|
||||
// precedence over any FCU messages. This ensures that a queued pipeline run via
|
||||
// [EngineSyncController::set_pipeline_sync_target] are processed before any forkchoice
|
||||
// updates.
|
||||
if this.sync.is_pipeline_sync_pending() {
|
||||
// the next event is guaranteed to be a [EngineSyncEvent::PipelineStarted]
|
||||
if let Poll::Ready(sync_event) = this.sync.poll(cx) {
|
||||
if let Some(res) = this.on_sync_event(sync_event) {
|
||||
return Poll::Ready(res)
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut engine_messages_pending = false;
|
||||
|
||||
// handle next engine message, else exit the loop
|
||||
// handle next engine message
|
||||
match this.engine_message_rx.poll_next_unpin(cx) {
|
||||
Poll::Ready(Some(msg)) => match msg {
|
||||
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
|
||||
@@ -1339,19 +1328,25 @@ where
|
||||
}
|
||||
Poll::Pending => {
|
||||
// no more CL messages to process
|
||||
break
|
||||
engine_messages_pending = true;
|
||||
}
|
||||
}
|
||||
|
||||
// process sync events if any
|
||||
match this.sync.poll(cx) {
|
||||
Poll::Ready(sync_event) => {
|
||||
if let Some(res) = this.on_sync_event(sync_event) {
|
||||
return Poll::Ready(res)
|
||||
}
|
||||
}
|
||||
Poll::Pending => {
|
||||
if engine_messages_pending {
|
||||
// both the sync and the engine message receiver are pending
|
||||
return Poll::Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// drain the sync controller
|
||||
while let Poll::Ready(sync_event) = this.sync.poll(cx) {
|
||||
if let Some(res) = this.on_sync_event(sync_event) {
|
||||
return Poll::Ready(res)
|
||||
}
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -107,6 +107,7 @@ where
|
||||
}
|
||||
|
||||
/// Returns `true` if a pipeline target is queued and will be triggered on the next `poll`.
|
||||
#[allow(unused)]
|
||||
pub(crate) fn is_pipeline_sync_pending(&self) -> bool {
|
||||
self.pending_pipeline_target.is_some() && self.pipeline_state.is_idle()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user