429: Defer command buffer recycling until allocate r=kvark a=LaylConway

As discussed on Matrix, this change defers command buffer recycling until allocation.
This lets a rendering thread clean up its own command buffers, making sure only the owning thread accesses anything from its command pool.

I have also added a TODO comment about cleaning up thread pools, which currently isn't happening as far as I can tell. This previously was an issue as well but now is a slightly bigger one as the command buffer wouldn't get recycled either.

I figured that, for now, fixing that issue is out of the scope of this PR. This PR focuses only on resolving the race condition causing the validation error.

This fixes #423

Co-authored-by: Layl <2385329-layl@users.noreply.gitlab.com>
This commit is contained in:
bors[bot]
2020-01-02 04:55:21 +00:00
committed by GitHub
2 changed files with 52 additions and 49 deletions

View File

@@ -22,9 +22,37 @@ use std::{collections::HashMap, sync::atomic::Ordering, thread};
struct CommandPool<B: hal::Backend> {
raw: B::CommandPool,
available: Vec<B::CommandBuffer>,
pending: Vec<CommandBuffer<B>>,
}
impl<B: hal::Backend> CommandPool<B> {
fn maintain(&mut self, lowest_active_index: SubmissionIndex) {
for i in (0 .. self.pending.len()).rev() {
let index = self.pending[i]
.life_guard
.submission_index
.load(Ordering::Acquire);
if index < lowest_active_index {
let cmd_buf = self.pending.swap_remove(i);
log::trace!(
"recycling comb submitted in {} when {} is done",
index,
lowest_active_index,
);
self.recycle(cmd_buf);
}
}
}
fn recycle(&mut self, cmd_buf: CommandBuffer<B>) {
for mut raw in cmd_buf.raw {
unsafe {
raw.reset(false);
}
self.available.push(raw);
}
}
fn allocate(&mut self) -> B::CommandBuffer {
if self.available.is_empty() {
let extra = unsafe { self.raw.allocate_vec(20, hal::command::Level::Primary) };
@@ -37,20 +65,9 @@ impl<B: hal::Backend> CommandPool<B> {
#[derive(Debug)]
struct Inner<B: hal::Backend> {
// TODO: Currently pools from threads that are stopped or no longer call into wgpu will never be
// cleaned up.
pools: HashMap<thread::ThreadId, CommandPool<B>>,
pending: Vec<CommandBuffer<B>>,
}
impl<B: hal::Backend> Inner<B> {
fn recycle(&mut self, cmd_buf: CommandBuffer<B>) {
let pool = self.pools.get_mut(&cmd_buf.recorded_thread_id).unwrap();
for mut raw in cmd_buf.raw {
unsafe {
raw.reset(false);
}
pool.available.push(raw);
}
}
}
#[derive(Debug)]
@@ -65,6 +82,7 @@ impl<B: GfxBackend> CommandAllocator<B> {
device_id: Stored<DeviceId>,
device: &B::Device,
features: Features,
lowest_active_index: SubmissionIndex,
) -> CommandBuffer<B> {
//debug_assert_eq!(device_id.backend(), B::VARIANT);
let thread_id = thread::current().id();
@@ -79,7 +97,12 @@ impl<B: GfxBackend> CommandAllocator<B> {
}
.unwrap(),
available: Vec::new(),
pending: Vec::new(),
});
// Recycle completed command buffers
pool.maintain(lowest_active_index);
let init = pool.allocate();
CommandBuffer {
@@ -101,7 +124,6 @@ impl<B: hal::Backend> CommandAllocator<B> {
queue_family,
inner: Mutex::new(Inner {
pools: HashMap::new(),
pending: Vec::new(),
}),
}
}
@@ -124,34 +146,19 @@ impl<B: hal::Backend> CommandAllocator<B> {
.life_guard
.submission_index
.store(submit_index, Ordering::Release);
self.inner.lock().pending.push(cmd_buf);
}
pub fn maintain(&self, last_done: SubmissionIndex) {
// Record this command buffer as pending
let mut inner = self.inner.lock();
for i in (0 .. inner.pending.len()).rev() {
let index = inner.pending[i]
.life_guard
.submission_index
.load(Ordering::Acquire);
if index <= last_done {
let cmd_buf = inner.pending.swap_remove(i);
log::trace!(
"recycling comb submitted in {} when {} is done",
index,
last_done
);
inner.recycle(cmd_buf);
}
}
let pool = inner.pools.get_mut(&cmd_buf.recorded_thread_id).unwrap();
pool.pending.push(cmd_buf);
}
pub fn destroy(self, device: &B::Device) {
let mut inner = self.inner.lock();
while let Some(cmd_buf) = inner.pending.pop() {
inner.recycle(cmd_buf);
}
for (_, mut pool) in inner.pools.drain() {
while let Some(cmd_buf) = pool.pending.pop() {
pool.recycle(cmd_buf);
}
unsafe {
pool.raw.free(pool.available);
device.destroy_command_pool(pool.raw);

View File

@@ -200,7 +200,7 @@ impl<B: GfxBackend> PendingResources<B> {
heaps_mutex: &Mutex<Heaps<B>>,
descriptor_allocator_mutex: &Mutex<DescriptorAllocator<B>>,
force_wait: bool,
) -> SubmissionIndex {
) {
if force_wait && !self.active.is_empty() {
let status = unsafe {
device.wait_for_fences(
@@ -219,11 +219,6 @@ impl<B: GfxBackend> PendingResources<B> {
.iter()
.position(|a| unsafe { !device.get_fence_status(&a.fence).unwrap() })
.unwrap_or(self.active.len());
let last_done = if done_count != 0 {
self.active[done_count - 1].index
} else {
return 0;
};
for a in self.active.drain(.. done_count) {
log::trace!("Active submission {} is done", a.index);
@@ -260,8 +255,6 @@ impl<B: GfxBackend> PendingResources<B> {
},
}
}
last_done
}
fn triage_referenced<F: AllIdentityFilter>(
@@ -590,7 +583,7 @@ impl<B: GfxBackend> Device<B> {
pending.triage_referenced(global, &mut *trackers, token);
pending.triage_mapped(global, token);
pending.triage_framebuffers(global, &mut *self.framebuffers.lock(), token);
let last_done = pending.cleanup(
pending.cleanup(
&self.raw,
&self.mem_allocator,
&self.desc_allocator,
@@ -602,10 +595,6 @@ impl<B: GfxBackend> Device<B> {
self.desc_allocator.lock().cleanup(&self.raw);
}
if last_done != 0 {
self.com_allocator.maintain(last_done);
}
callbacks
}
@@ -1560,9 +1549,16 @@ impl<F: IdentityFilter<CommandEncoderId>> Global<F> {
value: device_id,
ref_count: device.life_guard.ref_count.clone(),
};
// The first entry in the active list should have the lowest index
let lowest_active_index = device.pending.lock()
.active.get(0)
.map(|active| active.index)
.unwrap_or(0);
let mut comb = device
.com_allocator
.allocate(dev_stored, &device.raw, device.features);
.allocate(dev_stored, &device.raw, device.features, lowest_active_index);
unsafe {
comb.raw.last_mut().unwrap().begin(
hal::command::CommandBufferFlags::ONE_TIME_SUBMIT,