From cf8513baf553bae736952d5ac6bfc31ba2246faa Mon Sep 17 00:00:00 2001 From: Dzmitry Malyshau Date: Wed, 6 May 2020 11:15:08 -0400 Subject: [PATCH] Proper maintenance of the command pools --- wgpu-core/src/command/allocator.rs | 101 +++++++++++++++++++---------- wgpu-core/src/device/mod.rs | 7 +- 2 files changed, 70 insertions(+), 38 deletions(-) diff --git a/wgpu-core/src/command/allocator.rs b/wgpu-core/src/command/allocator.rs index 1d89b38623..5a060e1344 100644 --- a/wgpu-core/src/command/allocator.rs +++ b/wgpu-core/src/command/allocator.rs @@ -13,9 +13,12 @@ use parking_lot::Mutex; use std::{collections::HashMap, sync::atomic::Ordering, thread}; +const GROW_AMOUNT: usize = 20; + #[derive(Debug)] struct CommandPool { raw: B::CommandPool, + total: usize, available: Vec, pending: Vec>, } @@ -50,9 +53,13 @@ impl CommandPool { fn allocate(&mut self) -> B::CommandBuffer { if self.available.is_empty() { + self.total += GROW_AMOUNT; unsafe { - self.raw - .allocate(20, hal::command::Level::Primary, &mut self.available) + self.raw.allocate( + GROW_AMOUNT, + hal::command::Level::Primary, + &mut self.available, + ) }; } self.available.pop().unwrap() @@ -61,8 +68,6 @@ impl CommandPool { #[derive(Debug)] struct Inner { - // TODO: Currently pools from threads that are stopped or no longer call into wgpu will never be - // cleaned up. pools: HashMap>, } @@ -79,29 +84,29 @@ impl CommandAllocator { device: &B::Device, limits: wgt::Limits, private_features: PrivateFeatures, - lowest_active_index: SubmissionIndex, #[cfg(feature = "trace")] enable_tracing: bool, ) -> CommandBuffer { //debug_assert_eq!(device_id.backend(), B::VARIANT); let thread_id = thread::current().id(); let mut inner = self.inner.lock(); - let pool = inner.pools.entry(thread_id).or_insert_with(|| CommandPool { - raw: unsafe { - device.create_command_pool( - self.queue_family, - hal::pool::CommandPoolCreateFlags::RESET_INDIVIDUAL, - ) - } - .unwrap(), - available: Vec::new(), - pending: Vec::new(), - }); - - // Recycle completed command buffers - pool.maintain(lowest_active_index); - - let init = pool.allocate(); + let init = inner + .pools + .entry(thread_id) + .or_insert_with(|| CommandPool { + raw: unsafe { + log::info!("Starting on thread {:?}", thread_id); + device.create_command_pool( + self.queue_family, + hal::pool::CommandPoolCreateFlags::RESET_INDIVIDUAL, + ) + } + .unwrap(), + total: 0, + available: Vec::new(), + pending: Vec::new(), + }) + .allocate(); CommandBuffer { raw: vec![init], @@ -135,22 +140,17 @@ impl CommandAllocator { pub fn extend(&self, cmd_buf: &CommandBuffer) -> B::CommandBuffer { let mut inner = self.inner.lock(); - let pool = inner.pools.get_mut(&cmd_buf.recorded_thread_id).unwrap(); - - if pool.available.is_empty() { - unsafe { - pool.raw - .allocate(20, hal::command::Level::Primary, &mut pool.available) - }; - } - - pool.available.pop().unwrap() + inner + .pools + .get_mut(&cmd_buf.recorded_thread_id) + .unwrap() + .allocate() } pub fn discard(&self, mut cmd_buf: CommandBuffer) { cmd_buf.trackers.clear(); - self.inner - .lock() + let mut inner = self.inner.lock(); + inner .pools .get_mut(&cmd_buf.recorded_thread_id) .unwrap() @@ -166,8 +166,32 @@ impl CommandAllocator { // Record this command buffer as pending let mut inner = self.inner.lock(); - let pool = inner.pools.get_mut(&cmd_buf.recorded_thread_id).unwrap(); - pool.pending.push(cmd_buf); + inner + .pools + .get_mut(&cmd_buf.recorded_thread_id) + .unwrap() + .pending + .push(cmd_buf); + } + + pub fn maintain(&self, device: &B::Device, lowest_active_index: SubmissionIndex) { + let mut inner = self.inner.lock(); + let mut remove_threads = Vec::new(); + for (thread_id, pool) in inner.pools.iter_mut() { + pool.maintain(lowest_active_index); + if pool.total == pool.available.len() { + assert!(pool.pending.is_empty()); + remove_threads.push(*thread_id); + } + } + for thread_id in remove_threads { + log::info!("Removing from thread {:?}", thread_id); + let mut pool = inner.pools.remove(&thread_id).unwrap(); + unsafe { + pool.raw.free(pool.available); + device.destroy_command_pool(pool.raw); + } + } } pub fn destroy(self, device: &B::Device) { @@ -176,6 +200,13 @@ impl CommandAllocator { while let Some(cmd_buf) = pool.pending.pop() { pool.recycle(cmd_buf); } + if pool.total != pool.available.len() { + log::error!( + "Some command buffers are still recorded, only tracking {} / {}", + pool.available.len(), + pool.total + ); + } unsafe { pool.raw.free(pool.available); device.destroy_command_pool(pool.raw); diff --git a/wgpu-core/src/device/mod.rs b/wgpu-core/src/device/mod.rs index ba7d955d3d..387cb789e3 100644 --- a/wgpu-core/src/device/mod.rs +++ b/wgpu-core/src/device/mod.rs @@ -301,6 +301,9 @@ impl Device { let _last_done = life_tracker.triage_submissions(&self.raw, force_wait); let callbacks = life_tracker.handle_mapping(global, &self.raw, &self.trackers, token); life_tracker.cleanup(&self.raw, &self.mem_allocator, &self.desc_allocator); + + self.com_allocator + .maintain(&self.raw, life_tracker.lowest_active_submission()); callbacks } @@ -1569,17 +1572,15 @@ impl Global { ref_count: device.life_guard.add_ref(), }; - let lowest_active_index = device.lock_life(&mut token).lowest_active_submission(); - let mut command_buffer = device.com_allocator.allocate( dev_stored, &device.raw, device.limits.clone(), device.private_features, - lowest_active_index, #[cfg(feature = "trace")] device.trace.is_some(), ); + unsafe { let raw_command_buffer = command_buffer.raw.last_mut().unwrap(); if !desc.label.is_null() {