wallet: add event_graph and get darkirc messages

This commit is contained in:
darkfi
2024-08-05 08:21:41 +02:00
parent 9cc8fa56b5
commit 8dd09484c5
2 changed files with 151 additions and 6 deletions

View File

@@ -37,7 +37,9 @@ futures = "0.3.30"
async-recursion = "1.1.1"
colored = "2.1.0"
#rustpython-vm = "0.3.1"
sled = "0.34.7"
sled = "0.34"
url = "2.5"
semver = "1.0"
[patch.crates-io]
freetype-rs = { git = "https://github.com/narodnik/freetype-rs" }

View File

@@ -27,6 +27,22 @@
use async_lock::Mutex;
use std::sync::{mpsc, Arc};
use darkfi::{
async_daemonize, cli_desc,
event_graph::{self, proto::ProtocolEventGraph, EventGraph, EventGraphPtr},
net::{session::SESSION_DEFAULT, settings::Settings as NetSettings, P2p, P2pPtr},
rpc::{
jsonrpc::JsonSubscriber,
server::{listen_and_serve, RequestHandler},
},
system::{sleep, sleep_forever, StoppableTask, StoppableTaskPtr, Subscription},
util::path::{expand_path, get_config_path},
Error, Result,
};
use darkfi_serial::{
async_trait, deserialize_async, AsyncDecodable, SerialDecodable, SerialEncodable,
};
#[macro_use]
extern crate log;
#[allow(unused_imports)]
@@ -65,13 +81,140 @@ fn panic_hook(panic_info: &std::panic::PanicInfo) {
std::process::exit(1);
}
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct Privmsg {
pub channel: String,
pub nick: String,
pub msg: String,
}
async fn print_evs(ev_sub: Subscription<event_graph::Event>) {
loop {
let ev = ev_sub.receive().await;
// Try to deserialize the `Event`'s content into a `Privmsg`
let mut privmsg: Privmsg = match deserialize_async(ev.content()).await {
Ok(v) => v,
Err(e) => {
error!("[IRC CLIENT] Failed deserializing incoming Privmsg event: {}", e);
continue
}
};
info!("ev_id={:?}", ev.id());
info!("ev: {:?}", ev);
info!("privmsg: {:?}", privmsg);
info!("");
}
}
async fn realmain(ex: ExecutorPtr) -> darkfi::Result<()> {
let sled_db = sled::open("evgrdb")?;
let mut p2p_settings: NetSettings = Default::default();
p2p_settings.app_version = semver::Version::parse("0.5.0").unwrap();
p2p_settings.seeds.push(url::Url::parse("tcp+tls://lilith1.dark.fi:5262").unwrap());
let p2p = P2p::new(p2p_settings, ex.clone()).await?;
let event_graph = EventGraph::new(
p2p.clone(),
sled_db.clone(),
std::path::PathBuf::new(),
false,
"darkirc_dag",
1,
ex.clone(),
)
.await?;
let prune_task = event_graph.prune_task.get().unwrap();
info!("Registering EventGraph P2P protocol");
let event_graph_ = Arc::clone(&event_graph);
let registry = p2p.protocol_registry();
registry
.register(SESSION_DEFAULT, move |channel, _| {
let event_graph_ = event_graph_.clone();
async move { ProtocolEventGraph::init(event_graph_, channel).await.unwrap() }
})
.await;
let ev_sub = event_graph.event_pub.clone().subscribe().await;
let ev_task = ex.spawn(print_evs(ev_sub));
info!("Starting P2P network");
p2p.clone().start().await?;
info!("Waiting for some P2P connections...");
sleep(5).await;
// We'll attempt to sync {sync_attempts} times
let sync_attempts = 4;
for i in 1..=sync_attempts {
info!("Syncing event DAG (attempt #{})", i);
match event_graph.dag_sync().await {
Ok(()) => break,
Err(e) => {
if i == sync_attempts {
error!("Failed syncing DAG. Exiting.");
p2p.stop().await;
return Err(Error::DagSyncFailed)
} else {
// TODO: Maybe at this point we should prune or something?
// TODO: Or maybe just tell the user to delete the DAG from FS.
error!("Failed syncing DAG ({}), retrying in {}s...", e, 4);
sleep(4).await;
}
}
}
}
sleep_forever().await;
// Signal handling for graceful termination.
//let (signals_handler, signals_task) = SignalHandler::new(ex)?;
//signals_handler.wait_termination(signals_task).await?;
info!("Caught termination signal, cleaning up and exiting...");
info!("Stopping P2P network");
p2p.stop().await;
info!("Stopping IRC server");
prune_task.stop().await;
info!("Flushing sled database...");
let flushed_bytes = sled_db.flush_async().await?;
info!("Flushed {} bytes", flushed_bytes);
info!("Shut down successfully");
Ok(())
}
fn newmain() {
simplelog::TermLogger::init(
simplelog::LevelFilter::Info,
simplelog::Config::default(),
simplelog::TerminalMode::Stdout,
simplelog::ColorChoice::Auto,
)
.unwrap();
let ex = Arc::new(smol::Executor::new());
smol::block_on(async {
let mut p2p_settings: darkfi::net::Settings = Default::default();
//p2p_settings.app_version = semver::Version::parse(env!("CARGO_PKG_VERSION")).unwrap();
let p2p = darkfi::net::P2p::new(p2p_settings, ex.clone()).await.unwrap();
});
let n_threads = std::thread::available_parallelism().unwrap().get();
let ex = std::sync::Arc::new(smol::Executor::new());
let (signal, shutdown) = smol::channel::unbounded::<()>();
let (_, result) = easy_parallel::Parallel::new()
// Run four executor threads
.each(0..n_threads, |_| smol::future::block_on(ex.run(shutdown.recv())))
// Run the main future on the current thread.
.finish(|| {
smol::future::block_on(async {
realmain(ex.clone()).await?;
drop(signal);
Ok::<(), darkfi::Error>(())
})
});
}
fn main() {