new message substem, still not yet integrated with main networking code

This commit is contained in:
narodnik
2021-03-05 15:44:34 +01:00
parent 7dc7dbd88c
commit be1eac9a57
5 changed files with 259 additions and 14 deletions

View File

@@ -5,12 +5,12 @@ use async_native_tls::TlsAcceptor;
use async_std::sync::Mutex;
use easy_parallel::Parallel;
use http_types::{Request, Response, StatusCode};
use log::*;
use serde_json::json;
use smol::Async;
use std::net::SocketAddr;
use std::net::TcpListener;
use std::sync::Arc;
use log::*;
use sapvi::{net, Result};
@@ -138,6 +138,9 @@ impl RpcInterface {
}
async fn start(executor: Arc<Executor<'_>>, options: ProgramOptions) -> Result<()> {
sapvi::net::message_subscriber::doteste().await;
return Ok(());
let p2p = net::P2p::new(options.network_settings);
let rpc = RpcInterface::new(p2p.clone());

View File

@@ -1,10 +1,18 @@
use std::io;
use log::*;
use std::io::Cursor;
use async_trait::async_trait;
use async_std::sync::Mutex;
use rand::Rng;
use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
use crate::net::error::NetResult;
use crate::error::Result;
use crate::net::error::{NetResult, NetError};
use crate::net::messages::{Message, PacketType};
use crate::serial::Decodable;
use crate::serial::Encodable;
pub type MessageSubscriberPtr = Arc<MessageSubscriber>;
@@ -129,3 +137,244 @@ impl MessageSubscriber {
}
}
}
//
//
pub trait Message2: 'static + Decodable + Send + Sync {
fn name() -> &'static str;
fn deserialize();
fn serialize();
}
pub struct MessageSubscription2<M: Message2> {
id: MessageSubscriptionID,
recv_queue: async_channel::Receiver<Arc<M>>,
parent: Arc<Box<dyn MessageDispatcherInterface>>
}
impl<M: Message2> MessageSubscription2<M> {
pub async fn receive(&self) -> Arc<M> {
match self.recv_queue.recv().await {
Ok(message) => {
message
}
Err(err) => {
panic!("MessageSubscription::receive() recv_queue failed! {}", err);
}
}
}
// Must be called manually since async Drop is not possible in Rust
pub async fn unsubscribe(&self) {
self.parent.clone().unsubscribe(self.id).await
}
}
#[async_trait]
trait MessageDispatcherInterface: Sync {
async fn notify(&self, payload: Vec<u8>);
async fn unsubscribe(&self, sub_id: MessageSubscriptionID);
fn as_any(&self) -> &dyn Any;
}
struct MessageDispatcher<M: Message2> {
subs: Mutex<HashMap<MessageSubscriptionID, async_channel::Sender<Arc<M>>>>,
}
impl<M: Message2> MessageDispatcher<M> {
fn new() -> Self {
MessageDispatcher {
subs: Mutex::new(HashMap::new()),
}
}
pub fn random_id() -> MessageSubscriptionID {
let mut rng = rand::thread_rng();
rng.gen()
}
pub async fn subscribe(&self, self_arc: Arc<Box<dyn MessageDispatcherInterface>>) -> MessageSubscription2<M> {
let (sender, recvr) = async_channel::unbounded();
let sub_id = Self::random_id();
self.subs.lock().await.insert(sub_id, sender);
MessageSubscription2 {
id: sub_id,
recv_queue: recvr,
parent: self_arc
}
}
async fn notify_all(&self, message: M) {
let message = Arc::new(message);
let mut garbage_ids = Vec::new();
for (sub_id, sub) in &*self.subs.lock().await {
match sub.send(message.clone()).await {
Ok(()) => {}
Err(_err) => {
// Automatically clean out closed channels
garbage_ids.push(*sub_id);
//panic!("Error returned sending message in notify() call! {}", err);
}
}
}
self.collect_garbage(garbage_ids).await;
}
async fn collect_garbage(&self, ids: Vec<MessageSubscriptionID>) {
let mut subs = self.subs.lock().await;
for id in &ids {
subs.remove(id);
}
}
}
#[async_trait]
impl<M: Message2> MessageDispatcherInterface for MessageDispatcher<M> {
async fn notify(&self, payload: Vec<u8>) {
// deserialize data into type
// send down the pipes
let cursor = Cursor::new(payload);
match M::decode(cursor) {
Ok(message) => {
self.notify_all(message).await
}
Err(err) => {
error!("Unable to decode data. Dropping...: {}", err);
}
}
}
async fn unsubscribe(&self, sub_id: MessageSubscriptionID) {
self.subs.lock().await.remove(&sub_id);
}
fn as_any(&self) -> &dyn Any {
self
}
}
struct MyVersionMessage {
x: u32
}
impl Message2 for MyVersionMessage {
fn name() -> &'static str {
"verver"
}
fn deserialize() {}
fn serialize() {}
}
impl Encodable for MyVersionMessage {
fn encode<S: io::Write>(&self, mut s: S) -> Result<usize> {
let mut len = 0;
len += self.x.encode(&mut s)?;
Ok(len)
}
}
impl Decodable for MyVersionMessage {
fn decode<D: io::Read>(mut d: D) -> Result<Self> {
Ok(Self {
x: Decodable::decode(&mut d)?,
})
}
}
struct MessageSubsystem {
dispatchers: Mutex<HashMap<&'static str, Arc<Box<dyn MessageDispatcherInterface>>>>
}
impl MessageSubsystem {
pub fn new() -> Self {
MessageSubsystem { dispatchers: Mutex::new(HashMap::new()) }
}
pub async fn add_dispatch<M: Message2>(&self) {
self.dispatchers.lock().await.insert(
M::name(),
Arc::new(Box::new(MessageDispatcher::<M>::new())),
);
}
pub async fn subscribe<M: Message2>(&self) -> NetResult<MessageSubscription2<M>> {
let dispatcher = self.dispatchers
.lock()
.await
.get(MyVersionMessage::name())
.cloned();
let sub = match dispatcher {
Some(dispatcher_arc) => {
let dispatcher: &MessageDispatcher<M> = match dispatcher_arc
.as_any()
.downcast_ref::<MessageDispatcher<
M,
>>() {
Some(dispatcher) => dispatcher,
None => panic!("Multiple messages registered with different names"),
};
dispatcher.subscribe(dispatcher_arc.clone()).await
}
None => {
// normall return failure here
// for now panic
return Err(NetError::OperationFailed);
}
};
Ok(sub)
}
pub async fn trigger(&self, name: &str, data: Vec<u8>) {
let dispatcher = self.dispatchers
.lock()
.await
.get(name)
.cloned();
match dispatcher {
Some(dispatcher) => {
dispatcher.notify(data).await;
}
None => {}
}
}
}
pub async fn doteste() {
println!("hello");
let subsystem = MessageSubsystem::new();
subsystem.add_dispatch::<MyVersionMessage>().await;
// subscribe
// 1. get dispatcher
// 2. cast to specific type
// 3. do sub, return sub
let sub = subsystem.subscribe::<MyVersionMessage>().await.unwrap();
let msg = MyVersionMessage { x: 110 };
let mut payload = Vec::new();
msg.encode(&mut payload).unwrap();
// receive message and publish
// 1. based on string, lookup relevant dispatcher interface
// 2. publish data there
subsystem.trigger("verver", payload).await;
// receive
// 1. do a get easy
let msg2 = sub.receive().await;
println!("{}", msg2.x);
}

