mirror of
https://github.com/sinui0/ludi.git
synced 2026-01-09 04:47:56 -05:00
remove some degrees of abstraction in address/mailbox, add detach feature
This commit is contained in:
@@ -8,13 +8,14 @@ ludi is mostly a collection of traits which compose together to resemble an acto
|
||||
|
||||
Check out this blog post on [tokio actors](https://ryhl.io/blog/actors-with-tokio/) which serves as a nice introduction to this paradigm.
|
||||
|
||||
A pitfall of message-based synchronization, in this author's view, is the boilerplate that comes with it. To address this, ludi comes with (optional) macros which can be used to generate APIs which encapsulate the implementation details of message passing, and instead provide more ergonomic OOP-style interfaces (traits, methods). This approach was inspired by [`spaad`](https://github.com/Restioson/spaad), a crate built on [`xtra`](https://github.com/Restioson/xtra).
|
||||
A pitfall of message-based synchronization, in this author's view, is the boilerplate that comes with it. To address this, ludi comes with (optional) macros which can be used to generate APIs which encapsulate the implementation details of message passing, and instead provide more ergonomic OOP-style interfaces (traits, methods).
|
||||
|
||||
This project was inspired by [`xtra`](https://github.com/Restioson/xtra), and [`spaad`](https://github.com/Restioson/spaad) an extension crate built on it.
|
||||
|
||||
## Features
|
||||
|
||||
- Small
|
||||
- Contains very little implementation code, mostly traits and helpers.
|
||||
- The "batteries" that are included are feature gated, eg `futures-mailbox`.
|
||||
- Contains very little implementation code.
|
||||
- Ergonomic
|
||||
- Generate APIs which resemble lock-based interfaces.
|
||||
- Safe
|
||||
@@ -89,7 +90,7 @@ impl Counter for CounterBoi {
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
// Create a mailbox and address for sending `CounterMsg` messages.
|
||||
let (mut mailbox, addr) = ludi::FuturesMailbox::<CounterMsg>::new(100);
|
||||
let (mut mailbox, addr) = ludi::mailbox::<CounterMsg>(8);
|
||||
|
||||
// Create a new actor.
|
||||
let mut actor = CounterBoi::default();
|
||||
|
||||
@@ -4,8 +4,7 @@ version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[features]
|
||||
default = ["futures-mailbox"]
|
||||
futures-mailbox = []
|
||||
default = []
|
||||
|
||||
[dependencies]
|
||||
futures-core = { version = "0.3" }
|
||||
|
||||
@@ -1,130 +1,90 @@
|
||||
use std::future::Future;
|
||||
use std::{
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use futures_util::Sink;
|
||||
|
||||
use crate::{Envelope, Message, MessageError, Wrap};
|
||||
use crate::{
|
||||
channel::Sender,
|
||||
futures::{MessageFuture, QueueFuture, Wait},
|
||||
Envelope, Error, Message, Wrap,
|
||||
};
|
||||
|
||||
/// An address of a mailbox.
|
||||
///
|
||||
/// An address is used to send messages to a mailbox, which can be dispatched to an actor.
|
||||
pub trait Address:
|
||||
Sink<Envelope<Self::Message, <Self::Message as Message>::Return>, Error = MessageError>
|
||||
+ Send
|
||||
+ 'static
|
||||
{
|
||||
/// The type of message that can be sent to this address.
|
||||
type Message: Message;
|
||||
|
||||
/// Sends a message and awaits a response.
|
||||
fn send_await<T>(&self, msg: T) -> impl Future<Output = Result<T::Return, MessageError>> + Send
|
||||
where
|
||||
Self::Message: Wrap<T>,
|
||||
T: Message;
|
||||
|
||||
/// Sends a message.
|
||||
fn send<T>(&self, msg: T) -> impl Future<Output = Result<(), MessageError>> + Send
|
||||
where
|
||||
Self::Message: Wrap<T>,
|
||||
T: Message;
|
||||
/// An address which can be used to send messages to a mailbox.
|
||||
#[derive(Debug)]
|
||||
pub struct Address<T: Message> {
|
||||
sender: Sender<T>,
|
||||
}
|
||||
|
||||
#[cfg(feature = "futures-mailbox")]
|
||||
pub(crate) mod futures_address {
|
||||
use super::*;
|
||||
use futures_channel::mpsc;
|
||||
use futures_util::SinkExt;
|
||||
use std::{pin::Pin, task::Poll};
|
||||
|
||||
/// A MPSC address implemented using channels from the [`futures_channel`](https://crates.io/crates/futures_channel) crate.
|
||||
pub struct FuturesAddress<T: Message> {
|
||||
pub(crate) send: mpsc::Sender<Envelope<T, T::Return>>,
|
||||
impl<T: Message> Address<T> {
|
||||
pub(crate) fn new(sender: Sender<T>) -> Self {
|
||||
Self { sender }
|
||||
}
|
||||
|
||||
impl<T: Message> Clone for FuturesAddress<T> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
send: self.send.clone(),
|
||||
}
|
||||
}
|
||||
/// Closes the mailbox with this address.
|
||||
pub fn close(&mut self) {
|
||||
self.sender.close();
|
||||
}
|
||||
|
||||
impl<T> Sink<Envelope<T, T::Return>> for FuturesAddress<T>
|
||||
/// Returns whether the mailbox is connected.
|
||||
pub fn is_connected(&self) -> bool {
|
||||
self.sender.is_closed()
|
||||
}
|
||||
|
||||
/// Sends a message and waits for a response.
|
||||
pub async fn send<U>(&self, msg: U) -> Result<U::Return, Error>
|
||||
where
|
||||
T: Message,
|
||||
T: Wrap<U>,
|
||||
U: Message,
|
||||
{
|
||||
type Error = MessageError;
|
||||
|
||||
fn poll_ready(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> Poll<Result<(), Self::Error>> {
|
||||
Pin::new(&mut self.send)
|
||||
.poll_ready(cx)
|
||||
.map_err(|_| MessageError::Closed)
|
||||
}
|
||||
|
||||
fn start_send(
|
||||
mut self: Pin<&mut Self>,
|
||||
item: Envelope<T, T::Return>,
|
||||
) -> Result<(), Self::Error> {
|
||||
Pin::new(&mut self.send)
|
||||
.start_send(item)
|
||||
.map_err(|_| MessageError::Closed)
|
||||
}
|
||||
|
||||
fn poll_flush(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> Poll<Result<(), Self::Error>> {
|
||||
Pin::new(&mut self.send)
|
||||
.poll_flush(cx)
|
||||
.map_err(|_| MessageError::Closed)
|
||||
}
|
||||
|
||||
fn poll_close(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> Poll<Result<(), Self::Error>> {
|
||||
Pin::new(&mut self.send)
|
||||
.poll_close(cx)
|
||||
.map_err(|_| MessageError::Closed)
|
||||
}
|
||||
T::unwrap_return(self.wait(msg.into()).await?)
|
||||
}
|
||||
|
||||
impl<T> Address for FuturesAddress<T>
|
||||
where
|
||||
T: Message,
|
||||
{
|
||||
type Message = T;
|
||||
/// Returns a future which resolves immediately when a message is queued.
|
||||
pub fn queue(&self, msg: T) -> QueueFuture<T> {
|
||||
QueueFuture::new(self.sender.clone(), Envelope::new(msg))
|
||||
}
|
||||
|
||||
async fn send_await<U>(&self, msg: U) -> Result<U::Return, MessageError>
|
||||
where
|
||||
Self::Message: Wrap<U>,
|
||||
U: Message,
|
||||
{
|
||||
let (env, ret) = Envelope::new_returning(msg.into());
|
||||
/// Returns a future which will send a message and wait for a response.
|
||||
pub fn wait(&self, msg: T) -> MessageFuture<T, Wait> {
|
||||
let (envelope, response) = Envelope::new_with_response(msg);
|
||||
MessageFuture::new(QueueFuture::new(self.sender.clone(), envelope), response)
|
||||
}
|
||||
}
|
||||
|
||||
self.send
|
||||
.clone()
|
||||
.send(env)
|
||||
.await
|
||||
.map_err(|_| MessageError::Closed)?;
|
||||
|
||||
let ret = ret.await.map_err(|_| MessageError::Interrupted)?;
|
||||
|
||||
Self::Message::unwrap_return(ret)
|
||||
}
|
||||
|
||||
async fn send<U>(&self, msg: U) -> Result<(), MessageError>
|
||||
where
|
||||
Self::Message: Wrap<U>,
|
||||
U: Message,
|
||||
{
|
||||
self.send
|
||||
.clone()
|
||||
.send(Envelope::new(msg.into()))
|
||||
.await
|
||||
.map_err(|_| MessageError::Closed)
|
||||
impl<T: Message> Clone for Address<T> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
sender: self.sender.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Message> Sink<Envelope<T>> for Address<T> {
|
||||
type Error = Error;
|
||||
|
||||
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
Pin::new(&mut self.get_mut().sender)
|
||||
.poll_ready(cx)
|
||||
.map_err(|_| Error::Disconnected)
|
||||
}
|
||||
|
||||
fn start_send(self: Pin<&mut Self>, item: Envelope<T>) -> Result<(), Self::Error> {
|
||||
Pin::new(&mut self.get_mut().sender)
|
||||
.start_send(item)
|
||||
.map_err(|_| Error::Disconnected)
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
Pin::new(&mut self.get_mut().sender)
|
||||
.poll_flush(cx)
|
||||
.map_err(|_| Error::Disconnected)
|
||||
}
|
||||
|
||||
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
Pin::new(&mut self.get_mut().sender)
|
||||
.poll_close(cx)
|
||||
.map_err(|_| Error::Disconnected)
|
||||
}
|
||||
}
|
||||
|
||||
181
ludi-core/src/channel.rs
Normal file
181
ludi-core/src/channel.rs
Normal file
@@ -0,0 +1,181 @@
|
||||
use std::{
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use futures_core::Stream;
|
||||
use futures_util::Sink;
|
||||
|
||||
use crate::{futures::ResponseFuture, Envelope, Error, Message};
|
||||
|
||||
// TODO: Support other channel implementations using conditional compilation.
|
||||
pub(crate) type OneshotSender<T> = futures_channel::oneshot::Sender<T>;
|
||||
pub(crate) type OneshotReceiver<T> = futures_channel::oneshot::Receiver<T>;
|
||||
pub(crate) type BoundedSender<T> = futures_channel::mpsc::Sender<Envelope<T>>;
|
||||
pub(crate) type BoundedReceiver<T> = futures_channel::mpsc::Receiver<Envelope<T>>;
|
||||
pub(crate) type UnboundedSender<T> = futures_channel::mpsc::UnboundedSender<Envelope<T>>;
|
||||
pub(crate) type UnboundedReceiver<T> = futures_channel::mpsc::UnboundedReceiver<Envelope<T>>;
|
||||
|
||||
pub(crate) fn new_response<T: Message>() -> (ResponseSender<T>, ResponseFuture<T>) {
|
||||
let (sender, receiver) = futures_channel::oneshot::channel();
|
||||
|
||||
(ResponseSender(sender), ResponseFuture(receiver))
|
||||
}
|
||||
|
||||
/// A channel for sending a response to a message.
|
||||
pub struct ResponseSender<T: Message>(OneshotSender<T::Return>);
|
||||
|
||||
impl<T> std::fmt::Debug for ResponseSender<T>
|
||||
where
|
||||
T: Message + std::fmt::Debug,
|
||||
{
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_tuple("ResponseSender").field(&self.0).finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Message> ResponseSender<T> {
|
||||
/// Sends the response.
|
||||
pub fn send(self, msg: T::Return) {
|
||||
// Ignore the error if the receiver has been dropped.
|
||||
_ = self.0.send(msg);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn new_channel<T: Message>(capacity: usize) -> (Sender<T>, Receiver<T>) {
|
||||
let (sender, receiver) = futures_channel::mpsc::channel(capacity);
|
||||
|
||||
(Sender::Bounded(sender), Receiver::Bounded(receiver))
|
||||
}
|
||||
|
||||
pub(crate) fn new_unbounded_channel<T: Message>() -> (Sender<T>, Receiver<T>) {
|
||||
let (sender, receiver) = futures_channel::mpsc::unbounded();
|
||||
|
||||
(Sender::Unbounded(sender), Receiver::Unbounded(receiver))
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum Sender<T: Message> {
|
||||
Bounded(BoundedSender<T>),
|
||||
Unbounded(UnboundedSender<T>),
|
||||
}
|
||||
|
||||
impl<T: Message> Clone for Sender<T> {
|
||||
fn clone(&self) -> Self {
|
||||
match self {
|
||||
Self::Bounded(sender) => Self::Bounded(sender.clone()),
|
||||
Self::Unbounded(sender) => Self::Unbounded(sender.clone()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) enum ChannelError<T> {
|
||||
/// The channel is closed.
|
||||
Disconnected,
|
||||
/// The channel is full.
|
||||
Full(T),
|
||||
}
|
||||
|
||||
pub(crate) struct Disconnected;
|
||||
|
||||
impl<T: Message> Sender<T> {
|
||||
pub(crate) fn close(&mut self) {
|
||||
match self {
|
||||
Self::Bounded(sender) => sender.close_channel(),
|
||||
Self::Unbounded(sender) => sender.close_channel(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn is_closed(&self) -> bool {
|
||||
match self {
|
||||
Self::Bounded(sender) => sender.is_closed(),
|
||||
Self::Unbounded(sender) => sender.is_closed(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Disconnected>> {
|
||||
match self {
|
||||
Self::Bounded(sender) => sender.poll_ready(ctx).map_err(|_| Disconnected),
|
||||
Self::Unbounded(sender) => sender.poll_ready(ctx).map_err(|_| Disconnected),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn try_send(
|
||||
&mut self,
|
||||
envelope: Envelope<T>,
|
||||
) -> Result<(), ChannelError<Envelope<T>>> {
|
||||
match self {
|
||||
Self::Bounded(sender) => sender.try_send(envelope).map_err(|e| {
|
||||
if e.is_full() {
|
||||
ChannelError::Full(e.into_inner())
|
||||
} else {
|
||||
ChannelError::Disconnected
|
||||
}
|
||||
}),
|
||||
Self::Unbounded(sender) => sender.unbounded_send(envelope).map_err(|e| {
|
||||
if e.is_full() {
|
||||
unreachable!("Unbounded channels cannot be full")
|
||||
} else {
|
||||
ChannelError::Disconnected
|
||||
}
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Message> Sink<Envelope<T>> for Sender<T> {
|
||||
type Error = Error;
|
||||
|
||||
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
match self.get_mut() {
|
||||
Sender::Bounded(sender) => sender.poll_ready(cx).map_err(|_| Error::Disconnected),
|
||||
Sender::Unbounded(sender) => sender.poll_ready(cx).map_err(|_| Error::Disconnected),
|
||||
}
|
||||
}
|
||||
|
||||
fn start_send(self: Pin<&mut Self>, item: Envelope<T>) -> Result<(), Self::Error> {
|
||||
match self.get_mut() {
|
||||
Sender::Bounded(sender) => sender.start_send(item).map_err(|_| Error::Disconnected),
|
||||
Sender::Unbounded(sender) => sender.start_send(item).map_err(|_| Error::Disconnected),
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
match self.get_mut() {
|
||||
Sender::Bounded(sender) => Pin::new(sender)
|
||||
.poll_flush(cx)
|
||||
.map_err(|_| Error::Disconnected),
|
||||
Sender::Unbounded(sender) => Pin::new(sender)
|
||||
.poll_flush(cx)
|
||||
.map_err(|_| Error::Disconnected),
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
match self.get_mut() {
|
||||
Sender::Bounded(sender) => Pin::new(sender)
|
||||
.poll_close(cx)
|
||||
.map_err(|_| Error::Disconnected),
|
||||
Sender::Unbounded(sender) => Pin::new(sender)
|
||||
.poll_close(cx)
|
||||
.map_err(|_| Error::Disconnected),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum Receiver<T: Message> {
|
||||
Bounded(BoundedReceiver<T>),
|
||||
Unbounded(UnboundedReceiver<T>),
|
||||
}
|
||||
|
||||
impl<T: Message> Stream for Receiver<T> {
|
||||
type Item = Envelope<T>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
match self.get_mut() {
|
||||
Receiver::Bounded(receiver) => Pin::new(receiver).poll_next(cx),
|
||||
Receiver::Unbounded(receiver) => Pin::new(receiver).poll_next(cx),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,56 +1,60 @@
|
||||
use futures_channel::oneshot::{channel, Receiver, Sender};
|
||||
use crate::{channel::ResponseSender, futures::ResponseFuture, Actor, Context, Dispatch, Message};
|
||||
|
||||
use crate::{Actor, Context, Dispatch, Message};
|
||||
|
||||
/// An envelope containing a message and optionally a channel which can be
|
||||
/// used to return a value back to the sender.
|
||||
pub struct Envelope<M, R> {
|
||||
msg: M,
|
||||
send: Option<Sender<R>>,
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum EnvelopeInner<T: Message> {
|
||||
/// A message which does not expect a response.
|
||||
NoResponse(T),
|
||||
/// A message which expects a response.
|
||||
WantsResponse(T, ResponseSender<T>),
|
||||
}
|
||||
|
||||
impl<M, R> Envelope<M, R> {
|
||||
/// An envelope containing a message and optionally a channel which can be
|
||||
/// used to return a response back to the sender.
|
||||
#[derive(Debug)]
|
||||
pub struct Envelope<T: Message>(EnvelopeInner<T>);
|
||||
|
||||
impl<T: Message> Envelope<T> {
|
||||
/// Create a new envelope.
|
||||
pub fn new(msg: M) -> Self {
|
||||
Self { msg, send: None }
|
||||
pub fn new(msg: T) -> Self {
|
||||
Self(EnvelopeInner::NoResponse(msg))
|
||||
}
|
||||
|
||||
/// Create a new envelope with a channel which can be used to return
|
||||
/// a response to the sender.
|
||||
pub fn new_returning(msg: M) -> (Self, Receiver<R>) {
|
||||
let (send, recv) = channel();
|
||||
(
|
||||
Self {
|
||||
msg,
|
||||
send: Some(send),
|
||||
},
|
||||
recv,
|
||||
)
|
||||
pub fn new_with_response(msg: T) -> (Self, ResponseFuture<T>) {
|
||||
let (send, recv) = ResponseFuture::new();
|
||||
(Self(EnvelopeInner::WantsResponse(msg, send)), recv)
|
||||
}
|
||||
|
||||
/// Returns `true` if the envelope has a channel which will receive a response.
|
||||
pub fn wants_response(&self) -> bool {
|
||||
match &self.0 {
|
||||
EnvelopeInner::NoResponse(_) => false,
|
||||
EnvelopeInner::WantsResponse(_, _) => true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<M, R> Envelope<M, R>
|
||||
where
|
||||
M: Message<Return = R>,
|
||||
R: Send,
|
||||
{
|
||||
/// Dispatches the message and return channel to the actor for handling.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `actor` - The actor which will handle the message.
|
||||
/// * `ctx` - The context of the actor.
|
||||
pub async fn dispatch<A>(mut self, actor: &mut A, ctx: &mut Context<A>)
|
||||
pub async fn dispatch<A>(self, actor: &mut A, ctx: &mut Context<A>)
|
||||
where
|
||||
A: Actor,
|
||||
M: Dispatch<A>,
|
||||
T: Dispatch<A>,
|
||||
{
|
||||
self.msg
|
||||
.dispatch(actor, ctx, move |ret| {
|
||||
if let Some(send) = self.send.take() {
|
||||
let _ = send.send(ret);
|
||||
}
|
||||
})
|
||||
.await
|
||||
match self.0 {
|
||||
EnvelopeInner::NoResponse(msg) => {
|
||||
msg.dispatch(actor, ctx, move |_| {}).await;
|
||||
}
|
||||
EnvelopeInner::WantsResponse(msg, sender) => {
|
||||
msg.dispatch(actor, ctx, move |ret| {
|
||||
let _ = sender.send(ret);
|
||||
})
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,23 +3,23 @@ use std::fmt::Display;
|
||||
/// Errors that can occur when sending a message.
|
||||
#[derive(Debug, PartialEq, Eq, Hash)]
|
||||
#[non_exhaustive]
|
||||
pub enum MessageError {
|
||||
/// The mailbox has been closed.
|
||||
Closed,
|
||||
pub enum Error {
|
||||
/// The mailbox has been disconnected.
|
||||
Disconnected,
|
||||
/// Handling of the message was interrupted.
|
||||
Interrupted,
|
||||
/// Error occurred while wrapping a message.
|
||||
Wrapper,
|
||||
}
|
||||
|
||||
impl Display for MessageError {
|
||||
impl Display for Error {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
MessageError::Closed => write!(f, "mailbox closed"),
|
||||
MessageError::Interrupted => write!(f, "message handling interrupted"),
|
||||
MessageError::Wrapper => write!(f, "wrapper error"),
|
||||
Error::Disconnected => write!(f, "mailbox disconnected"),
|
||||
Error::Interrupted => write!(f, "message handling interrupted"),
|
||||
Error::Wrapper => write!(f, "wrapper error"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for MessageError {}
|
||||
impl std::error::Error for Error {}
|
||||
|
||||
171
ludi-core/src/futures.rs
Normal file
171
ludi-core/src/futures.rs
Normal file
@@ -0,0 +1,171 @@
|
||||
//! Futures for sending messages and waiting for responses.
|
||||
|
||||
use std::{
|
||||
marker::PhantomData,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use futures_core::{ready, FusedFuture, Future};
|
||||
use futures_util::FutureExt;
|
||||
|
||||
use crate::{
|
||||
channel::{new_response, ChannelError, OneshotReceiver, ResponseSender, Sender},
|
||||
Envelope, Error, Message,
|
||||
};
|
||||
|
||||
/// A [`MessageFuture`] mode which will wait for a response.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub struct Wait;
|
||||
/// A [`MessageFuture`] mode which will return a [`ResponseFuture`].
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub struct Detach;
|
||||
|
||||
/// A future which sends a message and optionally waits for a response depending on the mode.
|
||||
///
|
||||
/// # Modes
|
||||
///
|
||||
/// * [`Wait`] - Waits for a response.
|
||||
/// * [`Detach`] - Returns a [`ResponseFuture`] which can be used to wait for the response.
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
pub struct MessageFuture<T: Message, M> {
|
||||
queue: QueueFuture<T>,
|
||||
response: Option<ResponseFuture<T>>,
|
||||
_mode: PhantomData<M>,
|
||||
}
|
||||
|
||||
impl<T: Message> MessageFuture<T, Wait> {
|
||||
pub(crate) fn new(queue: QueueFuture<T>, response: ResponseFuture<T>) -> Self {
|
||||
Self {
|
||||
queue,
|
||||
response: Some(response),
|
||||
_mode: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a new [`MessageFuture`] which will instead resolve when the message is sent and
|
||||
/// return a [`ResponseFuture`] which can be used to wait for the response.
|
||||
pub fn detach(self) -> MessageFuture<T, Detach> {
|
||||
MessageFuture {
|
||||
queue: self.queue,
|
||||
response: self.response,
|
||||
_mode: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Message> Future for MessageFuture<T, Wait> {
|
||||
type Output = Result<T::Return, Error>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.get_mut();
|
||||
|
||||
if !this.queue.is_terminated() {
|
||||
ready!(this.queue.poll_unpin(cx))?;
|
||||
}
|
||||
|
||||
let Some(response) = this.response.as_mut() else {
|
||||
panic!("future is not polled after completion")
|
||||
};
|
||||
|
||||
response.poll_unpin(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Message> Future for MessageFuture<T, Detach> {
|
||||
type Output = Result<ResponseFuture<T>, Error>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.get_mut();
|
||||
if !this.queue.is_terminated() {
|
||||
ready!(this.queue.poll_unpin(cx))?;
|
||||
}
|
||||
|
||||
Poll::Ready(Ok(this
|
||||
.response
|
||||
.take()
|
||||
.expect("future is not polled after completion")))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Message> FusedFuture for MessageFuture<T, Wait> {
|
||||
fn is_terminated(&self) -> bool {
|
||||
self.queue.is_terminated()
|
||||
&& self
|
||||
.response
|
||||
.as_ref()
|
||||
.map(|r| r.is_terminated())
|
||||
.unwrap_or(true)
|
||||
}
|
||||
}
|
||||
|
||||
/// A future which resolves when a message is successfully queued.
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
pub struct QueueFuture<T: Message> {
|
||||
sender: Sender<T>,
|
||||
msg: Option<Envelope<T>>,
|
||||
}
|
||||
|
||||
impl<T: Message> QueueFuture<T> {
|
||||
pub(crate) fn new(sender: Sender<T>, msg: Envelope<T>) -> Self {
|
||||
Self {
|
||||
sender,
|
||||
msg: Some(msg),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Message> Future for QueueFuture<T> {
|
||||
type Output = Result<(), Error>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.get_mut();
|
||||
if this.msg.is_some() {
|
||||
ready!(this.sender.poll_ready(cx).map_err(|_| Error::Disconnected))?;
|
||||
if let Err(err) = this.sender.try_send(this.msg.take().unwrap()) {
|
||||
match err {
|
||||
ChannelError::Disconnected => return Poll::Ready(Err(Error::Disconnected)),
|
||||
ChannelError::Full(msg) => {
|
||||
this.msg = Some(msg);
|
||||
return Poll::Pending;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Message> FusedFuture for QueueFuture<T> {
|
||||
fn is_terminated(&self) -> bool {
|
||||
self.msg.is_none()
|
||||
}
|
||||
}
|
||||
|
||||
/// A future which returns the response to a message.
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
pub struct ResponseFuture<T: Message>(pub(crate) OneshotReceiver<T::Return>);
|
||||
|
||||
impl<T: Message> ResponseFuture<T> {
|
||||
/// Returns a new [`ResponseSender`] and [`ResponseFuture`].
|
||||
pub fn new() -> (ResponseSender<T>, Self) {
|
||||
new_response()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Message> Future for ResponseFuture<T> {
|
||||
type Output = Result<T::Return, Error>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
Pin::new(&mut self.get_mut().0)
|
||||
.poll(cx)
|
||||
.map_err(|_| Error::Interrupted)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Message> FusedFuture for ResponseFuture<T> {
|
||||
fn is_terminated(&self) -> bool {
|
||||
self.0.is_terminated()
|
||||
}
|
||||
}
|
||||
@@ -7,37 +7,36 @@
|
||||
#![deny(clippy::all)]
|
||||
|
||||
mod address;
|
||||
mod channel;
|
||||
mod envelope;
|
||||
mod error;
|
||||
pub mod futures;
|
||||
mod mailbox;
|
||||
|
||||
use futures_core::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use std::future::Future;
|
||||
|
||||
pub use address::Address;
|
||||
pub use channel::ResponseSender;
|
||||
pub use envelope::Envelope;
|
||||
pub use error::MessageError;
|
||||
pub use mailbox::{IntoMail, IntoMailbox, Mailbox};
|
||||
|
||||
#[cfg(feature = "futures-mailbox")]
|
||||
pub use address::futures_address::FuturesAddress;
|
||||
#[cfg(feature = "futures-mailbox")]
|
||||
pub use mailbox::futures_mailbox::{mailbox, FuturesMailbox};
|
||||
pub use error::Error;
|
||||
pub use mailbox::{mailbox, unbounded_mailbox, IntoMail, IntoMailbox, Mailbox};
|
||||
|
||||
/// A message type.
|
||||
pub trait Message: Send + 'static {
|
||||
pub trait Message: Send + Unpin + 'static {
|
||||
/// The return value of the message.
|
||||
type Return: Send + 'static;
|
||||
type Return: Send + Unpin + 'static;
|
||||
}
|
||||
|
||||
/// A message which can wrap another type of message.
|
||||
pub trait Wrap<T: Message>: From<T> + Message {
|
||||
/// Unwraps the return value of the message.
|
||||
fn unwrap_return(ret: Self::Return) -> Result<T::Return, MessageError>;
|
||||
fn unwrap_return(ret: Self::Return) -> Result<T::Return, Error>;
|
||||
}
|
||||
|
||||
impl<T: Message> Wrap<T> for T {
|
||||
fn unwrap_return(ret: Self::Return) -> Result<T::Return, MessageError> {
|
||||
fn unwrap_return(ret: Self::Return) -> Result<T::Return, Error> {
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
@@ -61,11 +60,6 @@ pub trait Dispatch<A: Actor>: Message {
|
||||
|
||||
/// An actor.
|
||||
///
|
||||
/// # Message Type
|
||||
///
|
||||
/// Each actor must specify the type of message it can handle. See the [`Message`] trait for
|
||||
/// more information and examples.
|
||||
///
|
||||
/// # Start
|
||||
///
|
||||
/// When an actor is first started, the [`Actor::started`] method will be called. By default this method
|
||||
@@ -75,22 +69,7 @@ pub trait Dispatch<A: Actor>: Message {
|
||||
///
|
||||
/// When an actor receives a stop signal it will stop processing messages and the [`Actor::stopped`] method
|
||||
/// will be called before returning.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```ignore
|
||||
/// # use ludi_core::*;
|
||||
/// struct PingActor;
|
||||
///
|
||||
/// impl Actor for PingActor {
|
||||
/// type Stop = ();
|
||||
///
|
||||
/// async fn stopped(&mut self) -> Self::Stop {
|
||||
/// println!("actor stopped");
|
||||
/// }
|
||||
/// }
|
||||
/// ```
|
||||
pub trait Actor: Send + Sized + 'static {
|
||||
pub trait Actor: Send + Sized {
|
||||
/// The type of value returned when this actor is stopped.
|
||||
type Stop;
|
||||
/// The type of error which may occur during handling.
|
||||
@@ -154,19 +133,6 @@ pub trait Handler<T: Message>: Actor {
|
||||
}
|
||||
}
|
||||
|
||||
/// A controller for an actor.
|
||||
pub trait Controller {
|
||||
/// The type of actor that this controller controls.
|
||||
type Actor: Actor;
|
||||
/// The type of address that this controller uses to send messages to the actor.
|
||||
type Address: Address<Message = Self::Message>;
|
||||
/// The type of message that this controller can send to the actor.
|
||||
type Message: Message + Dispatch<Self::Actor>;
|
||||
|
||||
/// Returns the address of the actor.
|
||||
fn address(&self) -> &Self::Address;
|
||||
}
|
||||
|
||||
/// An actor's execution context.
|
||||
pub struct Context<A: Actor> {
|
||||
stopped: bool,
|
||||
@@ -233,11 +199,11 @@ impl<A: Actor> Context<A> {
|
||||
///
|
||||
/// * `actor` - The actor to run.
|
||||
/// * `mailbox` - The mailbox which will be used to receive messages.
|
||||
pub async fn run<A, M>(actor: &mut A, mailbox: &mut M) -> Result<A::Stop, A::Error>
|
||||
pub async fn run<A, M, T>(actor: &mut A, mailbox: &mut M) -> Result<A::Stop, A::Error>
|
||||
where
|
||||
A: Actor,
|
||||
M: Mailbox,
|
||||
M::Message: Dispatch<A>,
|
||||
M: Stream<Item = Envelope<T>> + Unpin,
|
||||
T: Dispatch<A>,
|
||||
{
|
||||
let mut ctx = Context::default();
|
||||
actor.started(&mut ctx)?;
|
||||
|
||||
@@ -1,48 +1,55 @@
|
||||
use std::{pin::Pin, task::Poll};
|
||||
use std::{
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use futures_core::Stream;
|
||||
use futures_util::StreamExt;
|
||||
|
||||
use crate::{Envelope, Message};
|
||||
use crate::{
|
||||
channel::{new_channel, new_unbounded_channel, Receiver},
|
||||
Address, Envelope, Message,
|
||||
};
|
||||
|
||||
/// A mailbox.
|
||||
///
|
||||
/// A mailbox is an asynchronous stream of messages which can be dispatched to an actor. The counter-part
|
||||
/// to a mailbox is an [`Address`](crate::Address), which is used to send messages.
|
||||
pub trait Mailbox:
|
||||
Stream<Item = Envelope<Self::Message, <Self::Message as Message>::Return>> + Send + Unpin + 'static
|
||||
{
|
||||
/// The type of message that can be sent to this mailbox.
|
||||
type Message: Message;
|
||||
/// Returns a new mailbox and address.
|
||||
pub fn mailbox<T: Message>(capacity: usize) -> (Mailbox<T>, Address<T>) {
|
||||
let (sender, recv) = new_channel(capacity);
|
||||
|
||||
(Mailbox { recv }, Address::new(sender))
|
||||
}
|
||||
|
||||
impl<T, U> Mailbox for T
|
||||
where
|
||||
T: Stream<Item = Envelope<U, <U as Message>::Return>> + Send + Unpin + 'static,
|
||||
U: Message,
|
||||
{
|
||||
type Message = U;
|
||||
/// Returns a new unbounded mailbox and address.
|
||||
pub fn unbounded_mailbox<T: Message>() -> (Mailbox<T>, Address<T>) {
|
||||
let (sender, recv) = new_unbounded_channel();
|
||||
|
||||
(Mailbox { recv }, Address::new(sender))
|
||||
}
|
||||
|
||||
/// A mailbox.
|
||||
pub struct Mailbox<T: Message> {
|
||||
recv: Receiver<T>,
|
||||
}
|
||||
|
||||
impl<T: Message> Stream for Mailbox<T> {
|
||||
type Item = Envelope<T>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
self.recv.poll_next_unpin(cx)
|
||||
}
|
||||
}
|
||||
|
||||
/// An extension trait which converts a stream of messages into a mailbox.
|
||||
pub trait IntoMailbox {
|
||||
/// The type of message that can be sent to the mailbox.
|
||||
type Message: Message;
|
||||
/// The type of mailbox.
|
||||
type IntoMail: Mailbox<Message = Self::Message>;
|
||||
|
||||
pub trait IntoMailbox: Sized {
|
||||
/// Convert self into a mailbox.
|
||||
fn into_mailbox(self) -> Self::IntoMail;
|
||||
fn into_mailbox(self) -> IntoMail<Self>;
|
||||
}
|
||||
|
||||
impl<T> IntoMailbox for T
|
||||
where
|
||||
T: Stream + Send + Unpin + 'static,
|
||||
T: Stream + Unpin,
|
||||
T::Item: Message,
|
||||
{
|
||||
type Message = T::Item;
|
||||
type IntoMail = IntoMail<T>;
|
||||
|
||||
fn into_mailbox(self) -> Self::IntoMail {
|
||||
fn into_mailbox(self) -> IntoMail<Self> {
|
||||
IntoMail(self)
|
||||
}
|
||||
}
|
||||
@@ -71,10 +78,10 @@ impl<T> IntoMail<T> {
|
||||
|
||||
impl<T> Stream for IntoMail<T>
|
||||
where
|
||||
T: Stream + Send + Unpin + 'static,
|
||||
T: Stream + Unpin,
|
||||
T::Item: Message,
|
||||
{
|
||||
type Item = Envelope<T::Item, <T::Item as Message>::Return>;
|
||||
type Item = Envelope<T::Item>;
|
||||
|
||||
fn poll_next(
|
||||
self: Pin<&mut Self>,
|
||||
@@ -83,51 +90,3 @@ where
|
||||
Stream::poll_next(Pin::new(&mut self.get_mut().0), cx).map(|m| m.map(|m| Envelope::new(m)))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "futures-mailbox")]
|
||||
pub(crate) mod futures_mailbox {
|
||||
use super::*;
|
||||
use crate::FuturesAddress;
|
||||
use futures_channel::mpsc;
|
||||
|
||||
/// Returns a new mailbox and its' address.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `capacity` - The number of messages that can be buffered in the mailbox.
|
||||
pub fn mailbox<T>(capacity: usize) -> (FuturesMailbox<T>, FuturesAddress<T>)
|
||||
where
|
||||
T: Message,
|
||||
{
|
||||
FuturesMailbox::new(capacity)
|
||||
}
|
||||
|
||||
/// A MPSC mailbox implemented using channels from the [`futures_channel`](https://crates.io/crates/futures_channel) crate.
|
||||
pub struct FuturesMailbox<T: Message> {
|
||||
recv: mpsc::Receiver<Envelope<T, T::Return>>,
|
||||
}
|
||||
|
||||
impl<T: Message> FuturesMailbox<T> {
|
||||
/// Create a new mailbox, returning the mailbox and its' address.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `buffer` - The number of messages that can be buffered in the mailbox.
|
||||
pub fn new(buffer: usize) -> (Self, FuturesAddress<T>) {
|
||||
let (send, recv) = mpsc::channel(buffer);
|
||||
|
||||
(Self { recv }, FuturesAddress { send })
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Message> Stream for FuturesMailbox<T> {
|
||||
type Item = Envelope<T, T::Return>;
|
||||
|
||||
fn poll_next(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> Poll<Option<Self::Item>> {
|
||||
Pin::new(&mut self.recv).poll_next(cx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -49,3 +49,7 @@ path = "tests/implement_async_trait.rs"
|
||||
[[test]]
|
||||
name = "implement_trait_foreign"
|
||||
path = "tests/implement_trait_foreign.rs"
|
||||
|
||||
[[test]]
|
||||
name = "controller"
|
||||
path = "tests/controller.rs"
|
||||
|
||||
27
ludi-macros-test/tests/controller.rs
Normal file
27
ludi-macros-test/tests/controller.rs
Normal file
@@ -0,0 +1,27 @@
|
||||
#![allow(dead_code)]
|
||||
|
||||
#[derive(ludi::Controller)]
|
||||
pub struct Foo;
|
||||
|
||||
impl ::ludi::Actor for Foo {
|
||||
type Stop = ();
|
||||
type Error = ();
|
||||
|
||||
async fn stopped(&mut self) -> Result<Self::Stop, Self::Error> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(ludi::Controller)]
|
||||
pub struct FooGenerics<T>(T)
|
||||
where
|
||||
T: Send;
|
||||
|
||||
impl<T: Send> ::ludi::Actor for FooGenerics<T> {
|
||||
type Stop = ();
|
||||
type Error = ();
|
||||
|
||||
async fn stopped(&mut self) -> Result<Self::Stop, Self::Error> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -23,22 +23,19 @@ pub(crate) fn impl_controller(input: syn::DeriveInput) -> proc_macro2::TokenStre
|
||||
input.generics.split_for_impl();
|
||||
|
||||
let mut generics = input.generics.clone();
|
||||
generics.params.push(syn::parse_quote!(A));
|
||||
generics.params.push(syn::parse_quote!(CtrlMsg));
|
||||
|
||||
let where_clause = generics.make_where_clause();
|
||||
where_clause
|
||||
.predicates
|
||||
.push(syn::parse_quote!(A: ::ludi::Address));
|
||||
where_clause
|
||||
.predicates
|
||||
.push(syn::parse_quote!(<A as ::ludi::Address>::Message: ::ludi::Dispatch<#actor_ident #actor_ty_generics>));
|
||||
.push(syn::parse_quote!(CtrlMsg: ::ludi::Message));
|
||||
|
||||
let (impl_generics, ty_generics, where_clause) = generics.split_for_impl();
|
||||
|
||||
let fields = if input.generics.params.is_empty() {
|
||||
quote!(addr: A)
|
||||
quote!(addr: ludi::Address<CtrlMsg>)
|
||||
} else {
|
||||
quote!(addr: A, _pd: std::marker::PhantomData #actor_ty_generics)
|
||||
quote!(addr: ludi::Address<CtrlMsg>, _pd: std::marker::PhantomData #actor_ty_generics)
|
||||
};
|
||||
|
||||
let from_fields = if input.generics.params.is_empty() {
|
||||
@@ -53,39 +50,27 @@ pub(crate) fn impl_controller(input: syn::DeriveInput) -> proc_macro2::TokenStre
|
||||
quote!(
|
||||
#[derive(Debug, Clone)]
|
||||
#[doc = #ctrl_doc]
|
||||
#vis struct #ctrl_ident #ty_generics {
|
||||
#vis struct #ctrl_ident #ty_generics where CtrlMsg: ::ludi::Message {
|
||||
#fields
|
||||
}
|
||||
|
||||
impl #actor_impl_generics #actor_ident #actor_ty_generics #actor_where_clause {
|
||||
#[doc = #ctrl_fn_doc]
|
||||
pub fn controller<A>(addr: A) -> #ctrl_ident #ty_generics
|
||||
pub fn controller<CtrlMsg>(addr: ::ludi::Address<CtrlMsg>) -> #ctrl_ident #ty_generics
|
||||
where
|
||||
A: ::ludi::Address,
|
||||
<A as ::ludi::Address>::Message: ::ludi::Dispatch<Self>,
|
||||
CtrlMsg: ::ludi::Message + ::ludi::Dispatch<Self>,
|
||||
{
|
||||
#ctrl_ident ::from(addr)
|
||||
}
|
||||
}
|
||||
|
||||
impl #impl_generics From<A> for #ctrl_ident #ty_generics #actor_where_clause
|
||||
impl #impl_generics From<::ludi::Address<CtrlMsg>> for #ctrl_ident #ty_generics #where_clause
|
||||
{
|
||||
fn from(addr: A) -> Self {
|
||||
fn from(addr: ludi::Address<CtrlMsg>) -> Self {
|
||||
Self {
|
||||
#from_fields
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl #ty_generics ::ludi::Controller for #ctrl_ident #ty_generics #where_clause
|
||||
{
|
||||
type Actor = #actor_ident #actor_ty_generics;
|
||||
type Address = A;
|
||||
type Message = <A as ::ludi::Address>::Message;
|
||||
|
||||
fn address(&self) -> &Self::Address {
|
||||
&self.addr
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
@@ -237,31 +237,28 @@ impl ItemImpl {
|
||||
|
||||
if let Some(generic_args) = &self.actor_generic_args {
|
||||
let mut generic_args = generic_args.clone();
|
||||
generic_args.args.push(parse_quote!(A));
|
||||
generic_args.args.push(parse_quote!(CtrlMsg));
|
||||
ctrl_path.segments.last_mut().unwrap().arguments =
|
||||
syn::PathArguments::AngleBracketed(generic_args);
|
||||
} else {
|
||||
ctrl_path.segments.last_mut().unwrap().arguments =
|
||||
syn::PathArguments::AngleBracketed(parse_quote!(<A,>));
|
||||
syn::PathArguments::AngleBracketed(parse_quote!(<CtrlMsg,>));
|
||||
}
|
||||
|
||||
let mut generics = self.impl_generics.clone();
|
||||
generics.params.push(parse_quote!(A));
|
||||
generics.params.push(parse_quote!(CtrlMsg));
|
||||
|
||||
let where_clause = generics.make_where_clause();
|
||||
where_clause
|
||||
.predicates
|
||||
.push(parse_quote!(A: Send + Sync + 'static));
|
||||
where_clause
|
||||
.predicates
|
||||
.push(parse_quote!(Self: ::ludi::Controller<Actor = #actor_path>));
|
||||
.push(parse_quote!(CtrlMsg: ::ludi::Message + ludi::Dispatch<#actor_path>));
|
||||
|
||||
if let Some(ImplTrait { trait_path, .. }) = &self.impl_trait {
|
||||
self.methods.iter().for_each(|method| {
|
||||
let struct_path = &method.struct_path;
|
||||
where_clause.predicates.push(
|
||||
parse_quote!(<Self as ::ludi::Controller>::Message: ::ludi::Wrap<#struct_path>),
|
||||
);
|
||||
where_clause
|
||||
.predicates
|
||||
.push(parse_quote!(CtrlMsg: ::ludi::Wrap<#struct_path>));
|
||||
});
|
||||
|
||||
let methods = self.methods.iter().map(|method| method.expand_ctrl(true));
|
||||
@@ -279,9 +276,9 @@ impl ItemImpl {
|
||||
let mut generics = generics.clone();
|
||||
|
||||
let where_clause = generics.make_where_clause();
|
||||
where_clause.predicates.push(
|
||||
parse_quote!(<Self as ::ludi::Controller>::Message: ::ludi::Wrap<#struct_path>),
|
||||
);
|
||||
where_clause
|
||||
.predicates
|
||||
.push(parse_quote!(CtrlMsg: ::ludi::Wrap<#struct_path>));
|
||||
let (impl_generics, _, where_clause) = generics.split_for_impl();
|
||||
|
||||
quote!(
|
||||
|
||||
@@ -405,10 +405,7 @@ impl Method {
|
||||
#(#doc_attrs)*
|
||||
#(#[#attrs])*
|
||||
#vis #ctrl_sig {
|
||||
::ludi::Address::send_await(
|
||||
::ludi::Controller::address(self),
|
||||
#struct_arg
|
||||
).await #err_handler
|
||||
self.addr.send(#struct_arg).await #err_handler
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
@@ -142,10 +142,10 @@ impl ToTokens for Wrap {
|
||||
|
||||
#(
|
||||
impl #impl_generics ::ludi::Wrap<#variant_tys> for #ident #ty_generics #where_clause {
|
||||
fn unwrap_return(ret: Self::Return) -> Result<<#variant_tys as ::ludi::Message>::Return, ::ludi::MessageError> {
|
||||
fn unwrap_return(ret: Self::Return) -> Result<<#variant_tys as ::ludi::Message>::Return, ::ludi::Error> {
|
||||
match ret {
|
||||
Self::Return :: #variant_idents (value) => Ok(value),
|
||||
_ => Err(::ludi::MessageError::Wrapper),
|
||||
_ => Err(::ludi::Error::Wrapper),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,9 +4,8 @@ version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[features]
|
||||
default = ["macros", "futures-mailbox"]
|
||||
default = ["macros"]
|
||||
macros = ["dep:ludi-macros"]
|
||||
futures-mailbox = ["ludi-core/futures-mailbox"]
|
||||
|
||||
[dependencies]
|
||||
ludi-core = { path = "../ludi-core" }
|
||||
|
||||
3
rustfmt.toml
Normal file
3
rustfmt.toml
Normal file
@@ -0,0 +1,3 @@
|
||||
ignore = ["tls-core/", "tls-client/"]
|
||||
|
||||
imports_granularity = "Crate"
|
||||
Reference in New Issue
Block a user