Improve handling of erroring command buffers on submission

This commit is contained in:
Dzmitry Malyshau
2021-05-20 15:35:14 -04:00
parent 314ed4cce5
commit e9d2f402f1
5 changed files with 178 additions and 168 deletions

View File

@@ -2,7 +2,7 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use super::CommandBuffer;
use super::{CommandBuffer, CommandEncoderStatus};
use crate::{
device::DeviceError, hub::GfxBackend, id::DeviceId, track::TrackerSet, FastHashMap,
PrivateFeatures, Stored, SubmissionIndex,
@@ -126,7 +126,7 @@ impl<B: GfxBackend> CommandAllocator<B> {
Ok(CommandBuffer {
raw: vec![pool.allocate()],
is_recording: true,
status: CommandEncoderStatus::Recording,
recorded_thread_id: thread_id,
device_id,
trackers: TrackerSet::new(B::VARIANT),

View File

@@ -6,7 +6,8 @@ use crate::{
binding_model::{BindError, BindGroup, PushConstantUploadError},
command::{
bind::Binder, end_pipeline_statistics_query, BasePass, BasePassRef, CommandBuffer,
CommandEncoderError, MapPassErr, PassErrorScope, QueryUseError, StateChange,
CommandEncoderError, CommandEncoderStatus, MapPassErr, PassErrorScope, QueryUseError,
StateChange,
},
hub::{GfxBackend, Global, GlobalIdentityHandlerFactory, Storage, Token},
id,
@@ -266,6 +267,8 @@ impl<G: GlobalIdentityHandlerFactory> Global<G> {
let (mut cmd_buf_guard, mut token) = hub.command_buffers.write(&mut token);
let cmd_buf =
CommandBuffer::get_encoder_mut(&mut *cmd_buf_guard, encoder_id).map_pass_err(scope)?;
// will be reset to true if recording is done without errors
cmd_buf.status = CommandEncoderStatus::Error;
let raw = cmd_buf.raw.last_mut().unwrap();
#[cfg(feature = "trace")]
@@ -657,6 +660,7 @@ impl<G: GlobalIdentityHandlerFactory> Global<G> {
raw.end_debug_marker();
}
}
cmd_buf.status = CommandEncoderStatus::Recording;
Ok(())
}

View File

@@ -38,10 +38,17 @@ use std::thread::ThreadId;
const PUSH_CONSTANT_CLEAR_ARRAY: &[u32] = &[0_u32; 64];
#[derive(Debug)]
enum CommandEncoderStatus {
Recording,
Finished,
Error,
}
#[derive(Debug)]
pub struct CommandBuffer<B: hal::Backend> {
pub(crate) raw: Vec<B::CommandBuffer>,
is_recording: bool,
status: CommandEncoderStatus,
recorded_thread_id: ThreadId,
pub(crate) device_id: Stored<id::DeviceId>,
pub(crate) trackers: TrackerSet,
@@ -63,12 +70,22 @@ impl<B: GfxBackend> CommandBuffer<B> {
id: id::CommandEncoderId,
) -> Result<&mut Self, CommandEncoderError> {
match storage.get_mut(id) {
Ok(cmd_buf) if cmd_buf.is_recording => Ok(cmd_buf),
Ok(_) => Err(CommandEncoderError::NotRecording),
Ok(cmd_buf) => match cmd_buf.status {
CommandEncoderStatus::Recording => Ok(cmd_buf),
CommandEncoderStatus::Finished => Err(CommandEncoderError::NotRecording),
CommandEncoderStatus::Error => Err(CommandEncoderError::Invalid),
},
Err(_) => Err(CommandEncoderError::Invalid),
}
}
pub fn is_finished(&self) -> bool {
match self.status {
CommandEncoderStatus::Finished => true,
_ => false,
}
}
pub(crate) fn insert_barriers(
raw: &mut B::CommandBuffer,
base: &mut TrackerSet,
@@ -209,7 +226,7 @@ impl<G: GlobalIdentityHandlerFactory> Global<G> {
let error = match CommandBuffer::get_encoder_mut(&mut *cmd_buf_guard, encoder_id) {
Ok(cmd_buf) => {
cmd_buf.is_recording = false;
cmd_buf.status = CommandEncoderStatus::Finished;
// stop tracking the swapchain image, if used
for sc_id in cmd_buf.used_swap_chains.iter() {
let view_id = swap_chain_guard[sc_id.value]

View File

@@ -6,8 +6,9 @@ use crate::{
binding_model::BindError,
command::{
bind::Binder, end_pipeline_statistics_query, BasePass, BasePassRef, CommandBuffer,
CommandEncoderError, DrawError, ExecutionError, MapPassErr, PassErrorScope, QueryResetMap,
QueryUseError, RenderCommand, RenderCommandError, StateChange,
CommandEncoderError, CommandEncoderStatus, DrawError, ExecutionError, MapPassErr,
PassErrorScope, QueryResetMap, QueryUseError, RenderCommand, RenderCommandError,
StateChange,
},
conv,
device::{
@@ -1042,12 +1043,14 @@ impl<G: GlobalIdentityHandlerFactory> Global<G> {
let (device_guard, mut token) = hub.devices.read(&mut token);
let (cmd_buf_raw, trackers, used_swapchain, query_reset_state) = {
let (cmd_buf_raw, trackers, query_reset_state) = {
// read-only lock guard
let (mut cmb_guard, mut token) = hub.command_buffers.write(&mut token);
let cmd_buf =
CommandBuffer::get_encoder_mut(&mut *cmb_guard, encoder_id).map_pass_err(scope)?;
// will be reset to true if recording is done without errors
cmd_buf.status = CommandEncoderStatus::Error;
cmd_buf.has_labels |= base.label.is_some();
#[cfg(feature = "trace")]
if let Some(ref mut list) = cmd_buf.commands {
@@ -1958,7 +1961,9 @@ impl<G: GlobalIdentityHandlerFactory> Global<G> {
}
let (trackers, used_swapchain) = info.finish(&*texture_guard).map_pass_err(scope)?;
(raw, trackers, used_swapchain, query_reset_state)
cmd_buf.status = CommandEncoderStatus::Recording;
cmd_buf.used_swap_chains.extend(used_swapchain);
(raw, trackers, query_reset_state)
};
let (mut cmb_guard, mut token) = hub.command_buffers.write(&mut token);
@@ -1968,7 +1973,6 @@ impl<G: GlobalIdentityHandlerFactory> Global<G> {
let cmd_buf =
CommandBuffer::get_encoder_mut(&mut *cmb_guard, encoder_id).map_pass_err(scope)?;
cmd_buf.used_swap_chains.extend(used_swapchain);
let last_cmd_buf = cmd_buf.raw.last_mut().unwrap();
query_reset_state

View File

@@ -11,10 +11,10 @@ use crate::{
},
conv,
device::{alloc, DeviceError, WaitIdleError},
hub::{GfxBackend, Global, GlobalIdentityHandlerFactory, Token},
hub::{GfxBackend, Global, GlobalIdentityHandlerFactory, Storage, Token},
id,
memory_init_tracker::MemoryInitKind,
resource::{BufferAccessError, BufferMapState, BufferUse, TextureUse},
memory_init_tracker::{MemoryInitKind, MemoryInitTrackerAction},
resource::{Buffer, BufferAccessError, BufferMapState, BufferUse, TextureUse},
FastHashMap, FastHashSet,
};
@@ -94,18 +94,55 @@ impl<B: hal::Backend> PendingWrites<B> {
cmd_buf
})
}
fn borrow_cmd_buf(&mut self, cmd_allocator: &CommandAllocator<B>) -> &mut B::CommandBuffer {
if self.command_buffer.is_none() {
let mut cmdbuf = cmd_allocator.allocate_internal();
unsafe {
cmdbuf.begin_primary(hal::command::CommandBufferFlags::ONE_TIME_SUBMIT);
}
self.command_buffer = Some(cmdbuf);
}
self.command_buffer.as_mut().unwrap()
}
}
#[derive(Default)]
struct RequiredBufferInits {
map: FastHashMap<id::BufferId, Vec<Range<wgt::BufferAddress>>>,
}
impl RequiredBufferInits {
fn add<B: hal::Backend>(
&mut self,
buffer_memory_init_actions: &[MemoryInitTrackerAction<id::BufferId>],
buffer_guard: &mut Storage<Buffer<B>, id::BufferId>,
) -> Result<(), QueueSubmitError> {
for buffer_use in buffer_memory_init_actions.iter() {
let buffer = buffer_guard
.get_mut(buffer_use.id)
.map_err(|_| QueueSubmitError::DestroyedBuffer(buffer_use.id))?;
let uninitialized_ranges = buffer.initialization_status.drain(buffer_use.range.clone());
match buffer_use.kind {
MemoryInitKind::ImplicitlyInitialized => {
uninitialized_ranges.for_each(drop);
}
MemoryInitKind::NeedsInitializedMemory => {
self.map
.entry(buffer_use.id)
.or_default()
.extend(uninitialized_ranges);
}
}
}
Ok(())
}
}
impl<B: hal::Backend> super::Device<B> {
pub fn borrow_pending_writes(&mut self) -> &mut B::CommandBuffer {
if self.pending_writes.command_buffer.is_none() {
let mut cmdbuf = self.cmd_allocator.allocate_internal();
unsafe {
cmdbuf.begin_primary(hal::command::CommandBufferFlags::ONE_TIME_SUBMIT);
}
self.pending_writes.command_buffer = Some(cmdbuf);
}
self.pending_writes.command_buffer.as_mut().unwrap()
self.pending_writes.borrow_cmd_buf(&self.cmd_allocator)
}
fn prepare_stage(&mut self, size: wgt::BufferAddress) -> Result<StagingData<B>, DeviceError> {
@@ -151,6 +188,70 @@ impl<B: hal::Backend> super::Device<B> {
cmdbuf,
})
}
fn initialize_buffer_memory(
&mut self,
mut required_buffer_inits: RequiredBufferInits,
buffer_guard: &mut Storage<Buffer<B>, id::BufferId>,
) -> Result<(), QueueSubmitError> {
self.pending_writes
.dst_buffers
.extend(required_buffer_inits.map.keys());
let cmd_buf = self.pending_writes.borrow_cmd_buf(&self.cmd_allocator);
let mut trackers = self.trackers.lock();
for (buffer_id, mut ranges) in required_buffer_inits.map.drain() {
// Collapse touching ranges. We can't do this any earlier since we only now gathered ranges from several different command buffers!
ranges.sort_by(|a, b| a.start.cmp(&b.start));
for i in (1..ranges.len()).rev() {
assert!(ranges[i - 1].end <= ranges[i].start); // The memory init tracker made sure of this!
if ranges[i].start == ranges[i - 1].end {
ranges[i - 1].end = ranges[i].end;
ranges.swap_remove(i); // Ordering not important at this point
}
}
// Don't do use_replace since the buffer may already no longer have a ref_count.
// However, we *know* that it is currently in use, so the tracker must already know about it.
let transition = trackers.buffers.change_replace_tracked(
id::Valid(buffer_id),
(),
BufferUse::COPY_DST,
);
let buffer = buffer_guard.get(buffer_id).unwrap();
let &(ref buffer_raw, _) = buffer
.raw
.as_ref()
.ok_or(QueueSubmitError::DestroyedBuffer(buffer_id))?;
unsafe {
cmd_buf.pipeline_barrier(
super::all_buffer_stages()..hal::pso::PipelineStage::TRANSFER,
hal::memory::Dependencies::empty(),
transition.map(|pending| pending.into_hal(buffer)),
);
}
for range in ranges {
let size = range.end - range.start;
assert!(range.start % 4 == 0, "Buffer {:?} has an uninitialized range with a start not aligned to 4 (start was {})", buffer, range.start);
assert!(size % 4 == 0, "Buffer {:?} has an uninitialized range with a size not aligned to 4 (size was {})", buffer, size);
unsafe {
cmd_buf.fill_buffer(
buffer_raw,
hal::buffer::SubRange {
offset: range.start,
size: Some(size),
},
0,
);
}
}
}
Ok(())
}
}
#[derive(Clone, Debug, Error)]
@@ -169,8 +270,6 @@ pub enum QueueWriteError {
pub enum QueueSubmitError {
#[error(transparent)]
Queue(#[from] DeviceError),
#[error("command buffer {0:?} is invalid")]
InvalidCommandBuffer(id::CommandBufferId),
#[error("buffer {0:?} is destroyed")]
DestroyedBuffer(id::BufferId),
#[error("texture {0:?} is destroyed")]
@@ -487,135 +586,6 @@ impl<G: GlobalIdentityHandlerFactory> Global<G> {
Ok(())
}
// Enacts all zero initializations required by the given command buffers
// Required commands are appended to device.pending_writes
fn initialize_used_uninitialized_memory<B: GfxBackend>(
&self,
queue_id: id::QueueId,
command_buffer_ids: &[id::CommandBufferId],
) -> Result<(), QueueSubmitError> {
if command_buffer_ids.is_empty() {
return Ok(());
}
let hub = B::hub(self);
let mut token = Token::root();
let mut required_buffer_inits = {
let (command_buffer_guard, mut token) = hub.command_buffers.read(&mut token);
let mut required_buffer_inits: FastHashMap<
id::BufferId,
Vec<Range<wgt::BufferAddress>>,
> = FastHashMap::default();
for &cmb_id in command_buffer_ids {
let cmdbuf = command_buffer_guard
.get(cmb_id)
.map_err(|_| QueueSubmitError::InvalidCommandBuffer(cmb_id))?;
if cmdbuf.buffer_memory_init_actions.is_empty() {
continue;
}
let (mut buffer_guard, _) = hub.buffers.write(&mut token);
for buffer_use in cmdbuf.buffer_memory_init_actions.iter() {
let buffer = buffer_guard
.get_mut(buffer_use.id)
.map_err(|_| QueueSubmitError::DestroyedBuffer(buffer_use.id))?;
let uninitialized_ranges =
buffer.initialization_status.drain(buffer_use.range.clone());
match buffer_use.kind {
MemoryInitKind::ImplicitlyInitialized => {
uninitialized_ranges.for_each(drop);
}
MemoryInitKind::NeedsInitializedMemory => {
required_buffer_inits
.entry(buffer_use.id)
.or_default()
.extend(uninitialized_ranges);
}
}
}
}
required_buffer_inits
};
// Memory init is expected to be rare (means user relies on default zero!), so most of the time we early here!
if required_buffer_inits.is_empty() {
return Ok(());
}
let (mut device_guard, mut token) = hub.devices.write(&mut token);
let (buffer_guard, _) = hub.buffers.read(&mut token);
let device = device_guard
.get_mut(queue_id)
.map_err(|_| DeviceError::Invalid)?;
device
.pending_writes
.dst_buffers
.extend(required_buffer_inits.keys());
device.borrow_pending_writes(); // Call ensures there is a pending_writes cmdbuffer, but using the reference returned would make the borrow checker unhappy!
let pending_writes_cmd_buf = device.pending_writes.command_buffer.as_mut().unwrap();
let mut trackers = device.trackers.lock();
for (buffer_id, mut ranges) in required_buffer_inits.drain() {
// Collapse touching ranges. We can't do this any earlier since we only now gathered ranges from several different command buffers!
ranges.sort_by(|a, b| a.start.cmp(&b.start));
for i in (1..ranges.len()).rev() {
assert!(ranges[i - 1].end <= ranges[i].start); // The memory init tracker made sure of this!
if ranges[i].start == ranges[i - 1].end {
ranges[i - 1].end = ranges[i].end;
ranges.swap_remove(i); // Ordering not important at this point
}
}
// Don't do use_replace since the buffer may already no longer have a ref_count.
// However, we *know* that it is currently in use, so the tracker must already know about it.
let transition = trackers.buffers.change_replace_tracked(
id::Valid(buffer_id),
(),
BufferUse::COPY_DST,
);
let buffer = buffer_guard
.get(buffer_id)
.map_err(|_| QueueSubmitError::DestroyedBuffer(buffer_id))?;
let &(ref buffer_raw, _) = buffer
.raw
.as_ref()
.ok_or(QueueSubmitError::DestroyedBuffer(buffer_id))?;
unsafe {
pending_writes_cmd_buf.pipeline_barrier(
super::all_buffer_stages()..hal::pso::PipelineStage::TRANSFER,
hal::memory::Dependencies::empty(),
transition.map(|pending| pending.into_hal(buffer)),
);
}
for range in ranges {
let size = range.end - range.start;
assert!(range.start % 4 == 0, "Buffer {:?} has an uninitialized range with a start not aligned to 4 (start was {})", buffer, range.start);
assert!(size % 4 == 0, "Buffer {:?} has an uninitialized range with a size not aligned to 4 (size was {})", buffer, size);
unsafe {
pending_writes_cmd_buf.fill_buffer(
buffer_raw,
hal::buffer::SubRange {
offset: range.start,
size: Some(size),
},
0,
);
}
}
}
Ok(())
}
pub fn queue_submit<B: GfxBackend>(
&self,
queue_id: id::QueueId,
@@ -623,8 +593,6 @@ impl<G: GlobalIdentityHandlerFactory> Global<G> {
) -> Result<(), QueueSubmitError> {
profiling::scope!("submit", "Queue");
self.initialize_used_uninitialized_memory::<B>(queue_id, command_buffer_ids)?;
let hub = B::hub(self);
let mut token = Token::root();
@@ -656,6 +624,7 @@ impl<G: GlobalIdentityHandlerFactory> Global<G> {
let (texture_view_guard, mut token) = hub.texture_views.read(&mut token);
let (sampler_guard, _) = hub.samplers.read(&mut token);
let mut required_buffer_inits = RequiredBufferInits::default();
//Note: locking the trackers has to be done after the storages
let mut trackers = device.trackers.lock();
@@ -665,9 +634,10 @@ impl<G: GlobalIdentityHandlerFactory> Global<G> {
// finish all the command buffers first
for &cmb_id in command_buffer_ids {
let cmdbuf = command_buffer_guard
.get_mut(cmb_id)
.map_err(|_| QueueSubmitError::InvalidCommandBuffer(cmb_id))?;
let cmdbuf = match command_buffer_guard.get_mut(cmb_id) {
Ok(cmdbuf) => cmdbuf,
Err(_) => continue,
};
#[cfg(feature = "trace")]
if let Some(ref trace) = device.trace {
trace.lock().add(Action::Submit(
@@ -675,6 +645,14 @@ impl<G: GlobalIdentityHandlerFactory> Global<G> {
cmdbuf.commands.take().unwrap(),
));
}
if !cmdbuf.is_finished() {
continue;
}
required_buffer_inits
.add(&cmdbuf.buffer_memory_init_actions, &mut *buffer_guard)?;
// optimize the tracked states
cmdbuf.trackers.optimize();
for sc_id in cmdbuf.used_swap_chains.drain(..) {
let sc = &mut swap_chain_guard[sc_id.value];
@@ -689,9 +667,6 @@ impl<G: GlobalIdentityHandlerFactory> Global<G> {
}
}
// optimize the tracked states
cmdbuf.trackers.optimize();
// update submission IDs
for id in cmdbuf.trackers.buffers.used() {
let buffer = &mut buffer_guard[id];
@@ -774,6 +749,10 @@ impl<G: GlobalIdentityHandlerFactory> Global<G> {
}
log::trace!("Device after submission {}: {:#?}", submit_index, trackers);
drop(trackers);
if !required_buffer_inits.map.is_empty() {
device.initialize_buffer_memory(required_buffer_inits, &mut *buffer_guard)?;
}
}
// now prepare the GPU submission
@@ -781,18 +760,24 @@ impl<G: GlobalIdentityHandlerFactory> Global<G> {
.raw
.create_fence(false)
.or(Err(DeviceError::OutOfMemory))?;
let command_buffers = pending_write_command_buffer.as_ref().into_iter().chain(
command_buffer_ids.iter().flat_map(|&cmd_buf_id| {
command_buffer_guard.get(cmd_buf_id).unwrap().raw.iter()
}),
);
let signal_semaphores = signal_swapchain_semaphores
.into_iter()
.map(|sc_id| &swap_chain_guard[sc_id].semaphore);
//Note: we could technically avoid the heap Vec here
let mut command_buffers = Vec::new();
command_buffers.extend(pending_write_command_buffer.as_ref());
for &cmd_buf_id in command_buffer_ids.iter() {
match command_buffer_guard.get(cmd_buf_id) {
Ok(cmd_buf) if cmd_buf.is_finished() => {
command_buffers.extend(cmd_buf.raw.iter());
}
_ => {}
}
}
unsafe {
device.queue_group.queues[0].submit(
command_buffers,
command_buffers.into_iter(),
iter::empty(),
signal_semaphores,
Some(&mut fence),