diff --git a/bin/darkfid/src/task/consensus.rs b/bin/darkfid/src/task/consensus.rs index 1cf8e563d..ff68ae188 100644 --- a/bin/darkfid/src/task/consensus.rs +++ b/bin/darkfid/src/task/consensus.rs @@ -107,7 +107,7 @@ pub async fn replicator_task(node: Arc, ex: Arc // Grab proposals subscriber and subscribe to it let proposals_sub = node.subscribers.get("proposals").unwrap(); - let subscription = proposals_sub.sub.clone().subscribe().await; + let subscription = proposals_sub.publisher.clone().subscribe().await; // Create the garbage collection task using a dummy task let gc_task = StoppableTask::new(); diff --git a/bin/darkfid/src/task/miner.rs b/bin/darkfid/src/task/miner.rs index b36f4aa87..629e8f503 100644 --- a/bin/darkfid/src/task/miner.rs +++ b/bin/darkfid/src/task/miner.rs @@ -93,7 +93,7 @@ pub async fn miner_task( // Grab proposals subscriber and subscribe to it let proposals_sub = node.subscribers.get("proposals").unwrap(); - let subscription = proposals_sub.sub.clone().subscribe().await; + let subscription = proposals_sub.publisher.clone().subscribe().await; // Listen for blocks until next finalization, for optimal conditions if !skip_sync { diff --git a/bin/darkirc/src/irc/server.rs b/bin/darkirc/src/irc/server.rs index f98f90b0e..27613bdf8 100644 --- a/bin/darkirc/src/irc/server.rs +++ b/bin/darkirc/src/irc/server.rs @@ -202,7 +202,7 @@ impl IrcServer { }; // Subscribe to incoming events and set up the connection. - let incoming = self.darkirc.event_graph.event_sub.clone().subscribe().await; + let incoming = self.darkirc.event_graph.event_pub.clone().subscribe().await; if let Err(e) = self .clone() .process_connection(stream, peer_addr, incoming, ex.clone()) @@ -216,7 +216,7 @@ impl IrcServer { // Expecting plain TCP connection None => { // Subscribe to incoming events and set up the connection. - let incoming = self.darkirc.event_graph.event_sub.clone().subscribe().await; + let incoming = self.darkirc.event_graph.event_pub.clone().subscribe().await; if let Err(e) = self .clone() .process_connection(stream, peer_addr, incoming, ex.clone()) diff --git a/bin/drk/src/rpc.rs b/bin/drk/src/rpc.rs index 772ddf619..ab7a3a7e9 100644 --- a/bin/drk/src/rpc.rs +++ b/bin/drk/src/rpc.rs @@ -27,7 +27,7 @@ use darkfi::{ jsonrpc::{JsonRequest, JsonResult}, util::JsonValue, }, - system::{StoppableTask, Subscriber}, + system::{Publisher, StoppableTask}, tx::Transaction, util::encoding::base64, Error, Result, @@ -76,8 +76,8 @@ impl Drk { } println!("Subscribing to receive notifications of incoming blocks"); - let subscriber = Subscriber::new(); - let subscription = subscriber.clone().subscribe().await; + let publisher = Publisher::new(); + let subscription = publisher.clone().subscribe().await; let _ex = ex.clone(); StoppableTask::new().start( // Weird hack to prevent lifetimes hell @@ -85,7 +85,7 @@ impl Drk { let ex = _ex.clone(); let rpc_client = RpcClient::new(endpoint, ex).await?; let req = JsonRequest::new("blockchain.subscribe_blocks", JsonValue::Array(vec![])); - rpc_client.subscribe(req, subscriber).await + rpc_client.subscribe(req, publisher).await }, |res| async move { match res { diff --git a/bin/genev/genevd/src/main.rs b/bin/genev/genevd/src/main.rs index b2d26dd5e..7b9200bf2 100644 --- a/bin/genev/genevd/src/main.rs +++ b/bin/genev/genevd/src/main.rs @@ -85,7 +85,7 @@ async fn start_sync_loop( last_sent: RwLock, seen: OnceLock, ) -> Result<()> { - let incoming = event_graph.event_sub.clone().subscribe().await; + let incoming = event_graph.event_pub.clone().subscribe().await; let seen_events = seen.get().unwrap(); loop { let event = incoming.receive().await; diff --git a/bin/tau/taud/src/main.rs b/bin/tau/taud/src/main.rs index e572869a0..60bfe4773 100644 --- a/bin/tau/taud/src/main.rs +++ b/bin/tau/taud/src/main.rs @@ -155,7 +155,7 @@ async fn start_sync_loop( last_sent: RwLock, seen: OnceLock, ) -> TaudResult<()> { - let incoming = event_graph.event_sub.clone().subscribe().await; + let incoming = event_graph.event_pub.clone().subscribe().await; let seen_events = seen.get().unwrap(); loop { diff --git a/src/event_graph/mod.rs b/src/event_graph/mod.rs index d839e6b59..a874f41ef 100644 --- a/src/event_graph/mod.rs +++ b/src/event_graph/mod.rs @@ -43,7 +43,7 @@ use crate::{ util::json_map, }, system::{ - sleep, timeout::timeout, StoppableTask, StoppableTaskPtr, Subscriber, SubscriberPtr, + sleep, timeout::timeout, Publisher, PublisherPtr, StoppableTask, StoppableTaskPtr, Subscription, }, Error, Result, @@ -104,9 +104,9 @@ pub struct EventGraph { broadcasted_ids: RwLock>, /// DAG Pruning Task prune_task: OnceCell, - /// Event subscriber, this notifies whenever an event is + /// Event publisher, this notifies whenever an event is /// inserted into the DAG - pub event_sub: SubscriberPtr, + pub event_pub: PublisherPtr, /// Current genesis event current_genesis: RwLock, /// Currently configured DAG rotation, in days @@ -115,8 +115,8 @@ pub struct EventGraph { pub synced: RwLock, /// Enable graph debugging pub deg_enabled: RwLock, - /// The subscriber for which we can give deg info over - deg_subscriber: SubscriberPtr, + /// The publisher for which we can give deg info over + deg_publisher: PublisherPtr, } impl EventGraph { @@ -134,7 +134,7 @@ impl EventGraph { let dag = sled_db.open_tree(dag_tree_name)?; let unreferenced_tips = RwLock::new(BTreeMap::new()); let broadcasted_ids = RwLock::new(HashSet::new()); - let event_sub = Subscriber::new(); + let event_pub = Publisher::new(); // Create the current genesis event based on the `days_rotation` let current_genesis = generate_genesis(days_rotation); @@ -146,12 +146,12 @@ impl EventGraph { unreferenced_tips, broadcasted_ids, prune_task: OnceCell::new(), - event_sub, + event_pub, current_genesis: RwLock::new(current_genesis.clone()), days_rotation, synced: RwLock::new(false), deg_enabled: RwLock::new(false), - deg_subscriber: Subscriber::new(), + deg_publisher: Publisher::new(), }); // Check if we have it in our DAG. @@ -640,7 +640,7 @@ impl EventGraph { } // Send out notifications about the new event - self.event_sub.notify(event.clone()).await; + self.event_pub.notify(event.clone()).await; } // Drop the exclusive locks @@ -832,12 +832,12 @@ impl EventGraph { /// Subscribe to deg events pub async fn deg_subscribe(&self) -> Subscription { - self.deg_subscriber.clone().subscribe().await + self.deg_publisher.clone().subscribe().await } - /// Send a deg notification over the subscriber + /// Send a deg notification over the publisher pub async fn deg_notify(&self, event: DegEvent) { - self.deg_subscriber.notify(event).await; + self.deg_publisher.notify(event).await; } pub async fn eventgraph_info(&self, id: u16, _params: JsonValue) -> JsonResult { diff --git a/src/net/acceptor.rs b/src/net/acceptor.rs index f52612bda..88d01b9b6 100644 --- a/src/net/acceptor.rs +++ b/src/net/acceptor.rs @@ -35,7 +35,7 @@ use super::{ transport::{Listener, PtListener}, }; use crate::{ - system::{CondVar, StoppableTask, StoppableTaskPtr, Subscriber, SubscriberPtr, Subscription}, + system::{CondVar, Publisher, PublisherPtr, StoppableTask, StoppableTaskPtr, Subscription}, Error, Result, }; @@ -44,7 +44,7 @@ pub type AcceptorPtr = Arc; /// Create inbound socket connections pub struct Acceptor { - channel_subscriber: SubscriberPtr>, + channel_publisher: PublisherPtr>, task: StoppableTaskPtr, session: SessionWeakPtr, conn_count: AtomicUsize, @@ -54,7 +54,7 @@ impl Acceptor { /// Create new Acceptor object. pub fn new(session: SessionWeakPtr) -> AcceptorPtr { Arc::new(Self { - channel_subscriber: Subscriber::new(), + channel_publisher: Publisher::new(), task: StoppableTask::new(), session, conn_count: AtomicUsize::new(0), @@ -76,7 +76,7 @@ impl Acceptor { /// Start receiving network messages. pub async fn subscribe(self: Arc) -> Subscription> { - self.channel_subscriber.clone().subscribe().await + self.channel_publisher.clone().subscribe().await } /// Run the accept loop in a new thread and error if a connection problem occurs @@ -147,8 +147,8 @@ impl Acceptor { }) .detach(); - // Finally, notify any subscribers about the new channel. - self.channel_subscriber.notify(Ok(channel)).await; + // Finally, notify any publishers about the new channel. + self.channel_publisher.notify(Ok(channel)).await; } // As per accept(2) recommendation: @@ -223,11 +223,11 @@ impl Acceptor { } /// Handles network errors. Panics if errors pass silently, otherwise broadcasts it - /// to all channel subscribers. + /// to all channel publishers. async fn handle_stop(self: Arc, result: Result<()>) { match result { Ok(()) => panic!("Acceptor task should never complete without error status"), - Err(err) => self.channel_subscriber.notify(Err(err)).await, + Err(err) => self.channel_publisher.notify(Err(err)).await, } } } diff --git a/src/net/channel.rs b/src/net/channel.rs index 8d01ed96d..40493cd58 100644 --- a/src/net/channel.rs +++ b/src/net/channel.rs @@ -42,13 +42,13 @@ use super::{ hosts::HostColor, message, message::{VersionMessage, MAGIC_BYTES}, - message_subscriber::{MessageSubscription, MessageSubsystem}, + message_publisher::{MessageSubscription, MessageSubsystem}, p2p::P2pPtr, session::{Session, SessionBitFlag, SessionWeakPtr, SESSION_ALL, SESSION_REFINE}, transport::PtStream, }; use crate::{ - system::{StoppableTask, StoppableTaskPtr, Subscriber, SubscriberPtr, Subscription}, + system::{Publisher, PublisherPtr, StoppableTask, StoppableTaskPtr, Subscription}, util::time::NanoTimestamp, Error, Result, }; @@ -79,8 +79,8 @@ pub struct Channel { writer: Mutex>>, /// The message subsystem instance for this channel message_subsystem: MessageSubsystem, - /// Subscriber listening for stop signal for closing this channel - stop_subscriber: SubscriberPtr, + /// Publisher listening for stop signal for closing this channel + stop_publisher: PublisherPtr, /// Task that is listening for the stop signal receive_task: StoppableTaskPtr, /// A boolean marking if this channel is stopped @@ -97,7 +97,7 @@ pub struct Channel { impl Channel { /// Sets up a new channel. Creates a reader and writer [`PtStream`] and - /// the message subscriber subsystem. Performs a network handshake on the + /// the message publisher subsystem. Performs a network handshake on the /// subsystem dispatchers. pub async fn new( stream: Box, @@ -120,7 +120,7 @@ impl Channel { reader, writer, message_subsystem, - stop_subscriber: Subscriber::new(), + stop_publisher: Publisher::new(), receive_task: StoppableTask::new(), stopped: AtomicBool::new(false), session, @@ -156,7 +156,7 @@ impl Channel { } /// Stops the channel. - /// Notifies all subscribers that the channel has been closed in `handle_stop()`. + /// Notifies all publishers that the channel has been closed in `handle_stop()`. pub async fn stop(&self) { debug!(target: "net::channel::stop()", "START {:?}", self); self.receive_task.stop().await; @@ -172,7 +172,7 @@ impl Channel { return Err(Error::ChannelStopped) } - let sub = self.stop_subscriber.clone().subscribe().await; + let sub = self.stop_publisher.clone().subscribe().await; debug!(target: "net::channel::subscribe_stop()", "END {:?}", self); @@ -319,7 +319,7 @@ impl Channel { Ok(()) => panic!("Channel task should never complete without error status"), // Send this error to all channel subscribers Err(e) => { - self.stop_subscriber.notify(Error::ChannelStopped).await; + self.stop_publisher.notify(Error::ChannelStopped).await; self.message_subsystem.trigger_error(e).await; } } @@ -370,7 +370,7 @@ impl Channel { time: NanoTimestamp::current_time(), }); - // Send result to our subscribers + // Send result to our publishers match self.message_subsystem.notify(&command, reader).await { Ok(()) => {} // If we're getting messages without dispatchers, it's spam. diff --git a/src/net/hosts.rs b/src/net/hosts.rs index e00c64052..e6e66fde1 100644 --- a/src/net/hosts.rs +++ b/src/net/hosts.rs @@ -25,7 +25,7 @@ use url::Url; use super::{settings::SettingsPtr, ChannelPtr}; use crate::{ - system::{Subscriber, SubscriberPtr, Subscription}, + system::{Publisher, PublisherPtr, Subscription}, util::{ file::{load_file, save_file}, path::expand_path, @@ -777,7 +777,7 @@ impl HostContainer { /// Main parent class for the management and manipulation of /// hostlists. Keeps track of hosts and their current state via the /// HostRegistry, and stores hostlists and associated methods in the -/// HostContainer. Also operates two subscribers to notify other parts +/// HostContainer. Also operates two publishers to notify other parts /// of the code base when new channels have been created or new hosts /// have been added to the hostlist. pub struct Hosts { @@ -787,11 +787,11 @@ pub struct Hosts { /// Hostlists and associated methods. pub container: HostContainer, - /// Subscriber listening for store updates - store_subscriber: SubscriberPtr, + /// Publisher listening for store updates + store_publisher: PublisherPtr, - /// Subscriber for notifications of new channels - pub(in crate::net) channel_subscriber: SubscriberPtr>, + /// Publisher for notifications of new channels + pub(in crate::net) channel_publisher: PublisherPtr>, /// Keeps track of the last time a connection was made. pub(in crate::net) last_connection: RwLock, @@ -809,8 +809,8 @@ impl Hosts { Arc::new(Self { registry: RwLock::new(HashMap::new()), container: HostContainer::new(), - store_subscriber: Subscriber::new(), - channel_subscriber: Subscriber::new(), + store_publisher: Publisher::new(), + channel_publisher: Publisher::new(), last_connection: RwLock::new(Instant::now()), ipv6_available: Mutex::new(true), settings, @@ -818,7 +818,7 @@ impl Hosts { } /// Safely insert into the HostContainer. Filters the addresses first before storing and - /// notifies the subscriber. Must be called when first receiving greylist addresses. + /// notifies the publisher. Must be called when first receiving greylist addresses. pub(in crate::net) async fn insert(&self, color: HostColor, addrs: &[(Url, u64)]) { trace!(target: "net::hosts:insert()", "[START]"); @@ -849,7 +849,7 @@ impl Hosts { self.unregister(addr).await; } - self.store_subscriber.notify(addrs_len).await; + self.store_publisher.notify(addrs_len).await; trace!(target: "net::hosts:insert()", "[END]"); } @@ -984,18 +984,18 @@ impl Hosts { self.try_register(address.clone(), HostState::Connected(channel.clone())).await.unwrap(); // Notify that channel processing was successful - self.channel_subscriber.notify(Ok(channel.clone())).await; + self.channel_publisher.notify(Ok(channel.clone())).await; let mut last_online = self.last_connection.write().await; *last_online = Instant::now(); } pub async fn subscribe_store(&self) -> Subscription { - self.store_subscriber.clone().subscribe().await + self.store_publisher.clone().subscribe().await } pub async fn subscribe_channel(&self) -> Subscription> { - self.channel_subscriber.clone().subscribe().await + self.channel_publisher.clone().subscribe().await } // Verify whether a URL is local. diff --git a/src/net/message_subscriber.rs b/src/net/message_publisher.rs similarity index 94% rename from src/net/message_subscriber.rs rename to src/net/message_publisher.rs index c93aa5379..8c1a7e909 100644 --- a/src/net/message_subscriber.rs +++ b/src/net/message_publisher.rs @@ -33,8 +33,8 @@ pub type MessageSubscriptionId = u64; type MessageResult = Result>; /// A dispatcher that is unique to every [`Message`]. -/// Maintains a list of subscribers that are subscribed to that -/// unique Message type and handles sending messages across these +/// Maintains a list of subscriptions to a unique Message +/// type and handles sending messages across these /// subscriptions. #[derive(Debug)] struct MessageDispatcher { @@ -53,7 +53,7 @@ impl MessageDispatcher { } /// Subscribe to a channel. - /// Assigns a new ID and adds it to the list of subscribers. + /// Assigns a new ID and adds it to the list of subscriptions. pub async fn subscribe(self: Arc) -> MessageSubscription { let (sender, recv_queue) = smol::channel::unbounded(); // Guard against overwriting @@ -85,7 +85,7 @@ impl MessageDispatcher { let mut subs = self.subs.lock().await; debug!( - target: "net::message_subscriber::_trigger_all()", "START msg={}({}), subs={}", + target: "net::message_publisher::_trigger_all()", "START msg={}({}), subs={}", if message.is_ok() { "Ok" } else {"Err"}, M::NAME, subs.len(), ); @@ -119,7 +119,7 @@ impl MessageDispatcher { } debug!( - target: "net::message_subscriber::_trigger_all()", "END msg={}({}), subs={}", + target: "net::message_publisher::_trigger_all()", "END msg={}({}), subs={}", if message.is_ok() { "Ok" } else { "Err" }, M::NAME, subs.len(), ); @@ -210,7 +210,7 @@ impl MessageDispatcherInterface for MessageDispatcher { Err(err) => { error!( - target: "net::message_subscriber::trigger()", + target: "net::message_publisher::trigger()", "Unable to decode data. Dropping...: {}", err, ); @@ -219,7 +219,7 @@ impl MessageDispatcherInterface for MessageDispatcher { } Err(err) => { error!( - target: "net::message_subscriber::trigger()", + target: "net::message_publisher::trigger()", "Unable to decode VarInt. Dropping...: {}", err, ); @@ -291,8 +291,8 @@ impl MessageSubsystem { ) -> Result<()> { let Some(dispatcher) = self.dispatchers.lock().await.get(command).cloned() else { warn!( - target: "net::message_subscriber::notify", - "message_subscriber::notify: Command '{}' did not find a dispatcher", + target: "net::message_publisher::notify", + "message_publisher::notify: Command '{}' did not find a dispatcher", command, ); return Err(Error::MissingDispatcher) diff --git a/src/net/mod.rs b/src/net/mod.rs index 5e9620354..dc3391127 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -31,23 +31,23 @@ pub use message::Message; /// Generic publish/subscribe class that can dispatch any kind of message /// to a subscribed list of dispatchers. Dispatchers subscribe to a single /// message format of any type. This is a generalized version of the simple -/// publish-subscribe class in system::Subscriber. +/// publish-subscribe class in system::Publisher. /// /// Message Subsystem also enables the creation of new message subsystems, /// adding new dispatchers and clearing inactive channels. /// /// Message Subsystem maintains a list of dispatchers, which is a generalized -/// version of a subscriber. Pub-sub is called on dispatchers through the -/// functions `subscribe` and `notify`. Whereas system::Subscriber only allows +/// version of a publisher. Pub-sub is called on dispatchers through the +/// functions `subscribe` and `notify`. Whereas system::Publisher only allows /// messages of a single type, dispatchers can handle any kind of message. This /// generic message is called a payload and is processed and decoded by the /// Message Dispatcher. /// -/// The Message Dispatcher is a class of subscribers that implement a generic +/// The Message Dispatcher is a class of publishers that implement a generic /// trait called Message Dispatcher Interface, which allows us to process any /// kind of payload as a message. -pub mod message_subscriber; -pub use message_subscriber::MessageSubscription; +pub mod message_publisher; +pub use message_publisher::MessageSubscription; /// Network transports, holds implementations of pluggable transports. /// Exposes agnostic dialers and agnostic listeners. diff --git a/src/net/p2p.rs b/src/net/p2p.rs index a137a8a35..531ebebbe 100644 --- a/src/net/p2p.rs +++ b/src/net/p2p.rs @@ -37,7 +37,7 @@ use super::{ settings::{Settings, SettingsPtr}, }; use crate::{ - system::{ExecutorPtr, Subscriber, SubscriberPtr, Subscription}, + system::{ExecutorPtr, Publisher, PublisherPtr, Subscription}, Result, }; @@ -66,8 +66,8 @@ pub struct P2p { session_seedsync: SeedSyncSessionPtr, /// Enable network debugging pub dnet_enabled: Mutex, - /// The subscriber for which we can give dnet info over - dnet_subscriber: SubscriberPtr, + /// The publisher for which we can give dnet info over + dnet_publisher: PublisherPtr, } impl P2p { @@ -97,7 +97,7 @@ impl P2p { session_seedsync: SeedSyncSession::new(), dnet_enabled: Mutex::new(false), - dnet_subscriber: Subscriber::new(), + dnet_publisher: Publisher::new(), }); self_.session_manual.p2p.init(self_.clone()); @@ -267,11 +267,11 @@ impl P2p { /// Subscribe to dnet events pub async fn dnet_subscribe(&self) -> Subscription { - self.dnet_subscriber.clone().subscribe().await + self.dnet_publisher.clone().subscribe().await } - /// Send a dnet notification over the subscriber + /// Send a dnet notification over the publisher pub(super) async fn dnet_notify(&self, event: DnetEvent) { - self.dnet_subscriber.notify(event).await; + self.dnet_publisher.notify(event).await; } } diff --git a/src/net/protocol/protocol_address.rs b/src/net/protocol/protocol_address.rs index bbdd711fb..a62b05ef8 100644 --- a/src/net/protocol/protocol_address.rs +++ b/src/net/protocol/protocol_address.rs @@ -27,7 +27,7 @@ use super::{ channel::ChannelPtr, hosts::{HostColor, HostsPtr}, message::{AddrsMessage, GetAddrsMessage}, - message_subscriber::MessageSubscription, + message_publisher::MessageSubscription, p2p::P2pPtr, session::SESSION_OUTBOUND, settings::SettingsPtr, diff --git a/src/net/protocol/protocol_ping.rs b/src/net/protocol/protocol_ping.rs index 8a5641cd1..24e36610c 100644 --- a/src/net/protocol/protocol_ping.rs +++ b/src/net/protocol/protocol_ping.rs @@ -30,7 +30,7 @@ use super::{ super::{ channel::ChannelPtr, message::{PingMessage, PongMessage}, - message_subscriber::MessageSubscription, + message_publisher::MessageSubscription, p2p::P2pPtr, settings::SettingsPtr, }, diff --git a/src/net/protocol/protocol_seed.rs b/src/net/protocol/protocol_seed.rs index 3e89eed5e..659b0eb64 100644 --- a/src/net/protocol/protocol_seed.rs +++ b/src/net/protocol/protocol_seed.rs @@ -27,7 +27,7 @@ use super::{ channel::ChannelPtr, hosts::{HostColor, HostsPtr}, message::{AddrsMessage, GetAddrsMessage}, - message_subscriber::MessageSubscription, + message_publisher::MessageSubscription, p2p::P2pPtr, settings::SettingsPtr, }, diff --git a/src/net/protocol/protocol_version.rs b/src/net/protocol/protocol_version.rs index 5c8cbd813..f02551869 100644 --- a/src/net/protocol/protocol_version.rs +++ b/src/net/protocol/protocol_version.rs @@ -31,7 +31,7 @@ use smol::{Executor, Timer}; use super::super::{ channel::ChannelPtr, message::{VerackMessage, VersionMessage}, - message_subscriber::MessageSubscription, + message_publisher::MessageSubscription, settings::SettingsPtr, }; use crate::{Error, Result}; diff --git a/src/net/session/outbound_session.rs b/src/net/session/outbound_session.rs index cebca7aca..0bd5058f9 100644 --- a/src/net/session/outbound_session.rs +++ b/src/net/session/outbound_session.rs @@ -409,7 +409,7 @@ impl Slot { self.p2p().hosts().try_register(addr.clone(), HostState::Suspend).await.unwrap(); // Notify that channel processing failed - self.p2p().hosts().channel_subscriber.notify(Err(Error::ConnectFailed)).await; + self.p2p().hosts().channel_publisher.notify(Err(Error::ConnectFailed)).await; Err(Error::ConnectFailed) } @@ -592,7 +592,7 @@ impl PeerDiscoveryBase for PeerDiscovery { // NOTE: not every call to subscribe() in net/ has a // corresponding unsubscribe(). To do this we need async - // Drop. For now it's sufficient for subscribers to be + // Drop. For now it's sufficient for publishers to be // de-allocated when the Session completes. store_sub.unsubscribe().await; } else { diff --git a/src/net/session/refine_session.rs b/src/net/session/refine_session.rs index 7f389b911..0e24faf99 100644 --- a/src/net/session/refine_session.rs +++ b/src/net/session/refine_session.rs @@ -119,7 +119,7 @@ impl RefineSession { match connector.connect(&addr).await { Ok((url, channel)) => { debug!(target: "net::refinery::handshake_node()", "Successfully created a channel with {}", url); - // First initialize the version protocol and its Version, Verack subscribers. + // First initialize the version protocol and its Version, Verack subscriptions. let proto_ver = ProtocolVersion::new(channel.clone(), p2p.settings()).await; debug!(target: "net::refinery::handshake_node()", "Performing handshake protocols with {}", url); diff --git a/src/rpc/client.rs b/src/rpc/client.rs index 3b0c25d98..3c77a9c31 100644 --- a/src/rpc/client.rs +++ b/src/rpc/client.rs @@ -29,7 +29,7 @@ use super::{ }; use crate::{ net::transport::{Dialer, PtStream}, - system::{io_timeout, StoppableTask, StoppableTaskPtr, SubscriberPtr}, + system::{io_timeout, PublisherPtr, StoppableTask, StoppableTaskPtr}, Error, Result, }; @@ -204,7 +204,11 @@ impl RpcClient { /// Listen instantiated client for notifications. /// NOTE: Subscriber listeners must perform response handling. - pub async fn subscribe(&self, req: JsonRequest, sub: SubscriberPtr) -> Result<()> { + pub async fn subscribe( + &self, + req: JsonRequest, + publisher: PublisherPtr, + ) -> Result<()> { // Perform initial request debug!(target: "rpc::client", "--> {}", req.stringify()?); let req_id = req.id; @@ -224,7 +228,7 @@ impl RpcClient { JsonResult::Notification(ref n) => { debug!(target: "rpc::client", "<-- {}", n.stringify()?); self.req_skip_send.send(()).await?; - sub.notify(notification.clone()).await; + publisher.notify(notification.clone()).await; continue } diff --git a/src/rpc/jsonrpc.rs b/src/rpc/jsonrpc.rs index 7447751b4..dfbcb15a7 100644 --- a/src/rpc/jsonrpc.rs +++ b/src/rpc/jsonrpc.rs @@ -24,7 +24,7 @@ use tinyjson::JsonValue; use crate::{ error::RpcError, - system::{Subscriber, SubscriberPtr}, + system::{Publisher, PublisherPtr}, Result, }; @@ -501,19 +501,19 @@ impl TryFrom<&JsonValue> for JsonError { pub struct JsonSubscriber { /// Notification method pub method: &'static str, - /// Notification subscriber - pub sub: SubscriberPtr, + /// Notification publisher + pub publisher: PublisherPtr, } impl JsonSubscriber { pub fn new(method: &'static str) -> Self { - let sub = Subscriber::new(); - Self { method, sub } + let publisher = Publisher::new(); + Self { method, publisher } } - /// Send a notification to the subscriber with the given JSON object + /// Send a notification to the publisher with the given JSON object pub async fn notify(&self, params: JsonValue) { let notification = JsonNotification::new(self.method, params); - self.sub.notify(notification).await; + self.publisher.notify(notification).await; } } diff --git a/src/rpc/server.rs b/src/rpc/server.rs index cbc18f256..4240dc9b4 100644 --- a/src/rpc/server.rs +++ b/src/rpc/server.rs @@ -159,7 +159,7 @@ pub async fn accept( task.clone().start( async move { // Subscribe to the inner method subscriber - let subscription = subscriber.sub.subscribe().await; + let subscription = subscriber.publisher.subscribe().await; loop { // Listen for notifications let notification = subscription.receive().await; @@ -209,7 +209,7 @@ pub async fn accept( task.clone().start( async move { // Start the subscriber loop - let subscription = subscriber.sub.subscribe().await; + let subscription = subscriber.publisher.subscribe().await; loop { // Listen for notifications let notification = subscription.receive().await; diff --git a/src/system/mod.rs b/src/system/mod.rs index 763394257..44072fc0f 100644 --- a/src/system/mod.rs +++ b/src/system/mod.rs @@ -34,8 +34,8 @@ pub mod stoppable_task; pub use stoppable_task::{StoppableTask, StoppableTaskPtr}; /// Simple broadcast (publish-subscribe) class -pub mod subscriber; -pub use subscriber::{Subscriber, SubscriberPtr, Subscription}; +pub mod publisher; +pub use publisher::{Publisher, PublisherPtr, Subscription}; /// Async timeout implementations pub mod timeout; diff --git a/src/system/subscriber.rs b/src/system/publisher.rs similarity index 89% rename from src/system/subscriber.rs rename to src/system/publisher.rs index 7bb80c9c8..c164459cf 100644 --- a/src/system/subscriber.rs +++ b/src/system/publisher.rs @@ -22,15 +22,15 @@ use log::warn; use rand::{rngs::OsRng, Rng}; use smol::lock::Mutex; -pub type SubscriberPtr = Arc>; +pub type PublisherPtr = Arc>; pub type SubscriptionId = usize; #[derive(Debug)] -/// Subscription to the Subscriber. Created using `subscriber.subscribe().await`. +/// Subscription to the Publisher. Created using `publisher.subscribe().await`. pub struct Subscription { id: SubscriptionId, recv_queue: smol::channel::Receiver, - parent: Arc>, + parent: Arc>, } impl Subscription { @@ -58,12 +58,12 @@ impl Subscription { /// Simple broadcast (publish-subscribe) class. #[derive(Debug)] -pub struct Subscriber { +pub struct Publisher { subs: Mutex>>, } -impl Subscriber { - /// Construct a new subscriber. +impl Publisher { + /// Construct a new publisher. pub fn new() -> Arc { Arc::new(Self { subs: Mutex::new(HashMap::new()) }) } @@ -109,8 +109,8 @@ impl Subscriber { if let Err(e) = sub.send(message_result.clone()).await { warn!( - target: "system::subscriber", - "[system::subscriber] Error returned sending message in notify_with_exclude() call! {}", e, + target: "system::publisher", + "[system::publisher] Error returned sending message in notify_with_exclude() call! {}", e, ); } } diff --git a/src/util/cli.rs b/src/util/cli.rs index ab98a8541..86d6c024c 100644 --- a/src/util/cli.rs +++ b/src/util/cli.rs @@ -223,8 +223,8 @@ macro_rules! async_daemonize { term_rx: smol::channel::Receiver<()>, /// Signals handle handle: signal_hook_async_std::Handle, - /// SIGHUP subscriber to retrieve new configuration, - sighup_sub: darkfi::system::SubscriberPtr, + /// SIGHUP publisher to retrieve new configuration, + sighup_pub: darkfi::system::PublisherPtr, } impl SignalHandler { @@ -239,10 +239,10 @@ macro_rules! async_daemonize { signal_hook::consts::SIGQUIT, ])?; let handle = signals.handle(); - let sighup_sub = darkfi::system::Subscriber::new(); - let signals_task = ex.spawn(handle_signals(signals, term_tx, sighup_sub.clone())); + let sighup_pub = darkfi::system::Publisher::new(); + let signals_task = ex.spawn(handle_signals(signals, term_tx, sighup_pub.clone())); - Ok((Self { term_rx, handle, sighup_sub }, signals_task)) + Ok((Self { term_rx, handle, sighup_pub }, signals_task)) } /// Handler waits for termination signal @@ -260,7 +260,7 @@ macro_rules! async_daemonize { async fn handle_signals( mut signals: signal_hook_async_std::Signals, term_tx: smol::channel::Sender<()>, - subscriber: darkfi::system::SubscriberPtr, + publisher: darkfi::system::PublisherPtr, ) -> Result<()> { while let Some(signal) = signals.next().await { match signal { @@ -277,7 +277,7 @@ macro_rules! async_daemonize { println!("handle_signals():: Error parsing the config file"); continue } - subscriber.notify(args.unwrap()).await; + publisher.notify(args.unwrap()).await; } signal_hook::consts::SIGTERM | signal_hook::consts::SIGINT |