system: rename Subscriber to Publisher + propagate change

We now subscribe to a Publisher which is clearer conceptually than
subscribing to a Subscriber.
This commit is contained in:
draoi
2024-05-25 18:31:43 +02:00
parent 302d482475
commit b0b192c3f2
25 changed files with 115 additions and 111 deletions

View File

@@ -107,7 +107,7 @@ pub async fn replicator_task(node: Arc<Darkfid>, ex: Arc<smol::Executor<'static>
// Grab proposals subscriber and subscribe to it
let proposals_sub = node.subscribers.get("proposals").unwrap();
let subscription = proposals_sub.sub.clone().subscribe().await;
let subscription = proposals_sub.publisher.clone().subscribe().await;
// Create the garbage collection task using a dummy task
let gc_task = StoppableTask::new();

View File

@@ -93,7 +93,7 @@ pub async fn miner_task(
// Grab proposals subscriber and subscribe to it
let proposals_sub = node.subscribers.get("proposals").unwrap();
let subscription = proposals_sub.sub.clone().subscribe().await;
let subscription = proposals_sub.publisher.clone().subscribe().await;
// Listen for blocks until next finalization, for optimal conditions
if !skip_sync {

View File

@@ -202,7 +202,7 @@ impl IrcServer {
};
// Subscribe to incoming events and set up the connection.
let incoming = self.darkirc.event_graph.event_sub.clone().subscribe().await;
let incoming = self.darkirc.event_graph.event_pub.clone().subscribe().await;
if let Err(e) = self
.clone()
.process_connection(stream, peer_addr, incoming, ex.clone())
@@ -216,7 +216,7 @@ impl IrcServer {
// Expecting plain TCP connection
None => {
// Subscribe to incoming events and set up the connection.
let incoming = self.darkirc.event_graph.event_sub.clone().subscribe().await;
let incoming = self.darkirc.event_graph.event_pub.clone().subscribe().await;
if let Err(e) = self
.clone()
.process_connection(stream, peer_addr, incoming, ex.clone())

View File

@@ -27,7 +27,7 @@ use darkfi::{
jsonrpc::{JsonRequest, JsonResult},
util::JsonValue,
},
system::{StoppableTask, Subscriber},
system::{Publisher, StoppableTask},
tx::Transaction,
util::encoding::base64,
Error, Result,
@@ -76,8 +76,8 @@ impl Drk {
}
println!("Subscribing to receive notifications of incoming blocks");
let subscriber = Subscriber::new();
let subscription = subscriber.clone().subscribe().await;
let publisher = Publisher::new();
let subscription = publisher.clone().subscribe().await;
let _ex = ex.clone();
StoppableTask::new().start(
// Weird hack to prevent lifetimes hell
@@ -85,7 +85,7 @@ impl Drk {
let ex = _ex.clone();
let rpc_client = RpcClient::new(endpoint, ex).await?;
let req = JsonRequest::new("blockchain.subscribe_blocks", JsonValue::Array(vec![]));
rpc_client.subscribe(req, subscriber).await
rpc_client.subscribe(req, publisher).await
},
|res| async move {
match res {

View File

@@ -85,7 +85,7 @@ async fn start_sync_loop(
last_sent: RwLock<blake3::Hash>,
seen: OnceLock<sled::Tree>,
) -> Result<()> {
let incoming = event_graph.event_sub.clone().subscribe().await;
let incoming = event_graph.event_pub.clone().subscribe().await;
let seen_events = seen.get().unwrap();
loop {
let event = incoming.receive().await;

View File

@@ -155,7 +155,7 @@ async fn start_sync_loop(
last_sent: RwLock<blake3::Hash>,
seen: OnceLock<sled::Tree>,
) -> TaudResult<()> {
let incoming = event_graph.event_sub.clone().subscribe().await;
let incoming = event_graph.event_pub.clone().subscribe().await;
let seen_events = seen.get().unwrap();
loop {

View File

@@ -43,7 +43,7 @@ use crate::{
util::json_map,
},
system::{
sleep, timeout::timeout, StoppableTask, StoppableTaskPtr, Subscriber, SubscriberPtr,
sleep, timeout::timeout, Publisher, PublisherPtr, StoppableTask, StoppableTaskPtr,
Subscription,
},
Error, Result,
@@ -104,9 +104,9 @@ pub struct EventGraph {
broadcasted_ids: RwLock<HashSet<blake3::Hash>>,
/// DAG Pruning Task
prune_task: OnceCell<StoppableTaskPtr>,
/// Event subscriber, this notifies whenever an event is
/// Event publisher, this notifies whenever an event is
/// inserted into the DAG
pub event_sub: SubscriberPtr<Event>,
pub event_pub: PublisherPtr<Event>,
/// Current genesis event
current_genesis: RwLock<Event>,
/// Currently configured DAG rotation, in days
@@ -115,8 +115,8 @@ pub struct EventGraph {
pub synced: RwLock<bool>,
/// Enable graph debugging
pub deg_enabled: RwLock<bool>,
/// The subscriber for which we can give deg info over
deg_subscriber: SubscriberPtr<DegEvent>,
/// The publisher for which we can give deg info over
deg_publisher: PublisherPtr<DegEvent>,
}
impl EventGraph {
@@ -134,7 +134,7 @@ impl EventGraph {
let dag = sled_db.open_tree(dag_tree_name)?;
let unreferenced_tips = RwLock::new(BTreeMap::new());
let broadcasted_ids = RwLock::new(HashSet::new());
let event_sub = Subscriber::new();
let event_pub = Publisher::new();
// Create the current genesis event based on the `days_rotation`
let current_genesis = generate_genesis(days_rotation);
@@ -146,12 +146,12 @@ impl EventGraph {
unreferenced_tips,
broadcasted_ids,
prune_task: OnceCell::new(),
event_sub,
event_pub,
current_genesis: RwLock::new(current_genesis.clone()),
days_rotation,
synced: RwLock::new(false),
deg_enabled: RwLock::new(false),
deg_subscriber: Subscriber::new(),
deg_publisher: Publisher::new(),
});
// Check if we have it in our DAG.
@@ -640,7 +640,7 @@ impl EventGraph {
}
// Send out notifications about the new event
self.event_sub.notify(event.clone()).await;
self.event_pub.notify(event.clone()).await;
}
// Drop the exclusive locks
@@ -832,12 +832,12 @@ impl EventGraph {
/// Subscribe to deg events
pub async fn deg_subscribe(&self) -> Subscription<DegEvent> {
self.deg_subscriber.clone().subscribe().await
self.deg_publisher.clone().subscribe().await
}
/// Send a deg notification over the subscriber
/// Send a deg notification over the publisher
pub async fn deg_notify(&self, event: DegEvent) {
self.deg_subscriber.notify(event).await;
self.deg_publisher.notify(event).await;
}
pub async fn eventgraph_info(&self, id: u16, _params: JsonValue) -> JsonResult {

View File

@@ -35,7 +35,7 @@ use super::{
transport::{Listener, PtListener},
};
use crate::{
system::{CondVar, StoppableTask, StoppableTaskPtr, Subscriber, SubscriberPtr, Subscription},
system::{CondVar, Publisher, PublisherPtr, StoppableTask, StoppableTaskPtr, Subscription},
Error, Result,
};
@@ -44,7 +44,7 @@ pub type AcceptorPtr = Arc<Acceptor>;
/// Create inbound socket connections
pub struct Acceptor {
channel_subscriber: SubscriberPtr<Result<ChannelPtr>>,
channel_publisher: PublisherPtr<Result<ChannelPtr>>,
task: StoppableTaskPtr,
session: SessionWeakPtr,
conn_count: AtomicUsize,
@@ -54,7 +54,7 @@ impl Acceptor {
/// Create new Acceptor object.
pub fn new(session: SessionWeakPtr) -> AcceptorPtr {
Arc::new(Self {
channel_subscriber: Subscriber::new(),
channel_publisher: Publisher::new(),
task: StoppableTask::new(),
session,
conn_count: AtomicUsize::new(0),
@@ -76,7 +76,7 @@ impl Acceptor {
/// Start receiving network messages.
pub async fn subscribe(self: Arc<Self>) -> Subscription<Result<ChannelPtr>> {
self.channel_subscriber.clone().subscribe().await
self.channel_publisher.clone().subscribe().await
}
/// Run the accept loop in a new thread and error if a connection problem occurs
@@ -147,8 +147,8 @@ impl Acceptor {
})
.detach();
// Finally, notify any subscribers about the new channel.
self.channel_subscriber.notify(Ok(channel)).await;
// Finally, notify any publishers about the new channel.
self.channel_publisher.notify(Ok(channel)).await;
}
// As per accept(2) recommendation:
@@ -223,11 +223,11 @@ impl Acceptor {
}
/// Handles network errors. Panics if errors pass silently, otherwise broadcasts it
/// to all channel subscribers.
/// to all channel publishers.
async fn handle_stop(self: Arc<Self>, result: Result<()>) {
match result {
Ok(()) => panic!("Acceptor task should never complete without error status"),
Err(err) => self.channel_subscriber.notify(Err(err)).await,
Err(err) => self.channel_publisher.notify(Err(err)).await,
}
}
}

View File

@@ -42,13 +42,13 @@ use super::{
hosts::HostColor,
message,
message::{VersionMessage, MAGIC_BYTES},
message_subscriber::{MessageSubscription, MessageSubsystem},
message_publisher::{MessageSubscription, MessageSubsystem},
p2p::P2pPtr,
session::{Session, SessionBitFlag, SessionWeakPtr, SESSION_ALL, SESSION_REFINE},
transport::PtStream,
};
use crate::{
system::{StoppableTask, StoppableTaskPtr, Subscriber, SubscriberPtr, Subscription},
system::{Publisher, PublisherPtr, StoppableTask, StoppableTaskPtr, Subscription},
util::time::NanoTimestamp,
Error, Result,
};
@@ -79,8 +79,8 @@ pub struct Channel {
writer: Mutex<WriteHalf<Box<dyn PtStream>>>,
/// The message subsystem instance for this channel
message_subsystem: MessageSubsystem,
/// Subscriber listening for stop signal for closing this channel
stop_subscriber: SubscriberPtr<Error>,
/// Publisher listening for stop signal for closing this channel
stop_publisher: PublisherPtr<Error>,
/// Task that is listening for the stop signal
receive_task: StoppableTaskPtr,
/// A boolean marking if this channel is stopped
@@ -97,7 +97,7 @@ pub struct Channel {
impl Channel {
/// Sets up a new channel. Creates a reader and writer [`PtStream`] and
/// the message subscriber subsystem. Performs a network handshake on the
/// the message publisher subsystem. Performs a network handshake on the
/// subsystem dispatchers.
pub async fn new(
stream: Box<dyn PtStream>,
@@ -120,7 +120,7 @@ impl Channel {
reader,
writer,
message_subsystem,
stop_subscriber: Subscriber::new(),
stop_publisher: Publisher::new(),
receive_task: StoppableTask::new(),
stopped: AtomicBool::new(false),
session,
@@ -156,7 +156,7 @@ impl Channel {
}
/// Stops the channel.
/// Notifies all subscribers that the channel has been closed in `handle_stop()`.
/// Notifies all publishers that the channel has been closed in `handle_stop()`.
pub async fn stop(&self) {
debug!(target: "net::channel::stop()", "START {:?}", self);
self.receive_task.stop().await;
@@ -172,7 +172,7 @@ impl Channel {
return Err(Error::ChannelStopped)
}
let sub = self.stop_subscriber.clone().subscribe().await;
let sub = self.stop_publisher.clone().subscribe().await;
debug!(target: "net::channel::subscribe_stop()", "END {:?}", self);
@@ -319,7 +319,7 @@ impl Channel {
Ok(()) => panic!("Channel task should never complete without error status"),
// Send this error to all channel subscribers
Err(e) => {
self.stop_subscriber.notify(Error::ChannelStopped).await;
self.stop_publisher.notify(Error::ChannelStopped).await;
self.message_subsystem.trigger_error(e).await;
}
}
@@ -370,7 +370,7 @@ impl Channel {
time: NanoTimestamp::current_time(),
});
// Send result to our subscribers
// Send result to our publishers
match self.message_subsystem.notify(&command, reader).await {
Ok(()) => {}
// If we're getting messages without dispatchers, it's spam.

View File

@@ -25,7 +25,7 @@ use url::Url;
use super::{settings::SettingsPtr, ChannelPtr};
use crate::{
system::{Subscriber, SubscriberPtr, Subscription},
system::{Publisher, PublisherPtr, Subscription},
util::{
file::{load_file, save_file},
path::expand_path,
@@ -777,7 +777,7 @@ impl HostContainer {
/// Main parent class for the management and manipulation of
/// hostlists. Keeps track of hosts and their current state via the
/// HostRegistry, and stores hostlists and associated methods in the
/// HostContainer. Also operates two subscribers to notify other parts
/// HostContainer. Also operates two publishers to notify other parts
/// of the code base when new channels have been created or new hosts
/// have been added to the hostlist.
pub struct Hosts {
@@ -787,11 +787,11 @@ pub struct Hosts {
/// Hostlists and associated methods.
pub container: HostContainer,
/// Subscriber listening for store updates
store_subscriber: SubscriberPtr<usize>,
/// Publisher listening for store updates
store_publisher: PublisherPtr<usize>,
/// Subscriber for notifications of new channels
pub(in crate::net) channel_subscriber: SubscriberPtr<Result<ChannelPtr>>,
/// Publisher for notifications of new channels
pub(in crate::net) channel_publisher: PublisherPtr<Result<ChannelPtr>>,
/// Keeps track of the last time a connection was made.
pub(in crate::net) last_connection: RwLock<Instant>,
@@ -809,8 +809,8 @@ impl Hosts {
Arc::new(Self {
registry: RwLock::new(HashMap::new()),
container: HostContainer::new(),
store_subscriber: Subscriber::new(),
channel_subscriber: Subscriber::new(),
store_publisher: Publisher::new(),
channel_publisher: Publisher::new(),
last_connection: RwLock::new(Instant::now()),
ipv6_available: Mutex::new(true),
settings,
@@ -818,7 +818,7 @@ impl Hosts {
}
/// Safely insert into the HostContainer. Filters the addresses first before storing and
/// notifies the subscriber. Must be called when first receiving greylist addresses.
/// notifies the publisher. Must be called when first receiving greylist addresses.
pub(in crate::net) async fn insert(&self, color: HostColor, addrs: &[(Url, u64)]) {
trace!(target: "net::hosts:insert()", "[START]");
@@ -849,7 +849,7 @@ impl Hosts {
self.unregister(addr).await;
}
self.store_subscriber.notify(addrs_len).await;
self.store_publisher.notify(addrs_len).await;
trace!(target: "net::hosts:insert()", "[END]");
}
@@ -984,18 +984,18 @@ impl Hosts {
self.try_register(address.clone(), HostState::Connected(channel.clone())).await.unwrap();
// Notify that channel processing was successful
self.channel_subscriber.notify(Ok(channel.clone())).await;
self.channel_publisher.notify(Ok(channel.clone())).await;
let mut last_online = self.last_connection.write().await;
*last_online = Instant::now();
}
pub async fn subscribe_store(&self) -> Subscription<usize> {
self.store_subscriber.clone().subscribe().await
self.store_publisher.clone().subscribe().await
}
pub async fn subscribe_channel(&self) -> Subscription<Result<ChannelPtr>> {
self.channel_subscriber.clone().subscribe().await
self.channel_publisher.clone().subscribe().await
}
// Verify whether a URL is local.

View File

@@ -33,8 +33,8 @@ pub type MessageSubscriptionId = u64;
type MessageResult<M> = Result<Arc<M>>;
/// A dispatcher that is unique to every [`Message`].
/// Maintains a list of subscribers that are subscribed to that
/// unique Message type and handles sending messages across these
/// Maintains a list of subscriptions to a unique Message
/// type and handles sending messages across these
/// subscriptions.
#[derive(Debug)]
struct MessageDispatcher<M: Message> {
@@ -53,7 +53,7 @@ impl<M: Message> MessageDispatcher<M> {
}
/// Subscribe to a channel.
/// Assigns a new ID and adds it to the list of subscribers.
/// Assigns a new ID and adds it to the list of subscriptions.
pub async fn subscribe(self: Arc<Self>) -> MessageSubscription<M> {
let (sender, recv_queue) = smol::channel::unbounded();
// Guard against overwriting
@@ -85,7 +85,7 @@ impl<M: Message> MessageDispatcher<M> {
let mut subs = self.subs.lock().await;
debug!(
target: "net::message_subscriber::_trigger_all()", "START msg={}({}), subs={}",
target: "net::message_publisher::_trigger_all()", "START msg={}({}), subs={}",
if message.is_ok() { "Ok" } else {"Err"},
M::NAME, subs.len(),
);
@@ -119,7 +119,7 @@ impl<M: Message> MessageDispatcher<M> {
}
debug!(
target: "net::message_subscriber::_trigger_all()", "END msg={}({}), subs={}",
target: "net::message_publisher::_trigger_all()", "END msg={}({}), subs={}",
if message.is_ok() { "Ok" } else { "Err" },
M::NAME, subs.len(),
);
@@ -210,7 +210,7 @@ impl<M: Message> MessageDispatcherInterface for MessageDispatcher<M> {
Err(err) => {
error!(
target: "net::message_subscriber::trigger()",
target: "net::message_publisher::trigger()",
"Unable to decode data. Dropping...: {}",
err,
);
@@ -219,7 +219,7 @@ impl<M: Message> MessageDispatcherInterface for MessageDispatcher<M> {
}
Err(err) => {
error!(
target: "net::message_subscriber::trigger()",
target: "net::message_publisher::trigger()",
"Unable to decode VarInt. Dropping...: {}",
err,
);
@@ -291,8 +291,8 @@ impl MessageSubsystem {
) -> Result<()> {
let Some(dispatcher) = self.dispatchers.lock().await.get(command).cloned() else {
warn!(
target: "net::message_subscriber::notify",
"message_subscriber::notify: Command '{}' did not find a dispatcher",
target: "net::message_publisher::notify",
"message_publisher::notify: Command '{}' did not find a dispatcher",
command,
);
return Err(Error::MissingDispatcher)

View File

@@ -31,23 +31,23 @@ pub use message::Message;
/// Generic publish/subscribe class that can dispatch any kind of message
/// to a subscribed list of dispatchers. Dispatchers subscribe to a single
/// message format of any type. This is a generalized version of the simple
/// publish-subscribe class in system::Subscriber.
/// publish-subscribe class in system::Publisher.
///
/// Message Subsystem also enables the creation of new message subsystems,
/// adding new dispatchers and clearing inactive channels.
///
/// Message Subsystem maintains a list of dispatchers, which is a generalized
/// version of a subscriber. Pub-sub is called on dispatchers through the
/// functions `subscribe` and `notify`. Whereas system::Subscriber only allows
/// version of a publisher. Pub-sub is called on dispatchers through the
/// functions `subscribe` and `notify`. Whereas system::Publisher only allows
/// messages of a single type, dispatchers can handle any kind of message. This
/// generic message is called a payload and is processed and decoded by the
/// Message Dispatcher.
///
/// The Message Dispatcher is a class of subscribers that implement a generic
/// The Message Dispatcher is a class of publishers that implement a generic
/// trait called Message Dispatcher Interface, which allows us to process any
/// kind of payload as a message.
pub mod message_subscriber;
pub use message_subscriber::MessageSubscription;
pub mod message_publisher;
pub use message_publisher::MessageSubscription;
/// Network transports, holds implementations of pluggable transports.
/// Exposes agnostic dialers and agnostic listeners.

View File

@@ -37,7 +37,7 @@ use super::{
settings::{Settings, SettingsPtr},
};
use crate::{
system::{ExecutorPtr, Subscriber, SubscriberPtr, Subscription},
system::{ExecutorPtr, Publisher, PublisherPtr, Subscription},
Result,
};
@@ -66,8 +66,8 @@ pub struct P2p {
session_seedsync: SeedSyncSessionPtr,
/// Enable network debugging
pub dnet_enabled: Mutex<bool>,
/// The subscriber for which we can give dnet info over
dnet_subscriber: SubscriberPtr<DnetEvent>,
/// The publisher for which we can give dnet info over
dnet_publisher: PublisherPtr<DnetEvent>,
}
impl P2p {
@@ -97,7 +97,7 @@ impl P2p {
session_seedsync: SeedSyncSession::new(),
dnet_enabled: Mutex::new(false),
dnet_subscriber: Subscriber::new(),
dnet_publisher: Publisher::new(),
});
self_.session_manual.p2p.init(self_.clone());
@@ -267,11 +267,11 @@ impl P2p {
/// Subscribe to dnet events
pub async fn dnet_subscribe(&self) -> Subscription<DnetEvent> {
self.dnet_subscriber.clone().subscribe().await
self.dnet_publisher.clone().subscribe().await
}
/// Send a dnet notification over the subscriber
/// Send a dnet notification over the publisher
pub(super) async fn dnet_notify(&self, event: DnetEvent) {
self.dnet_subscriber.notify(event).await;
self.dnet_publisher.notify(event).await;
}
}

View File

@@ -27,7 +27,7 @@ use super::{
channel::ChannelPtr,
hosts::{HostColor, HostsPtr},
message::{AddrsMessage, GetAddrsMessage},
message_subscriber::MessageSubscription,
message_publisher::MessageSubscription,
p2p::P2pPtr,
session::SESSION_OUTBOUND,
settings::SettingsPtr,

View File

@@ -30,7 +30,7 @@ use super::{
super::{
channel::ChannelPtr,
message::{PingMessage, PongMessage},
message_subscriber::MessageSubscription,
message_publisher::MessageSubscription,
p2p::P2pPtr,
settings::SettingsPtr,
},

View File

@@ -27,7 +27,7 @@ use super::{
channel::ChannelPtr,
hosts::{HostColor, HostsPtr},
message::{AddrsMessage, GetAddrsMessage},
message_subscriber::MessageSubscription,
message_publisher::MessageSubscription,
p2p::P2pPtr,
settings::SettingsPtr,
},

View File

@@ -31,7 +31,7 @@ use smol::{Executor, Timer};
use super::super::{
channel::ChannelPtr,
message::{VerackMessage, VersionMessage},
message_subscriber::MessageSubscription,
message_publisher::MessageSubscription,
settings::SettingsPtr,
};
use crate::{Error, Result};

View File

@@ -409,7 +409,7 @@ impl Slot {
self.p2p().hosts().try_register(addr.clone(), HostState::Suspend).await.unwrap();
// Notify that channel processing failed
self.p2p().hosts().channel_subscriber.notify(Err(Error::ConnectFailed)).await;
self.p2p().hosts().channel_publisher.notify(Err(Error::ConnectFailed)).await;
Err(Error::ConnectFailed)
}
@@ -592,7 +592,7 @@ impl PeerDiscoveryBase for PeerDiscovery {
// NOTE: not every call to subscribe() in net/ has a
// corresponding unsubscribe(). To do this we need async
// Drop. For now it's sufficient for subscribers to be
// Drop. For now it's sufficient for publishers to be
// de-allocated when the Session completes.
store_sub.unsubscribe().await;
} else {

View File

@@ -119,7 +119,7 @@ impl RefineSession {
match connector.connect(&addr).await {
Ok((url, channel)) => {
debug!(target: "net::refinery::handshake_node()", "Successfully created a channel with {}", url);
// First initialize the version protocol and its Version, Verack subscribers.
// First initialize the version protocol and its Version, Verack subscriptions.
let proto_ver = ProtocolVersion::new(channel.clone(), p2p.settings()).await;
debug!(target: "net::refinery::handshake_node()", "Performing handshake protocols with {}", url);

View File

@@ -29,7 +29,7 @@ use super::{
};
use crate::{
net::transport::{Dialer, PtStream},
system::{io_timeout, StoppableTask, StoppableTaskPtr, SubscriberPtr},
system::{io_timeout, PublisherPtr, StoppableTask, StoppableTaskPtr},
Error, Result,
};
@@ -204,7 +204,11 @@ impl RpcClient {
/// Listen instantiated client for notifications.
/// NOTE: Subscriber listeners must perform response handling.
pub async fn subscribe(&self, req: JsonRequest, sub: SubscriberPtr<JsonResult>) -> Result<()> {
pub async fn subscribe(
&self,
req: JsonRequest,
publisher: PublisherPtr<JsonResult>,
) -> Result<()> {
// Perform initial request
debug!(target: "rpc::client", "--> {}", req.stringify()?);
let req_id = req.id;
@@ -224,7 +228,7 @@ impl RpcClient {
JsonResult::Notification(ref n) => {
debug!(target: "rpc::client", "<-- {}", n.stringify()?);
self.req_skip_send.send(()).await?;
sub.notify(notification.clone()).await;
publisher.notify(notification.clone()).await;
continue
}

View File

@@ -24,7 +24,7 @@ use tinyjson::JsonValue;
use crate::{
error::RpcError,
system::{Subscriber, SubscriberPtr},
system::{Publisher, PublisherPtr},
Result,
};
@@ -501,19 +501,19 @@ impl TryFrom<&JsonValue> for JsonError {
pub struct JsonSubscriber {
/// Notification method
pub method: &'static str,
/// Notification subscriber
pub sub: SubscriberPtr<JsonNotification>,
/// Notification publisher
pub publisher: PublisherPtr<JsonNotification>,
}
impl JsonSubscriber {
pub fn new(method: &'static str) -> Self {
let sub = Subscriber::new();
Self { method, sub }
let publisher = Publisher::new();
Self { method, publisher }
}
/// Send a notification to the subscriber with the given JSON object
/// Send a notification to the publisher with the given JSON object
pub async fn notify(&self, params: JsonValue) {
let notification = JsonNotification::new(self.method, params);
self.sub.notify(notification).await;
self.publisher.notify(notification).await;
}
}

View File

@@ -159,7 +159,7 @@ pub async fn accept(
task.clone().start(
async move {
// Subscribe to the inner method subscriber
let subscription = subscriber.sub.subscribe().await;
let subscription = subscriber.publisher.subscribe().await;
loop {
// Listen for notifications
let notification = subscription.receive().await;
@@ -209,7 +209,7 @@ pub async fn accept(
task.clone().start(
async move {
// Start the subscriber loop
let subscription = subscriber.sub.subscribe().await;
let subscription = subscriber.publisher.subscribe().await;
loop {
// Listen for notifications
let notification = subscription.receive().await;

View File

@@ -34,8 +34,8 @@ pub mod stoppable_task;
pub use stoppable_task::{StoppableTask, StoppableTaskPtr};
/// Simple broadcast (publish-subscribe) class
pub mod subscriber;
pub use subscriber::{Subscriber, SubscriberPtr, Subscription};
pub mod publisher;
pub use publisher::{Publisher, PublisherPtr, Subscription};
/// Async timeout implementations
pub mod timeout;

View File

@@ -22,15 +22,15 @@ use log::warn;
use rand::{rngs::OsRng, Rng};
use smol::lock::Mutex;
pub type SubscriberPtr<T> = Arc<Subscriber<T>>;
pub type PublisherPtr<T> = Arc<Publisher<T>>;
pub type SubscriptionId = usize;
#[derive(Debug)]
/// Subscription to the Subscriber. Created using `subscriber.subscribe().await`.
/// Subscription to the Publisher. Created using `publisher.subscribe().await`.
pub struct Subscription<T> {
id: SubscriptionId,
recv_queue: smol::channel::Receiver<T>,
parent: Arc<Subscriber<T>>,
parent: Arc<Publisher<T>>,
}
impl<T: Clone> Subscription<T> {
@@ -58,12 +58,12 @@ impl<T: Clone> Subscription<T> {
/// Simple broadcast (publish-subscribe) class.
#[derive(Debug)]
pub struct Subscriber<T> {
pub struct Publisher<T> {
subs: Mutex<HashMap<SubscriptionId, smol::channel::Sender<T>>>,
}
impl<T: Clone> Subscriber<T> {
/// Construct a new subscriber.
impl<T: Clone> Publisher<T> {
/// Construct a new publisher.
pub fn new() -> Arc<Self> {
Arc::new(Self { subs: Mutex::new(HashMap::new()) })
}
@@ -109,8 +109,8 @@ impl<T: Clone> Subscriber<T> {
if let Err(e) = sub.send(message_result.clone()).await {
warn!(
target: "system::subscriber",
"[system::subscriber] Error returned sending message in notify_with_exclude() call! {}", e,
target: "system::publisher",
"[system::publisher] Error returned sending message in notify_with_exclude() call! {}", e,
);
}
}

View File

@@ -223,8 +223,8 @@ macro_rules! async_daemonize {
term_rx: smol::channel::Receiver<()>,
/// Signals handle
handle: signal_hook_async_std::Handle,
/// SIGHUP subscriber to retrieve new configuration,
sighup_sub: darkfi::system::SubscriberPtr<Args>,
/// SIGHUP publisher to retrieve new configuration,
sighup_pub: darkfi::system::PublisherPtr<Args>,
}
impl SignalHandler {
@@ -239,10 +239,10 @@ macro_rules! async_daemonize {
signal_hook::consts::SIGQUIT,
])?;
let handle = signals.handle();
let sighup_sub = darkfi::system::Subscriber::new();
let signals_task = ex.spawn(handle_signals(signals, term_tx, sighup_sub.clone()));
let sighup_pub = darkfi::system::Publisher::new();
let signals_task = ex.spawn(handle_signals(signals, term_tx, sighup_pub.clone()));
Ok((Self { term_rx, handle, sighup_sub }, signals_task))
Ok((Self { term_rx, handle, sighup_pub }, signals_task))
}
/// Handler waits for termination signal
@@ -260,7 +260,7 @@ macro_rules! async_daemonize {
async fn handle_signals(
mut signals: signal_hook_async_std::Signals,
term_tx: smol::channel::Sender<()>,
subscriber: darkfi::system::SubscriberPtr<Args>,
publisher: darkfi::system::PublisherPtr<Args>,
) -> Result<()> {
while let Some(signal) = signals.next().await {
match signal {
@@ -277,7 +277,7 @@ macro_rules! async_daemonize {
println!("handle_signals():: Error parsing the config file");
continue
}
subscriber.notify(args.unwrap()).await;
publisher.notify(args.unwrap()).await;
}
signal_hook::consts::SIGTERM |
signal_hook::consts::SIGINT |