refactor: add with_* compressor utility methods (#21680)

This commit is contained in:
DaniPopes
2026-02-01 21:43:25 +01:00
committed by GitHub
parent 60c3bef1e8
commit 5ef32726db
7 changed files with 75 additions and 96 deletions

View File

@@ -511,9 +511,8 @@ mod compact {
total_length += flags.len() + buffer.len();
buf.put_slice(&flags);
if zstd {
reth_zstd_compressors::RECEIPT_COMPRESSOR.with(|compressor| {
let compressed =
compressor.borrow_mut().compress(&buffer).expect("Failed to compress.");
reth_zstd_compressors::with_receipt_compressor(|compressor| {
let compressed = compressor.compress(&buffer).expect("Failed to compress.");
buf.put(compressed.as_slice());
});
} else {
@@ -525,8 +524,7 @@ mod compact {
fn from_compact(buf: &[u8], _len: usize) -> (Self, &[u8]) {
let (flags, mut buf) = ReceiptFlags::from(buf);
if flags.__zstd() != 0 {
reth_zstd_compressors::RECEIPT_DECOMPRESSOR.with(|decompressor| {
let decompressor = &mut decompressor.borrow_mut();
reth_zstd_compressors::with_receipt_decompressor(|decompressor| {
let decompressed = decompressor.decompress(buf);
let original_buf = buf;
let mut buf: &[u8] = decompressed;

View File

@@ -577,19 +577,11 @@ impl reth_codecs::Compact for TransactionSigned {
let tx_bits = if zstd_bit {
let mut tmp = Vec::with_capacity(256);
if cfg!(feature = "std") {
reth_zstd_compressors::TRANSACTION_COMPRESSOR.with(|compressor| {
let mut compressor = compressor.borrow_mut();
let tx_bits = self.transaction.to_compact(&mut tmp);
buf.put_slice(&compressor.compress(&tmp).expect("Failed to compress"));
tx_bits as u8
})
} else {
let mut compressor = reth_zstd_compressors::create_tx_compressor();
reth_zstd_compressors::with_tx_compressor(|compressor| {
let tx_bits = self.transaction.to_compact(&mut tmp);
buf.put_slice(&compressor.compress(&tmp).expect("Failed to compress"));
tx_bits as u8
}
})
} else {
self.transaction.to_compact(buf) as u8
};
@@ -611,26 +603,13 @@ impl reth_codecs::Compact for TransactionSigned {
let zstd_bit = bitflags >> 3;
let (transaction, buf) = if zstd_bit != 0 {
if cfg!(feature = "std") {
reth_zstd_compressors::TRANSACTION_DECOMPRESSOR.with(|decompressor| {
let mut decompressor = decompressor.borrow_mut();
// TODO: enforce that zstd is only present at a "top" level type
let transaction_type = (bitflags & 0b110) >> 1;
let (transaction, _) =
Transaction::from_compact(decompressor.decompress(buf), transaction_type);
(transaction, buf)
})
} else {
let mut decompressor = reth_zstd_compressors::create_tx_decompressor();
reth_zstd_compressors::with_tx_decompressor(|decompressor| {
// TODO: enforce that zstd is only present at a "top" level type
let transaction_type = (bitflags & 0b110) >> 1;
let (transaction, _) =
Transaction::from_compact(decompressor.decompress(buf), transaction_type);
(transaction, buf)
}
})
} else {
let transaction_type = bitflags >> 1;
Transaction::from_compact(buf, transaction_type)

View File

@@ -435,19 +435,11 @@ impl reth_codecs::Compact for OpTransactionSigned {
let tx_bits = if zstd_bit {
let mut tmp = Vec::with_capacity(256);
if cfg!(feature = "std") {
reth_zstd_compressors::TRANSACTION_COMPRESSOR.with(|compressor| {
let mut compressor = compressor.borrow_mut();
let tx_bits = self.transaction.to_compact(&mut tmp);
buf.put_slice(&compressor.compress(&tmp).expect("Failed to compress"));
tx_bits as u8
})
} else {
let mut compressor = reth_zstd_compressors::create_tx_compressor();
reth_zstd_compressors::with_tx_compressor(|compressor| {
let tx_bits = self.transaction.to_compact(&mut tmp);
buf.put_slice(&compressor.compress(&tmp).expect("Failed to compress"));
tx_bits as u8
}
})
} else {
self.transaction.to_compact(buf) as u8
};
@@ -469,29 +461,15 @@ impl reth_codecs::Compact for OpTransactionSigned {
let zstd_bit = bitflags >> 3;
let (transaction, buf) = if zstd_bit != 0 {
if cfg!(feature = "std") {
reth_zstd_compressors::TRANSACTION_DECOMPRESSOR.with(|decompressor| {
let mut decompressor = decompressor.borrow_mut();
// TODO: enforce that zstd is only present at a "top" level type
let transaction_type = (bitflags & 0b110) >> 1;
let (transaction, _) = OpTypedTransaction::from_compact(
decompressor.decompress(buf),
transaction_type,
);
(transaction, buf)
})
} else {
let mut decompressor = reth_zstd_compressors::create_tx_decompressor();
reth_zstd_compressors::with_tx_decompressor(|decompressor| {
// TODO: enforce that zstd is only present at a "top" level type
let transaction_type = (bitflags & 0b110) >> 1;
let (transaction, _) = OpTypedTransaction::from_compact(
decompressor.decompress(buf),
transaction_type,
);
(transaction, buf)
}
})
} else {
let transaction_type = bitflags >> 1;
OpTypedTransaction::from_compact(buf, transaction_type)

View File

@@ -133,8 +133,7 @@ fn generate_from_compact(
let decompressor = zstd.decompressor;
quote! {
if flags.__zstd() != 0 {
#decompressor.with(|decompressor| {
let decompressor = &mut decompressor.borrow_mut();
#decompressor(|decompressor| {
let decompressed = decompressor.decompress(buf);
let mut original_buf = buf;
@@ -203,9 +202,7 @@ fn generate_to_compact(
let compressor = zstd.compressor;
lines.push(quote! {
if zstd {
#compressor.with(|compressor| {
let mut compressor = compressor.borrow_mut();
#compressor(|compressor| {
let compressed = compressor.compress(&buffer).expect("Failed to compress.");
buf.put(compressed.as_slice());
});

View File

@@ -10,8 +10,8 @@ use reth_codecs_derive::CompactZstd;
#[derive(CompactZstd)]
#[reth_codecs(crate = "crate")]
#[reth_zstd(
compressor = reth_zstd_compressors::RECEIPT_COMPRESSOR,
decompressor = reth_zstd_compressors::RECEIPT_DECOMPRESSOR
compressor = reth_zstd_compressors::with_receipt_compressor,
decompressor = reth_zstd_compressors::with_receipt_decompressor
)]
struct CompactOpReceipt<'a> {
tx_type: OpTxType,

View File

@@ -149,20 +149,9 @@ impl<T: Envelope + ToTxCompact + Transaction + Send + Sync> CompactEnvelope for
self.to_tx_compact(&mut tx_buf);
buf.put_slice(
&{
#[cfg(feature = "std")]
{
reth_zstd_compressors::TRANSACTION_COMPRESSOR.with(|compressor| {
let mut compressor = compressor.borrow_mut();
compressor.compress(&tx_buf)
})
}
#[cfg(not(feature = "std"))]
{
let mut compressor = reth_zstd_compressors::create_tx_compressor();
compressor.compress(&tx_buf)
}
}
&reth_zstd_compressors::with_tx_compressor(|compressor| {
compressor.compress(&tx_buf)
})
.expect("Failed to compress"),
);
tx_bits
@@ -188,27 +177,12 @@ impl<T: Envelope + ToTxCompact + Transaction + Send + Sync> CompactEnvelope for
let (signature, buf) = Signature::from_compact(buf, sig_bit);
let (transaction, buf) = if zstd_bit != 0 {
#[cfg(feature = "std")]
{
reth_zstd_compressors::TRANSACTION_DECOMPRESSOR.with(|decompressor| {
let mut decompressor = decompressor.borrow_mut();
let decompressed = decompressor.decompress(buf);
let (tx_type, tx_buf) = T::TxType::from_compact(decompressed, tx_bits);
let (tx, _) = Self::from_tx_compact(tx_buf, tx_type, signature);
(tx, buf)
})
}
#[cfg(not(feature = "std"))]
{
let mut decompressor = reth_zstd_compressors::create_tx_decompressor();
reth_zstd_compressors::with_tx_decompressor(|decompressor| {
let decompressed = decompressor.decompress(buf);
let (tx_type, tx_buf) = T::TxType::from_compact(decompressed, tx_bits);
let (tx, _) = Self::from_tx_compact(tx_buf, tx_type, signature);
(tx, buf)
}
})
} else {
let (tx_type, buf) = T::TxType::from_compact(buf, tx_bits);
Self::from_tx_compact(buf, tx_type, signature)

View File

@@ -86,6 +86,59 @@ pub fn create_receipt_decompressor() -> ReusableDecompressor {
)
}
/// Executes `f` with the thread-local transaction compressor on `std`, otherwise creates a new one.
#[inline]
pub fn with_tx_compressor<R>(f: impl FnOnce(&mut Compressor<'_>) -> R) -> R {
#[cfg(feature = "std")]
{
TRANSACTION_COMPRESSOR.with_borrow_mut(f)
}
#[cfg(not(feature = "std"))]
{
f(&mut create_tx_compressor())
}
}
/// Executes `f` with the thread-local transaction decompressor on `std`, otherwise creates a new
/// one.
#[inline]
pub fn with_tx_decompressor<R>(f: impl FnOnce(&mut ReusableDecompressor) -> R) -> R {
#[cfg(feature = "std")]
{
TRANSACTION_DECOMPRESSOR.with_borrow_mut(f)
}
#[cfg(not(feature = "std"))]
{
f(&mut create_tx_decompressor())
}
}
/// Executes `f` with the thread-local receipt compressor on `std`, otherwise creates a new one.
#[inline]
pub fn with_receipt_compressor<R>(f: impl FnOnce(&mut Compressor<'_>) -> R) -> R {
#[cfg(feature = "std")]
{
RECEIPT_COMPRESSOR.with_borrow_mut(f)
}
#[cfg(not(feature = "std"))]
{
f(&mut create_receipt_compressor())
}
}
/// Executes `f` with the thread-local receipt decompressor on `std`, otherwise creates a new one.
#[inline]
pub fn with_receipt_decompressor<R>(f: impl FnOnce(&mut ReusableDecompressor) -> R) -> R {
#[cfg(feature = "std")]
{
RECEIPT_DECOMPRESSOR.with_borrow_mut(f)
}
#[cfg(not(feature = "std"))]
{
f(&mut create_receipt_decompressor())
}
}
/// Reusable decompressor that uses its own internal buffer.
#[expect(missing_debug_implementations)]
pub struct ReusableDecompressor {