diff --git a/crates/primitives/src/compression/mod.rs b/crates/primitives/src/compression/mod.rs index 0f0ae7a6b3..200b6bc436 100644 --- a/crates/primitives/src/compression/mod.rs +++ b/crates/primitives/src/compression/mod.rs @@ -6,38 +6,42 @@ pub static RECEIPT_DICTIONARY: &[u8] = include_bytes!("./receipt_dictionary.bin" /// Compression/Decompression dictionary for `Transaction`. pub static TRANSACTION_DICTIONARY: &[u8] = include_bytes!("./transaction_dictionary.bin"); -// Reason for using static compressors is that dictionaries can be quite big, and zstd-rs -// recommends to use one context/compressor per thread. Thus the usage of `thread_local`. +// We use `thread_local` compressors and decompressors because dictionaries can be quite big, and +// zstd-rs recommends to use one context/compressor per thread thread_local! { /// Thread Transaction compressor. pub static TRANSACTION_COMPRESSOR: RefCell> = RefCell::new( Compressor::with_dictionary(0, TRANSACTION_DICTIONARY) - .expect("Failed to initialize compressor."), + .expect("failed to initialize transaction compressor"), ); /// Thread Transaction decompressor. - pub static TRANSACTION_DECOMPRESSOR: RefCell = RefCell::new( - ReusableDecompressor::new(Decompressor::with_dictionary(TRANSACTION_DICTIONARY).expect("Failed to initialize decompressor.")) - ); + pub static TRANSACTION_DECOMPRESSOR: RefCell = + RefCell::new(ReusableDecompressor::new( + Decompressor::with_dictionary(TRANSACTION_DICTIONARY) + .expect("failed to initialize transaction decompressor"), + )); /// Thread receipt compressor. pub static RECEIPT_COMPRESSOR: RefCell> = RefCell::new( Compressor::with_dictionary(0, RECEIPT_DICTIONARY) - .expect("Failed to initialize compressor."), + .expect("failed to initialize receipt compressor"), ); /// Thread receipt decompressor. - pub static RECEIPT_DECOMPRESSOR: RefCell = RefCell::new( - ReusableDecompressor::new(Decompressor::with_dictionary(RECEIPT_DICTIONARY).expect("Failed to initialize decompressor.")) -); + pub static RECEIPT_DECOMPRESSOR: RefCell = + RefCell::new(ReusableDecompressor::new( + Decompressor::with_dictionary(RECEIPT_DICTIONARY) + .expect("failed to initialize receipt decompressor"), + )); } /// Reusable decompressor that uses its own internal buffer. #[allow(missing_debug_implementations)] pub struct ReusableDecompressor { - /// zstd decompressor + /// The `zstd` decompressor. decompressor: Decompressor<'static>, - /// buffer to decompress to. + /// The buffer to decompress to. buf: Vec, } @@ -48,17 +52,48 @@ impl ReusableDecompressor { /// Decompresses `src` reusing the decompressor and its internal buffer. pub fn decompress(&mut self, src: &[u8]) -> &[u8] { - // `decompress_to_buffer` will return an error if the output buffer doesn't have - // enough capacity. However we don't actually have information on the required - // length. So we hope for the best, and keep trying again with a fairly bigger size - // if it fails. + // If the decompression fails because the buffer is too small, we try to reserve more space + // by getting the upper bound and retry the decompression. + let mut reserved_upper_bound = false; while let Err(err) = self.decompressor.decompress_to_buffer(src, &mut self.buf) { let err = err.to_string(); if !err.contains("Destination buffer is too small") { - panic!("Failed to decompress: {err}"); + panic!("Failed to decompress {} bytes: {err}", src.len()); } - self.buf.reserve(self.buf.capacity() + 24_000); + + let additional = 'b: { + // Try to get the upper bound of the decompression for the given source. + // Do this only once as it might be expensive and will be the same for the same + // source. + if !reserved_upper_bound { + reserved_upper_bound = true; + if let Some(upper_bound) = Decompressor::upper_bound(src) { + if let Some(additional) = upper_bound.checked_sub(self.buf.capacity()) { + break 'b additional; + } + } + } + + // Otherwise, double the capacity of the buffer. + // This should normally not be reached as the upper bound should be enough. + self.buf.capacity() + 24_000 + }; + self.reserve(additional, src.len()); } + + // `decompress_to_buffer` sets the length of the vector to the number of bytes written, so + // we can safely return it as a slice. &self.buf } + + #[track_caller] + fn reserve(&mut self, additional: usize, src_len: usize) { + if let Err(e) = self.buf.try_reserve(additional) { + panic!( + "failed to allocate to {existing} + {additional} bytes \ + for the decompression of {src_len} bytes: {e}", + existing = self.buf.capacity(), + ); + } + } } diff --git a/crates/primitives/src/transaction/mod.rs b/crates/primitives/src/transaction/mod.rs index 817271ae33..5e9890fbf9 100644 --- a/crates/primitives/src/transaction/mod.rs +++ b/crates/primitives/src/transaction/mod.rs @@ -958,7 +958,7 @@ impl Compact for TransactionSignedNoHash { let zstd_bit = bitflags >> 3; let (transaction, buf) = if zstd_bit != 0 { TRANSACTION_DECOMPRESSOR.with(|decompressor| { - let decompressor = &mut decompressor.borrow_mut(); + let mut decompressor = decompressor.borrow_mut(); // TODO: enforce that zstd is only present at a "top" level type