retrieve "ircd: create entry point socket for bots and plugins"

This commit is contained in:
ghassmo
2022-04-28 22:16:59 +03:00
parent 01724a544d
commit f15f5906b8
3 changed files with 1 additions and 85 deletions

View File

@@ -4,9 +4,6 @@
## IRC listen URL
#irc_listen="127.0.0.1:11066"
## Plugin listen URL
#plugin_listen="127.0.0.1:11077"
## Sets Datastore Path
#datastore="~/.config/ircd"

View File

@@ -17,7 +17,7 @@ use darkfi::{
Error, Result,
};
use easy_parallel::Parallel;
use futures::{io::BufReader, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, FutureExt};
use futures::{io::BufReader, AsyncBufReadExt, AsyncReadExt, FutureExt};
use log::{debug, error, info, warn};
use simplelog::{ColorChoice, TermLogger, TerminalMode};
use smol::future;
@@ -74,43 +74,11 @@ async fn broadcast_msg(
Ok(())
}
async fn plugin_process(
recv_for_plugin: Receiver<String>,
send_from_plugin: Sender<String>,
stream: TcpStream,
) -> Result<()> {
let peer_addr = stream.peer_addr()?.clone();
let (reader, mut writer) = stream.split();
let mut reader = BufReader::new(reader);
loop {
let mut line = String::new();
futures::select! {
irc_msg = recv_for_plugin.recv().fuse() => {
let irc_msg = irc_msg?;
info!("Plugin recv {}", irc_msg);
writer.write_all(irc_msg.as_bytes()).await?;
},
err = reader.read_line(&mut line).fuse() => {
if let Err(e) = err {
warn!("Read line error: {}", e);
return Ok(())
}
let irc_msg = clean_input(line, &peer_addr)?;
info!("Plugin send {}", irc_msg);
send_from_plugin.send(irc_msg).await?;
}
};
}
}
async fn process(
raft_receiver: Receiver<Privmsg>,
stream: TcpStream,
peer_addr: SocketAddr,
raft_sender: Sender<Privmsg>,
send_for_plugin: Sender<String>,
recv_from_plugin: Receiver<String>,
seen_msg_id: SeenMsgIds,
) -> Result<()> {
let (reader, writer) = stream.split();
@@ -134,13 +102,6 @@ async fn process(
let irc_msg = build_irc_msg(&msg);
conn.reply(&irc_msg).await?;
send_for_plugin.send(irc_msg).await?;
}
irc_msg = recv_from_plugin.recv().fuse() => {
let irc_msg = irc_msg?;
info!("Receive msg from plugin");
broadcast_msg(irc_msg, peer_addr, &mut conn).await?;
}
err = reader.read_line(&mut line).fuse() => {
if let Err(e) = err {
@@ -188,9 +149,6 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
listen_and_serve(rpc_config, rpc_interface, executor_cloned.clone()).await
});
let (send_from_plugin, recv_from_plugin) = async_channel::unbounded::<String>();
let (send_for_plugin, recv_for_plugin) = async_channel::unbounded::<String>();
//
// IRC instance
//
@@ -213,47 +171,12 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
stream,
peer_addr,
raft_sender.clone(),
send_for_plugin.clone(),
recv_from_plugin.clone(),
seen_msg_id.clone(),
))
.detach();
}
});
//
// Plugin instance
//
let executor_cloned = executor.clone();
let send_from_plugin_cloned = send_from_plugin.clone();
let recv_for_plugin_cloned = recv_for_plugin.clone();
let plugin_task: smol::Task<Result<()>> = executor.spawn(async move {
if settings.plugin_listen.is_none() {
return Ok(())
}
let plugin_listener = TcpListener::bind(settings.plugin_listen.unwrap()).await?;
let plugin_local_addr = plugin_listener.local_addr()?;
info!("Plugin listening on {}", plugin_local_addr);
loop {
let (stream, peer_addr) = match plugin_listener.accept().await {
Ok((s, a)) => (s, a),
Err(e) => {
error!("Failed listening for connections: {}", e);
return Err(Error::ServiceStopped)
}
};
info!("Plugin Accepted client: {}", peer_addr);
executor_cloned
.spawn(plugin_process(
recv_for_plugin_cloned.clone(),
send_from_plugin_cloned.clone(),
stream,
))
.detach();
}
});
let (signal, shutdown) = async_channel::bounded::<()>(1);
ctrlc_async::set_async_handler(async move {
warn!(target: "ircd", "ircd start Exit Signal");
@@ -261,7 +184,6 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
signal.send(()).await.unwrap();
rpc_task.cancel().await;
irc_task.cancel().await;
plugin_task.cancel().await;
})
.unwrap();

View File

@@ -23,9 +23,6 @@ pub struct Args {
/// IRC listen URL
#[structopt(long = "irc", default_value = "127.0.0.1:11066")]
pub irc_listen: SocketAddr,
/// Plugin listen URL
#[structopt(long)]
pub plugin_listen: Option<SocketAddr>,
/// Sets Datastore Path
#[structopt(long, default_value = "~/.config/ircd")]
pub datastore: String,