diff --git a/crates/ethereum/primitives/src/receipt.rs b/crates/ethereum/primitives/src/receipt.rs index 306eae8d75..8172a5f1bd 100644 --- a/crates/ethereum/primitives/src/receipt.rs +++ b/crates/ethereum/primitives/src/receipt.rs @@ -511,9 +511,8 @@ mod compact { total_length += flags.len() + buffer.len(); buf.put_slice(&flags); if zstd { - reth_zstd_compressors::RECEIPT_COMPRESSOR.with(|compressor| { - let compressed = - compressor.borrow_mut().compress(&buffer).expect("Failed to compress."); + reth_zstd_compressors::with_receipt_compressor(|compressor| { + let compressed = compressor.compress(&buffer).expect("Failed to compress."); buf.put(compressed.as_slice()); }); } else { @@ -525,8 +524,7 @@ mod compact { fn from_compact(buf: &[u8], _len: usize) -> (Self, &[u8]) { let (flags, mut buf) = ReceiptFlags::from(buf); if flags.__zstd() != 0 { - reth_zstd_compressors::RECEIPT_DECOMPRESSOR.with(|decompressor| { - let decompressor = &mut decompressor.borrow_mut(); + reth_zstd_compressors::with_receipt_decompressor(|decompressor| { let decompressed = decompressor.decompress(buf); let original_buf = buf; let mut buf: &[u8] = decompressed; diff --git a/crates/ethereum/primitives/src/transaction.rs b/crates/ethereum/primitives/src/transaction.rs index f4de2994e3..45c9075b1d 100644 --- a/crates/ethereum/primitives/src/transaction.rs +++ b/crates/ethereum/primitives/src/transaction.rs @@ -577,19 +577,11 @@ impl reth_codecs::Compact for TransactionSigned { let tx_bits = if zstd_bit { let mut tmp = Vec::with_capacity(256); - if cfg!(feature = "std") { - reth_zstd_compressors::TRANSACTION_COMPRESSOR.with(|compressor| { - let mut compressor = compressor.borrow_mut(); - let tx_bits = self.transaction.to_compact(&mut tmp); - buf.put_slice(&compressor.compress(&tmp).expect("Failed to compress")); - tx_bits as u8 - }) - } else { - let mut compressor = reth_zstd_compressors::create_tx_compressor(); + reth_zstd_compressors::with_tx_compressor(|compressor| { let tx_bits = self.transaction.to_compact(&mut tmp); buf.put_slice(&compressor.compress(&tmp).expect("Failed to compress")); tx_bits as u8 - } + }) } else { self.transaction.to_compact(buf) as u8 }; @@ -611,26 +603,13 @@ impl reth_codecs::Compact for TransactionSigned { let zstd_bit = bitflags >> 3; let (transaction, buf) = if zstd_bit != 0 { - if cfg!(feature = "std") { - reth_zstd_compressors::TRANSACTION_DECOMPRESSOR.with(|decompressor| { - let mut decompressor = decompressor.borrow_mut(); - - // TODO: enforce that zstd is only present at a "top" level type - - let transaction_type = (bitflags & 0b110) >> 1; - let (transaction, _) = - Transaction::from_compact(decompressor.decompress(buf), transaction_type); - - (transaction, buf) - }) - } else { - let mut decompressor = reth_zstd_compressors::create_tx_decompressor(); + reth_zstd_compressors::with_tx_decompressor(|decompressor| { + // TODO: enforce that zstd is only present at a "top" level type let transaction_type = (bitflags & 0b110) >> 1; let (transaction, _) = Transaction::from_compact(decompressor.decompress(buf), transaction_type); - (transaction, buf) - } + }) } else { let transaction_type = bitflags >> 1; Transaction::from_compact(buf, transaction_type) diff --git a/crates/optimism/primitives/src/transaction/signed.rs b/crates/optimism/primitives/src/transaction/signed.rs index 896e62b304..2a83e75cad 100644 --- a/crates/optimism/primitives/src/transaction/signed.rs +++ b/crates/optimism/primitives/src/transaction/signed.rs @@ -435,19 +435,11 @@ impl reth_codecs::Compact for OpTransactionSigned { let tx_bits = if zstd_bit { let mut tmp = Vec::with_capacity(256); - if cfg!(feature = "std") { - reth_zstd_compressors::TRANSACTION_COMPRESSOR.with(|compressor| { - let mut compressor = compressor.borrow_mut(); - let tx_bits = self.transaction.to_compact(&mut tmp); - buf.put_slice(&compressor.compress(&tmp).expect("Failed to compress")); - tx_bits as u8 - }) - } else { - let mut compressor = reth_zstd_compressors::create_tx_compressor(); + reth_zstd_compressors::with_tx_compressor(|compressor| { let tx_bits = self.transaction.to_compact(&mut tmp); buf.put_slice(&compressor.compress(&tmp).expect("Failed to compress")); tx_bits as u8 - } + }) } else { self.transaction.to_compact(buf) as u8 }; @@ -469,29 +461,15 @@ impl reth_codecs::Compact for OpTransactionSigned { let zstd_bit = bitflags >> 3; let (transaction, buf) = if zstd_bit != 0 { - if cfg!(feature = "std") { - reth_zstd_compressors::TRANSACTION_DECOMPRESSOR.with(|decompressor| { - let mut decompressor = decompressor.borrow_mut(); - - // TODO: enforce that zstd is only present at a "top" level type - let transaction_type = (bitflags & 0b110) >> 1; - let (transaction, _) = OpTypedTransaction::from_compact( - decompressor.decompress(buf), - transaction_type, - ); - - (transaction, buf) - }) - } else { - let mut decompressor = reth_zstd_compressors::create_tx_decompressor(); + reth_zstd_compressors::with_tx_decompressor(|decompressor| { + // TODO: enforce that zstd is only present at a "top" level type let transaction_type = (bitflags & 0b110) >> 1; let (transaction, _) = OpTypedTransaction::from_compact( decompressor.decompress(buf), transaction_type, ); - (transaction, buf) - } + }) } else { let transaction_type = bitflags >> 1; OpTypedTransaction::from_compact(buf, transaction_type) diff --git a/crates/storage/codecs/derive/src/compact/generator.rs b/crates/storage/codecs/derive/src/compact/generator.rs index 569cebce22..bdb3d5f8be 100644 --- a/crates/storage/codecs/derive/src/compact/generator.rs +++ b/crates/storage/codecs/derive/src/compact/generator.rs @@ -133,8 +133,7 @@ fn generate_from_compact( let decompressor = zstd.decompressor; quote! { if flags.__zstd() != 0 { - #decompressor.with(|decompressor| { - let decompressor = &mut decompressor.borrow_mut(); + #decompressor(|decompressor| { let decompressed = decompressor.decompress(buf); let mut original_buf = buf; @@ -203,9 +202,7 @@ fn generate_to_compact( let compressor = zstd.compressor; lines.push(quote! { if zstd { - #compressor.with(|compressor| { - let mut compressor = compressor.borrow_mut(); - + #compressor(|compressor| { let compressed = compressor.compress(&buffer).expect("Failed to compress."); buf.put(compressed.as_slice()); }); diff --git a/crates/storage/codecs/src/alloy/optimism.rs b/crates/storage/codecs/src/alloy/optimism.rs index 7a851a5041..4395693978 100644 --- a/crates/storage/codecs/src/alloy/optimism.rs +++ b/crates/storage/codecs/src/alloy/optimism.rs @@ -10,8 +10,8 @@ use reth_codecs_derive::CompactZstd; #[derive(CompactZstd)] #[reth_codecs(crate = "crate")] #[reth_zstd( - compressor = reth_zstd_compressors::RECEIPT_COMPRESSOR, - decompressor = reth_zstd_compressors::RECEIPT_DECOMPRESSOR + compressor = reth_zstd_compressors::with_receipt_compressor, + decompressor = reth_zstd_compressors::with_receipt_decompressor )] struct CompactOpReceipt<'a> { tx_type: OpTxType, diff --git a/crates/storage/codecs/src/alloy/transaction/ethereum.rs b/crates/storage/codecs/src/alloy/transaction/ethereum.rs index 7824f60301..d11c5744e5 100644 --- a/crates/storage/codecs/src/alloy/transaction/ethereum.rs +++ b/crates/storage/codecs/src/alloy/transaction/ethereum.rs @@ -149,20 +149,9 @@ impl CompactEnvelope for self.to_tx_compact(&mut tx_buf); buf.put_slice( - &{ - #[cfg(feature = "std")] - { - reth_zstd_compressors::TRANSACTION_COMPRESSOR.with(|compressor| { - let mut compressor = compressor.borrow_mut(); - compressor.compress(&tx_buf) - }) - } - #[cfg(not(feature = "std"))] - { - let mut compressor = reth_zstd_compressors::create_tx_compressor(); - compressor.compress(&tx_buf) - } - } + &reth_zstd_compressors::with_tx_compressor(|compressor| { + compressor.compress(&tx_buf) + }) .expect("Failed to compress"), ); tx_bits @@ -188,27 +177,12 @@ impl CompactEnvelope for let (signature, buf) = Signature::from_compact(buf, sig_bit); let (transaction, buf) = if zstd_bit != 0 { - #[cfg(feature = "std")] - { - reth_zstd_compressors::TRANSACTION_DECOMPRESSOR.with(|decompressor| { - let mut decompressor = decompressor.borrow_mut(); - let decompressed = decompressor.decompress(buf); - - let (tx_type, tx_buf) = T::TxType::from_compact(decompressed, tx_bits); - let (tx, _) = Self::from_tx_compact(tx_buf, tx_type, signature); - - (tx, buf) - }) - } - #[cfg(not(feature = "std"))] - { - let mut decompressor = reth_zstd_compressors::create_tx_decompressor(); + reth_zstd_compressors::with_tx_decompressor(|decompressor| { let decompressed = decompressor.decompress(buf); let (tx_type, tx_buf) = T::TxType::from_compact(decompressed, tx_bits); let (tx, _) = Self::from_tx_compact(tx_buf, tx_type, signature); - (tx, buf) - } + }) } else { let (tx_type, buf) = T::TxType::from_compact(buf, tx_bits); Self::from_tx_compact(buf, tx_type, signature) diff --git a/crates/storage/zstd-compressors/src/lib.rs b/crates/storage/zstd-compressors/src/lib.rs index 776bb23324..1addc2caa4 100644 --- a/crates/storage/zstd-compressors/src/lib.rs +++ b/crates/storage/zstd-compressors/src/lib.rs @@ -86,6 +86,59 @@ pub fn create_receipt_decompressor() -> ReusableDecompressor { ) } +/// Executes `f` with the thread-local transaction compressor on `std`, otherwise creates a new one. +#[inline] +pub fn with_tx_compressor(f: impl FnOnce(&mut Compressor<'_>) -> R) -> R { + #[cfg(feature = "std")] + { + TRANSACTION_COMPRESSOR.with_borrow_mut(f) + } + #[cfg(not(feature = "std"))] + { + f(&mut create_tx_compressor()) + } +} + +/// Executes `f` with the thread-local transaction decompressor on `std`, otherwise creates a new +/// one. +#[inline] +pub fn with_tx_decompressor(f: impl FnOnce(&mut ReusableDecompressor) -> R) -> R { + #[cfg(feature = "std")] + { + TRANSACTION_DECOMPRESSOR.with_borrow_mut(f) + } + #[cfg(not(feature = "std"))] + { + f(&mut create_tx_decompressor()) + } +} + +/// Executes `f` with the thread-local receipt compressor on `std`, otherwise creates a new one. +#[inline] +pub fn with_receipt_compressor(f: impl FnOnce(&mut Compressor<'_>) -> R) -> R { + #[cfg(feature = "std")] + { + RECEIPT_COMPRESSOR.with_borrow_mut(f) + } + #[cfg(not(feature = "std"))] + { + f(&mut create_receipt_compressor()) + } +} + +/// Executes `f` with the thread-local receipt decompressor on `std`, otherwise creates a new one. +#[inline] +pub fn with_receipt_decompressor(f: impl FnOnce(&mut ReusableDecompressor) -> R) -> R { + #[cfg(feature = "std")] + { + RECEIPT_DECOMPRESSOR.with_borrow_mut(f) + } + #[cfg(not(feature = "std"))] + { + f(&mut create_receipt_decompressor()) + } +} + /// Reusable decompressor that uses its own internal buffer. #[expect(missing_debug_implementations)] pub struct ReusableDecompressor {