View File

@@ -1,7 +1,7 @@
use async_executor::Executor;
use async_std::sync::Mutex;
use log::*;
use std::collections::{HashSet, HashMap};
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use std::sync::Arc;
@@ -80,10 +80,7 @@ impl P2p {
.insert(channel.address(), channel);
}
pub async fn remove(&self, channel: ChannelPtr) {
self.channels
.lock()
.await
.remove(&channel.address());
self.channels.lock().await.remove(&channel.address());
}
pub async fn exists(&self, addr: &SocketAddr) -> bool {

View File

@@ -131,13 +131,9 @@ impl OutboundSession {
fn addr_is_inbound(addr: &SocketAddr, inbound_addr: &Option<SocketAddr>) -> bool {
match inbound_addr {
Some(inbound_addr) => {
inbound_addr == addr
}
Some(inbound_addr) => inbound_addr == addr,
// No inbound listening address configured
None => {
false
}
None => false,
}
}

View File

@@ -7,8 +7,8 @@ use std::sync::{Arc, Weak};
use crate::net::error::{NetError, NetResult};
use crate::net::protocols::{ProtocolPing, ProtocolSeed};
use crate::net::sessions::Session;
use crate::net::{ChannelPtr, Connector, HostsPtr, P2p, SettingsPtr};
use crate::net::utility::sleep;
use crate::net::{ChannelPtr, Connector, HostsPtr, P2p, SettingsPtr};
pub struct SeedSession {
p2p: Weak<P2p>,