WIP: debugging...

This commit is contained in:
th4s
2025-10-16 13:31:01 +02:00
parent 847d923453
commit 341eb24d3c
3 changed files with 52 additions and 66 deletions

View File

@@ -204,7 +204,7 @@ impl SimplexStream {
fn new_unsplit(max_buf_size: usize) -> SimplexStream { fn new_unsplit(max_buf_size: usize) -> SimplexStream {
SimplexStream { SimplexStream {
max_buf_size, max_buf_size,
buffer: BytesMut::zeroed(max_buf_size), buffer: BytesMut::new(),
} }
} }

View File

@@ -75,7 +75,7 @@ impl ConnectionFuture {
futures::select! { futures::select! {
conn = &mut inner_fut => break conn, conn = &mut inner_fut => break conn,
default => { default => {
if let Some(waker) = waker { if let Some(waker) = waker.take() {
waker.wake(); waker.wake();
} }
} }
@@ -107,14 +107,15 @@ impl ConnectionFuture {
let (client_async_socket, client_async_handle) = futures_plex::duplex(BUF_SIZE); let (client_async_socket, client_async_handle) = futures_plex::duplex(BUF_SIZE);
let (client_handle, server_handle, mut inner_fut) = InnerFuture::new(client_conn); let (client_handle, server_handle, mut inner_fut) = InnerFuture::new(client_conn);
let mut client_buffer1 = [0_u8; BUF_SIZE]; let mut client_buffer_incoming = [0_u8; BUF_SIZE];
let mut client_buffer2 = [0_u8; BUF_SIZE]; let mut client_buffer_outgoing = [0_u8; BUF_SIZE];
let mut server_buffer1 = [0_u8; BUF_SIZE]; let mut server_buffer_incoming = [0_u8; BUF_SIZE];
let mut server_buffer2 = [0_u8; BUF_SIZE]; let mut server_buffer_outgoing = [0_u8; BUF_SIZE];
let (mut client_async_read, mut client_async_write) = client_async_handle.split(); let (mut client_async_read, mut client_async_write) = client_async_handle.split();
let (mut server_async_read, mut server_async_write) = socket.split();
let (mut client_inner_read, mut client_inner_write) = client_handle.split(); let (mut client_inner_read, mut client_inner_write) = client_handle.split();
let (mut server_async_read, mut server_async_write) = socket.split();
let (mut server_inner_read, mut server_inner_write) = server_handle.split(); let (mut server_inner_read, mut server_inner_write) = server_handle.split();
let future = ConnectionFuture { let future = ConnectionFuture {
@@ -131,10 +132,12 @@ impl ConnectionFuture {
if client_read_op.is_terminated() { if client_read_op.is_terminated() {
client_read_op = Box::pin( client_read_op = Box::pin(
async { async {
println!("attempting to read from outer client socket");
let read_count = let read_count =
client_async_read.read(&mut client_buffer1).await?; client_async_read.read(&mut client_buffer_incoming).await?;
println!("read {} bytes from outer client socket", read_count); println!("read {} bytes from outer client socket", read_count);
client_inner_write.write_all(&client_buffer1[..read_count])?; let write_count = client_inner_write.write(&client_buffer_incoming[..read_count])?;
println!("forwarded {} bytes from outer client socket", write_count);
Ok::<_, std::io::Error>(()) Ok::<_, std::io::Error>(())
} }
.fuse(), .fuse(),
@@ -143,15 +146,18 @@ impl ConnectionFuture {
if client_write_op.is_terminated() { if client_write_op.is_terminated() {
client_write_op = Box::pin( client_write_op = Box::pin(
async { async {
let read_count = client_inner_read.read(&mut client_buffer2)?; println!("attempting to write into outer client socket");
println!( let read_count = client_inner_read.read(&mut client_buffer_outgoing)?;
"writing {} bytes into outer client socket", if read_count > 0 {
read_count let write_count = client_async_write
); .write(&client_buffer_outgoing[..read_count])
client_async_write .await?;
.write_all(&client_buffer2[..read_count]) println!(
.await?; "writing {} bytes into outer client socket",
client_async_write.flush().await?; write_count
);
client_async_write.flush().await?;
}
Ok::<_, std::io::Error>(()) Ok::<_, std::io::Error>(())
} }
.fuse(), .fuse(),
@@ -160,15 +166,12 @@ impl ConnectionFuture {
if server_read_op.is_terminated() { if server_read_op.is_terminated() {
server_read_op = Box::pin( server_read_op = Box::pin(
async { async {
println!("attempting to read from outer server socket");
let read_count = let read_count =
server_async_read.read(&mut server_buffer1).await?; server_async_read.read(&mut server_buffer_incoming).await?;
println!("read {} bytes from outer server socket", read_count); println!("read {} bytes from outer server socket", read_count);
if read_count > 0 { let write_count = server_inner_write.write(&server_buffer_incoming[..read_count])?;
server_inner_write println!("forwarded {} bytes from outer server socket", write_count);
.write_all(&server_buffer1[..read_count])?;
} else {
server_inner_write.close();
}
Ok::<_, std::io::Error>(()) Ok::<_, std::io::Error>(())
} }
.fuse(), .fuse(),
@@ -177,15 +180,19 @@ impl ConnectionFuture {
if server_write_op.is_terminated() { if server_write_op.is_terminated() {
server_write_op = Box::pin( server_write_op = Box::pin(
async { async {
let read_count = server_inner_read.read(&mut server_buffer2)?; println!("attempting to write into outer server socket");
let write_count = server_async_write let read_count = server_inner_read.read(&mut server_buffer_outgoing)?;
.write(&server_buffer2[..read_count]) println!("buffer content: {:x?}", &server_buffer_outgoing[..read_count]);
.await?; if read_count > 0 {
println!( let write_count = server_async_write
"writing {} bytes into outer server socket", .write(&server_buffer_outgoing[..read_count])
write_count .await?;
); println!(
server_async_write.flush().await?; "writing {} bytes into outer server socket",
write_count
);
server_async_write.flush().await?;
}
Ok::<_, std::io::Error>(()) Ok::<_, std::io::Error>(())
} }
.fuse(), .fuse(),
@@ -194,13 +201,13 @@ impl ConnectionFuture {
futures::select! { futures::select! {
conn = &mut inner_fut => break conn, conn = &mut inner_fut => break conn,
_ = &mut client_read_op => (), client_read_result = &mut client_read_op => client_read_result.unwrap(),
_ = &mut client_write_op => (), client_write_result = &mut client_write_op => client_write_result.unwrap(),
_ = &mut server_read_op => (), server_read_result = &mut server_read_op => server_read_result.unwrap(),
_ = &mut server_write_op => (), server_write_result = &mut server_write_op => server_write_result.unwrap(),
default => { default => {
if let Some(waker) = waker { if let Some(waker) = waker.take() {
waker.wake_by_ref(); waker.wake();
} }
} }
} }
@@ -319,10 +326,8 @@ impl Future for InnerFuture {
} => match fut.as_mut().poll(cx) { } => match fut.as_mut().poll(cx) {
Poll::Ready(client) => { Poll::Ready(client) => {
let (client, notify) = client?; let (client, notify) = client?;
let rx_tls_buf = [0u8; BUF_SIZE];
let rx_buf = [0u8; BUF_SIZE];
let tx_tls_buf = [0u8; BUF_SIZE]; let rx_buf = [0u8; BUF_SIZE];
let tx_buf = [0u8; BUF_SIZE]; let tx_buf = [0u8; BUF_SIZE];
let handshake_done = false; let handshake_done = false;
@@ -334,9 +339,7 @@ impl Future for InnerFuture {
client_handle, client_handle,
server_handle, server_handle,
notify, notify,
rx_tls_buf,
rx_buf, rx_buf,
tx_tls_buf,
tx_buf, tx_buf,
handshake_done, handshake_done,
client_closed, client_closed,
@@ -397,9 +400,7 @@ struct PollingLoop {
client_handle: DuplexStream, client_handle: DuplexStream,
server_handle: DuplexStream, server_handle: DuplexStream,
notify: BackendNotify, notify: BackendNotify,
rx_tls_buf: [u8; BUF_SIZE],
rx_buf: [u8; BUF_SIZE], rx_buf: [u8; BUF_SIZE],
tx_tls_buf: [u8; BUF_SIZE],
tx_buf: [u8; BUF_SIZE], tx_buf: [u8; BUF_SIZE],
handshake_done: bool, handshake_done: bool,
client_closed: bool, client_closed: bool,
@@ -412,8 +413,7 @@ impl PollingLoop {
if self.client.wants_write() && !self.client_closed { if self.client.wants_write() && !self.client_closed {
trace!("client wants to write"); trace!("client wants to write");
while self.client.wants_write() { while self.client.wants_write() {
let sent_tls = self.client.write_tls(&mut self.tx_tls_buf.as_mut_slice())?; let sent_tls = self.client.write_tls(&mut self.server_handle)?;
self.server_handle.write_all(&self.tx_tls_buf[..sent_tls])?;
trace!("sent {} tls bytes to server_handle", sent_tls); trace!("sent {} tls bytes to server_handle", sent_tls);
println!("poll loop: sent {} tls bytes to server_handle", sent_tls); println!("poll loop: sent {} tls bytes to server_handle", sent_tls);
} }
@@ -423,7 +423,7 @@ impl PollingLoop {
// Forward received plaintext to `client_handle`. // Forward received plaintext to `client_handle`.
while !self.client.plaintext_is_empty() { while !self.client.plaintext_is_empty() {
let read = self.client.read_plaintext(&mut self.rx_buf)?; let read = self.client.read_plaintext(&mut self.rx_buf)?;
self.client_handle.write_all(&self.rx_buf[..read])?; self.client_handle.write(&self.rx_buf[..read])?;
trace!("sent {} clear bytes to client_handle", read); trace!("sent {} clear bytes to client_handle", read);
trace!("poll loop: sent {} clear bytes to client_handle", read); trace!("poll loop: sent {} clear bytes to client_handle", read);
} }
@@ -456,25 +456,13 @@ impl PollingLoop {
} }
// Reads TLS data from the server and writes it into the client. // Reads TLS data from the server and writes it into the client.
let mut processed = 0; let read_tls = self.client.read_tls(&mut self.server_handle)?;
let read_tls = self.server_handle.read(&mut self.rx_tls_buf)?;
trace!("received {} tls bytes from server_handle", read_tls); trace!("received {} tls bytes from server_handle", read_tls);
println!( println!(
"poll loop: received {} tls bytes from server_handle", "poll loop: received {} tls bytes from server_handle",
read_tls read_tls
); );
loop { self.client.process_new_packets().await?;
let new_processed = self.client.read_tls(&mut &self.rx_tls_buf[..read_tls])?;
processed += new_processed;
self.client.process_new_packets().await?;
trace!("processed {} tls bytes from server", processed);
println!("poll loop: processed {} tls bytes from server", processed);
if processed >= read_tls {
break;
}
}
select_biased! { select_biased! {
// Waits for a notification from the backend that it is ready to decrypt data. // Waits for a notification from the backend that it is ready to decrypt data.

View File

@@ -117,9 +117,7 @@ async fn test_hyper_ok() {
.body(Full::<Bytes>::new("hello".into())) .body(Full::<Bytes>::new("hello".into()))
.unwrap(); .unwrap();
println!("now trying to write request...");
let response = request_sender.send_request(request).await.unwrap(); let response = request_sender.send_request(request).await.unwrap();
println!("wrote request and got response.");
assert!(response.status() == StatusCode::OK); assert!(response.status() == StatusCode::OK);