lilith: move whitelist_refinery() into session::refine_sesssion()

This allows us to avoid making a lot of Hosts functions public that
should rather be private to the net module.
This commit is contained in:
draoi
2024-04-10 10:41:49 +02:00
parent 38dcac45b5
commit 5b02abf032
3 changed files with 60 additions and 62 deletions

View File

@@ -20,11 +20,10 @@ use std::{
collections::{HashMap, HashSet},
process::exit,
sync::Arc,
time::UNIX_EPOCH,
};
use async_trait::async_trait;
use log::{debug, error, info, warn};
use log::{error, info, warn};
use semver::Version;
use smol::{
lock::{Mutex, MutexGuard},
@@ -39,16 +38,12 @@ use url::Url;
use darkfi::{
async_daemonize, cli_desc,
net::{
self,
hosts::{HostColor, HostState},
P2p, P2pPtr,
},
net::{self, hosts::HostColor, session::whitelist_refinery, P2p, P2pPtr},
rpc::{
jsonrpc::*,
server::{listen_and_serve, RequestHandler},
},
system::{sleep, StoppableTask, StoppableTaskPtr},
system::{StoppableTask, StoppableTaskPtr},
util::path::get_config_path,
Error, Result,
};
@@ -56,10 +51,6 @@ use darkfi::{
const CONFIG_FILE: &str = "lilith_config.toml";
const CONFIG_FILE_CONTENTS: &str = include_str!("../lilith_config.toml");
/// Interval after which the refinery happens (in seconds)
// TODO: Make this configurable
const REFINERY_INTERVAL: u64 = 60;
#[derive(Clone, Debug, serde::Deserialize, StructOpt, StructOptToml)]
#[serde(default)]
#[structopt(name = "lilith", about = cli_desc!())]
@@ -165,53 +156,6 @@ struct Lilith {
}
impl Lilith {
/// Periodically ping nodes on the whitelist. If they are still reachable, update their last
/// seen field. Otherwise, downgrade them to the greylist.
async fn whitelist_refinery(name: String, p2p: P2pPtr) -> Result<()> {
info!(target: "lilith", "Starting whitelist refinery for \"{}\"", name);
let hosts = p2p.hosts();
loop {
sleep(REFINERY_INTERVAL).await;
if hosts.container.is_empty(HostColor::White).await {
warn!(target: "lilith", "Whitelist is empty! Cannot start refinery process");
continue
}
let (entry, position) = hosts.container.fetch_last(HostColor::White).await;
let url = &entry.0;
let last_seen = &entry.1;
if let Err(e) = hosts.try_register(url.clone(), HostState::Refine).await {
debug!(target: "lilith", "Unable to refine addr={}, err={}",
url.clone(), e);
continue
}
if !p2p.session_refine().handshake_node(url.clone(), p2p.clone()).await {
debug!(target: "lilith", "Host {} is not responsive. Downgrading from whitelist", url);
hosts.move_host(url, *last_seen, HostColor::Grey).await?;
hosts.unregister(url).await;
continue
}
debug!(target: "lilith","Peer {} is responsive. Updating last_seen", url);
// This node is active. Update the last seen field.
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
hosts
.container
.update_last_seen(HostColor::White as usize, url, last_seen, Some(position))
.await;
}
}
// RPCAPI:
// Returns all spawned networks names with their node addresses.
// --> {"jsonrpc": "2.0", "method": "spawns", "params": [], "id": 42}
@@ -393,7 +337,7 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
let name = network.name.clone();
let task = StoppableTask::new();
task.clone().start(
Lilith::whitelist_refinery(name.clone(), network.p2p.clone()),
whitelist_refinery(name.clone(), network.p2p.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }

View File

@@ -37,7 +37,7 @@ pub use outbound_session::{OutboundSession, OutboundSessionPtr};
pub mod seedsync_session;
pub use seedsync_session::{SeedSyncSession, SeedSyncSessionPtr};
pub mod refine_session;
pub use refine_session::{RefineSession, RefineSessionPtr};
pub use refine_session::{whitelist_refinery, RefineSession, RefineSessionPtr};
/// Bitwise selectors for the `protocol_registry`
pub type SessionBitFlag = u32;

View File

@@ -48,9 +48,11 @@ use crate::{
session::{Session, SessionBitFlag, SESSION_REFINE},
},
system::{sleep, timeout::timeout, LazyWeak, StoppableTask, StoppableTaskPtr},
Error,
Error, Result,
};
// TODO: Make this configurable
const WHITELIST_REFINERY_INTERVAL: u64 = 60;
pub type RefineSessionPtr = Arc<RefineSession>;
pub struct RefineSession {
@@ -331,6 +333,58 @@ impl GreylistRefinery {
}
}
/// Periodically ping nodes on the whitelist. If they are still reachable, update their last
/// seen field. Otherwise, downgrade them to the greylist.
pub async fn whitelist_refinery(network_name: String, p2p: P2pPtr) -> Result<()> {
debug!(target: "net::refinery::whitelist_refinery", "Starting whitelist refinery for \"{}\"",
network_name);
let hosts = p2p.hosts();
loop {
sleep(WHITELIST_REFINERY_INTERVAL).await;
if hosts.container.is_empty(HostColor::White).await {
warn!(target: "net::refinery::whitelist_refinery",
"Whitelist is empty! Cannot start refinery process");
continue
}
let (entry, position) = hosts.container.fetch_last(HostColor::White).await;
let url = &entry.0;
let last_seen = &entry.1;
if let Err(e) = hosts.try_register(url.clone(), HostState::Refine).await {
debug!(target: "net::refinery::whitelist_refinery", "Unable to refine addr={}, err={}",
url.clone(), e);
continue
}
if p2p.session_refine().handshake_node(url.clone(), p2p.clone()).await {
debug!(target: "net::refinery:::whitelist_refinery",
"Host {} is not responsive. Downgrading from whitelist", url);
hosts.move_host(url, *last_seen, HostColor::Grey).await?;
hosts.unregister(url).await;
continue
}
debug!(target: "net::refinery::whitelist_refinery",
"Peer {} is responsive. Updating last_seen", url);
// This node is active. Update the last seen field.
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
hosts
.container
.update_last_seen(HostColor::White as usize, url, last_seen, Some(position))
.await;
}
}
/// Periodically try to do a version exchange with our own external
/// addresses. If the version exchange is successful, take a timestamp and
/// save it along with the external addresses. Each address along with its