diff --git a/wgpu-core/src/command/allocator.rs b/wgpu-core/src/command/allocator.rs index 1a7983c5ce..a460579138 100644 --- a/wgpu-core/src/command/allocator.rs +++ b/wgpu-core/src/command/allocator.rs @@ -67,6 +67,7 @@ struct Inner { #[derive(Debug)] pub struct CommandAllocator { queue_family: hal::queue::QueueFamilyId, + internal_thread_id: thread::ThreadId, inner: Mutex>, } @@ -124,6 +125,7 @@ impl CommandAllocator { pub fn new(queue_family: hal::queue::QueueFamilyId) -> Self { CommandAllocator { queue_family, + internal_thread_id: thread::current().id(), inner: Mutex::new(Inner { pools: HashMap::new(), }), @@ -136,7 +138,7 @@ impl CommandAllocator { } pub fn allocate_internal(&self) -> B::CommandBuffer { - self.allocate_for_thread_id(thread::current().id()) + self.allocate_for_thread_id(self.internal_thread_id) } pub fn extend(&self, cmd_buf: &CommandBuffer) -> B::CommandBuffer { diff --git a/wgpu-core/src/device/mod.rs b/wgpu-core/src/device/mod.rs index e6e41b00e4..996d278f12 100644 --- a/wgpu-core/src/device/mod.rs +++ b/wgpu-core/src/device/mod.rs @@ -17,11 +17,9 @@ use gfx_memory::{Block, Heaps}; use hal::{ command::CommandBuffer as _, device::Device as _, - queue::CommandQueue as _, window::{PresentationSurface as _, Surface as _}, }; use parking_lot::{Mutex, MutexGuard}; -use smallvec::SmallVec; use wgt::{BufferAddress, InputStepMode, TextureDimension, TextureFormat, BIND_BUFFER_ALIGNMENT}; use std::{ @@ -32,8 +30,10 @@ use std::{ use spirv_headers::ExecutionModel; mod life; +mod queue; #[cfg(any(feature = "trace", feature = "replay"))] pub mod trace; + #[cfg(feature = "trace")] use trace::{Action, Trace}; @@ -1684,297 +1684,6 @@ impl Global { self.command_encoder_destroy::(command_buffer_id) } - pub fn queue_write_buffer( - &self, - queue_id: id::QueueId, - data: &[u8], - buffer_id: id::BufferId, - buffer_offset: BufferAddress, - ) { - let hub = B::hub(self); - let mut token = Token::root(); - let (mut device_guard, mut token) = hub.devices.write(&mut token); - let device = &mut device_guard[queue_id]; - let (buffer_guard, _) = hub.buffers.read(&mut token); - - #[cfg(feature = "trace")] - match device.trace { - Some(ref trace) => { - let mut trace = trace.lock(); - let data_path = trace.make_binary("bin", data); - trace.add(trace::Action::WriteBuffer { - id: buffer_id, - data: data_path, - range: buffer_offset..buffer_offset + data.len() as BufferAddress, - }); - } - None => {} - } - - let mut trackers = device.trackers.lock(); - let (dst, transition) = trackers.buffers.use_replace( - &*buffer_guard, - buffer_id, - (), - resource::BufferUse::COPY_DST, - ); - assert!( - dst.usage.contains(wgt::BufferUsage::COPY_DST), - "Write buffer usage {:?} must contain usage flag DST_SRC", - dst.usage - ); - - let submit_index = 1 + device.life_guard.submission_index.load(Ordering::Relaxed); - if !dst.life_guard.use_at(submit_index) { - device.temp_suspected.buffers.push(buffer_id); - } - - let src_raw = unsafe { - let mut buf = device - .raw - .create_buffer( - data.len() as BufferAddress, - hal::buffer::Usage::TRANSFER_SRC, - ) - .unwrap(); - device.raw.set_buffer_name(&mut buf, ""); - buf - }; - //TODO: do we need to transition into HOST_WRITE access first? - let requirements = unsafe { device.raw.get_buffer_requirements(&src_raw) }; - - let mut memory = device - .mem_allocator - .lock() - .allocate( - &device.raw, - requirements.type_mask as u32, - gfx_memory::MemoryUsage::Staging { read_back: false }, - gfx_memory::Kind::Linear, - requirements.size, - requirements.alignment, - ) - .unwrap(); - - let mut mapped = memory.map(&device.raw, hal::memory::Segment::ALL).unwrap(); - unsafe { mapped.write(&device.raw, hal::memory::Segment::ALL) } - .unwrap() - .slice - .copy_from_slice(data); - - let mut comb = match device.pending_write_command_buffer.take() { - Some(comb) => comb, - None => { - let mut comb = device.com_allocator.allocate_internal(); - unsafe { - comb.begin_primary(hal::command::CommandBufferFlags::ONE_TIME_SUBMIT); - } - comb - } - }; - let region = hal::command::BufferCopy { - src: 0, - dst: buffer_offset, - size: data.len() as _, - }; - unsafe { - comb.pipeline_barrier( - all_buffer_stages()..all_buffer_stages(), - hal::memory::Dependencies::empty(), - iter::once(hal::memory::Barrier::Buffer { - states: hal::buffer::Access::HOST_WRITE..hal::buffer::Access::TRANSFER_READ, - target: &src_raw, - range: hal::buffer::SubRange::WHOLE, - families: None, - }) - .chain(transition.map(|pending| pending.into_hal(dst))), - ); - comb.copy_buffer(&src_raw, &dst.raw, iter::once(region)); - } - device.pending_write_command_buffer = Some(comb); - } - - pub fn queue_submit( - &self, - queue_id: id::QueueId, - command_buffer_ids: &[id::CommandBufferId], - ) { - let hub = B::hub(self); - - let (submit_index, fence) = { - let mut token = Token::root(); - let (mut device_guard, mut token) = hub.devices.write(&mut token); - let device = &mut device_guard[queue_id]; - let pending_write_command_buffer = device.pending_write_command_buffer.take(); - device.temp_suspected.clear(); - - let submit_index = 1 + device - .life_guard - .submission_index - .fetch_add(1, Ordering::Relaxed); - - let (mut swap_chain_guard, mut token) = hub.swap_chains.write(&mut token); - let (mut command_buffer_guard, mut token) = hub.command_buffers.write(&mut token); - let (bind_group_guard, mut token) = hub.bind_groups.read(&mut token); - let (compute_pipe_guard, mut token) = hub.compute_pipelines.read(&mut token); - let (render_pipe_guard, mut token) = hub.render_pipelines.read(&mut token); - let (mut buffer_guard, mut token) = hub.buffers.write(&mut token); - let (texture_guard, mut token) = hub.textures.read(&mut token); - let (texture_view_guard, mut token) = hub.texture_views.read(&mut token); - let (sampler_guard, _) = hub.samplers.read(&mut token); - - //Note: locking the trackers has to be done after the storages - let mut signal_swapchain_semaphores = SmallVec::<[_; 1]>::new(); - let mut trackers = device.trackers.lock(); - - //TODO: if multiple command buffers are submitted, we can re-use the last - // native command buffer of the previous chain instead of always creating - // a temporary one, since the chains are not finished. - - // finish all the command buffers first - for &cmb_id in command_buffer_ids { - let comb = &mut command_buffer_guard[cmb_id]; - #[cfg(feature = "trace")] - match device.trace { - Some(ref trace) => trace - .lock() - .add(Action::Submit(submit_index, comb.commands.take().unwrap())), - None => (), - }; - - if let Some((sc_id, fbo)) = comb.used_swap_chain.take() { - let sc = &mut swap_chain_guard[sc_id.value]; - assert!(sc.acquired_view_id.is_some(), - "SwapChainOutput for {:?} was dropped before the respective command buffer {:?} got submitted!", - sc_id.value, cmb_id); - if sc.acquired_framebuffers.is_empty() { - signal_swapchain_semaphores.push(sc_id.value); - } - sc.acquired_framebuffers.push(fbo); - } - - // optimize the tracked states - comb.trackers.optimize(); - - // update submission IDs - for id in comb.trackers.buffers.used() { - if let resource::BufferMapState::Waiting(_) = buffer_guard[id].map_state { - panic!("Buffer has a pending mapping."); - } - if !buffer_guard[id].life_guard.use_at(submit_index) { - if let resource::BufferMapState::Active { .. } = buffer_guard[id].map_state - { - log::warn!("Dropped buffer has a pending mapping."); - unmap_buffer(&device.raw, &mut buffer_guard[id]); - } - device.temp_suspected.buffers.push(id); - } - } - for id in comb.trackers.textures.used() { - if !texture_guard[id].life_guard.use_at(submit_index) { - device.temp_suspected.textures.push(id); - } - } - for id in comb.trackers.views.used() { - if !texture_view_guard[id].life_guard.use_at(submit_index) { - device.temp_suspected.texture_views.push(id); - } - } - for id in comb.trackers.bind_groups.used() { - if !bind_group_guard[id].life_guard.use_at(submit_index) { - device.temp_suspected.bind_groups.push(id); - } - } - for id in comb.trackers.samplers.used() { - if !sampler_guard[id].life_guard.use_at(submit_index) { - device.temp_suspected.samplers.push(id); - } - } - for id in comb.trackers.compute_pipes.used() { - if !compute_pipe_guard[id].life_guard.use_at(submit_index) { - device.temp_suspected.compute_pipelines.push(id); - } - } - for id in comb.trackers.render_pipes.used() { - if !render_pipe_guard[id].life_guard.use_at(submit_index) { - device.temp_suspected.render_pipelines.push(id); - } - } - - // execute resource transitions - let mut transit = device.com_allocator.extend(comb); - unsafe { - // the last buffer was open, closing now - comb.raw.last_mut().unwrap().finish(); - transit.begin_primary(hal::command::CommandBufferFlags::ONE_TIME_SUBMIT); - } - log::trace!("Stitching command buffer {:?} before submission", cmb_id); - command::CommandBuffer::insert_barriers( - &mut transit, - &mut *trackers, - &comb.trackers, - &*buffer_guard, - &*texture_guard, - ); - unsafe { - transit.finish(); - } - comb.raw.insert(0, transit); - } - - log::debug!("Device after submission {}: {:#?}", submit_index, trackers); - - // now prepare the GPU submission - let fence = device.raw.create_fence(false).unwrap(); - let submission = hal::queue::Submission { - command_buffers: pending_write_command_buffer.as_ref().into_iter().chain( - command_buffer_ids - .iter() - .flat_map(|&cmb_id| &command_buffer_guard[cmb_id].raw), - ), - wait_semaphores: Vec::new(), - signal_semaphores: signal_swapchain_semaphores - .into_iter() - .map(|sc_id| &swap_chain_guard[sc_id].semaphore), - }; - - unsafe { - device.queue_group.queues[0].submit(submission, Some(&fence)); - } - - if let Some(comb_raw) = pending_write_command_buffer { - device - .com_allocator - .after_submit_internal(comb_raw, submit_index); - } - (submit_index, fence) - }; - - // No need for write access to the device from here on out - let callbacks = { - let mut token = Token::root(); - let (device_guard, mut token) = hub.devices.read(&mut token); - let device = &device_guard[queue_id]; - - let callbacks = device.maintain(self, false, &mut token); - device.lock_life(&mut token).track_submission( - submit_index, - fence, - &device.temp_suspected, - ); - - // finally, return the command buffers to the allocator - for &cmb_id in command_buffer_ids { - let (cmd_buf, _) = hub.command_buffers.unregister(cmb_id, &mut token); - device.com_allocator.after_submit(cmd_buf, submit_index); - } - - callbacks - }; - - fire_map_callbacks(callbacks); - } - pub fn device_create_render_pipeline( &self, device_id: id::DeviceId, diff --git a/wgpu-core/src/device/queue.rs b/wgpu-core/src/device/queue.rs new file mode 100644 index 0000000000..897463b03f --- /dev/null +++ b/wgpu-core/src/device/queue.rs @@ -0,0 +1,302 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::{ + command::CommandBuffer, + hub::{GfxBackend, Global, GlobalIdentityHandlerFactory, Token}, + id, + resource::{BufferMapState, BufferUse}, +}; +use gfx_memory::Block; +use hal::{command::CommandBuffer as _, device::Device as _, queue::CommandQueue as _}; +use smallvec::SmallVec; +use std::{iter, sync::atomic::Ordering}; + +impl Global { + pub fn queue_write_buffer( + &self, + queue_id: id::QueueId, + data: &[u8], + buffer_id: id::BufferId, + buffer_offset: wgt::BufferAddress, + ) { + let hub = B::hub(self); + let mut token = Token::root(); + let (mut device_guard, mut token) = hub.devices.write(&mut token); + let device = &mut device_guard[queue_id]; + let (buffer_guard, _) = hub.buffers.read(&mut token); + + #[cfg(feature = "trace")] + match device.trace { + Some(ref trace) => { + let mut trace = trace.lock(); + let data_path = trace.make_binary("bin", data); + trace.add(trace::Action::WriteBuffer { + id: buffer_id, + data: data_path, + range: buffer_offset..buffer_offset + data.len() as wgt::BufferAddress, + }); + } + None => {} + } + + let mut trackers = device.trackers.lock(); + let (dst, transition) = + trackers + .buffers + .use_replace(&*buffer_guard, buffer_id, (), BufferUse::COPY_DST); + assert!( + dst.usage.contains(wgt::BufferUsage::COPY_DST), + "Write buffer usage {:?} must contain usage flag DST_SRC", + dst.usage + ); + + let last_submit_index = device.life_guard.submission_index.load(Ordering::Relaxed); + dst.life_guard.use_at(last_submit_index + 1); + + let src_raw = unsafe { + let mut buf = device + .raw + .create_buffer( + data.len() as wgt::BufferAddress, + hal::buffer::Usage::TRANSFER_SRC, + ) + .unwrap(); + device.raw.set_buffer_name(&mut buf, ""); + buf + }; + //TODO: do we need to transition into HOST_WRITE access first? + let requirements = unsafe { device.raw.get_buffer_requirements(&src_raw) }; + + let mut memory = device + .mem_allocator + .lock() + .allocate( + &device.raw, + requirements.type_mask as u32, + gfx_memory::MemoryUsage::Staging { read_back: false }, + gfx_memory::Kind::Linear, + requirements.size, + requirements.alignment, + ) + .unwrap(); + + let mut mapped = memory.map(&device.raw, hal::memory::Segment::ALL).unwrap(); + unsafe { mapped.write(&device.raw, hal::memory::Segment::ALL) } + .unwrap() + .slice + .copy_from_slice(data); + + let mut comb = match device.pending_write_command_buffer.take() { + Some(comb) => comb, + None => { + let mut comb = device.com_allocator.allocate_internal(); + unsafe { + comb.begin_primary(hal::command::CommandBufferFlags::ONE_TIME_SUBMIT); + } + comb + } + }; + let region = hal::command::BufferCopy { + src: 0, + dst: buffer_offset, + size: data.len() as _, + }; + unsafe { + comb.pipeline_barrier( + super::all_buffer_stages()..hal::pso::PipelineStage::TRANSFER, + hal::memory::Dependencies::empty(), + iter::once(hal::memory::Barrier::Buffer { + states: hal::buffer::Access::HOST_WRITE..hal::buffer::Access::TRANSFER_READ, + target: &src_raw, + range: hal::buffer::SubRange::WHOLE, + families: None, + }) + .chain(transition.map(|pending| pending.into_hal(dst))), + ); + comb.copy_buffer(&src_raw, &dst.raw, iter::once(region)); + } + device.pending_write_command_buffer = Some(comb); + } + + pub fn queue_submit( + &self, + queue_id: id::QueueId, + command_buffer_ids: &[id::CommandBufferId], + ) { + let hub = B::hub(self); + + let (submit_index, fence) = { + let mut token = Token::root(); + let (mut device_guard, mut token) = hub.devices.write(&mut token); + let device = &mut device_guard[queue_id]; + let pending_write_command_buffer = device.pending_write_command_buffer.take(); + device.temp_suspected.clear(); + + let submit_index = 1 + device + .life_guard + .submission_index + .fetch_add(1, Ordering::Relaxed); + + let (mut swap_chain_guard, mut token) = hub.swap_chains.write(&mut token); + let (mut command_buffer_guard, mut token) = hub.command_buffers.write(&mut token); + let (bind_group_guard, mut token) = hub.bind_groups.read(&mut token); + let (compute_pipe_guard, mut token) = hub.compute_pipelines.read(&mut token); + let (render_pipe_guard, mut token) = hub.render_pipelines.read(&mut token); + let (mut buffer_guard, mut token) = hub.buffers.write(&mut token); + let (texture_guard, mut token) = hub.textures.read(&mut token); + let (texture_view_guard, mut token) = hub.texture_views.read(&mut token); + let (sampler_guard, _) = hub.samplers.read(&mut token); + + //Note: locking the trackers has to be done after the storages + let mut signal_swapchain_semaphores = SmallVec::<[_; 1]>::new(); + let mut trackers = device.trackers.lock(); + + //TODO: if multiple command buffers are submitted, we can re-use the last + // native command buffer of the previous chain instead of always creating + // a temporary one, since the chains are not finished. + + // finish all the command buffers first + for &cmb_id in command_buffer_ids { + let comb = &mut command_buffer_guard[cmb_id]; + #[cfg(feature = "trace")] + match device.trace { + Some(ref trace) => trace + .lock() + .add(Action::Submit(submit_index, comb.commands.take().unwrap())), + None => (), + }; + + if let Some((sc_id, fbo)) = comb.used_swap_chain.take() { + let sc = &mut swap_chain_guard[sc_id.value]; + assert!(sc.acquired_view_id.is_some(), + "SwapChainOutput for {:?} was dropped before the respective command buffer {:?} got submitted!", + sc_id.value, cmb_id); + if sc.acquired_framebuffers.is_empty() { + signal_swapchain_semaphores.push(sc_id.value); + } + sc.acquired_framebuffers.push(fbo); + } + + // optimize the tracked states + comb.trackers.optimize(); + + // update submission IDs + for id in comb.trackers.buffers.used() { + if let BufferMapState::Waiting(_) = buffer_guard[id].map_state { + panic!("Buffer has a pending mapping."); + } + if !buffer_guard[id].life_guard.use_at(submit_index) { + if let BufferMapState::Active { .. } = buffer_guard[id].map_state { + log::warn!("Dropped buffer has a pending mapping."); + super::unmap_buffer(&device.raw, &mut buffer_guard[id]); + } + device.temp_suspected.buffers.push(id); + } + } + for id in comb.trackers.textures.used() { + if !texture_guard[id].life_guard.use_at(submit_index) { + device.temp_suspected.textures.push(id); + } + } + for id in comb.trackers.views.used() { + if !texture_view_guard[id].life_guard.use_at(submit_index) { + device.temp_suspected.texture_views.push(id); + } + } + for id in comb.trackers.bind_groups.used() { + if !bind_group_guard[id].life_guard.use_at(submit_index) { + device.temp_suspected.bind_groups.push(id); + } + } + for id in comb.trackers.samplers.used() { + if !sampler_guard[id].life_guard.use_at(submit_index) { + device.temp_suspected.samplers.push(id); + } + } + for id in comb.trackers.compute_pipes.used() { + if !compute_pipe_guard[id].life_guard.use_at(submit_index) { + device.temp_suspected.compute_pipelines.push(id); + } + } + for id in comb.trackers.render_pipes.used() { + if !render_pipe_guard[id].life_guard.use_at(submit_index) { + device.temp_suspected.render_pipelines.push(id); + } + } + + // execute resource transitions + let mut transit = device.com_allocator.extend(comb); + unsafe { + // the last buffer was open, closing now + comb.raw.last_mut().unwrap().finish(); + transit.begin_primary(hal::command::CommandBufferFlags::ONE_TIME_SUBMIT); + } + log::trace!("Stitching command buffer {:?} before submission", cmb_id); + CommandBuffer::insert_barriers( + &mut transit, + &mut *trackers, + &comb.trackers, + &*buffer_guard, + &*texture_guard, + ); + unsafe { + transit.finish(); + } + comb.raw.insert(0, transit); + } + + log::debug!("Device after submission {}: {:#?}", submit_index, trackers); + + // now prepare the GPU submission + let fence = device.raw.create_fence(false).unwrap(); + let submission = hal::queue::Submission { + command_buffers: pending_write_command_buffer.as_ref().into_iter().chain( + command_buffer_ids + .iter() + .flat_map(|&cmb_id| &command_buffer_guard[cmb_id].raw), + ), + wait_semaphores: Vec::new(), + signal_semaphores: signal_swapchain_semaphores + .into_iter() + .map(|sc_id| &swap_chain_guard[sc_id].semaphore), + }; + + unsafe { + device.queue_group.queues[0].submit(submission, Some(&fence)); + } + + if let Some(comb_raw) = pending_write_command_buffer { + device + .com_allocator + .after_submit_internal(comb_raw, submit_index); + } + (submit_index, fence) + }; + + // No need for write access to the device from here on out + let callbacks = { + let mut token = Token::root(); + let (device_guard, mut token) = hub.devices.read(&mut token); + let device = &device_guard[queue_id]; + + let callbacks = device.maintain(self, false, &mut token); + device.lock_life(&mut token).track_submission( + submit_index, + fence, + &device.temp_suspected, + ); + + // finally, return the command buffers to the allocator + for &cmb_id in command_buffer_ids { + let (cmd_buf, _) = hub.command_buffers.unregister(cmb_id, &mut token); + device.com_allocator.after_submit(cmd_buf, submit_index); + } + + callbacks + }; + + super::fire_map_callbacks(callbacks); + } +}