mirror of
https://github.com/gfx-rs/wgpu.git
synced 2026-04-22 03:02:01 -04:00
[rs] Merge #685
685: Replace futures crate with pollster. r=grovesNL a=parasyte This PR removes all of the `futures` dependencies. `std::future` does not contain a lot of useful helpers available in `futures`. The obvious ones are `join_all`, `FutureExt::map`, and `block_on`. - `join_all` is replaced with a `Vec<T>` and an `async` block. - `FuturesExt::map` in the web backend is replaced by rolling the `map` function into the `MakeSendFuture` type. - `block_on` is provided by `pollster`. The original code using `join_all` ignored the result type yielded by the Future from `map_async`. This code does the same, but makes dropping the result a little more obvious. These should not be troublesome. Figured I would call them out anyway. The last big change is replacing `futures-executor` in the examples with `async-executor`. A new concrete `Spawner` type is used in the example framework instead of an implementation of `futures_task::LocalSpawn`. Fixes #628 Co-authored-by: Jay Oster <jay@kodewerx.org>
This commit is contained in:
@@ -43,7 +43,6 @@ rev = "4ebe1f50b057046e4d4f015eb006330d62f5fe91"
|
||||
|
||||
[dependencies]
|
||||
arrayvec = "0.5"
|
||||
futures = { version = "0.3", default-features = false, features = ["std"] }
|
||||
parking_lot = "0.11"
|
||||
raw-window-handle = "0.3"
|
||||
smallvec = "1"
|
||||
@@ -60,7 +59,10 @@ rand = { version = "0.7.2", features = ["wasm-bindgen"] }
|
||||
bytemuck = { version = "1.4", features = ["derive"] }
|
||||
noise = "0.6"
|
||||
ddsfile = "0.4"
|
||||
futures = { version = "0.3", default-features = false, features = ["std", "executor"] }
|
||||
|
||||
[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
|
||||
async-executor = "1.0"
|
||||
pollster = "0.2"
|
||||
|
||||
[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies.wgpu-subscriber]
|
||||
version = "0.1"
|
||||
|
||||
@@ -262,7 +262,7 @@ impl framework::Example for Example {
|
||||
frame: &wgpu::SwapChainTexture,
|
||||
device: &wgpu::Device,
|
||||
queue: &wgpu::Queue,
|
||||
_spawner: &impl futures::task::LocalSpawn,
|
||||
_spawner: &framework::Spawner,
|
||||
) {
|
||||
// create render pass descriptor and its color attachments
|
||||
let color_attachments = [wgpu::RenderPassColorAttachmentDescriptor {
|
||||
|
||||
@@ -195,7 +195,7 @@ fn main() {
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
{
|
||||
wgpu_subscriber::initialize_default_subscriber(None);
|
||||
futures::executor::block_on(run("red.png"));
|
||||
pollster::block_on(run("red.png"));
|
||||
}
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
{
|
||||
@@ -212,7 +212,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn ensure_generated_data_matches_expected() {
|
||||
futures::executor::block_on(assert_generated_data_matches_expected());
|
||||
pollster::block_on(assert_generated_data_matches_expected());
|
||||
}
|
||||
|
||||
async fn assert_generated_data_matches_expected() {
|
||||
|
||||
@@ -384,7 +384,7 @@ impl framework::Example for Example {
|
||||
frame: &wgpu::SwapChainTexture,
|
||||
device: &wgpu::Device,
|
||||
queue: &wgpu::Queue,
|
||||
_spawner: &impl futures::task::LocalSpawn,
|
||||
_spawner: &framework::Spawner,
|
||||
) {
|
||||
let mut encoder =
|
||||
device.create_command_encoder(&wgpu::CommandEncoderDescriptor { label: None });
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use futures::task::LocalSpawn;
|
||||
use std::future::Future;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use std::time::{Duration, Instant};
|
||||
use winit::{
|
||||
@@ -56,7 +56,7 @@ pub trait Example: 'static + Sized {
|
||||
frame: &wgpu::SwapChainTexture,
|
||||
device: &wgpu::Device,
|
||||
queue: &wgpu::Queue,
|
||||
spawner: &impl LocalSpawn,
|
||||
spawner: &Spawner,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -198,30 +198,7 @@ fn start<E: Example>(
|
||||
queue,
|
||||
}: Setup,
|
||||
) {
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
let (mut pool, spawner) = {
|
||||
let local_pool = futures::executor::LocalPool::new();
|
||||
let spawner = local_pool.spawner();
|
||||
(local_pool, spawner)
|
||||
};
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
let spawner = {
|
||||
use futures::{future::LocalFutureObj, task::SpawnError};
|
||||
|
||||
struct WebSpawner {}
|
||||
impl LocalSpawn for WebSpawner {
|
||||
fn spawn_local_obj(
|
||||
&self,
|
||||
future: LocalFutureObj<'static, ()>,
|
||||
) -> Result<(), SpawnError> {
|
||||
Ok(wasm_bindgen_futures::spawn_local(future))
|
||||
}
|
||||
}
|
||||
|
||||
WebSpawner {}
|
||||
};
|
||||
|
||||
let spawner = Spawner::new();
|
||||
let mut sc_desc = wgpu::SwapChainDescriptor {
|
||||
usage: wgpu::TextureUsage::RENDER_ATTACHMENT,
|
||||
format: device.get_swap_chain_preferred_format(),
|
||||
@@ -266,7 +243,7 @@ fn start<E: Example>(
|
||||
);
|
||||
}
|
||||
|
||||
pool.run_until_stalled();
|
||||
spawner.run_until_stalled();
|
||||
}
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
@@ -317,9 +294,47 @@ fn start<E: Example>(
|
||||
});
|
||||
}
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
pub struct Spawner<'a> {
|
||||
executor: async_executor::LocalExecutor<'a>,
|
||||
}
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
impl<'a> Spawner<'a> {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
executor: async_executor::LocalExecutor::new(),
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn spawn_local(&self, future: impl Future<Output = ()> + 'a) {
|
||||
self.executor.spawn(future).detach();
|
||||
}
|
||||
|
||||
fn run_until_stalled(&self) {
|
||||
while self.executor.try_tick() {}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
pub struct Spawner {}
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
impl Spawner {
|
||||
fn new() -> Self {
|
||||
Self {}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn spawn_local(&self, future: impl Future<Output = ()> + 'static) {
|
||||
wasm_bindgen_futures::spawn_local(future);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
pub fn run<E: Example>(title: &str) {
|
||||
let setup = futures::executor::block_on(setup::<E>(title));
|
||||
let setup = pollster::block_on(setup::<E>(title));
|
||||
start::<E>(setup);
|
||||
}
|
||||
|
||||
|
||||
@@ -183,7 +183,7 @@ fn main() {
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
{
|
||||
wgpu_subscriber::initialize_default_subscriber(None);
|
||||
futures::executor::block_on(run());
|
||||
pollster::block_on(run());
|
||||
}
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
{
|
||||
@@ -193,20 +193,20 @@ fn main() {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[cfg(all(test, not(target_arch = "wasm32")))]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_compute_1() {
|
||||
let input = vec![1, 2, 3, 4];
|
||||
futures::executor::block_on(assert_execute_gpu(input, vec![0, 1, 7, 2]));
|
||||
pollster::block_on(assert_execute_gpu(input, vec![0, 1, 7, 2]));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_compute_2() {
|
||||
let input = vec![5, 23, 10, 9];
|
||||
futures::executor::block_on(assert_execute_gpu(input, vec![5, 15, 6, 19]));
|
||||
pollster::block_on(assert_execute_gpu(input, vec![5, 15, 6, 19]));
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -220,7 +220,7 @@ mod tests {
|
||||
let tx = tx.clone();
|
||||
thread::spawn(move || {
|
||||
let input = vec![100, 100, 100];
|
||||
futures::executor::block_on(assert_execute_gpu(input, vec![25, 25, 25]));
|
||||
pollster::block_on(assert_execute_gpu(input, vec![25, 25, 25]));
|
||||
tx.send(true).unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -140,7 +140,7 @@ fn main() {
|
||||
{
|
||||
wgpu_subscriber::initialize_default_subscriber(None);
|
||||
// Temporarily avoid srgb formats for the swapchain on the web
|
||||
futures::executor::block_on(run(event_loop, window));
|
||||
pollster::block_on(run(event_loop, window));
|
||||
}
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
{
|
||||
|
||||
@@ -191,7 +191,7 @@ fn main() {
|
||||
|
||||
wgpu_subscriber::initialize_default_subscriber(None);
|
||||
// Temporarily avoid srgb formats for the swapchain on the web
|
||||
futures::executor::block_on(run(event_loop, viewports));
|
||||
pollster::block_on(run(event_loop, viewports));
|
||||
}
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
{
|
||||
|
||||
@@ -14,7 +14,7 @@ fn main() {
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
{
|
||||
wgpu_subscriber::initialize_default_subscriber(None);
|
||||
futures::executor::block_on(run());
|
||||
pollster::block_on(run());
|
||||
}
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
{
|
||||
|
||||
@@ -346,7 +346,7 @@ impl framework::Example for Example {
|
||||
frame: &wgpu::SwapChainTexture,
|
||||
device: &wgpu::Device,
|
||||
queue: &wgpu::Queue,
|
||||
_spawner: &impl futures::task::LocalSpawn,
|
||||
_spawner: &framework::Spawner,
|
||||
) {
|
||||
let mut encoder =
|
||||
device.create_command_encoder(&wgpu::CommandEncoderDescriptor { label: None });
|
||||
|
||||
@@ -231,7 +231,7 @@ impl framework::Example for Example {
|
||||
frame: &wgpu::SwapChainTexture,
|
||||
device: &wgpu::Device,
|
||||
queue: &wgpu::Queue,
|
||||
_spawner: &impl futures::task::LocalSpawn,
|
||||
_spawner: &framework::Spawner,
|
||||
) {
|
||||
if self.rebuild_bundle {
|
||||
self.bundle = Example::create_bundle(
|
||||
|
||||
@@ -710,7 +710,7 @@ impl framework::Example for Example {
|
||||
frame: &wgpu::SwapChainTexture,
|
||||
device: &wgpu::Device,
|
||||
queue: &wgpu::Queue,
|
||||
_spawner: &impl futures::task::LocalSpawn,
|
||||
_spawner: &framework::Spawner,
|
||||
) {
|
||||
// update uniforms
|
||||
for entity in self.entities.iter_mut() {
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
#[path = "../framework.rs"]
|
||||
mod framework;
|
||||
|
||||
use futures::task::{LocalSpawn, LocalSpawnExt};
|
||||
use wgpu::util::DeviceExt;
|
||||
|
||||
const IMAGE_SIZE: u32 = 128;
|
||||
@@ -253,7 +252,7 @@ impl framework::Example for Skybox {
|
||||
frame: &wgpu::SwapChainTexture,
|
||||
device: &wgpu::Device,
|
||||
queue: &wgpu::Queue,
|
||||
spawner: &impl LocalSpawn,
|
||||
spawner: &framework::Spawner,
|
||||
) {
|
||||
let mut encoder =
|
||||
device.create_command_encoder(&wgpu::CommandEncoderDescriptor { label: None });
|
||||
@@ -301,7 +300,7 @@ impl framework::Example for Skybox {
|
||||
queue.submit(std::iter::once(encoder.finish()));
|
||||
|
||||
let belt_future = self.staging_belt.recall();
|
||||
spawner.spawn_local(belt_future).unwrap();
|
||||
spawner.spawn_local(belt_future);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -307,7 +307,7 @@ impl framework::Example for Example {
|
||||
frame: &wgpu::SwapChainTexture,
|
||||
device: &wgpu::Device,
|
||||
queue: &wgpu::Queue,
|
||||
_spawner: &impl futures::task::LocalSpawn,
|
||||
_spawner: &framework::Spawner,
|
||||
) {
|
||||
let mut encoder = device.create_command_encoder(&wgpu::CommandEncoderDescriptor {
|
||||
label: Some("primary"),
|
||||
|
||||
@@ -673,7 +673,7 @@ impl framework::Example for Example {
|
||||
frame: &wgpu::SwapChainTexture,
|
||||
device: &wgpu::Device,
|
||||
queue: &wgpu::Queue,
|
||||
_spawner: &impl futures::task::LocalSpawn,
|
||||
_spawner: &framework::Spawner,
|
||||
) {
|
||||
// Increment frame count regardless of if we draw.
|
||||
self.current_frame += 1;
|
||||
|
||||
@@ -8,11 +8,17 @@ use crate::{
|
||||
};
|
||||
|
||||
use arrayvec::ArrayVec;
|
||||
use futures::future::{ready, Ready};
|
||||
use parking_lot::Mutex;
|
||||
use smallvec::SmallVec;
|
||||
use std::{
|
||||
borrow::Cow::Borrowed, error::Error, fmt, marker::PhantomData, ops::Range, slice, sync::Arc,
|
||||
borrow::Cow::Borrowed,
|
||||
error::Error,
|
||||
fmt,
|
||||
future::{ready, Ready},
|
||||
marker::PhantomData,
|
||||
ops::Range,
|
||||
slice,
|
||||
sync::Arc,
|
||||
};
|
||||
use typed_arena::Arena;
|
||||
|
||||
|
||||
@@ -7,7 +7,6 @@ use crate::{
|
||||
TextureViewDescriptor, TextureViewDimension,
|
||||
};
|
||||
|
||||
use futures::FutureExt;
|
||||
use std::{
|
||||
fmt,
|
||||
future::Future,
|
||||
@@ -55,19 +54,34 @@ pub(crate) struct RenderBundleEncoder(web_sys::GpuRenderBundleEncoder);
|
||||
// This is safe on wasm32 *for now*, but similarly to the unsafe Send impls for the handle type
|
||||
// wrappers, the full story for threading on wasm32 is still unfolding.
|
||||
|
||||
pub(crate) struct MakeSendFuture<F>(F);
|
||||
pub(crate) struct MakeSendFuture<F, M> {
|
||||
future: F,
|
||||
map: M,
|
||||
}
|
||||
|
||||
impl<F: Future> Future for MakeSendFuture<F> {
|
||||
type Output = F::Output;
|
||||
impl<F: Future, M: Fn(F::Output) -> T, T> Future for MakeSendFuture<F, M> {
|
||||
type Output = T;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Self::Output> {
|
||||
// This is safe because we have no Drop implementation to violate the Pin requirements and
|
||||
// do not provide any means of moving the inner future.
|
||||
unsafe { self.map_unchecked_mut(|s| &mut s.0) }.poll(cx)
|
||||
unsafe {
|
||||
let this = self.get_unchecked_mut();
|
||||
match Pin::new_unchecked(&mut this.future).poll(cx) {
|
||||
task::Poll::Ready(value) => task::Poll::Ready((this.map)(value)),
|
||||
task::Poll::Pending => task::Poll::Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
unsafe impl<F> Send for MakeSendFuture<F> {}
|
||||
impl<F, M> MakeSendFuture<F, M> {
|
||||
fn new(future: F, map: M) -> Self {
|
||||
Self { future, map }
|
||||
}
|
||||
}
|
||||
|
||||
unsafe impl<F, M> Send for MakeSendFuture<F, M> {}
|
||||
|
||||
impl crate::ComputePassInner<Context> for ComputePass {
|
||||
fn set_pipeline(&mut self, pipeline: &Sendable<web_sys::GpuComputePipeline>) {
|
||||
@@ -800,7 +814,6 @@ fn map_map_mode(mode: crate::MapMode) -> u32 {
|
||||
}
|
||||
|
||||
type JsFutureResult = Result<wasm_bindgen::JsValue, wasm_bindgen::JsValue>;
|
||||
type FutureMap<T> = futures::future::Map<wasm_bindgen_futures::JsFuture, fn(JsFutureResult) -> T>;
|
||||
|
||||
fn future_request_adapter(result: JsFutureResult) -> Option<Sendable<web_sys::GpuAdapter>> {
|
||||
match result {
|
||||
@@ -850,11 +863,18 @@ impl crate::Context for Context {
|
||||
|
||||
type SwapChainOutputDetail = SwapChainOutputDetail;
|
||||
|
||||
type RequestAdapterFuture = MakeSendFuture<FutureMap<Option<Self::AdapterId>>>;
|
||||
type RequestDeviceFuture = MakeSendFuture<
|
||||
FutureMap<Result<(Self::DeviceId, Self::QueueId), crate::RequestDeviceError>>,
|
||||
type RequestAdapterFuture = MakeSendFuture<
|
||||
wasm_bindgen_futures::JsFuture,
|
||||
fn(JsFutureResult) -> Option<Self::AdapterId>,
|
||||
>;
|
||||
type RequestDeviceFuture = MakeSendFuture<
|
||||
wasm_bindgen_futures::JsFuture,
|
||||
fn(JsFutureResult) -> Result<(Self::DeviceId, Self::QueueId), crate::RequestDeviceError>,
|
||||
>;
|
||||
type MapAsyncFuture = MakeSendFuture<
|
||||
wasm_bindgen_futures::JsFuture,
|
||||
fn(JsFutureResult) -> Result<(), crate::BufferAsyncError>,
|
||||
>;
|
||||
type MapAsyncFuture = MakeSendFuture<FutureMap<Result<(), crate::BufferAsyncError>>>;
|
||||
|
||||
fn init(_backends: wgt::BackendBit) -> Self {
|
||||
Context(web_sys::window().unwrap().navigator().gpu())
|
||||
@@ -900,8 +920,10 @@ impl crate::Context for Context {
|
||||
};
|
||||
mapped_options.power_preference(mapped_power_preference);
|
||||
let adapter_promise = self.0.request_adapter_with_options(&mapped_options);
|
||||
MakeSendFuture(
|
||||
wasm_bindgen_futures::JsFuture::from(adapter_promise).map(future_request_adapter),
|
||||
|
||||
MakeSendFuture::new(
|
||||
wasm_bindgen_futures::JsFuture::from(adapter_promise),
|
||||
future_request_adapter,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -926,8 +948,9 @@ impl crate::Context for Context {
|
||||
mapped_desc.limits(&mapped_limits);
|
||||
let device_promise = adapter.0.request_device_with_descriptor(&mapped_desc);
|
||||
|
||||
MakeSendFuture(
|
||||
wasm_bindgen_futures::JsFuture::from(device_promise).map(future_request_device),
|
||||
MakeSendFuture::new(
|
||||
wasm_bindgen_futures::JsFuture::from(device_promise),
|
||||
future_request_device,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -1343,7 +1366,10 @@ impl crate::Context for Context {
|
||||
(range.end - range.start) as f64,
|
||||
);
|
||||
|
||||
MakeSendFuture(wasm_bindgen_futures::JsFuture::from(map_promise).map(future_map_async))
|
||||
MakeSendFuture::new(
|
||||
wasm_bindgen_futures::JsFuture::from(map_promise),
|
||||
future_map_async,
|
||||
)
|
||||
}
|
||||
|
||||
fn buffer_get_mapped_range(
|
||||
|
||||
@@ -22,7 +22,6 @@ use std::{
|
||||
thread,
|
||||
};
|
||||
|
||||
use futures::FutureExt as _;
|
||||
use parking_lot::Mutex;
|
||||
|
||||
pub use wgt::{
|
||||
@@ -1312,9 +1311,8 @@ impl Instance {
|
||||
options: &RequestAdapterOptions,
|
||||
) -> impl Future<Output = Option<Adapter>> + Send {
|
||||
let context = Arc::clone(&self.context);
|
||||
self.context
|
||||
.instance_request_adapter(options)
|
||||
.map(|option| option.map(|id| Adapter { context, id }))
|
||||
let adapter = self.context.instance_request_adapter(options);
|
||||
async move { adapter.await.map(|id| Adapter { context, id }) }
|
||||
}
|
||||
|
||||
/// Creates a surface from a raw window handle.
|
||||
@@ -1369,8 +1367,9 @@ impl Adapter {
|
||||
trace_path: Option<&std::path::Path>,
|
||||
) -> impl Future<Output = Result<(Device, Queue), RequestDeviceError>> + Send {
|
||||
let context = Arc::clone(&self.context);
|
||||
Context::adapter_request_device(&*self.context, &self.id, desc, trace_path).map(|result| {
|
||||
result.map(|(device_id, queue_id)| {
|
||||
let device = Context::adapter_request_device(&*self.context, &self.id, desc, trace_path);
|
||||
async move {
|
||||
device.await.map(|(device_id, queue_id)| {
|
||||
(
|
||||
Device {
|
||||
context: Arc::clone(&context),
|
||||
@@ -1382,7 +1381,7 @@ impl Adapter {
|
||||
},
|
||||
)
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// List all features that are supported with this adapter.
|
||||
|
||||
@@ -2,9 +2,42 @@ use crate::{
|
||||
Buffer, BufferAddress, BufferDescriptor, BufferSize, BufferUsage, BufferViewMut,
|
||||
CommandEncoder, Device, MapMode,
|
||||
};
|
||||
use futures::{future::join_all, FutureExt};
|
||||
use std::pin::Pin;
|
||||
use std::task::{self, Poll};
|
||||
use std::{future::Future, sync::mpsc};
|
||||
|
||||
// Given a vector of futures, poll each in parallel until all are ready.
|
||||
struct Join<F> {
|
||||
futures: Vec<Option<F>>,
|
||||
}
|
||||
|
||||
impl<F: Future<Output = ()>> Future for Join<F> {
|
||||
type Output = ();
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Self::Output> {
|
||||
// This is safe because we have no Drop implementation to violate the Pin requirements and
|
||||
// do not provide any means of moving the inner futures.
|
||||
let all_ready = unsafe {
|
||||
// Poll all remaining futures, removing all that are ready
|
||||
self.get_unchecked_mut().futures.iter_mut().all(|opt| {
|
||||
if let Some(future) = opt {
|
||||
if Pin::new_unchecked(future).poll(cx) == Poll::Ready(()) {
|
||||
*opt = None;
|
||||
}
|
||||
}
|
||||
|
||||
opt.is_none()
|
||||
})
|
||||
};
|
||||
|
||||
if all_ready {
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct Chunk {
|
||||
buffer: Buffer,
|
||||
size: BufferAddress,
|
||||
@@ -133,18 +166,24 @@ impl StagingBelt {
|
||||
self.free_chunks.push(chunk);
|
||||
}
|
||||
|
||||
let sender_template = &self.sender;
|
||||
join_all(self.closed_chunks.drain(..).map(|chunk| {
|
||||
let sender = sender_template.clone();
|
||||
chunk
|
||||
.buffer
|
||||
.slice(..)
|
||||
.map_async(MapMode::Write)
|
||||
.inspect(move |_| {
|
||||
let sender = &self.sender;
|
||||
let futures = self
|
||||
.closed_chunks
|
||||
.drain(..)
|
||||
.map(|chunk| {
|
||||
let sender = sender.clone();
|
||||
let async_buffer = chunk.buffer.slice(..).map_async(MapMode::Write);
|
||||
|
||||
Some(async move {
|
||||
// The result is ignored
|
||||
async_buffer.await.ok();
|
||||
|
||||
// The only possible error is the other side disconnecting, which is fine
|
||||
let _ = sender.send(chunk);
|
||||
})
|
||||
}))
|
||||
.map(|_| ())
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
Join { futures }
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user