From e511c3b330dc298613cc3fd168244619e81ac740 Mon Sep 17 00:00:00 2001 From: sinu <65924192+sinui0@users.noreply.github.com> Date: Mon, 15 Jan 2024 12:19:12 -0800 Subject: [PATCH] remove some degrees of abstraction in address/mailbox, add detach feature --- README.md | 9 +- ludi-core/Cargo.toml | 3 +- ludi-core/src/address.rs | 182 +++++++++++---------------- ludi-core/src/channel.rs | 181 ++++++++++++++++++++++++++ ludi-core/src/envelope.rs | 74 +++++------ ludi-core/src/error.rs | 16 +-- ludi-core/src/futures.rs | 171 +++++++++++++++++++++++++ ludi-core/src/lib.rs | 62 +++------ ludi-core/src/mailbox.rs | 117 ++++++----------- ludi-macros-test/Cargo.toml | 4 + ludi-macros-test/tests/controller.rs | 27 ++++ ludi-macros/src/controller.rs | 33 ++--- ludi-macros/src/items/item_impl.rs | 23 ++-- ludi-macros/src/items/method.rs | 5 +- ludi-macros/src/wrap.rs | 4 +- ludi/Cargo.toml | 3 +- rustfmt.toml | 3 + 17 files changed, 585 insertions(+), 332 deletions(-) create mode 100644 ludi-core/src/channel.rs create mode 100644 ludi-core/src/futures.rs create mode 100644 ludi-macros-test/tests/controller.rs create mode 100644 rustfmt.toml diff --git a/README.md b/README.md index 6294675..d317103 100644 --- a/README.md +++ b/README.md @@ -8,13 +8,14 @@ ludi is mostly a collection of traits which compose together to resemble an acto Check out this blog post on [tokio actors](https://ryhl.io/blog/actors-with-tokio/) which serves as a nice introduction to this paradigm. -A pitfall of message-based synchronization, in this author's view, is the boilerplate that comes with it. To address this, ludi comes with (optional) macros which can be used to generate APIs which encapsulate the implementation details of message passing, and instead provide more ergonomic OOP-style interfaces (traits, methods). This approach was inspired by [`spaad`](https://github.com/Restioson/spaad), a crate built on [`xtra`](https://github.com/Restioson/xtra). +A pitfall of message-based synchronization, in this author's view, is the boilerplate that comes with it. To address this, ludi comes with (optional) macros which can be used to generate APIs which encapsulate the implementation details of message passing, and instead provide more ergonomic OOP-style interfaces (traits, methods). + +This project was inspired by [`xtra`](https://github.com/Restioson/xtra), and [`spaad`](https://github.com/Restioson/spaad) an extension crate built on it. ## Features - Small - - Contains very little implementation code, mostly traits and helpers. - - The "batteries" that are included are feature gated, eg `futures-mailbox`. + - Contains very little implementation code. - Ergonomic - Generate APIs which resemble lock-based interfaces. - Safe @@ -89,7 +90,7 @@ impl Counter for CounterBoi { #[tokio::main] async fn main() { // Create a mailbox and address for sending `CounterMsg` messages. - let (mut mailbox, addr) = ludi::FuturesMailbox::::new(100); + let (mut mailbox, addr) = ludi::mailbox::(8); // Create a new actor. let mut actor = CounterBoi::default(); diff --git a/ludi-core/Cargo.toml b/ludi-core/Cargo.toml index 0f64d1b..3c749ed 100644 --- a/ludi-core/Cargo.toml +++ b/ludi-core/Cargo.toml @@ -4,8 +4,7 @@ version = "0.1.0" edition = "2021" [features] -default = ["futures-mailbox"] -futures-mailbox = [] +default = [] [dependencies] futures-core = { version = "0.3" } diff --git a/ludi-core/src/address.rs b/ludi-core/src/address.rs index 35d912d..76a456d 100644 --- a/ludi-core/src/address.rs +++ b/ludi-core/src/address.rs @@ -1,130 +1,90 @@ -use std::future::Future; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; use futures_util::Sink; -use crate::{Envelope, Message, MessageError, Wrap}; +use crate::{ + channel::Sender, + futures::{MessageFuture, QueueFuture, Wait}, + Envelope, Error, Message, Wrap, +}; -/// An address of a mailbox. -/// -/// An address is used to send messages to a mailbox, which can be dispatched to an actor. -pub trait Address: - Sink::Return>, Error = MessageError> - + Send - + 'static -{ - /// The type of message that can be sent to this address. - type Message: Message; - - /// Sends a message and awaits a response. - fn send_await(&self, msg: T) -> impl Future> + Send - where - Self::Message: Wrap, - T: Message; - - /// Sends a message. - fn send(&self, msg: T) -> impl Future> + Send - where - Self::Message: Wrap, - T: Message; +/// An address which can be used to send messages to a mailbox. +#[derive(Debug)] +pub struct Address { + sender: Sender, } -#[cfg(feature = "futures-mailbox")] -pub(crate) mod futures_address { - use super::*; - use futures_channel::mpsc; - use futures_util::SinkExt; - use std::{pin::Pin, task::Poll}; - - /// A MPSC address implemented using channels from the [`futures_channel`](https://crates.io/crates/futures_channel) crate. - pub struct FuturesAddress { - pub(crate) send: mpsc::Sender>, +impl Address { + pub(crate) fn new(sender: Sender) -> Self { + Self { sender } } - impl Clone for FuturesAddress { - fn clone(&self) -> Self { - Self { - send: self.send.clone(), - } - } + /// Closes the mailbox with this address. + pub fn close(&mut self) { + self.sender.close(); } - impl Sink> for FuturesAddress + /// Returns whether the mailbox is connected. + pub fn is_connected(&self) -> bool { + self.sender.is_closed() + } + + /// Sends a message and waits for a response. + pub async fn send(&self, msg: U) -> Result where - T: Message, + T: Wrap, + U: Message, { - type Error = MessageError; - - fn poll_ready( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - Pin::new(&mut self.send) - .poll_ready(cx) - .map_err(|_| MessageError::Closed) - } - - fn start_send( - mut self: Pin<&mut Self>, - item: Envelope, - ) -> Result<(), Self::Error> { - Pin::new(&mut self.send) - .start_send(item) - .map_err(|_| MessageError::Closed) - } - - fn poll_flush( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - Pin::new(&mut self.send) - .poll_flush(cx) - .map_err(|_| MessageError::Closed) - } - - fn poll_close( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - Pin::new(&mut self.send) - .poll_close(cx) - .map_err(|_| MessageError::Closed) - } + T::unwrap_return(self.wait(msg.into()).await?) } - impl Address for FuturesAddress - where - T: Message, - { - type Message = T; + /// Returns a future which resolves immediately when a message is queued. + pub fn queue(&self, msg: T) -> QueueFuture { + QueueFuture::new(self.sender.clone(), Envelope::new(msg)) + } - async fn send_await(&self, msg: U) -> Result - where - Self::Message: Wrap, - U: Message, - { - let (env, ret) = Envelope::new_returning(msg.into()); + /// Returns a future which will send a message and wait for a response. + pub fn wait(&self, msg: T) -> MessageFuture { + let (envelope, response) = Envelope::new_with_response(msg); + MessageFuture::new(QueueFuture::new(self.sender.clone(), envelope), response) + } +} - self.send - .clone() - .send(env) - .await - .map_err(|_| MessageError::Closed)?; - - let ret = ret.await.map_err(|_| MessageError::Interrupted)?; - - Self::Message::unwrap_return(ret) - } - - async fn send(&self, msg: U) -> Result<(), MessageError> - where - Self::Message: Wrap, - U: Message, - { - self.send - .clone() - .send(Envelope::new(msg.into())) - .await - .map_err(|_| MessageError::Closed) +impl Clone for Address { + fn clone(&self) -> Self { + Self { + sender: self.sender.clone(), } } } + +impl Sink> for Address { + type Error = Error; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.get_mut().sender) + .poll_ready(cx) + .map_err(|_| Error::Disconnected) + } + + fn start_send(self: Pin<&mut Self>, item: Envelope) -> Result<(), Self::Error> { + Pin::new(&mut self.get_mut().sender) + .start_send(item) + .map_err(|_| Error::Disconnected) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.get_mut().sender) + .poll_flush(cx) + .map_err(|_| Error::Disconnected) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.get_mut().sender) + .poll_close(cx) + .map_err(|_| Error::Disconnected) + } +} diff --git a/ludi-core/src/channel.rs b/ludi-core/src/channel.rs new file mode 100644 index 0000000..c40616a --- /dev/null +++ b/ludi-core/src/channel.rs @@ -0,0 +1,181 @@ +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use futures_core::Stream; +use futures_util::Sink; + +use crate::{futures::ResponseFuture, Envelope, Error, Message}; + +// TODO: Support other channel implementations using conditional compilation. +pub(crate) type OneshotSender = futures_channel::oneshot::Sender; +pub(crate) type OneshotReceiver = futures_channel::oneshot::Receiver; +pub(crate) type BoundedSender = futures_channel::mpsc::Sender>; +pub(crate) type BoundedReceiver = futures_channel::mpsc::Receiver>; +pub(crate) type UnboundedSender = futures_channel::mpsc::UnboundedSender>; +pub(crate) type UnboundedReceiver = futures_channel::mpsc::UnboundedReceiver>; + +pub(crate) fn new_response() -> (ResponseSender, ResponseFuture) { + let (sender, receiver) = futures_channel::oneshot::channel(); + + (ResponseSender(sender), ResponseFuture(receiver)) +} + +/// A channel for sending a response to a message. +pub struct ResponseSender(OneshotSender); + +impl std::fmt::Debug for ResponseSender +where + T: Message + std::fmt::Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("ResponseSender").field(&self.0).finish() + } +} + +impl ResponseSender { + /// Sends the response. + pub fn send(self, msg: T::Return) { + // Ignore the error if the receiver has been dropped. + _ = self.0.send(msg); + } +} + +pub(crate) fn new_channel(capacity: usize) -> (Sender, Receiver) { + let (sender, receiver) = futures_channel::mpsc::channel(capacity); + + (Sender::Bounded(sender), Receiver::Bounded(receiver)) +} + +pub(crate) fn new_unbounded_channel() -> (Sender, Receiver) { + let (sender, receiver) = futures_channel::mpsc::unbounded(); + + (Sender::Unbounded(sender), Receiver::Unbounded(receiver)) +} + +#[derive(Debug)] +pub(crate) enum Sender { + Bounded(BoundedSender), + Unbounded(UnboundedSender), +} + +impl Clone for Sender { + fn clone(&self) -> Self { + match self { + Self::Bounded(sender) => Self::Bounded(sender.clone()), + Self::Unbounded(sender) => Self::Unbounded(sender.clone()), + } + } +} + +pub(crate) enum ChannelError { + /// The channel is closed. + Disconnected, + /// The channel is full. + Full(T), +} + +pub(crate) struct Disconnected; + +impl Sender { + pub(crate) fn close(&mut self) { + match self { + Self::Bounded(sender) => sender.close_channel(), + Self::Unbounded(sender) => sender.close_channel(), + } + } + + pub(crate) fn is_closed(&self) -> bool { + match self { + Self::Bounded(sender) => sender.is_closed(), + Self::Unbounded(sender) => sender.is_closed(), + } + } + + pub(crate) fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll> { + match self { + Self::Bounded(sender) => sender.poll_ready(ctx).map_err(|_| Disconnected), + Self::Unbounded(sender) => sender.poll_ready(ctx).map_err(|_| Disconnected), + } + } + + pub(crate) fn try_send( + &mut self, + envelope: Envelope, + ) -> Result<(), ChannelError>> { + match self { + Self::Bounded(sender) => sender.try_send(envelope).map_err(|e| { + if e.is_full() { + ChannelError::Full(e.into_inner()) + } else { + ChannelError::Disconnected + } + }), + Self::Unbounded(sender) => sender.unbounded_send(envelope).map_err(|e| { + if e.is_full() { + unreachable!("Unbounded channels cannot be full") + } else { + ChannelError::Disconnected + } + }), + } + } +} + +impl Sink> for Sender { + type Error = Error; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.get_mut() { + Sender::Bounded(sender) => sender.poll_ready(cx).map_err(|_| Error::Disconnected), + Sender::Unbounded(sender) => sender.poll_ready(cx).map_err(|_| Error::Disconnected), + } + } + + fn start_send(self: Pin<&mut Self>, item: Envelope) -> Result<(), Self::Error> { + match self.get_mut() { + Sender::Bounded(sender) => sender.start_send(item).map_err(|_| Error::Disconnected), + Sender::Unbounded(sender) => sender.start_send(item).map_err(|_| Error::Disconnected), + } + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.get_mut() { + Sender::Bounded(sender) => Pin::new(sender) + .poll_flush(cx) + .map_err(|_| Error::Disconnected), + Sender::Unbounded(sender) => Pin::new(sender) + .poll_flush(cx) + .map_err(|_| Error::Disconnected), + } + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.get_mut() { + Sender::Bounded(sender) => Pin::new(sender) + .poll_close(cx) + .map_err(|_| Error::Disconnected), + Sender::Unbounded(sender) => Pin::new(sender) + .poll_close(cx) + .map_err(|_| Error::Disconnected), + } + } +} + +#[derive(Debug)] +pub(crate) enum Receiver { + Bounded(BoundedReceiver), + Unbounded(UnboundedReceiver), +} + +impl Stream for Receiver { + type Item = Envelope; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.get_mut() { + Receiver::Bounded(receiver) => Pin::new(receiver).poll_next(cx), + Receiver::Unbounded(receiver) => Pin::new(receiver).poll_next(cx), + } + } +} diff --git a/ludi-core/src/envelope.rs b/ludi-core/src/envelope.rs index d6a1744..c1054e7 100644 --- a/ludi-core/src/envelope.rs +++ b/ludi-core/src/envelope.rs @@ -1,56 +1,60 @@ -use futures_channel::oneshot::{channel, Receiver, Sender}; +use crate::{channel::ResponseSender, futures::ResponseFuture, Actor, Context, Dispatch, Message}; -use crate::{Actor, Context, Dispatch, Message}; - -/// An envelope containing a message and optionally a channel which can be -/// used to return a value back to the sender. -pub struct Envelope { - msg: M, - send: Option>, +#[derive(Debug)] +pub(crate) enum EnvelopeInner { + /// A message which does not expect a response. + NoResponse(T), + /// A message which expects a response. + WantsResponse(T, ResponseSender), } -impl Envelope { +/// An envelope containing a message and optionally a channel which can be +/// used to return a response back to the sender. +#[derive(Debug)] +pub struct Envelope(EnvelopeInner); + +impl Envelope { /// Create a new envelope. - pub fn new(msg: M) -> Self { - Self { msg, send: None } + pub fn new(msg: T) -> Self { + Self(EnvelopeInner::NoResponse(msg)) } /// Create a new envelope with a channel which can be used to return /// a response to the sender. - pub fn new_returning(msg: M) -> (Self, Receiver) { - let (send, recv) = channel(); - ( - Self { - msg, - send: Some(send), - }, - recv, - ) + pub fn new_with_response(msg: T) -> (Self, ResponseFuture) { + let (send, recv) = ResponseFuture::new(); + (Self(EnvelopeInner::WantsResponse(msg, send)), recv) + } + + /// Returns `true` if the envelope has a channel which will receive a response. + pub fn wants_response(&self) -> bool { + match &self.0 { + EnvelopeInner::NoResponse(_) => false, + EnvelopeInner::WantsResponse(_, _) => true, + } } -} -impl Envelope -where - M: Message, - R: Send, -{ /// Dispatches the message and return channel to the actor for handling. /// /// # Arguments /// /// * `actor` - The actor which will handle the message. /// * `ctx` - The context of the actor. - pub async fn dispatch(mut self, actor: &mut A, ctx: &mut Context) + pub async fn dispatch(self, actor: &mut A, ctx: &mut Context) where A: Actor, - M: Dispatch, + T: Dispatch, { - self.msg - .dispatch(actor, ctx, move |ret| { - if let Some(send) = self.send.take() { - let _ = send.send(ret); - } - }) - .await + match self.0 { + EnvelopeInner::NoResponse(msg) => { + msg.dispatch(actor, ctx, move |_| {}).await; + } + EnvelopeInner::WantsResponse(msg, sender) => { + msg.dispatch(actor, ctx, move |ret| { + let _ = sender.send(ret); + }) + .await; + } + } } } diff --git a/ludi-core/src/error.rs b/ludi-core/src/error.rs index ffccf0b..5c21d69 100644 --- a/ludi-core/src/error.rs +++ b/ludi-core/src/error.rs @@ -3,23 +3,23 @@ use std::fmt::Display; /// Errors that can occur when sending a message. #[derive(Debug, PartialEq, Eq, Hash)] #[non_exhaustive] -pub enum MessageError { - /// The mailbox has been closed. - Closed, +pub enum Error { + /// The mailbox has been disconnected. + Disconnected, /// Handling of the message was interrupted. Interrupted, /// Error occurred while wrapping a message. Wrapper, } -impl Display for MessageError { +impl Display for Error { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - MessageError::Closed => write!(f, "mailbox closed"), - MessageError::Interrupted => write!(f, "message handling interrupted"), - MessageError::Wrapper => write!(f, "wrapper error"), + Error::Disconnected => write!(f, "mailbox disconnected"), + Error::Interrupted => write!(f, "message handling interrupted"), + Error::Wrapper => write!(f, "wrapper error"), } } } -impl std::error::Error for MessageError {} +impl std::error::Error for Error {} diff --git a/ludi-core/src/futures.rs b/ludi-core/src/futures.rs new file mode 100644 index 0000000..c3bfd04 --- /dev/null +++ b/ludi-core/src/futures.rs @@ -0,0 +1,171 @@ +//! Futures for sending messages and waiting for responses. + +use std::{ + marker::PhantomData, + pin::Pin, + task::{Context, Poll}, +}; + +use futures_core::{ready, FusedFuture, Future}; +use futures_util::FutureExt; + +use crate::{ + channel::{new_response, ChannelError, OneshotReceiver, ResponseSender, Sender}, + Envelope, Error, Message, +}; + +/// A [`MessageFuture`] mode which will wait for a response. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct Wait; +/// A [`MessageFuture`] mode which will return a [`ResponseFuture`]. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct Detach; + +/// A future which sends a message and optionally waits for a response depending on the mode. +/// +/// # Modes +/// +/// * [`Wait`] - Waits for a response. +/// * [`Detach`] - Returns a [`ResponseFuture`] which can be used to wait for the response. +#[must_use = "futures do nothing unless polled"] +pub struct MessageFuture { + queue: QueueFuture, + response: Option>, + _mode: PhantomData, +} + +impl MessageFuture { + pub(crate) fn new(queue: QueueFuture, response: ResponseFuture) -> Self { + Self { + queue, + response: Some(response), + _mode: PhantomData, + } + } + + /// Returns a new [`MessageFuture`] which will instead resolve when the message is sent and + /// return a [`ResponseFuture`] which can be used to wait for the response. + pub fn detach(self) -> MessageFuture { + MessageFuture { + queue: self.queue, + response: self.response, + _mode: PhantomData, + } + } +} + +impl Future for MessageFuture { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + if !this.queue.is_terminated() { + ready!(this.queue.poll_unpin(cx))?; + } + + let Some(response) = this.response.as_mut() else { + panic!("future is not polled after completion") + }; + + response.poll_unpin(cx) + } +} + +impl Future for MessageFuture { + type Output = Result, Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + if !this.queue.is_terminated() { + ready!(this.queue.poll_unpin(cx))?; + } + + Poll::Ready(Ok(this + .response + .take() + .expect("future is not polled after completion"))) + } +} + +impl FusedFuture for MessageFuture { + fn is_terminated(&self) -> bool { + self.queue.is_terminated() + && self + .response + .as_ref() + .map(|r| r.is_terminated()) + .unwrap_or(true) + } +} + +/// A future which resolves when a message is successfully queued. +#[must_use = "futures do nothing unless polled"] +pub struct QueueFuture { + sender: Sender, + msg: Option>, +} + +impl QueueFuture { + pub(crate) fn new(sender: Sender, msg: Envelope) -> Self { + Self { + sender, + msg: Some(msg), + } + } +} + +impl Future for QueueFuture { + type Output = Result<(), Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + if this.msg.is_some() { + ready!(this.sender.poll_ready(cx).map_err(|_| Error::Disconnected))?; + if let Err(err) = this.sender.try_send(this.msg.take().unwrap()) { + match err { + ChannelError::Disconnected => return Poll::Ready(Err(Error::Disconnected)), + ChannelError::Full(msg) => { + this.msg = Some(msg); + return Poll::Pending; + } + } + } + } + + Poll::Ready(Ok(())) + } +} + +impl FusedFuture for QueueFuture { + fn is_terminated(&self) -> bool { + self.msg.is_none() + } +} + +/// A future which returns the response to a message. +#[must_use = "futures do nothing unless polled"] +pub struct ResponseFuture(pub(crate) OneshotReceiver); + +impl ResponseFuture { + /// Returns a new [`ResponseSender`] and [`ResponseFuture`]. + pub fn new() -> (ResponseSender, Self) { + new_response() + } +} + +impl Future for ResponseFuture { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut self.get_mut().0) + .poll(cx) + .map_err(|_| Error::Interrupted) + } +} + +impl FusedFuture for ResponseFuture { + fn is_terminated(&self) -> bool { + self.0.is_terminated() + } +} diff --git a/ludi-core/src/lib.rs b/ludi-core/src/lib.rs index 99794ea..4e0bf18 100644 --- a/ludi-core/src/lib.rs +++ b/ludi-core/src/lib.rs @@ -7,37 +7,36 @@ #![deny(clippy::all)] mod address; +mod channel; mod envelope; mod error; +pub mod futures; mod mailbox; +use futures_core::Stream; use futures_util::StreamExt; use std::future::Future; pub use address::Address; +pub use channel::ResponseSender; pub use envelope::Envelope; -pub use error::MessageError; -pub use mailbox::{IntoMail, IntoMailbox, Mailbox}; - -#[cfg(feature = "futures-mailbox")] -pub use address::futures_address::FuturesAddress; -#[cfg(feature = "futures-mailbox")] -pub use mailbox::futures_mailbox::{mailbox, FuturesMailbox}; +pub use error::Error; +pub use mailbox::{mailbox, unbounded_mailbox, IntoMail, IntoMailbox, Mailbox}; /// A message type. -pub trait Message: Send + 'static { +pub trait Message: Send + Unpin + 'static { /// The return value of the message. - type Return: Send + 'static; + type Return: Send + Unpin + 'static; } /// A message which can wrap another type of message. pub trait Wrap: From + Message { /// Unwraps the return value of the message. - fn unwrap_return(ret: Self::Return) -> Result; + fn unwrap_return(ret: Self::Return) -> Result; } impl Wrap for T { - fn unwrap_return(ret: Self::Return) -> Result { + fn unwrap_return(ret: Self::Return) -> Result { Ok(ret) } } @@ -61,11 +60,6 @@ pub trait Dispatch: Message { /// An actor. /// -/// # Message Type -/// -/// Each actor must specify the type of message it can handle. See the [`Message`] trait for -/// more information and examples. -/// /// # Start /// /// When an actor is first started, the [`Actor::started`] method will be called. By default this method @@ -75,22 +69,7 @@ pub trait Dispatch: Message { /// /// When an actor receives a stop signal it will stop processing messages and the [`Actor::stopped`] method /// will be called before returning. -/// -/// # Example -/// -/// ```ignore -/// # use ludi_core::*; -/// struct PingActor; -/// -/// impl Actor for PingActor { -/// type Stop = (); -/// -/// async fn stopped(&mut self) -> Self::Stop { -/// println!("actor stopped"); -/// } -/// } -/// ``` -pub trait Actor: Send + Sized + 'static { +pub trait Actor: Send + Sized { /// The type of value returned when this actor is stopped. type Stop; /// The type of error which may occur during handling. @@ -154,19 +133,6 @@ pub trait Handler: Actor { } } -/// A controller for an actor. -pub trait Controller { - /// The type of actor that this controller controls. - type Actor: Actor; - /// The type of address that this controller uses to send messages to the actor. - type Address: Address; - /// The type of message that this controller can send to the actor. - type Message: Message + Dispatch; - - /// Returns the address of the actor. - fn address(&self) -> &Self::Address; -} - /// An actor's execution context. pub struct Context { stopped: bool, @@ -233,11 +199,11 @@ impl Context { /// /// * `actor` - The actor to run. /// * `mailbox` - The mailbox which will be used to receive messages. -pub async fn run(actor: &mut A, mailbox: &mut M) -> Result +pub async fn run(actor: &mut A, mailbox: &mut M) -> Result where A: Actor, - M: Mailbox, - M::Message: Dispatch, + M: Stream> + Unpin, + T: Dispatch, { let mut ctx = Context::default(); actor.started(&mut ctx)?; diff --git a/ludi-core/src/mailbox.rs b/ludi-core/src/mailbox.rs index 5b945a7..0530756 100644 --- a/ludi-core/src/mailbox.rs +++ b/ludi-core/src/mailbox.rs @@ -1,48 +1,55 @@ -use std::{pin::Pin, task::Poll}; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; use futures_core::Stream; +use futures_util::StreamExt; -use crate::{Envelope, Message}; +use crate::{ + channel::{new_channel, new_unbounded_channel, Receiver}, + Address, Envelope, Message, +}; -/// A mailbox. -/// -/// A mailbox is an asynchronous stream of messages which can be dispatched to an actor. The counter-part -/// to a mailbox is an [`Address`](crate::Address), which is used to send messages. -pub trait Mailbox: - Stream::Return>> + Send + Unpin + 'static -{ - /// The type of message that can be sent to this mailbox. - type Message: Message; +/// Returns a new mailbox and address. +pub fn mailbox(capacity: usize) -> (Mailbox, Address) { + let (sender, recv) = new_channel(capacity); + + (Mailbox { recv }, Address::new(sender)) } -impl Mailbox for T -where - T: Stream::Return>> + Send + Unpin + 'static, - U: Message, -{ - type Message = U; +/// Returns a new unbounded mailbox and address. +pub fn unbounded_mailbox() -> (Mailbox, Address) { + let (sender, recv) = new_unbounded_channel(); + + (Mailbox { recv }, Address::new(sender)) +} + +/// A mailbox. +pub struct Mailbox { + recv: Receiver, +} + +impl Stream for Mailbox { + type Item = Envelope; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.recv.poll_next_unpin(cx) + } } /// An extension trait which converts a stream of messages into a mailbox. -pub trait IntoMailbox { - /// The type of message that can be sent to the mailbox. - type Message: Message; - /// The type of mailbox. - type IntoMail: Mailbox; - +pub trait IntoMailbox: Sized { /// Convert self into a mailbox. - fn into_mailbox(self) -> Self::IntoMail; + fn into_mailbox(self) -> IntoMail; } impl IntoMailbox for T where - T: Stream + Send + Unpin + 'static, + T: Stream + Unpin, T::Item: Message, { - type Message = T::Item; - type IntoMail = IntoMail; - - fn into_mailbox(self) -> Self::IntoMail { + fn into_mailbox(self) -> IntoMail { IntoMail(self) } } @@ -71,10 +78,10 @@ impl IntoMail { impl Stream for IntoMail where - T: Stream + Send + Unpin + 'static, + T: Stream + Unpin, T::Item: Message, { - type Item = Envelope::Return>; + type Item = Envelope; fn poll_next( self: Pin<&mut Self>, @@ -83,51 +90,3 @@ where Stream::poll_next(Pin::new(&mut self.get_mut().0), cx).map(|m| m.map(|m| Envelope::new(m))) } } - -#[cfg(feature = "futures-mailbox")] -pub(crate) mod futures_mailbox { - use super::*; - use crate::FuturesAddress; - use futures_channel::mpsc; - - /// Returns a new mailbox and its' address. - /// - /// # Arguments - /// - /// * `capacity` - The number of messages that can be buffered in the mailbox. - pub fn mailbox(capacity: usize) -> (FuturesMailbox, FuturesAddress) - where - T: Message, - { - FuturesMailbox::new(capacity) - } - - /// A MPSC mailbox implemented using channels from the [`futures_channel`](https://crates.io/crates/futures_channel) crate. - pub struct FuturesMailbox { - recv: mpsc::Receiver>, - } - - impl FuturesMailbox { - /// Create a new mailbox, returning the mailbox and its' address. - /// - /// # Arguments - /// - /// * `buffer` - The number of messages that can be buffered in the mailbox. - pub fn new(buffer: usize) -> (Self, FuturesAddress) { - let (send, recv) = mpsc::channel(buffer); - - (Self { recv }, FuturesAddress { send }) - } - } - - impl Stream for FuturesMailbox { - type Item = Envelope; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - Pin::new(&mut self.recv).poll_next(cx) - } - } -} diff --git a/ludi-macros-test/Cargo.toml b/ludi-macros-test/Cargo.toml index dc6ca3b..c307710 100644 --- a/ludi-macros-test/Cargo.toml +++ b/ludi-macros-test/Cargo.toml @@ -49,3 +49,7 @@ path = "tests/implement_async_trait.rs" [[test]] name = "implement_trait_foreign" path = "tests/implement_trait_foreign.rs" + +[[test]] +name = "controller" +path = "tests/controller.rs" diff --git a/ludi-macros-test/tests/controller.rs b/ludi-macros-test/tests/controller.rs new file mode 100644 index 0000000..3fca956 --- /dev/null +++ b/ludi-macros-test/tests/controller.rs @@ -0,0 +1,27 @@ +#![allow(dead_code)] + +#[derive(ludi::Controller)] +pub struct Foo; + +impl ::ludi::Actor for Foo { + type Stop = (); + type Error = (); + + async fn stopped(&mut self) -> Result { + Ok(()) + } +} + +#[derive(ludi::Controller)] +pub struct FooGenerics(T) +where + T: Send; + +impl ::ludi::Actor for FooGenerics { + type Stop = (); + type Error = (); + + async fn stopped(&mut self) -> Result { + Ok(()) + } +} diff --git a/ludi-macros/src/controller.rs b/ludi-macros/src/controller.rs index 3f96453..2b5b5f1 100644 --- a/ludi-macros/src/controller.rs +++ b/ludi-macros/src/controller.rs @@ -23,22 +23,19 @@ pub(crate) fn impl_controller(input: syn::DeriveInput) -> proc_macro2::TokenStre input.generics.split_for_impl(); let mut generics = input.generics.clone(); - generics.params.push(syn::parse_quote!(A)); + generics.params.push(syn::parse_quote!(CtrlMsg)); let where_clause = generics.make_where_clause(); where_clause .predicates - .push(syn::parse_quote!(A: ::ludi::Address)); - where_clause - .predicates - .push(syn::parse_quote!(::Message: ::ludi::Dispatch<#actor_ident #actor_ty_generics>)); + .push(syn::parse_quote!(CtrlMsg: ::ludi::Message)); let (impl_generics, ty_generics, where_clause) = generics.split_for_impl(); let fields = if input.generics.params.is_empty() { - quote!(addr: A) + quote!(addr: ludi::Address) } else { - quote!(addr: A, _pd: std::marker::PhantomData #actor_ty_generics) + quote!(addr: ludi::Address, _pd: std::marker::PhantomData #actor_ty_generics) }; let from_fields = if input.generics.params.is_empty() { @@ -53,39 +50,27 @@ pub(crate) fn impl_controller(input: syn::DeriveInput) -> proc_macro2::TokenStre quote!( #[derive(Debug, Clone)] #[doc = #ctrl_doc] - #vis struct #ctrl_ident #ty_generics { + #vis struct #ctrl_ident #ty_generics where CtrlMsg: ::ludi::Message { #fields } impl #actor_impl_generics #actor_ident #actor_ty_generics #actor_where_clause { #[doc = #ctrl_fn_doc] - pub fn controller(addr: A) -> #ctrl_ident #ty_generics + pub fn controller(addr: ::ludi::Address) -> #ctrl_ident #ty_generics where - A: ::ludi::Address, - ::Message: ::ludi::Dispatch, + CtrlMsg: ::ludi::Message + ::ludi::Dispatch, { #ctrl_ident ::from(addr) } } - impl #impl_generics From for #ctrl_ident #ty_generics #actor_where_clause + impl #impl_generics From<::ludi::Address> for #ctrl_ident #ty_generics #where_clause { - fn from(addr: A) -> Self { + fn from(addr: ludi::Address) -> Self { Self { #from_fields } } } - - impl #ty_generics ::ludi::Controller for #ctrl_ident #ty_generics #where_clause - { - type Actor = #actor_ident #actor_ty_generics; - type Address = A; - type Message = ::Message; - - fn address(&self) -> &Self::Address { - &self.addr - } - } ) } diff --git a/ludi-macros/src/items/item_impl.rs b/ludi-macros/src/items/item_impl.rs index 4d3efe1..04a6936 100644 --- a/ludi-macros/src/items/item_impl.rs +++ b/ludi-macros/src/items/item_impl.rs @@ -237,31 +237,28 @@ impl ItemImpl { if let Some(generic_args) = &self.actor_generic_args { let mut generic_args = generic_args.clone(); - generic_args.args.push(parse_quote!(A)); + generic_args.args.push(parse_quote!(CtrlMsg)); ctrl_path.segments.last_mut().unwrap().arguments = syn::PathArguments::AngleBracketed(generic_args); } else { ctrl_path.segments.last_mut().unwrap().arguments = - syn::PathArguments::AngleBracketed(parse_quote!()); + syn::PathArguments::AngleBracketed(parse_quote!()); } let mut generics = self.impl_generics.clone(); - generics.params.push(parse_quote!(A)); + generics.params.push(parse_quote!(CtrlMsg)); let where_clause = generics.make_where_clause(); where_clause .predicates - .push(parse_quote!(A: Send + Sync + 'static)); - where_clause - .predicates - .push(parse_quote!(Self: ::ludi::Controller)); + .push(parse_quote!(CtrlMsg: ::ludi::Message + ludi::Dispatch<#actor_path>)); if let Some(ImplTrait { trait_path, .. }) = &self.impl_trait { self.methods.iter().for_each(|method| { let struct_path = &method.struct_path; - where_clause.predicates.push( - parse_quote!(::Message: ::ludi::Wrap<#struct_path>), - ); + where_clause + .predicates + .push(parse_quote!(CtrlMsg: ::ludi::Wrap<#struct_path>)); }); let methods = self.methods.iter().map(|method| method.expand_ctrl(true)); @@ -279,9 +276,9 @@ impl ItemImpl { let mut generics = generics.clone(); let where_clause = generics.make_where_clause(); - where_clause.predicates.push( - parse_quote!(::Message: ::ludi::Wrap<#struct_path>), - ); + where_clause + .predicates + .push(parse_quote!(CtrlMsg: ::ludi::Wrap<#struct_path>)); let (impl_generics, _, where_clause) = generics.split_for_impl(); quote!( diff --git a/ludi-macros/src/items/method.rs b/ludi-macros/src/items/method.rs index 078e46b..b4b1c83 100644 --- a/ludi-macros/src/items/method.rs +++ b/ludi-macros/src/items/method.rs @@ -405,10 +405,7 @@ impl Method { #(#doc_attrs)* #(#[#attrs])* #vis #ctrl_sig { - ::ludi::Address::send_await( - ::ludi::Controller::address(self), - #struct_arg - ).await #err_handler + self.addr.send(#struct_arg).await #err_handler } ) } diff --git a/ludi-macros/src/wrap.rs b/ludi-macros/src/wrap.rs index d63c1c8..b771daa 100644 --- a/ludi-macros/src/wrap.rs +++ b/ludi-macros/src/wrap.rs @@ -142,10 +142,10 @@ impl ToTokens for Wrap { #( impl #impl_generics ::ludi::Wrap<#variant_tys> for #ident #ty_generics #where_clause { - fn unwrap_return(ret: Self::Return) -> Result<<#variant_tys as ::ludi::Message>::Return, ::ludi::MessageError> { + fn unwrap_return(ret: Self::Return) -> Result<<#variant_tys as ::ludi::Message>::Return, ::ludi::Error> { match ret { Self::Return :: #variant_idents (value) => Ok(value), - _ => Err(::ludi::MessageError::Wrapper), + _ => Err(::ludi::Error::Wrapper), } } } diff --git a/ludi/Cargo.toml b/ludi/Cargo.toml index 90baae6..1cdb9e0 100644 --- a/ludi/Cargo.toml +++ b/ludi/Cargo.toml @@ -4,9 +4,8 @@ version = "0.1.0" edition = "2021" [features] -default = ["macros", "futures-mailbox"] +default = ["macros"] macros = ["dep:ludi-macros"] -futures-mailbox = ["ludi-core/futures-mailbox"] [dependencies] ludi-core = { path = "../ludi-core" } diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..5a25ea3 --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1,3 @@ +ignore = ["tls-core/", "tls-client/"] + +imports_granularity = "Crate"