Compare commits

...

1 Commits

Author SHA1 Message Date
Sergei Shulepov
c825c7f7a4 perf(trie): make update_leaves parallelism threshold batch-scoped
Previously min_updates was checked per-subtrie, so small subtries were
always updated inline even when the overall batch was large enough to
benefit from parallelism. This changes the threshold to a batch-level
check so that once the batch is large enough, all subtries are deferred
for parallel processing.

Amp-Thread-ID: https://ampcode.com/threads/T-019ce331-57d8-74b3-9fa3-b58a3345623d
Co-authored-by: Amp <amp@ampcode.com>
2026-03-12 18:12:01 +00:00

View File

@@ -2134,6 +2134,21 @@ impl ArenaParallelSparseTrie {
Self::merge_subtrie_updates(&mut self.buffers.updates, &mut subtrie.buffers.updates);
}
/// Returns true if the current `update_leaves` batch should defer revealed subtries for
/// parallel processing. Will always return false in nostd builds.
const fn is_update_leaves_parallelism_enabled(&self, num_updates: usize) -> bool {
#[cfg(not(feature = "std"))]
{
let _ = num_updates;
false
}
#[cfg(feature = "std")]
{
num_updates >= self.parallelism_thresholds.min_updates
}
}
}
impl SparseTrie for ArenaParallelSparseTrie {
@@ -2763,7 +2778,7 @@ impl SparseTrie for ArenaParallelSparseTrie {
updates.drain().map(|(key, update)| (key, Nibbles::unpack(key), update)).collect();
sorted.sort_unstable_by_key(|entry| entry.1);
let threshold = self.parallelism_thresholds.min_updates;
let parallelize_subtries = self.is_update_leaves_parallelism_enabled(sorted.len());
let mut cursor = mem::take(&mut self.buffers.cursor);
cursor.reset(&self.upper_arena, self.root, Nibbles::default());
@@ -2824,8 +2839,10 @@ impl SparseTrie for ArenaParallelSparseTrie {
let num_subtrie_updates = update_idx - subtrie_start;
if num_subtrie_updates >= threshold {
// Take subtrie for parallel update.
if parallelize_subtries {
// Take subtrie for deferred update. Once the batch-level threshold is
// met, even small subtries are processed together so distributed work
// does not get stranded on the serial path.
trace!(target: TRACE_TARGET, ?subtrie_root_path, num_subtrie_updates, "Taking subtrie for parallel update");
let ArenaSparseNode::Subtrie(subtrie) = mem::replace(
&mut self.upper_arena[child_idx],
@@ -3191,6 +3208,22 @@ mod tests {
changeset
}
#[test]
fn update_leaves_parallelism_threshold_is_batch_scoped() {
let trie = ArenaParallelSparseTrie::default().with_parallelism_thresholds(
ArenaParallelismThresholds {
min_dirty_leaves: 64,
min_revealed_nodes: 16,
min_updates: 8,
min_leaves_for_prune: 128,
},
);
assert!(!trie.is_update_leaves_parallelism_enabled(7));
assert!(trie.is_update_leaves_parallelism_enabled(8));
assert!(trie.is_update_leaves_parallelism_enabled(32));
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(1000))]
#[test]