mirror of
https://github.com/akula-bft/akula.git
synced 2026-04-19 03:00:13 -04:00
Print peer info only during net stages
This commit is contained in:
23
bin/akula.rs
23
bin/akula.rs
@@ -7,7 +7,10 @@ use akula::{
|
||||
erigon::ErigonApiServerImpl, eth::EthApiServerImpl, net::NetApiServerImpl,
|
||||
otterscan::OtterscanApiServerImpl,
|
||||
},
|
||||
stagedsync::{self},
|
||||
stagedsync::{
|
||||
self,
|
||||
stages::{BODIES, HEADERS},
|
||||
},
|
||||
stages::*,
|
||||
version_string,
|
||||
};
|
||||
@@ -221,14 +224,20 @@ fn main() -> anyhow::Result<()> {
|
||||
let sentry_api_addr = opt.sentry_opts.sentry_addr;
|
||||
let swarm = akula::sentry::run(opt.sentry_opts).await?;
|
||||
|
||||
let current_stage = staged_sync.current_stage();
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
info!(
|
||||
"P2P node peer info: {} active (+{} dialing) / {} max.",
|
||||
swarm.connected_peers(),
|
||||
swarm.dialing(),
|
||||
max_peers
|
||||
);
|
||||
if let Some(stage) = *current_stage.borrow() {
|
||||
if stage == HEADERS || stage == BODIES {
|
||||
info!(
|
||||
"P2P node peer info: {} active (+{} dialing) / {} max.",
|
||||
swarm.connected_peers(),
|
||||
swarm.dialing(),
|
||||
max_peers
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
sleep(Duration::from_secs(5)).await;
|
||||
}
|
||||
|
||||
@@ -3,8 +3,9 @@ pub mod stages;
|
||||
pub mod util;
|
||||
|
||||
use self::stage::{Stage, StageInput, UnwindInput};
|
||||
use crate::{kv::mdbx::*, models::*, stagedsync::stage::*};
|
||||
use crate::{kv::mdbx::*, models::*, stagedsync::stage::*, StageId};
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::sync::watch::{Receiver as WatchReceiver, Sender as WatchSender};
|
||||
use tracing::*;
|
||||
|
||||
struct QueuedStage<'db, E>
|
||||
@@ -32,6 +33,8 @@ where
|
||||
E: EnvironmentKind,
|
||||
{
|
||||
stages: Vec<QueuedStage<'db, E>>,
|
||||
current_stage_sender: WatchSender<Option<StageId>>,
|
||||
current_stage_receiver: WatchReceiver<Option<StageId>>,
|
||||
min_progress_to_commit_after_stage: u64,
|
||||
pruning_interval: u64,
|
||||
max_block: Option<BlockNumber>,
|
||||
@@ -53,8 +56,11 @@ where
|
||||
E: EnvironmentKind,
|
||||
{
|
||||
pub fn new() -> Self {
|
||||
let (current_stage_sender, current_stage_receiver) = tokio::sync::watch::channel(None);
|
||||
Self {
|
||||
stages: Vec::new(),
|
||||
current_stage_sender,
|
||||
current_stage_receiver,
|
||||
min_progress_to_commit_after_stage: 0,
|
||||
pruning_interval: 0,
|
||||
max_block: None,
|
||||
@@ -98,6 +104,10 @@ where
|
||||
self
|
||||
}
|
||||
|
||||
pub fn current_stage(&self) -> WatchReceiver<Option<StageId>> {
|
||||
self.current_stage_receiver.clone()
|
||||
}
|
||||
|
||||
/// Run staged sync loop.
|
||||
/// Invokes each loaded stage, and does unwinds if necessary.
|
||||
///
|
||||
@@ -109,6 +119,8 @@ where
|
||||
let mut maximum_progress = None;
|
||||
let mut unwind_to = None;
|
||||
'run_loop: loop {
|
||||
self.current_stage_sender.send(None).unwrap();
|
||||
|
||||
let mut tx = db.begin_mutable()?;
|
||||
|
||||
// Start with unwinding if it's been requested.
|
||||
@@ -184,6 +196,8 @@ where
|
||||
|
||||
let stage_id = stage.id();
|
||||
|
||||
self.current_stage_sender.send(Some(stage.id())).unwrap();
|
||||
|
||||
let start_time = Instant::now();
|
||||
let start_progress = stage_id.get_progress(&tx)?;
|
||||
// Re-invoke the stage until it reports `StageOutput::done`.
|
||||
@@ -350,6 +364,8 @@ where
|
||||
previous_stage = Some((stage_id, done_progress))
|
||||
}
|
||||
|
||||
self.current_stage_sender.send(None).unwrap();
|
||||
|
||||
let t = timings
|
||||
.into_iter()
|
||||
.fold(String::new(), |acc, (stage_id, time)| {
|
||||
|
||||
@@ -5,7 +5,7 @@ use crate::{
|
||||
use std::fmt::Display;
|
||||
use tracing::*;
|
||||
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
#[derive(Clone, Copy, Debug, PartialEq)]
|
||||
pub struct StageId(pub &'static str);
|
||||
|
||||
pub const HEADERS: StageId = StageId("Headers");
|
||||
|
||||
Reference in New Issue
Block a user