The motivation for this change is the worry, that streams which do not
consume their buffers in a timely manner prevent `BytesMut` from reusing
their storage space, instead forcing it to allocate more memory.
This PR removes `bytes` completely and instead uses `Vec<u8>`s. The
reclamation of storage is left to the allocator.
While at it we also remove the `BufWriter` from `Io`. Often client code
already uses buffers and always adding another buffer layer seems
misguided. After all clients may always provide `Connection::new` with
a `BufWriter` type.
While `Connection::next_stream` must not be called after `Ok(None)`
or `Err(_)` client code may still attempt to submit control commands.
To ensure those are not pending forever, we close and drain the
control channel so further command submissions return immediately.
We already drop all streams when encountering an error which
prevents them from submitting more stream commands but while we
are at it we also close the stream command channel and drop all
enqueued messages now.
Finally we also rework `Control::close` and `Control::poll_close`
to map a closed channel to an early Ok as it means the connection
is already closed. This explicit mapping seems more appropriate
than returning an `Err(ConnectionError::Closed)`.
The current behaviour is to always immediately send a `WindowUpdate`
frame to the remote when opening an outbound frame. While we do not wait
for an acknowledgement before we continue to send data over the stream,
we could also hold off and set the initial SYN flag in the first data
frame that is being sent over the new stream to the remote. The newly
added option `lazy_open` enables this. This should eventually become the
new default.
Implementation notice:
We currently check if we need to set flags when receiving a command from
the stream to send a frame. We might move this into the `stream` module
and make this decision local to the stream instead of looking up the
stream by ID. This would only require a bit more investigation for the
close case as the stream does currently not send a frame in this case on
which it could set the flag.
Up to now we never replied to opened inbound streams with an ACK flag,
despite the fact that the spec clearly demands this behaviour. While we
continue to ignore any ACK flags we receive, with this change we properly
set an ACK flag if required.
Additionally, this PR makes casting of header and frame types more
principled. Instead of arbitrary casts only casts from `()` to `Data`,
`WindowUpdate` and `Ping` are allowed. We also model a binary sum type and
provide introductions from `T` to `Either<T, U>` and `T` to `Either<U, T>`
but no projections. This allows a more type-safe `StreamCommand` whose
`SendFrame` constructor only accepts `Frame<Either<Data, WindowUpdate>>`
values which is what we expect coming from streams.
When encountering an I/O error with `ErrorKind::ConnectionReset`, map
this to a `ConnectionError::Closed` value. The remote may have closed
the connection and a host implementing a "half-duplex" close sequence
would send us a RST when we send data because we have not noticed the
close yet. By mapping this case to a `ConnectionError::Closed` error,
we effectively treat this case like any other connection close.
The impl. produces `Packet`s which are simply newtype wrappers around
`Bytes` and expose only an `AsRef<[u8]>` interface. The
`futures::stream::Stream` implementation is more efficient than the
`AsyncRead` one as it does not need to copy the bytes.
Instead of reading large bodies in `BLOCKSIZE` steps, ensure that the
buffer is large enough to hold the whole body which may result in less
`poll_read` calls.
Some notable changes:
- `BytesMut::bytes_mut` now returns a `&mut [MaybeUninit<u8>]` as it may
contain uninitialised memory. Since we resize our buffer which implies
initialisation, we can just cast this slice to a `&mut [u8]` and use it
directly.
- `BytesMut::remaining_mut` no longer gives the length of the
`BytesMut::bytes_mut` as the `BufMut` implementation grows the buffer as
needed. So, to figure out if we need to reserve another BLOCKSIZE chunk we
now check how much capacity is left.
- `futures_codec` has been removed from tests as it has not updated to
bytes-0.5. The tests now work directly with the stream as `AsyncRead`
and `AsyncWrite` impl.
Currently we always error when reading 0 bytes, however the correct
thing to do is to signal the end of the stream iff there are no unparsed
bytes in our buffer.
Keep the public API surface minimal. We may at some point replace the
`Mutex` around `Shared` with a futures-aware lock at which point
`Stream::state` would no longer be a synchronous method. As the method
is not used externally yet it is best not to introduce at this point.
Since we nevertheless use this in our tests, smoke.rs has been moved
crate.
- Remove `futures_codec` dependency.
- Replace `header::Codec` with `header::{encode, decode}` functions.
- Add `frame::io` module which contains the `Io` type which decodes
frames, implements `futures::stream::Stream` and also allows writing
frames (buffered) to the socket.
- Remove the `tokio` benchmarks as they are very similar to the
`async-std` ones and allow us to remove one (dev) dependency.
- Add a `use_unitialised_read_buffer` feature to allow reading from the
`AsyncRead` value without having to zero memory first which has a
noticable cost. Since the initializer parts for `AsyncRead` are unstable
we provide this opt-in feature instead.
In order to make `Connection` cancellation-safe we would need to store a
bunch of data in `Self` across yield points. This complicates matters
and requires even more work to make it efficient so for the time being
cancellation is not supported.
We want to make sure that even if users do not drive
`Connection::next_stream` to completion that all `Stream`s are closed
and pending `Waker`s notified when we have reached end of life, so we
close them all in a `Drop` impl.
Also, since we close the socket anyway only when things work out the way
they should be we move the close call to the location where it is
expected to happen, namely when reaching the end of the control command
queue.
The current implementation based on `futures::select!` may miss values
produced by some of the selected futures. Because only one value is
(pseudo-randomly) returned when multiple futures are ready, the
non-selected one may be ignored. This can happen for instance if we
got a new inbound connection stream, which we return to the caller which
causes all futures to be dropped and the other values that were ready to
be dropped too. We need to make sure to always process all produced
values which is what this PR attempts to do.
The problem we face is that we have an unbounded number of `Control`s
which can all issue close commands. We want them all to be informed when
we have done so but we need to make sure that by closing the command
queue we do not generate false positives. The strategy here is to split
the command queue into two, one for streams and one for controls. As
soon as we receive a close (control) command, we close the stream
commands receiver, and only process any pending stream commands while
not processing any more control commands. Only after completing this
process do we resume processing control commands but only to inform all
controls that have issued a command that we are closed.