mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-29 17:18:08 -05:00
chore: reserve an upper bound in ReusableDecompressor (#7523)
This commit is contained in:
@@ -6,38 +6,42 @@ pub static RECEIPT_DICTIONARY: &[u8] = include_bytes!("./receipt_dictionary.bin"
|
||||
/// Compression/Decompression dictionary for `Transaction`.
|
||||
pub static TRANSACTION_DICTIONARY: &[u8] = include_bytes!("./transaction_dictionary.bin");
|
||||
|
||||
// Reason for using static compressors is that dictionaries can be quite big, and zstd-rs
|
||||
// recommends to use one context/compressor per thread. Thus the usage of `thread_local`.
|
||||
// We use `thread_local` compressors and decompressors because dictionaries can be quite big, and
|
||||
// zstd-rs recommends to use one context/compressor per thread
|
||||
thread_local! {
|
||||
/// Thread Transaction compressor.
|
||||
pub static TRANSACTION_COMPRESSOR: RefCell<Compressor<'static>> = RefCell::new(
|
||||
Compressor::with_dictionary(0, TRANSACTION_DICTIONARY)
|
||||
.expect("Failed to initialize compressor."),
|
||||
.expect("failed to initialize transaction compressor"),
|
||||
);
|
||||
|
||||
/// Thread Transaction decompressor.
|
||||
pub static TRANSACTION_DECOMPRESSOR: RefCell<ReusableDecompressor> = RefCell::new(
|
||||
ReusableDecompressor::new(Decompressor::with_dictionary(TRANSACTION_DICTIONARY).expect("Failed to initialize decompressor."))
|
||||
);
|
||||
pub static TRANSACTION_DECOMPRESSOR: RefCell<ReusableDecompressor> =
|
||||
RefCell::new(ReusableDecompressor::new(
|
||||
Decompressor::with_dictionary(TRANSACTION_DICTIONARY)
|
||||
.expect("failed to initialize transaction decompressor"),
|
||||
));
|
||||
|
||||
/// Thread receipt compressor.
|
||||
pub static RECEIPT_COMPRESSOR: RefCell<Compressor<'static>> = RefCell::new(
|
||||
Compressor::with_dictionary(0, RECEIPT_DICTIONARY)
|
||||
.expect("Failed to initialize compressor."),
|
||||
.expect("failed to initialize receipt compressor"),
|
||||
);
|
||||
|
||||
/// Thread receipt decompressor.
|
||||
pub static RECEIPT_DECOMPRESSOR: RefCell<ReusableDecompressor> = RefCell::new(
|
||||
ReusableDecompressor::new(Decompressor::with_dictionary(RECEIPT_DICTIONARY).expect("Failed to initialize decompressor."))
|
||||
);
|
||||
pub static RECEIPT_DECOMPRESSOR: RefCell<ReusableDecompressor> =
|
||||
RefCell::new(ReusableDecompressor::new(
|
||||
Decompressor::with_dictionary(RECEIPT_DICTIONARY)
|
||||
.expect("failed to initialize receipt decompressor"),
|
||||
));
|
||||
}
|
||||
|
||||
/// Reusable decompressor that uses its own internal buffer.
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct ReusableDecompressor {
|
||||
/// zstd decompressor
|
||||
/// The `zstd` decompressor.
|
||||
decompressor: Decompressor<'static>,
|
||||
/// buffer to decompress to.
|
||||
/// The buffer to decompress to.
|
||||
buf: Vec<u8>,
|
||||
}
|
||||
|
||||
@@ -48,17 +52,48 @@ impl ReusableDecompressor {
|
||||
|
||||
/// Decompresses `src` reusing the decompressor and its internal buffer.
|
||||
pub fn decompress(&mut self, src: &[u8]) -> &[u8] {
|
||||
// `decompress_to_buffer` will return an error if the output buffer doesn't have
|
||||
// enough capacity. However we don't actually have information on the required
|
||||
// length. So we hope for the best, and keep trying again with a fairly bigger size
|
||||
// if it fails.
|
||||
// If the decompression fails because the buffer is too small, we try to reserve more space
|
||||
// by getting the upper bound and retry the decompression.
|
||||
let mut reserved_upper_bound = false;
|
||||
while let Err(err) = self.decompressor.decompress_to_buffer(src, &mut self.buf) {
|
||||
let err = err.to_string();
|
||||
if !err.contains("Destination buffer is too small") {
|
||||
panic!("Failed to decompress: {err}");
|
||||
panic!("Failed to decompress {} bytes: {err}", src.len());
|
||||
}
|
||||
self.buf.reserve(self.buf.capacity() + 24_000);
|
||||
|
||||
let additional = 'b: {
|
||||
// Try to get the upper bound of the decompression for the given source.
|
||||
// Do this only once as it might be expensive and will be the same for the same
|
||||
// source.
|
||||
if !reserved_upper_bound {
|
||||
reserved_upper_bound = true;
|
||||
if let Some(upper_bound) = Decompressor::upper_bound(src) {
|
||||
if let Some(additional) = upper_bound.checked_sub(self.buf.capacity()) {
|
||||
break 'b additional;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Otherwise, double the capacity of the buffer.
|
||||
// This should normally not be reached as the upper bound should be enough.
|
||||
self.buf.capacity() + 24_000
|
||||
};
|
||||
self.reserve(additional, src.len());
|
||||
}
|
||||
|
||||
// `decompress_to_buffer` sets the length of the vector to the number of bytes written, so
|
||||
// we can safely return it as a slice.
|
||||
&self.buf
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
fn reserve(&mut self, additional: usize, src_len: usize) {
|
||||
if let Err(e) = self.buf.try_reserve(additional) {
|
||||
panic!(
|
||||
"failed to allocate to {existing} + {additional} bytes \
|
||||
for the decompression of {src_len} bytes: {e}",
|
||||
existing = self.buf.capacity(),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -958,7 +958,7 @@ impl Compact for TransactionSignedNoHash {
|
||||
let zstd_bit = bitflags >> 3;
|
||||
let (transaction, buf) = if zstd_bit != 0 {
|
||||
TRANSACTION_DECOMPRESSOR.with(|decompressor| {
|
||||
let decompressor = &mut decompressor.borrow_mut();
|
||||
let mut decompressor = decompressor.borrow_mut();
|
||||
|
||||
// TODO: enforce that zstd is only present at a "top" level type
|
||||
|
||||
|
||||
Reference in New Issue
Block a user