remove stream interface for now

This commit is contained in:
sinuio
2022-05-29 17:20:14 -07:00
parent d8b5375625
commit 95c4997ef4
2 changed files with 130 additions and 151 deletions

View File

@@ -22,7 +22,6 @@ use crate::vecbuf::ChunkVecBuffer;
use async_recursion::async_recursion;
use async_trait::async_trait;
use futures::AsyncWrite;
use std::collections::VecDeque;
use std::convert::TryFrom;
use std::io;
@@ -64,29 +63,6 @@ impl IoState {
}
}
pub struct Writer<'a> {
conn: &'a mut ConnectionCommon
}
impl<'a> AsyncWrite for Writer<'a> {
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<io::Result<usize>> {
todo!()
}
fn poll_flush(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<io::Result<()>> {
todo!()
}
fn poll_close(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<io::Result<()>> {
todo!()
}
}
/// A structure that implements [`std::io::Read`] for reading plaintext.
pub struct Reader<'a> {
received_plaintext: &'a mut ChunkVecBuffer,
@@ -222,7 +198,7 @@ pub struct ConnectionCommon {
pub(crate) data: ClientConnectionData,
pub(crate) common_state: CommonState,
message_deframer: MessageDeframer,
handshake_joiner: HandshakeJoiner,
handshake_joiner: HandshakeJoiner
}
impl ConnectionCommon {
@@ -252,13 +228,6 @@ impl ConnectionCommon {
}
}
/// Writes plaintext
pub fn writer(&mut self) -> Writer {
Writer {
conn: self
}
}
/// This function uses `io` to complete any outstanding IO for
/// this connection.
///
@@ -505,11 +474,21 @@ impl ConnectionCommon {
Ok(state)
}
pub(crate) async fn send_some_plaintext(&mut self, buf: &[u8]) -> usize {
/// Write buffer into connection
pub async fn write_plaintext(&mut self, buf: &[u8]) -> io::Result<usize> {
if let Ok(st) = &mut self.state {
st.perhaps_write_key_update(&mut self.common_state).await;
}
self.common_state.send_some_plaintext(buf).await
Ok(self.common_state.send_some_plaintext(buf).await)
}
/// Write entire buffer into connection
pub async fn write_all_plaintext(&mut self, buf: &[u8]) -> io::Result<usize> {
let mut pos = 0;
while pos < buf.len() {
pos += self.write_plaintext(&buf[pos..]).await?;
}
Ok(pos)
}
/// Read TLS content from `rd`. This method does internal

View File

@@ -3,144 +3,144 @@ use crate::conn::{ConnectionCommon, SideData};
use std::io::{IoSlice, Read, Result, Write};
use std::ops::{Deref, DerefMut};
/// This type implements `io::Read` and `io::Write`, encapsulating
/// a Connection `C` and an underlying transport `T`, such as a socket.
///
/// This allows you to use a rustls Connection like a normal stream.
#[derive(Debug)]
pub struct Stream<'a, C: 'a + ?Sized, T: 'a + Read + Write + ?Sized> {
/// Our TLS connection
pub conn: &'a mut C,
// /// This type implements `io::Read` and `io::Write`, encapsulating
// /// a Connection `C` and an underlying transport `T`, such as a socket.
// ///
// /// This allows you to use a rustls Connection like a normal stream.
// #[derive(Debug)]
// pub struct Stream<'a, C: 'a + ?Sized, T: 'a + Read + Write + ?Sized> {
// /// Our TLS connection
// pub conn: &'a mut C,
/// The underlying transport, like a socket
pub sock: &'a mut T,
}
// /// The underlying transport, like a socket
// pub sock: &'a mut T,
// }
impl<'a, C, T, S> Stream<'a, C, T>
where
C: 'a + DerefMut + Deref<Target = ConnectionCommon<S>>,
T: 'a + Read + Write,
S: SideData,
{
/// Make a new Stream using the Connection `conn` and socket-like object
/// `sock`. This does not fail and does no IO.
pub fn new(conn: &'a mut C, sock: &'a mut T) -> Self {
Self { conn, sock }
}
// impl<'a, C, T, S> Stream<'a, C, T>
// where
// C: 'a + DerefMut + Deref<Target = ConnectionCommon<S>>,
// T: 'a + Read + Write,
// S: SideData,
// {
// /// Make a new Stream using the Connection `conn` and socket-like object
// /// `sock`. This does not fail and does no IO.
// pub fn new(conn: &'a mut C, sock: &'a mut T) -> Self {
// Self { conn, sock }
// }
/// If we're handshaking, complete all the IO for that.
/// If we have data to write, write it all.
fn complete_prior_io(&mut self) -> Result<()> {
if self.conn.is_handshaking() {
self.conn.complete_io(self.sock)?;
}
// /// If we're handshaking, complete all the IO for that.
// /// If we have data to write, write it all.
// fn complete_prior_io(&mut self) -> Result<()> {
// if self.conn.is_handshaking() {
// self.conn.complete_io(self.sock)?;
// }
if self.conn.wants_write() {
self.conn.complete_io(self.sock)?;
}
// if self.conn.wants_write() {
// self.conn.complete_io(self.sock)?;
// }
Ok(())
}
}
// Ok(())
// }
// }
impl<'a, C, T, S> Read for Stream<'a, C, T>
where
C: 'a + DerefMut + Deref<Target = ConnectionCommon<S>>,
T: 'a + Read + Write,
S: SideData,
{
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
self.complete_prior_io()?;
// impl<'a, C, T, S> Read for Stream<'a, C, T>
// where
// C: 'a + DerefMut + Deref<Target = ConnectionCommon<S>>,
// T: 'a + Read + Write,
// S: SideData,
// {
// fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
// self.complete_prior_io()?;
// We call complete_io() in a loop since a single call may read only
// a partial packet from the underlying transport. A full packet is
// needed to get more plaintext, which we must do if EOF has not been
// hit. Otherwise, we will prematurely signal EOF by returning 0. We
// determine if EOF has actually been hit by checking if 0 bytes were
// read from the underlying transport.
while self.conn.wants_read() {
let at_eof = self.conn.complete_io(self.sock)?.0 == 0;
if at_eof {
if let Ok(io_state) = self.conn.process_new_packets() {
if at_eof && io_state.plaintext_bytes_to_read() == 0 {
return Ok(0);
}
}
break;
}
}
// // We call complete_io() in a loop since a single call may read only
// // a partial packet from the underlying transport. A full packet is
// // needed to get more plaintext, which we must do if EOF has not been
// // hit. Otherwise, we will prematurely signal EOF by returning 0. We
// // determine if EOF has actually been hit by checking if 0 bytes were
// // read from the underlying transport.
// while self.conn.wants_read() {
// let at_eof = self.conn.complete_io(self.sock)?.0 == 0;
// if at_eof {
// if let Ok(io_state) = self.conn.process_new_packets() {
// if at_eof && io_state.plaintext_bytes_to_read() == 0 {
// return Ok(0);
// }
// }
// break;
// }
// }
self.conn.reader().read(buf)
}
// self.conn.reader().read(buf)
// }
#[cfg(read_buf)]
fn read_buf(&mut self, buf: &mut std::io::ReadBuf<'_>) -> Result<()> {
self.complete_prior_io()?;
// #[cfg(read_buf)]
// fn read_buf(&mut self, buf: &mut std::io::ReadBuf<'_>) -> Result<()> {
// self.complete_prior_io()?;
// We call complete_io() in a loop since a single call may read only
// a partial packet from the underlying transport. A full packet is
// needed to get more plaintext, which we must do if EOF has not been
// hit. Otherwise, we will prematurely signal EOF by returning without
// writing anything. We determine if EOF has actually been hit by
// checking if 0 bytes were read from the underlying transport.
while self.conn.wants_read() {
let at_eof = self.conn.complete_io(self.sock)?.0 == 0;
if at_eof {
if let Ok(io_state) = self.conn.process_new_packets() {
if at_eof && io_state.plaintext_bytes_to_read() == 0 {
return Ok(());
}
}
break;
}
}
// // We call complete_io() in a loop since a single call may read only
// // a partial packet from the underlying transport. A full packet is
// // needed to get more plaintext, which we must do if EOF has not been
// // hit. Otherwise, we will prematurely signal EOF by returning without
// // writing anything. We determine if EOF has actually been hit by
// // checking if 0 bytes were read from the underlying transport.
// while self.conn.wants_read() {
// let at_eof = self.conn.complete_io(self.sock)?.0 == 0;
// if at_eof {
// if let Ok(io_state) = self.conn.process_new_packets() {
// if at_eof && io_state.plaintext_bytes_to_read() == 0 {
// return Ok(());
// }
// }
// break;
// }
// }
self.conn.reader().read_buf(buf)
}
}
// self.conn.reader().read_buf(buf)
// }
// }
impl<'a, C, T, S> Write for Stream<'a, C, T>
where
C: 'a + DerefMut + Deref<Target = ConnectionCommon<S>>,
T: 'a + Read + Write,
S: SideData,
{
fn write(&mut self, buf: &[u8]) -> Result<usize> {
self.complete_prior_io()?;
// impl<'a, C, T, S> Write for Stream<'a, C, T>
// where
// C: 'a + DerefMut + Deref<Target = ConnectionCommon<S>>,
// T: 'a + Read + Write,
// S: SideData,
// {
// fn write(&mut self, buf: &[u8]) -> Result<usize> {
// self.complete_prior_io()?;
let len = self.conn.writer().write(buf)?;
// let len = self.conn.writer().write(buf)?;
// Try to write the underlying transport here, but don't let
// any errors mask the fact we've consumed `len` bytes.
// Callers will learn of permanent errors on the next call.
let _ = self.conn.complete_io(self.sock);
// // Try to write the underlying transport here, but don't let
// // any errors mask the fact we've consumed `len` bytes.
// // Callers will learn of permanent errors on the next call.
// let _ = self.conn.complete_io(self.sock);
Ok(len)
}
// Ok(len)
// }
fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> {
self.complete_prior_io()?;
// fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> {
// self.complete_prior_io()?;
let len = self.conn.writer().write_vectored(bufs)?;
// let len = self.conn.writer().write_vectored(bufs)?;
// Try to write the underlying transport here, but don't let
// any errors mask the fact we've consumed `len` bytes.
// Callers will learn of permanent errors on the next call.
let _ = self.conn.complete_io(self.sock);
// // Try to write the underlying transport here, but don't let
// // any errors mask the fact we've consumed `len` bytes.
// // Callers will learn of permanent errors on the next call.
// let _ = self.conn.complete_io(self.sock);
Ok(len)
}
// Ok(len)
// }
fn flush(&mut self) -> Result<()> {
self.complete_prior_io()?;
// fn flush(&mut self) -> Result<()> {
// self.complete_prior_io()?;
self.conn.writer().flush()?;
if self.conn.wants_write() {
self.conn.complete_io(self.sock)?;
}
Ok(())
}
}
// self.conn.writer().flush()?;
// if self.conn.wants_write() {
// self.conn.complete_io(self.sock)?;
// }
// Ok(())
// }
// }
/// This type implements `io::Read` and `io::Write`, encapsulating
/// and owning a Connection `C` and an underlying blocking transport