From 78f33bcf67ffa3f7c50e559365416a713eb2f7bf Mon Sep 17 00:00:00 2001 From: Jay Oster Date: Sat, 19 Dec 2020 14:48:34 -0800 Subject: [PATCH] [rs] Replace futures with pollster This PR removes all of the `futures` dependencies. `std::future` does not contain a lot of useful helpers available in futures. The obvious ones are `join_all`, `FutureExt::map`, and `block_on`. * `join_all` is replaced with a `Vec` and async blocks. * `FuturesExt::map` in the web backend is replaced by rolling the `map` function into the `MakeSendFuture` type. * `block_on` is provided by `pollster`. The original code using `join_all` ignored the result type yielded by the Future from `map_async`. This code does the same, but makes dropping the result a little more obvious. These should not be troublesome. Figured I would call them out anyway. The last big change is replacing `futures-executor` in the examples with `async-executor`. A new concrete `Spawner` type is used in the example framework instead of an implementation of `futures_task::LocalSpawn`. --- wgpu/Cargo.toml | 6 ++- wgpu/examples/boids/main.rs | 2 +- wgpu/examples/capture/main.rs | 4 +- wgpu/examples/cube/main.rs | 2 +- wgpu/examples/framework.rs | 71 +++++++++++++++++----------- wgpu/examples/hello-compute/main.rs | 10 ++-- wgpu/examples/hello-triangle/main.rs | 2 +- wgpu/examples/hello-windows/main.rs | 2 +- wgpu/examples/hello/main.rs | 2 +- wgpu/examples/mipmap/main.rs | 2 +- wgpu/examples/msaa-line/main.rs | 2 +- wgpu/examples/shadow/main.rs | 2 +- wgpu/examples/skybox/main.rs | 5 +- wgpu/examples/texture-arrays/main.rs | 2 +- wgpu/examples/water/main.rs | 2 +- wgpu/src/backend/direct.rs | 10 +++- wgpu/src/backend/web.rs | 58 ++++++++++++++++------- wgpu/src/lib.rs | 13 +++-- wgpu/src/util/belt.rs | 61 +++++++++++++++++++----- 19 files changed, 172 insertions(+), 86 deletions(-) diff --git a/wgpu/Cargo.toml b/wgpu/Cargo.toml index d4501b9333..bdf48206f0 100644 --- a/wgpu/Cargo.toml +++ b/wgpu/Cargo.toml @@ -43,7 +43,6 @@ rev = "4ebe1f50b057046e4d4f015eb006330d62f5fe91" [dependencies] arrayvec = "0.5" -futures = { version = "0.3", default-features = false, features = ["std"] } parking_lot = "0.11" raw-window-handle = "0.3" smallvec = "1" @@ -60,7 +59,10 @@ rand = { version = "0.7.2", features = ["wasm-bindgen"] } bytemuck = { version = "1.4", features = ["derive"] } noise = "0.6" ddsfile = "0.4" -futures = { version = "0.3", default-features = false, features = ["std", "executor"] } + +[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] +async-executor = "1.0" +pollster = "0.2" [target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies.wgpu-subscriber] version = "0.1" diff --git a/wgpu/examples/boids/main.rs b/wgpu/examples/boids/main.rs index 81e9e21526..b6917b86c9 100644 --- a/wgpu/examples/boids/main.rs +++ b/wgpu/examples/boids/main.rs @@ -262,7 +262,7 @@ impl framework::Example for Example { frame: &wgpu::SwapChainTexture, device: &wgpu::Device, queue: &wgpu::Queue, - _spawner: &impl futures::task::LocalSpawn, + _spawner: &framework::Spawner, ) { // create render pass descriptor and its color attachments let color_attachments = [wgpu::RenderPassColorAttachmentDescriptor { diff --git a/wgpu/examples/capture/main.rs b/wgpu/examples/capture/main.rs index 90e17116ad..71157c202b 100644 --- a/wgpu/examples/capture/main.rs +++ b/wgpu/examples/capture/main.rs @@ -195,7 +195,7 @@ fn main() { #[cfg(not(target_arch = "wasm32"))] { wgpu_subscriber::initialize_default_subscriber(None); - futures::executor::block_on(run("red.png")); + pollster::block_on(run("red.png")); } #[cfg(target_arch = "wasm32")] { @@ -212,7 +212,7 @@ mod tests { #[test] fn ensure_generated_data_matches_expected() { - futures::executor::block_on(assert_generated_data_matches_expected()); + pollster::block_on(assert_generated_data_matches_expected()); } async fn assert_generated_data_matches_expected() { diff --git a/wgpu/examples/cube/main.rs b/wgpu/examples/cube/main.rs index 48471a3231..981256b4ee 100644 --- a/wgpu/examples/cube/main.rs +++ b/wgpu/examples/cube/main.rs @@ -384,7 +384,7 @@ impl framework::Example for Example { frame: &wgpu::SwapChainTexture, device: &wgpu::Device, queue: &wgpu::Queue, - _spawner: &impl futures::task::LocalSpawn, + _spawner: &framework::Spawner, ) { let mut encoder = device.create_command_encoder(&wgpu::CommandEncoderDescriptor { label: None }); diff --git a/wgpu/examples/framework.rs b/wgpu/examples/framework.rs index 3f0004d94a..263dbbeeee 100644 --- a/wgpu/examples/framework.rs +++ b/wgpu/examples/framework.rs @@ -1,4 +1,4 @@ -use futures::task::LocalSpawn; +use std::future::Future; #[cfg(not(target_arch = "wasm32"))] use std::time::{Duration, Instant}; use winit::{ @@ -56,7 +56,7 @@ pub trait Example: 'static + Sized { frame: &wgpu::SwapChainTexture, device: &wgpu::Device, queue: &wgpu::Queue, - spawner: &impl LocalSpawn, + spawner: &Spawner, ); } @@ -198,30 +198,7 @@ fn start( queue, }: Setup, ) { - #[cfg(not(target_arch = "wasm32"))] - let (mut pool, spawner) = { - let local_pool = futures::executor::LocalPool::new(); - let spawner = local_pool.spawner(); - (local_pool, spawner) - }; - - #[cfg(target_arch = "wasm32")] - let spawner = { - use futures::{future::LocalFutureObj, task::SpawnError}; - - struct WebSpawner {} - impl LocalSpawn for WebSpawner { - fn spawn_local_obj( - &self, - future: LocalFutureObj<'static, ()>, - ) -> Result<(), SpawnError> { - Ok(wasm_bindgen_futures::spawn_local(future)) - } - } - - WebSpawner {} - }; - + let spawner = Spawner::new(); let mut sc_desc = wgpu::SwapChainDescriptor { usage: wgpu::TextureUsage::RENDER_ATTACHMENT, format: device.get_swap_chain_preferred_format(), @@ -266,7 +243,7 @@ fn start( ); } - pool.run_until_stalled(); + spawner.run_until_stalled(); } #[cfg(target_arch = "wasm32")] @@ -317,9 +294,47 @@ fn start( }); } +#[cfg(not(target_arch = "wasm32"))] +pub struct Spawner<'a> { + executor: async_executor::LocalExecutor<'a>, +} + +#[cfg(not(target_arch = "wasm32"))] +impl<'a> Spawner<'a> { + fn new() -> Self { + Self { + executor: async_executor::LocalExecutor::new(), + } + } + + #[allow(dead_code)] + pub fn spawn_local(&self, future: impl Future + 'a) { + self.executor.spawn(future).detach(); + } + + fn run_until_stalled(&self) { + while self.executor.try_tick() {} + } +} + +#[cfg(target_arch = "wasm32")] +pub struct Spawner {} + +#[cfg(target_arch = "wasm32")] +impl Spawner { + fn new() -> Self { + Self {} + } + + #[allow(dead_code)] + pub fn spawn_local(&self, future: impl Future + 'static) { + wasm_bindgen_futures::spawn_local(future); + } +} + #[cfg(not(target_arch = "wasm32"))] pub fn run(title: &str) { - let setup = futures::executor::block_on(setup::(title)); + let setup = pollster::block_on(setup::(title)); start::(setup); } diff --git a/wgpu/examples/hello-compute/main.rs b/wgpu/examples/hello-compute/main.rs index df532782d2..d08d995319 100644 --- a/wgpu/examples/hello-compute/main.rs +++ b/wgpu/examples/hello-compute/main.rs @@ -183,7 +183,7 @@ fn main() { #[cfg(not(target_arch = "wasm32"))] { wgpu_subscriber::initialize_default_subscriber(None); - futures::executor::block_on(run()); + pollster::block_on(run()); } #[cfg(target_arch = "wasm32")] { @@ -193,20 +193,20 @@ fn main() { } } -#[cfg(test)] +#[cfg(all(test, not(target_arch = "wasm32")))] mod tests { use super::*; #[test] fn test_compute_1() { let input = vec![1, 2, 3, 4]; - futures::executor::block_on(assert_execute_gpu(input, vec![0, 1, 7, 2])); + pollster::block_on(assert_execute_gpu(input, vec![0, 1, 7, 2])); } #[test] fn test_compute_2() { let input = vec![5, 23, 10, 9]; - futures::executor::block_on(assert_execute_gpu(input, vec![5, 15, 6, 19])); + pollster::block_on(assert_execute_gpu(input, vec![5, 15, 6, 19])); } #[test] @@ -220,7 +220,7 @@ mod tests { let tx = tx.clone(); thread::spawn(move || { let input = vec![100, 100, 100]; - futures::executor::block_on(assert_execute_gpu(input, vec![25, 25, 25])); + pollster::block_on(assert_execute_gpu(input, vec![25, 25, 25])); tx.send(true).unwrap(); }); } diff --git a/wgpu/examples/hello-triangle/main.rs b/wgpu/examples/hello-triangle/main.rs index c9daeee933..da1b1680b2 100644 --- a/wgpu/examples/hello-triangle/main.rs +++ b/wgpu/examples/hello-triangle/main.rs @@ -140,7 +140,7 @@ fn main() { { wgpu_subscriber::initialize_default_subscriber(None); // Temporarily avoid srgb formats for the swapchain on the web - futures::executor::block_on(run(event_loop, window)); + pollster::block_on(run(event_loop, window)); } #[cfg(target_arch = "wasm32")] { diff --git a/wgpu/examples/hello-windows/main.rs b/wgpu/examples/hello-windows/main.rs index 08e8f0e417..105562511c 100644 --- a/wgpu/examples/hello-windows/main.rs +++ b/wgpu/examples/hello-windows/main.rs @@ -191,7 +191,7 @@ fn main() { wgpu_subscriber::initialize_default_subscriber(None); // Temporarily avoid srgb formats for the swapchain on the web - futures::executor::block_on(run(event_loop, viewports)); + pollster::block_on(run(event_loop, viewports)); } #[cfg(target_arch = "wasm32")] { diff --git a/wgpu/examples/hello/main.rs b/wgpu/examples/hello/main.rs index a58e568732..705017be27 100644 --- a/wgpu/examples/hello/main.rs +++ b/wgpu/examples/hello/main.rs @@ -14,7 +14,7 @@ fn main() { #[cfg(not(target_arch = "wasm32"))] { wgpu_subscriber::initialize_default_subscriber(None); - futures::executor::block_on(run()); + pollster::block_on(run()); } #[cfg(target_arch = "wasm32")] { diff --git a/wgpu/examples/mipmap/main.rs b/wgpu/examples/mipmap/main.rs index 692bc485d7..311ff86ac4 100644 --- a/wgpu/examples/mipmap/main.rs +++ b/wgpu/examples/mipmap/main.rs @@ -346,7 +346,7 @@ impl framework::Example for Example { frame: &wgpu::SwapChainTexture, device: &wgpu::Device, queue: &wgpu::Queue, - _spawner: &impl futures::task::LocalSpawn, + _spawner: &framework::Spawner, ) { let mut encoder = device.create_command_encoder(&wgpu::CommandEncoderDescriptor { label: None }); diff --git a/wgpu/examples/msaa-line/main.rs b/wgpu/examples/msaa-line/main.rs index 04e382f4f7..d2c4ef43c2 100644 --- a/wgpu/examples/msaa-line/main.rs +++ b/wgpu/examples/msaa-line/main.rs @@ -231,7 +231,7 @@ impl framework::Example for Example { frame: &wgpu::SwapChainTexture, device: &wgpu::Device, queue: &wgpu::Queue, - _spawner: &impl futures::task::LocalSpawn, + _spawner: &framework::Spawner, ) { if self.rebuild_bundle { self.bundle = Example::create_bundle( diff --git a/wgpu/examples/shadow/main.rs b/wgpu/examples/shadow/main.rs index 8c28624049..262112c142 100644 --- a/wgpu/examples/shadow/main.rs +++ b/wgpu/examples/shadow/main.rs @@ -710,7 +710,7 @@ impl framework::Example for Example { frame: &wgpu::SwapChainTexture, device: &wgpu::Device, queue: &wgpu::Queue, - _spawner: &impl futures::task::LocalSpawn, + _spawner: &framework::Spawner, ) { // update uniforms for entity in self.entities.iter_mut() { diff --git a/wgpu/examples/skybox/main.rs b/wgpu/examples/skybox/main.rs index fdbd829d36..74a47a256c 100644 --- a/wgpu/examples/skybox/main.rs +++ b/wgpu/examples/skybox/main.rs @@ -1,7 +1,6 @@ #[path = "../framework.rs"] mod framework; -use futures::task::{LocalSpawn, LocalSpawnExt}; use wgpu::util::DeviceExt; const IMAGE_SIZE: u32 = 128; @@ -253,7 +252,7 @@ impl framework::Example for Skybox { frame: &wgpu::SwapChainTexture, device: &wgpu::Device, queue: &wgpu::Queue, - spawner: &impl LocalSpawn, + spawner: &framework::Spawner, ) { let mut encoder = device.create_command_encoder(&wgpu::CommandEncoderDescriptor { label: None }); @@ -301,7 +300,7 @@ impl framework::Example for Skybox { queue.submit(std::iter::once(encoder.finish())); let belt_future = self.staging_belt.recall(); - spawner.spawn_local(belt_future).unwrap(); + spawner.spawn_local(belt_future); } } diff --git a/wgpu/examples/texture-arrays/main.rs b/wgpu/examples/texture-arrays/main.rs index 89185fc504..27e0cc2745 100644 --- a/wgpu/examples/texture-arrays/main.rs +++ b/wgpu/examples/texture-arrays/main.rs @@ -307,7 +307,7 @@ impl framework::Example for Example { frame: &wgpu::SwapChainTexture, device: &wgpu::Device, queue: &wgpu::Queue, - _spawner: &impl futures::task::LocalSpawn, + _spawner: &framework::Spawner, ) { let mut encoder = device.create_command_encoder(&wgpu::CommandEncoderDescriptor { label: Some("primary"), diff --git a/wgpu/examples/water/main.rs b/wgpu/examples/water/main.rs index df4dbbd13a..b72d4986ad 100644 --- a/wgpu/examples/water/main.rs +++ b/wgpu/examples/water/main.rs @@ -673,7 +673,7 @@ impl framework::Example for Example { frame: &wgpu::SwapChainTexture, device: &wgpu::Device, queue: &wgpu::Queue, - _spawner: &impl futures::task::LocalSpawn, + _spawner: &framework::Spawner, ) { // Increment frame count regardless of if we draw. self.current_frame += 1; diff --git a/wgpu/src/backend/direct.rs b/wgpu/src/backend/direct.rs index bf9b965524..f373dbcae8 100644 --- a/wgpu/src/backend/direct.rs +++ b/wgpu/src/backend/direct.rs @@ -8,11 +8,17 @@ use crate::{ }; use arrayvec::ArrayVec; -use futures::future::{ready, Ready}; use parking_lot::Mutex; use smallvec::SmallVec; use std::{ - borrow::Cow::Borrowed, error::Error, fmt, marker::PhantomData, ops::Range, slice, sync::Arc, + borrow::Cow::Borrowed, + error::Error, + fmt, + future::{ready, Ready}, + marker::PhantomData, + ops::Range, + slice, + sync::Arc, }; use typed_arena::Arena; diff --git a/wgpu/src/backend/web.rs b/wgpu/src/backend/web.rs index 8333575f6b..494643b2a1 100644 --- a/wgpu/src/backend/web.rs +++ b/wgpu/src/backend/web.rs @@ -7,7 +7,6 @@ use crate::{ TextureViewDescriptor, TextureViewDimension, }; -use futures::FutureExt; use std::{ fmt, future::Future, @@ -55,19 +54,34 @@ pub(crate) struct RenderBundleEncoder(web_sys::GpuRenderBundleEncoder); // This is safe on wasm32 *for now*, but similarly to the unsafe Send impls for the handle type // wrappers, the full story for threading on wasm32 is still unfolding. -pub(crate) struct MakeSendFuture(F); +pub(crate) struct MakeSendFuture { + future: F, + map: M, +} -impl Future for MakeSendFuture { - type Output = F::Output; +impl T, T> Future for MakeSendFuture { + type Output = T; fn poll(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll { // This is safe because we have no Drop implementation to violate the Pin requirements and // do not provide any means of moving the inner future. - unsafe { self.map_unchecked_mut(|s| &mut s.0) }.poll(cx) + unsafe { + let this = self.get_unchecked_mut(); + match Pin::new_unchecked(&mut this.future).poll(cx) { + task::Poll::Ready(value) => task::Poll::Ready((this.map)(value)), + task::Poll::Pending => task::Poll::Pending, + } + } } } -unsafe impl Send for MakeSendFuture {} +impl MakeSendFuture { + fn new(future: F, map: M) -> Self { + Self { future, map } + } +} + +unsafe impl Send for MakeSendFuture {} impl crate::ComputePassInner for ComputePass { fn set_pipeline(&mut self, pipeline: &Sendable) { @@ -800,7 +814,6 @@ fn map_map_mode(mode: crate::MapMode) -> u32 { } type JsFutureResult = Result; -type FutureMap = futures::future::Map T>; fn future_request_adapter(result: JsFutureResult) -> Option> { match result { @@ -850,11 +863,18 @@ impl crate::Context for Context { type SwapChainOutputDetail = SwapChainOutputDetail; - type RequestAdapterFuture = MakeSendFuture>>; - type RequestDeviceFuture = MakeSendFuture< - FutureMap>, + type RequestAdapterFuture = MakeSendFuture< + wasm_bindgen_futures::JsFuture, + fn(JsFutureResult) -> Option, + >; + type RequestDeviceFuture = MakeSendFuture< + wasm_bindgen_futures::JsFuture, + fn(JsFutureResult) -> Result<(Self::DeviceId, Self::QueueId), crate::RequestDeviceError>, + >; + type MapAsyncFuture = MakeSendFuture< + wasm_bindgen_futures::JsFuture, + fn(JsFutureResult) -> Result<(), crate::BufferAsyncError>, >; - type MapAsyncFuture = MakeSendFuture>>; fn init(_backends: wgt::BackendBit) -> Self { Context(web_sys::window().unwrap().navigator().gpu()) @@ -900,8 +920,10 @@ impl crate::Context for Context { }; mapped_options.power_preference(mapped_power_preference); let adapter_promise = self.0.request_adapter_with_options(&mapped_options); - MakeSendFuture( - wasm_bindgen_futures::JsFuture::from(adapter_promise).map(future_request_adapter), + + MakeSendFuture::new( + wasm_bindgen_futures::JsFuture::from(adapter_promise), + future_request_adapter, ) } @@ -926,8 +948,9 @@ impl crate::Context for Context { mapped_desc.limits(&mapped_limits); let device_promise = adapter.0.request_device_with_descriptor(&mapped_desc); - MakeSendFuture( - wasm_bindgen_futures::JsFuture::from(device_promise).map(future_request_device), + MakeSendFuture::new( + wasm_bindgen_futures::JsFuture::from(device_promise), + future_request_device, ) } @@ -1343,7 +1366,10 @@ impl crate::Context for Context { (range.end - range.start) as f64, ); - MakeSendFuture(wasm_bindgen_futures::JsFuture::from(map_promise).map(future_map_async)) + MakeSendFuture::new( + wasm_bindgen_futures::JsFuture::from(map_promise), + future_map_async, + ) } fn buffer_get_mapped_range( diff --git a/wgpu/src/lib.rs b/wgpu/src/lib.rs index 1e7d293e60..287ff2e1c7 100644 --- a/wgpu/src/lib.rs +++ b/wgpu/src/lib.rs @@ -22,7 +22,6 @@ use std::{ thread, }; -use futures::FutureExt as _; use parking_lot::Mutex; pub use wgt::{ @@ -1312,9 +1311,8 @@ impl Instance { options: &RequestAdapterOptions, ) -> impl Future> + Send { let context = Arc::clone(&self.context); - self.context - .instance_request_adapter(options) - .map(|option| option.map(|id| Adapter { context, id })) + let adapter = self.context.instance_request_adapter(options); + async move { adapter.await.map(|id| Adapter { context, id }) } } /// Creates a surface from a raw window handle. @@ -1369,8 +1367,9 @@ impl Adapter { trace_path: Option<&std::path::Path>, ) -> impl Future> + Send { let context = Arc::clone(&self.context); - Context::adapter_request_device(&*self.context, &self.id, desc, trace_path).map(|result| { - result.map(|(device_id, queue_id)| { + let device = Context::adapter_request_device(&*self.context, &self.id, desc, trace_path); + async move { + device.await.map(|(device_id, queue_id)| { ( Device { context: Arc::clone(&context), @@ -1382,7 +1381,7 @@ impl Adapter { }, ) }) - }) + } } /// List all features that are supported with this adapter. diff --git a/wgpu/src/util/belt.rs b/wgpu/src/util/belt.rs index 251d6483b7..abaf03d844 100644 --- a/wgpu/src/util/belt.rs +++ b/wgpu/src/util/belt.rs @@ -2,9 +2,42 @@ use crate::{ Buffer, BufferAddress, BufferDescriptor, BufferSize, BufferUsage, BufferViewMut, CommandEncoder, Device, MapMode, }; -use futures::{future::join_all, FutureExt}; +use std::pin::Pin; +use std::task::{self, Poll}; use std::{future::Future, sync::mpsc}; +// Given a vector of futures, poll each in parallel until all are ready. +struct Join { + futures: Vec>, +} + +impl> Future for Join { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll { + // This is safe because we have no Drop implementation to violate the Pin requirements and + // do not provide any means of moving the inner futures. + let all_ready = unsafe { + // Poll all remaining futures, removing all that are ready + self.get_unchecked_mut().futures.iter_mut().all(|opt| { + if let Some(future) = opt { + if Pin::new_unchecked(future).poll(cx) == Poll::Ready(()) { + *opt = None; + } + } + + opt.is_none() + }) + }; + + if all_ready { + Poll::Ready(()) + } else { + Poll::Pending + } + } +} + struct Chunk { buffer: Buffer, size: BufferAddress, @@ -133,18 +166,24 @@ impl StagingBelt { self.free_chunks.push(chunk); } - let sender_template = &self.sender; - join_all(self.closed_chunks.drain(..).map(|chunk| { - let sender = sender_template.clone(); - chunk - .buffer - .slice(..) - .map_async(MapMode::Write) - .inspect(move |_| { + let sender = &self.sender; + let futures = self + .closed_chunks + .drain(..) + .map(|chunk| { + let sender = sender.clone(); + let async_buffer = chunk.buffer.slice(..).map_async(MapMode::Write); + + Some(async move { + // The result is ignored + async_buffer.await.ok(); + // The only possible error is the other side disconnecting, which is fine let _ = sender.send(chunk); }) - })) - .map(|_| ()) + }) + .collect::>(); + + Join { futures } } }