diff --git a/bin/akula.rs b/bin/akula.rs index adef945..10068a3 100644 --- a/bin/akula.rs +++ b/bin/akula.rs @@ -7,7 +7,10 @@ use akula::{ erigon::ErigonApiServerImpl, eth::EthApiServerImpl, net::NetApiServerImpl, otterscan::OtterscanApiServerImpl, }, - stagedsync::{self}, + stagedsync::{ + self, + stages::{BODIES, HEADERS}, + }, stages::*, version_string, }; @@ -221,14 +224,20 @@ fn main() -> anyhow::Result<()> { let sentry_api_addr = opt.sentry_opts.sentry_addr; let swarm = akula::sentry::run(opt.sentry_opts).await?; + let current_stage = staged_sync.current_stage(); + tokio::spawn(async move { loop { - info!( - "P2P node peer info: {} active (+{} dialing) / {} max.", - swarm.connected_peers(), - swarm.dialing(), - max_peers - ); + if let Some(stage) = *current_stage.borrow() { + if stage == HEADERS || stage == BODIES { + info!( + "P2P node peer info: {} active (+{} dialing) / {} max.", + swarm.connected_peers(), + swarm.dialing(), + max_peers + ); + } + } sleep(Duration::from_secs(5)).await; } diff --git a/src/stagedsync/mod.rs b/src/stagedsync/mod.rs index 3c73d91..4a9426b 100644 --- a/src/stagedsync/mod.rs +++ b/src/stagedsync/mod.rs @@ -3,8 +3,9 @@ pub mod stages; pub mod util; use self::stage::{Stage, StageInput, UnwindInput}; -use crate::{kv::mdbx::*, models::*, stagedsync::stage::*}; +use crate::{kv::mdbx::*, models::*, stagedsync::stage::*, StageId}; use std::time::{Duration, Instant}; +use tokio::sync::watch::{Receiver as WatchReceiver, Sender as WatchSender}; use tracing::*; struct QueuedStage<'db, E> @@ -32,6 +33,8 @@ where E: EnvironmentKind, { stages: Vec>, + current_stage_sender: WatchSender>, + current_stage_receiver: WatchReceiver>, min_progress_to_commit_after_stage: u64, pruning_interval: u64, max_block: Option, @@ -53,8 +56,11 @@ where E: EnvironmentKind, { pub fn new() -> Self { + let (current_stage_sender, current_stage_receiver) = tokio::sync::watch::channel(None); Self { stages: Vec::new(), + current_stage_sender, + current_stage_receiver, min_progress_to_commit_after_stage: 0, pruning_interval: 0, max_block: None, @@ -98,6 +104,10 @@ where self } + pub fn current_stage(&self) -> WatchReceiver> { + self.current_stage_receiver.clone() + } + /// Run staged sync loop. /// Invokes each loaded stage, and does unwinds if necessary. /// @@ -109,6 +119,8 @@ where let mut maximum_progress = None; let mut unwind_to = None; 'run_loop: loop { + self.current_stage_sender.send(None).unwrap(); + let mut tx = db.begin_mutable()?; // Start with unwinding if it's been requested. @@ -184,6 +196,8 @@ where let stage_id = stage.id(); + self.current_stage_sender.send(Some(stage.id())).unwrap(); + let start_time = Instant::now(); let start_progress = stage_id.get_progress(&tx)?; // Re-invoke the stage until it reports `StageOutput::done`. @@ -350,6 +364,8 @@ where previous_stage = Some((stage_id, done_progress)) } + self.current_stage_sender.send(None).unwrap(); + let t = timings .into_iter() .fold(String::new(), |acc, (stage_id, time)| { diff --git a/src/stagedsync/stages.rs b/src/stagedsync/stages.rs index 1ea0f0c..d1d6bfb 100644 --- a/src/stagedsync/stages.rs +++ b/src/stagedsync/stages.rs @@ -5,7 +5,7 @@ use crate::{ use std::fmt::Display; use tracing::*; -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy, Debug, PartialEq)] pub struct StageId(pub &'static str); pub const HEADERS: StageId = StageId("Headers");