From be1eac9a57765b8ffa2035c324a3ee226a74c562 Mon Sep 17 00:00:00 2001 From: narodnik Date: Fri, 5 Mar 2021 15:44:34 +0100 Subject: [PATCH] new message substem, still not yet integrated with main networking code --- src/bin/dfi.rs | 5 +- src/net/message_subscriber.rs | 251 ++++++++++++++++++++++++++- src/net/p2p.rs | 7 +- src/net/sessions/outbound_session.rs | 8 +- src/net/sessions/seed_session.rs | 2 +- 5 files changed, 259 insertions(+), 14 deletions(-) diff --git a/src/bin/dfi.rs b/src/bin/dfi.rs index acc3c70ba..4915e17cc 100644 --- a/src/bin/dfi.rs +++ b/src/bin/dfi.rs @@ -5,12 +5,12 @@ use async_native_tls::TlsAcceptor; use async_std::sync::Mutex; use easy_parallel::Parallel; use http_types::{Request, Response, StatusCode}; +use log::*; use serde_json::json; use smol::Async; use std::net::SocketAddr; use std::net::TcpListener; use std::sync::Arc; -use log::*; use sapvi::{net, Result}; @@ -138,6 +138,9 @@ impl RpcInterface { } async fn start(executor: Arc>, options: ProgramOptions) -> Result<()> { + sapvi::net::message_subscriber::doteste().await; + return Ok(()); + let p2p = net::P2p::new(options.network_settings); let rpc = RpcInterface::new(p2p.clone()); diff --git a/src/net/message_subscriber.rs b/src/net/message_subscriber.rs index 4119fd492..91bc2b323 100644 --- a/src/net/message_subscriber.rs +++ b/src/net/message_subscriber.rs @@ -1,10 +1,18 @@ +use std::io; +use log::*; +use std::io::Cursor; +use async_trait::async_trait; use async_std::sync::Mutex; use rand::Rng; +use std::any::Any; use std::collections::HashMap; use std::sync::Arc; -use crate::net::error::NetResult; +use crate::error::Result; +use crate::net::error::{NetResult, NetError}; use crate::net::messages::{Message, PacketType}; +use crate::serial::Decodable; +use crate::serial::Encodable; pub type MessageSubscriberPtr = Arc; @@ -129,3 +137,244 @@ impl MessageSubscriber { } } } + +// +// + +pub trait Message2: 'static + Decodable + Send + Sync { + fn name() -> &'static str; + + fn deserialize(); + fn serialize(); +} + +pub struct MessageSubscription2 { + id: MessageSubscriptionID, + recv_queue: async_channel::Receiver>, + parent: Arc> +} + +impl MessageSubscription2 { + pub async fn receive(&self) -> Arc { + match self.recv_queue.recv().await { + Ok(message) => { + message + } + Err(err) => { + panic!("MessageSubscription::receive() recv_queue failed! {}", err); + } + } + } + + // Must be called manually since async Drop is not possible in Rust + pub async fn unsubscribe(&self) { + self.parent.clone().unsubscribe(self.id).await + } +} + +#[async_trait] +trait MessageDispatcherInterface: Sync { + async fn notify(&self, payload: Vec); + + async fn unsubscribe(&self, sub_id: MessageSubscriptionID); + + fn as_any(&self) -> &dyn Any; +} + +struct MessageDispatcher { + subs: Mutex>>>, +} + +impl MessageDispatcher { + fn new() -> Self { + MessageDispatcher { + subs: Mutex::new(HashMap::new()), + } + } + + pub fn random_id() -> MessageSubscriptionID { + let mut rng = rand::thread_rng(); + rng.gen() + } + + pub async fn subscribe(&self, self_arc: Arc>) -> MessageSubscription2 { + let (sender, recvr) = async_channel::unbounded(); + let sub_id = Self::random_id(); + self.subs.lock().await.insert(sub_id, sender); + + MessageSubscription2 { + id: sub_id, + recv_queue: recvr, + parent: self_arc + } + } + + async fn notify_all(&self, message: M) { + let message = Arc::new(message); + let mut garbage_ids = Vec::new(); + + for (sub_id, sub) in &*self.subs.lock().await { + match sub.send(message.clone()).await { + Ok(()) => {} + Err(_err) => { + // Automatically clean out closed channels + garbage_ids.push(*sub_id); + //panic!("Error returned sending message in notify() call! {}", err); + } + } + } + + self.collect_garbage(garbage_ids).await; + } + + async fn collect_garbage(&self, ids: Vec) { + let mut subs = self.subs.lock().await; + for id in &ids { + subs.remove(id); + } + } +} + +#[async_trait] +impl MessageDispatcherInterface for MessageDispatcher { + async fn notify(&self, payload: Vec) { + // deserialize data into type + // send down the pipes + let cursor = Cursor::new(payload); + match M::decode(cursor) { + Ok(message) => { + self.notify_all(message).await + } + Err(err) => { + error!("Unable to decode data. Dropping...: {}", err); + } + } + } + + async fn unsubscribe(&self, sub_id: MessageSubscriptionID) { + self.subs.lock().await.remove(&sub_id); + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +struct MyVersionMessage { + x: u32 +} + +impl Message2 for MyVersionMessage { + fn name() -> &'static str { + "verver" + } + + fn deserialize() {} + fn serialize() {} +} + +impl Encodable for MyVersionMessage { + fn encode(&self, mut s: S) -> Result { + let mut len = 0; + len += self.x.encode(&mut s)?; + Ok(len) + } +} + +impl Decodable for MyVersionMessage { + fn decode(mut d: D) -> Result { + Ok(Self { + x: Decodable::decode(&mut d)?, + }) + } +} + +struct MessageSubsystem { + dispatchers: Mutex>>> +} + +impl MessageSubsystem { + pub fn new() -> Self { + MessageSubsystem { dispatchers: Mutex::new(HashMap::new()) } + } + + pub async fn add_dispatch(&self) { + self.dispatchers.lock().await.insert( + M::name(), + Arc::new(Box::new(MessageDispatcher::::new())), + ); + } + + pub async fn subscribe(&self) -> NetResult> { + let dispatcher = self.dispatchers + .lock() + .await + .get(MyVersionMessage::name()) + .cloned(); + + let sub = match dispatcher { + Some(dispatcher_arc) => { + let dispatcher: &MessageDispatcher = match dispatcher_arc + .as_any() + .downcast_ref::>() { + Some(dispatcher) => dispatcher, + None => panic!("Multiple messages registered with different names"), + }; + + dispatcher.subscribe(dispatcher_arc.clone()).await + } + None => { + // normall return failure here + // for now panic + return Err(NetError::OperationFailed); + } + }; + + Ok(sub) + } + + pub async fn trigger(&self, name: &str, data: Vec) { + let dispatcher = self.dispatchers + .lock() + .await + .get(name) + .cloned(); + + match dispatcher { + Some(dispatcher) => { + dispatcher.notify(data).await; + } + None => {} + } + } +} + +pub async fn doteste() { + println!("hello"); + + let subsystem = MessageSubsystem::new(); + subsystem.add_dispatch::().await; + + // subscribe + // 1. get dispatcher + // 2. cast to specific type + // 3. do sub, return sub + let sub = subsystem.subscribe::().await.unwrap(); + + let msg = MyVersionMessage { x: 110 }; + let mut payload = Vec::new(); + msg.encode(&mut payload).unwrap(); + + // receive message and publish + // 1. based on string, lookup relevant dispatcher interface + // 2. publish data there + subsystem.trigger("verver", payload).await; + + // receive + // 1. do a get easy + let msg2 = sub.receive().await; + println!("{}", msg2.x); +} + diff --git a/src/net/p2p.rs b/src/net/p2p.rs index 32bc7b17a..c5cf6f376 100644 --- a/src/net/p2p.rs +++ b/src/net/p2p.rs @@ -1,7 +1,7 @@ use async_executor::Executor; use async_std::sync::Mutex; use log::*; -use std::collections::{HashSet, HashMap}; +use std::collections::{HashMap, HashSet}; use std::net::SocketAddr; use std::sync::Arc; @@ -80,10 +80,7 @@ impl P2p { .insert(channel.address(), channel); } pub async fn remove(&self, channel: ChannelPtr) { - self.channels - .lock() - .await - .remove(&channel.address()); + self.channels.lock().await.remove(&channel.address()); } pub async fn exists(&self, addr: &SocketAddr) -> bool { diff --git a/src/net/sessions/outbound_session.rs b/src/net/sessions/outbound_session.rs index 68b711cc1..5a2295519 100644 --- a/src/net/sessions/outbound_session.rs +++ b/src/net/sessions/outbound_session.rs @@ -131,13 +131,9 @@ impl OutboundSession { fn addr_is_inbound(addr: &SocketAddr, inbound_addr: &Option) -> bool { match inbound_addr { - Some(inbound_addr) => { - inbound_addr == addr - } + Some(inbound_addr) => inbound_addr == addr, // No inbound listening address configured - None => { - false - } + None => false, } } diff --git a/src/net/sessions/seed_session.rs b/src/net/sessions/seed_session.rs index b0a05f3ca..ecd307b62 100644 --- a/src/net/sessions/seed_session.rs +++ b/src/net/sessions/seed_session.rs @@ -7,8 +7,8 @@ use std::sync::{Arc, Weak}; use crate::net::error::{NetError, NetResult}; use crate::net::protocols::{ProtocolPing, ProtocolSeed}; use crate::net::sessions::Session; -use crate::net::{ChannelPtr, Connector, HostsPtr, P2p, SettingsPtr}; use crate::net::utility::sleep; +use crate::net::{ChannelPtr, Connector, HostsPtr, P2p, SettingsPtr}; pub struct SeedSession { p2p: Weak,