Compare commits

...

7 Commits

Author SHA1 Message Date
Georgios Konstantopoulos
be9933d5b8 test(db): add adversarial tests for MDBX_RESERVE zero-copy writes
Adds 11 additional tests covering edge cases and adversarial scenarios:
- Transaction abort discards reserve-path writes
- Rapid successive overwrites to same key
- Interleaved reserve-path and fallback-path (compressable) writes
- Cursor reuse after multiple reserve operations
- Boundary values (zero, max, patterns)
- Write then delete
- Stress test with 1000 entries
- Alternating read/write transactions
- DupSort with 100 duplicates
- Insert duplicate key fails correctly
- Append order enforcement
2026-01-22 23:59:08 +00:00
Georgios Konstantopoulos
77bc9464e2 test(db): add roundtrip tests for MDBX_RESERVE zero-copy writes
Tests verify that:
- B256 values (uncompressable) roundtrip correctly via reserve path
- insert/append/upsert operations work with reserve path
- DupSort append_dup works with reserve path
- Mixed operations across transactions work correctly
2026-01-22 23:56:49 +00:00
Georgios Konstantopoulos
b936984fff perf(db): integrate MDBX_RESERVE for zero-copy writes
For values that implement uncompressable_ref() (like B256, Address),
use MDBX's reserve API to write directly into memory-mapped pages.

This eliminates one memcpy by:
1. Calling cursor.reserve() to get a mutable slice into MDBX's page
2. Copying data directly into the reserved space

For compressable values, falls back to the existing compress + put path.
2026-01-22 23:28:41 +00:00
Georgios Konstantopoulos
d5fd1b41c9 perf(db-api): implement zero-copy EncodeInto for all fixed-size types
- Replace Clone with Copy bound in blanket impl, use sealed marker trait
- Add direct EncodeInto impls for identity-encoding types (Address, B256)
  that write directly from in-memory representation (single memcpy)
- Add EncodeInto for all composite key types:
  - BlockNumberAddress (28 bytes)
  - BlockNumberHashedAddress (40 bytes)
  - AddressStorageKey (52 bytes)
  - ShardedKey<Address> (28 bytes)
  - StorageShardedKey (60 bytes)
  - ChainStateKey (1 byte)
- Add EncodeInto for integer types (u64, u32, u16, u8)
- Expand benchmark to cover B256

This eliminates all intermediate allocations and redundant copies.
For types like Address/B256, the encoded bytes ARE the in-memory bytes,
so we just copy directly from &self without calling encode().
2026-01-22 23:21:07 +00:00
Georgios Konstantopoulos
eff531b720 fix: use dereference instead of clone for Copy type 2026-01-22 23:19:50 +00:00
Georgios Konstantopoulos
941d47d9e6 fix: resolve type inference error in benchmark 2026-01-22 23:19:50 +00:00
Georgios Konstantopoulos
d42d61f94a perf(db-api): add EncodeInto trait for zero-copy encoding 2026-01-22 23:19:50 +00:00
11 changed files with 1004 additions and 34 deletions

1
Cargo.lock generated
View File

@@ -8104,6 +8104,7 @@ dependencies = [
"arbitrary",
"arrayvec",
"bytes",
"codspeed-criterion-compat",
"derive_more",
"metrics",
"modular-bitfield",

View File

@@ -61,6 +61,11 @@ test-fuzz.workspace = true
arbitrary = { workspace = true, features = ["derive"] }
proptest.workspace = true
proptest-arbitrary-interop.workspace = true
criterion.workspace = true
[[bench]]
name = "encode_into"
harness = false
[features]
test-utils = [

View File

@@ -0,0 +1,67 @@
use alloy_primitives::{Address, B256};
use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput};
use reth_db_api::table::{Encode, EncodeInto};
fn bench_encode_methods(c: &mut Criterion) {
let mut group = c.benchmark_group("encode");
group.throughput(Throughput::Elements(10000));
let addresses: Vec<Address> = (0..10000u64)
.map(|i| {
let mut bytes = [0u8; 32];
bytes[24..32].copy_from_slice(&i.to_be_bytes());
Address::from_word(B256::from(bytes))
})
.collect();
let hashes: Vec<B256> = (0..10000u64)
.map(|i| {
let mut bytes = [0u8; 32];
bytes[24..32].copy_from_slice(&i.to_be_bytes());
B256::from(bytes)
})
.collect();
group.bench_function("Address::encode (returns array, consumes self)", |b| {
b.iter(|| {
for addr in &addresses {
let encoded = black_box((*addr).encode());
black_box(encoded);
}
})
});
group.bench_function("Address::encode_into (zero-copy, borrows self)", |b| {
let mut buf = [0u8; 20];
b.iter(|| {
for addr in &addresses {
addr.encode_into(&mut buf);
black_box(&buf);
}
})
});
group.bench_function("B256::encode (returns array, consumes self)", |b| {
b.iter(|| {
for hash in &hashes {
let encoded = black_box((*hash).encode());
black_box(encoded);
}
})
});
group.bench_function("B256::encode_into (zero-copy, borrows self)", |b| {
let mut buf = [0u8; 32];
b.iter(|| {
for hash in &hashes {
hash.encode_into(&mut buf);
black_box(&buf);
}
})
});
group.finish();
}
criterion_group!(benches, bench_encode_methods);
criterion_main!(benches);

View File

@@ -2,10 +2,10 @@
use crate::{
impl_fixed_arbitrary,
table::{Decode, Encode},
table::{Decode, Encode, EncodeInto},
DatabaseError,
};
use alloy_primitives::{Address, BlockNumber, StorageKey};
use alloy_primitives::{Address, BlockNumber, StorageKey, B256};
use serde::{Deserialize, Serialize};
use std::ops::{Bound, Range, RangeBounds, RangeInclusive};
@@ -70,6 +70,19 @@ impl Decode for BlockNumberAddress {
}
}
impl EncodeInto for BlockNumberAddress {
#[inline]
fn encoded_len(&self) -> usize {
28
}
#[inline]
fn encode_into(&self, buf: &mut [u8]) {
buf[..8].copy_from_slice(&self.0 .0.to_be_bytes());
buf[8..28].copy_from_slice(self.0 .1.as_slice());
}
}
/// A [`RangeBounds`] over a range of [`BlockNumberAddress`]s. Used to conveniently convert from a
/// range of [`BlockNumber`]s.
#[derive(Debug)]
@@ -108,6 +121,55 @@ impl<R: RangeBounds<BlockNumber>> From<R> for BlockNumberAddressRange {
}
}
/// [`BlockNumber`] concatenated with [`B256`] (hashed address).
///
/// Since it's used as a key, it isn't compressed when encoding it.
#[derive(
Debug, Default, Copy, Clone, PartialEq, Eq, Serialize, Deserialize, Ord, PartialOrd, Hash,
)]
pub struct BlockNumberHashedAddress(pub (BlockNumber, B256));
impl From<(BlockNumber, B256)> for BlockNumberHashedAddress {
fn from(tpl: (BlockNumber, B256)) -> Self {
Self(tpl)
}
}
impl Encode for BlockNumberHashedAddress {
type Encoded = [u8; 40];
fn encode(self) -> Self::Encoded {
let block_number = self.0 .0;
let hashed_address = self.0 .1;
let mut buf = [0u8; 40];
buf[..8].copy_from_slice(&block_number.to_be_bytes());
buf[8..].copy_from_slice(hashed_address.as_slice());
buf
}
}
impl Decode for BlockNumberHashedAddress {
fn decode(value: &[u8]) -> Result<Self, DatabaseError> {
let num = u64::from_be_bytes(value[..8].try_into().map_err(|_| DatabaseError::Decode)?);
let hash = B256::from_slice(&value[8..]);
Ok(Self((num, hash)))
}
}
impl EncodeInto for BlockNumberHashedAddress {
#[inline]
fn encoded_len(&self) -> usize {
40
}
#[inline]
fn encode_into(&self, buf: &mut [u8]) {
buf[..8].copy_from_slice(&self.0 .0.to_be_bytes());
buf[8..40].copy_from_slice(self.0 .1.as_slice());
}
}
/// [`Address`] concatenated with [`StorageKey`]. Used by `reth_etl` and history stages.
///
/// Since it's used as a key, it isn't compressed when encoding it.
@@ -139,7 +201,24 @@ impl Decode for AddressStorageKey {
}
}
impl_fixed_arbitrary!((BlockNumberAddress, 28), (AddressStorageKey, 52));
impl EncodeInto for AddressStorageKey {
#[inline]
fn encoded_len(&self) -> usize {
52
}
#[inline]
fn encode_into(&self, buf: &mut [u8]) {
buf[..20].copy_from_slice(self.0 .0.as_slice());
buf[20..52].copy_from_slice(self.0 .1.as_slice());
}
}
impl_fixed_arbitrary!(
(BlockNumberAddress, 28),
(BlockNumberHashedAddress, 40),
(AddressStorageKey, 52)
);
#[cfg(test)]
mod tests {

View File

@@ -1,7 +1,7 @@
//! Implements data structures specific to the database
use crate::{
table::{Compress, Decode, Decompress, Encode},
table::{Compress, Decode, Decompress, Encode, EncodeInto},
DatabaseError,
};
use alloy_consensus::Header;
@@ -32,7 +32,7 @@ pub use reth_db_models::{
};
pub use sharded_key::ShardedKey;
/// Macro that implements [`Encode`] and [`Decode`] for uint types.
/// Macro that implements [`Encode`], [`Decode`], and [`EncodeInto`] for uint types.
macro_rules! impl_uints {
($($name:tt),+) => {
$(
@@ -53,6 +53,18 @@ macro_rules! impl_uints {
)
}
}
impl EncodeInto for $name {
#[inline]
fn encoded_len(&self) -> usize {
std::mem::size_of::<$name>()
}
#[inline]
fn encode_into(&self, buf: &mut [u8]) {
buf[..std::mem::size_of::<$name>()].copy_from_slice(&self.to_be_bytes());
}
}
)+
};
}
@@ -91,6 +103,18 @@ impl Decode for Address {
}
}
impl EncodeInto for Address {
#[inline]
fn encoded_len(&self) -> usize {
20
}
#[inline]
fn encode_into(&self, buf: &mut [u8]) {
buf[..20].copy_from_slice(self.as_ref());
}
}
impl Encode for B256 {
type Encoded = [u8; 32];
@@ -105,6 +129,18 @@ impl Decode for B256 {
}
}
impl EncodeInto for B256 {
#[inline]
fn encoded_len(&self) -> usize {
32
}
#[inline]
fn encode_into(&self, buf: &mut [u8]) {
buf[..32].copy_from_slice(self.as_ref());
}
}
impl Encode for String {
type Encoded = Vec<u8>;

View File

@@ -1,6 +1,6 @@
//! Sharded key
use crate::{
table::{Decode, Encode},
table::{Decode, Encode, EncodeInto},
DatabaseError,
};
use alloy_primitives::{Address, BlockNumber};
@@ -77,6 +77,19 @@ impl Decode for ShardedKey<Address> {
}
}
impl EncodeInto for ShardedKey<Address> {
#[inline]
fn encoded_len(&self) -> usize {
28
}
#[inline]
fn encode_into(&self, buf: &mut [u8]) {
buf[..20].copy_from_slice(self.key.as_slice());
buf[20..28].copy_from_slice(&self.highest_block_number.to_be_bytes());
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -1,6 +1,6 @@
//! Storage sharded key
use crate::{
table::{Decode, Encode},
table::{Decode, Encode, EncodeInto},
DatabaseError,
};
use alloy_primitives::{Address, BlockNumber, B256};
@@ -91,6 +91,20 @@ impl Decode for StorageShardedKey {
}
}
impl EncodeInto for StorageShardedKey {
#[inline]
fn encoded_len(&self) -> usize {
60
}
#[inline]
fn encode_into(&self, buf: &mut [u8]) {
buf[..20].copy_from_slice(self.address.as_slice());
buf[20..52].copy_from_slice(self.sharded_key.key.as_slice());
buf[52..60].copy_from_slice(&self.sharded_key.highest_block_number.to_be_bytes());
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -96,6 +96,53 @@ pub trait Decode: Send + Sync + Sized + Debug {
}
}
/// Trait for zero-copy encoding directly into a provided buffer.
///
/// This enables MDBX_RESERVE-style writes where we write directly into
/// MDBX's memory-mapped pages, avoiding intermediate allocations.
///
/// For types where the in-memory representation equals the encoded bytes
/// (e.g., `Address`, `B256`), implement this trait directly to avoid
/// any intermediate copies. For other fixed-size types, use the blanket
/// impl via [`EncodeIntoViaEncode`].
pub trait EncodeInto: Send + Sync + Sized + Debug {
/// Returns the exact number of bytes needed to encode this value.
fn encoded_len(&self) -> usize;
/// Encodes `self` into the provided buffer.
///
/// # Panics
/// May panic if `buf.len() < self.encoded_len()`.
fn encode_into(&self, buf: &mut [u8]);
}
mod sealed {
pub trait Sealed {}
}
/// Marker trait for types that should use the `Encode` + copy fallback for `EncodeInto`.
///
/// Implement this for types where cloning is acceptable or unavoidable.
/// For hot-path types with identity encoding (where in-memory bytes equal encoded bytes),
/// implement `EncodeInto` directly instead.
pub trait EncodeIntoViaEncode: sealed::Sealed + Send + Sync + Sized + Debug {}
/// Blanket impl for types that opt into the `Encode` + copy fallback.
impl<T, const N: usize> EncodeInto for T
where
T: Encode<Encoded = [u8; N]> + EncodeIntoViaEncode + Copy,
{
#[inline]
fn encoded_len(&self) -> usize {
N
}
#[inline]
fn encode_into(&self, buf: &mut [u8]) {
buf[..N].copy_from_slice((*self).encode().as_ref());
}
}
/// Generic trait that enforces the database key to implement [`Encode`] and [`Decode`].
pub trait Key: Encode + Decode + Ord + Clone + Serialize + for<'a> Deserialize<'a> {}

View File

@@ -24,7 +24,7 @@ use crate::{
AccountBeforeTx, ClientVersion, CompactU256, IntegerList, ShardedKey,
StoredBlockBodyIndices, StoredBlockWithdrawals,
},
table::{Decode, DupSort, Encode, Table, TableInfo},
table::{Decode, DupSort, Encode, EncodeInto, Table, TableInfo},
};
use alloy_consensus::Header;
use alloy_primitives::{Address, BlockHash, BlockNumber, TxHash, TxNumber, B256};
@@ -566,6 +566,21 @@ impl Decode for ChainStateKey {
}
}
impl EncodeInto for ChainStateKey {
#[inline]
fn encoded_len(&self) -> usize {
1
}
#[inline]
fn encode_into(&self, buf: &mut [u8]) {
buf[0] = match self {
Self::LastFinalizedBlock => 0,
Self::LastSafeBlock => 1,
};
}
}
// Alias types.
/// List with transaction numbers.

View File

@@ -74,19 +74,9 @@ where
res.map_err(|e| DatabaseError::Read(e.into()))?.map(decoder::<T>).transpose()
}
/// Some types don't support compression (eg. B256), and we don't want to be copying them to the
/// allocated buffer when we can just use their reference.
macro_rules! compress_to_buf_or_ref {
($self:expr, $value:expr) => {
if let Some(value) = $value.uncompressable_ref() {
Some(value)
} else {
$self.buf.clear();
$value.compress_to_buf(&mut $self.buf);
None
}
};
}
impl<K: TransactionKind, T: Table> DbCursorRO<T> for Cursor<K, T> {
fn first(&mut self) -> PairResult<T> {
@@ -256,13 +246,46 @@ impl<T: Table> DbCursorRW<T> for Cursor<RW, T> {
/// found, before calling `upsert`.
fn upsert(&mut self, key: T::Key, value: &T::Value) -> Result<(), DatabaseError> {
let key = key.encode();
let value = compress_to_buf_or_ref!(self, value);
// Check if we can use zero-copy reserve path
if let Some(value_ref) = value.uncompressable_ref() {
let value_len = value_ref.len();
return self.execute_with_operation_metric(
Operation::CursorUpsert,
Some(value_len),
|this| {
// SAFETY: We fill the reserved buffer immediately and don't use it after
unsafe {
this.inner
.reserve(key.as_ref(), value_len, WriteFlags::UPSERT)
.map(|reserved| {
reserved.copy_from_slice(value_ref);
})
.map_err(|e| {
DatabaseWriteError {
info: e.into(),
operation: DatabaseWriteOperation::CursorUpsert,
table_name: T::NAME,
key: key.into_vec(),
}
.into()
})
}
},
);
}
// Fallback: compress to buffer then put
self.buf.clear();
value.compress_to_buf(&mut self.buf);
let value_len = self.buf.len();
self.execute_with_operation_metric(
Operation::CursorUpsert,
Some(value.unwrap_or(&self.buf).len()),
Some(value_len),
|this| {
this.inner
.put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::UPSERT)
.put(key.as_ref(), &this.buf, WriteFlags::UPSERT)
.map_err(|e| {
DatabaseWriteError {
info: e.into(),
@@ -278,13 +301,43 @@ impl<T: Table> DbCursorRW<T> for Cursor<RW, T> {
fn insert(&mut self, key: T::Key, value: &T::Value) -> Result<(), DatabaseError> {
let key = key.encode();
let value = compress_to_buf_or_ref!(self, value);
if let Some(value_ref) = value.uncompressable_ref() {
let value_len = value_ref.len();
return self.execute_with_operation_metric(
Operation::CursorInsert,
Some(value_len),
|this| {
unsafe {
this.inner
.reserve(key.as_ref(), value_len, WriteFlags::NO_OVERWRITE)
.map(|reserved| {
reserved.copy_from_slice(value_ref);
})
.map_err(|e| {
DatabaseWriteError {
info: e.into(),
operation: DatabaseWriteOperation::CursorInsert,
table_name: T::NAME,
key: key.into_vec(),
}
.into()
})
}
},
);
}
self.buf.clear();
value.compress_to_buf(&mut self.buf);
let value_len = self.buf.len();
self.execute_with_operation_metric(
Operation::CursorInsert,
Some(value.unwrap_or(&self.buf).len()),
Some(value_len),
|this| {
this.inner
.put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::NO_OVERWRITE)
.put(key.as_ref(), &this.buf, WriteFlags::NO_OVERWRITE)
.map_err(|e| {
DatabaseWriteError {
info: e.into(),
@@ -302,13 +355,43 @@ impl<T: Table> DbCursorRW<T> for Cursor<RW, T> {
/// will fail if the inserted key is less than the last table key
fn append(&mut self, key: T::Key, value: &T::Value) -> Result<(), DatabaseError> {
let key = key.encode();
let value = compress_to_buf_or_ref!(self, value);
if let Some(value_ref) = value.uncompressable_ref() {
let value_len = value_ref.len();
return self.execute_with_operation_metric(
Operation::CursorAppend,
Some(value_len),
|this| {
unsafe {
this.inner
.reserve(key.as_ref(), value_len, WriteFlags::APPEND)
.map(|reserved| {
reserved.copy_from_slice(value_ref);
})
.map_err(|e| {
DatabaseWriteError {
info: e.into(),
operation: DatabaseWriteOperation::CursorAppend,
table_name: T::NAME,
key: key.into_vec(),
}
.into()
})
}
},
);
}
self.buf.clear();
value.compress_to_buf(&mut self.buf);
let value_len = self.buf.len();
self.execute_with_operation_metric(
Operation::CursorAppend,
Some(value.unwrap_or(&self.buf).len()),
Some(value_len),
|this| {
this.inner
.put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::APPEND)
.put(key.as_ref(), &this.buf, WriteFlags::APPEND)
.map_err(|e| {
DatabaseWriteError {
info: e.into(),
@@ -338,13 +421,43 @@ impl<T: DupSort> DbDupCursorRW<T> for Cursor<RW, T> {
fn append_dup(&mut self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> {
let key = key.encode();
let value = compress_to_buf_or_ref!(self, value);
if let Some(value_ref) = value.uncompressable_ref() {
let value_len = value_ref.len();
return self.execute_with_operation_metric(
Operation::CursorAppendDup,
Some(value_len),
|this| {
unsafe {
this.inner
.reserve(key.as_ref(), value_len, WriteFlags::APPEND_DUP)
.map(|reserved| {
reserved.copy_from_slice(value_ref);
})
.map_err(|e| {
DatabaseWriteError {
info: e.into(),
operation: DatabaseWriteOperation::CursorAppendDup,
table_name: T::NAME,
key: key.into_vec(),
}
.into()
})
}
},
);
}
self.buf.clear();
value.compress_to_buf(&mut self.buf);
let value_len = self.buf.len();
self.execute_with_operation_metric(
Operation::CursorAppendDup,
Some(value.unwrap_or(&self.buf).len()),
Some(value_len),
|this| {
this.inner
.put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::APPEND_DUP)
.put(key.as_ref(), &this.buf, WriteFlags::APPEND_DUP)
.map_err(|e| {
DatabaseWriteError {
info: e.into(),
@@ -368,7 +481,7 @@ mod tests {
};
use alloy_primitives::{address, Address, B256, U256};
use reth_db_api::{
cursor::{DbCursorRO, DbDupCursorRW},
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
models::{BlockNumberAddress, ClientVersion},
table::TableImporter,
transaction::{DbTx, DbTxMut},
@@ -464,4 +577,546 @@ mod tests {
assert_eq!(copied_value, expected_value);
}
}
/// Tests that the zero-copy reserve path works correctly for B256 values.
/// B256 implements `uncompressable_ref()` returning Some, so it uses the reserve path.
#[test]
fn test_reserve_path_b256_roundtrip() {
use crate::tables::CanonicalHeaders;
let db = create_test_db();
let tx = db.tx_mut().unwrap();
// B256 values use the reserve path (uncompressable_ref returns Some)
let test_data: Vec<(u64, B256)> = vec![
(0, B256::ZERO),
(1, B256::with_last_byte(1)),
(100, B256::repeat_byte(0xab)),
(u64::MAX, B256::repeat_byte(0xff)),
];
// Write using upsert (uses reserve path for B256)
{
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
for (key, value) in &test_data {
cursor.upsert(*key, value).unwrap();
}
}
// Read back and verify
{
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
for (expected_key, expected_value) in &test_data {
let (key, value) = cursor.seek_exact(*expected_key).unwrap().unwrap();
assert_eq!(key, *expected_key);
assert_eq!(value, *expected_value);
}
}
tx.commit().unwrap();
// Verify again after commit
let tx = db.tx().unwrap();
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
let all_entries: Vec<_> = cursor.walk(None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(all_entries.len(), test_data.len());
for ((key, value), (expected_key, expected_value)) in all_entries.iter().zip(test_data.iter()) {
assert_eq!(key, expected_key);
assert_eq!(value, expected_value);
}
}
/// Tests insert operation with reserve path
#[test]
fn test_reserve_path_insert_roundtrip() {
use crate::tables::CanonicalHeaders;
let db = create_test_db();
let tx = db.tx_mut().unwrap();
let hash1 = B256::repeat_byte(0x11);
let hash2 = B256::repeat_byte(0x22);
{
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
cursor.insert(1, &hash1).unwrap();
cursor.insert(2, &hash2).unwrap();
// Insert with existing key should fail
assert!(cursor.insert(1, &hash2).is_err());
}
// Verify
{
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
let (k1, v1) = cursor.seek_exact(1).unwrap().unwrap();
assert_eq!(k1, 1);
assert_eq!(v1, hash1);
let (k2, v2) = cursor.seek_exact(2).unwrap().unwrap();
assert_eq!(k2, 2);
assert_eq!(v2, hash2);
}
}
/// Tests append operation with reserve path
#[test]
fn test_reserve_path_append_roundtrip() {
use crate::tables::CanonicalHeaders;
let db = create_test_db();
let tx = db.tx_mut().unwrap();
let hashes: Vec<B256> = (0..10u8).map(|i| B256::repeat_byte(i)).collect();
{
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
for (i, hash) in hashes.iter().enumerate() {
cursor.append(i as u64, hash).unwrap();
}
}
// Verify all entries
{
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
let entries: Vec<_> = cursor.walk(None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(entries.len(), hashes.len());
for (i, (key, value)) in entries.iter().enumerate() {
assert_eq!(*key, i as u64);
assert_eq!(*value, hashes[i]);
}
}
}
/// Tests that DupSort append_dup works with reserve path
#[test]
fn test_reserve_path_append_dup_roundtrip() {
let db = create_test_db();
let tx = db.tx_mut().unwrap();
let addr = address!("0000000000000000000000000000000000000001");
let entries = vec![
StorageEntry { key: B256::with_last_byte(1), value: U256::from(100) },
StorageEntry { key: B256::with_last_byte(2), value: U256::from(200) },
StorageEntry { key: B256::with_last_byte(3), value: U256::from(300) },
];
{
let mut cursor = tx.cursor_dup_write::<StorageChangeSets>().unwrap();
for entry in &entries {
cursor.append_dup(BlockNumberAddress((1, addr)), *entry).unwrap();
}
}
// Verify
{
let mut cursor = tx.cursor_dup_read::<StorageChangeSets>().unwrap();
let results: Vec<_> = cursor
.walk_dup(Some(BlockNumberAddress((1, addr))), None)
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(results.len(), entries.len());
for ((_, value), expected) in results.iter().zip(entries.iter()) {
assert_eq!(value, expected);
}
}
}
/// Tests mixed operations to ensure reserve and non-reserve paths work together
#[test]
fn test_reserve_path_mixed_operations() {
use crate::tables::CanonicalHeaders;
let db = create_test_db();
// First transaction: upsert
{
let tx = db.tx_mut().unwrap();
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
cursor.upsert(1, &B256::repeat_byte(0x11)).unwrap();
cursor.upsert(2, &B256::repeat_byte(0x22)).unwrap();
tx.commit().unwrap();
}
// Second transaction: upsert to update existing
{
let tx = db.tx_mut().unwrap();
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
cursor.upsert(1, &B256::repeat_byte(0xaa)).unwrap(); // Update
cursor.upsert(3, &B256::repeat_byte(0x33)).unwrap(); // New
tx.commit().unwrap();
}
// Verify final state
{
let tx = db.tx().unwrap();
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
let (_, v1) = cursor.seek_exact(1).unwrap().unwrap();
assert_eq!(v1, B256::repeat_byte(0xaa)); // Updated
let (_, v2) = cursor.seek_exact(2).unwrap().unwrap();
assert_eq!(v2, B256::repeat_byte(0x22)); // Unchanged
let (_, v3) = cursor.seek_exact(3).unwrap().unwrap();
assert_eq!(v3, B256::repeat_byte(0x33)); // New
}
}
// ==================== ADVERSARIAL / EDGE CASE TESTS ====================
/// Test that transaction abort properly discards reserve-path writes
#[test]
fn test_reserve_path_transaction_abort() {
use crate::tables::CanonicalHeaders;
let db = create_test_db();
// Write some initial data
{
let tx = db.tx_mut().unwrap();
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
cursor.upsert(1, &B256::repeat_byte(0x11)).unwrap();
tx.commit().unwrap();
}
// Start a transaction, write via reserve path, then DROP (abort)
{
let tx = db.tx_mut().unwrap();
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
cursor.upsert(1, &B256::repeat_byte(0xff)).unwrap(); // Overwrite
cursor.upsert(2, &B256::repeat_byte(0x22)).unwrap(); // New
// Drop tx without commit - should abort
drop(cursor);
drop(tx);
}
// Verify original data is unchanged
{
let tx = db.tx().unwrap();
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
let (_, v1) = cursor.seek_exact(1).unwrap().unwrap();
assert_eq!(v1, B256::repeat_byte(0x11)); // Original, not 0xff
assert!(cursor.seek_exact(2).unwrap().is_none()); // Should not exist
}
}
/// Test rapid successive writes to same key via reserve path
#[test]
fn test_reserve_path_rapid_overwrites() {
use crate::tables::CanonicalHeaders;
let db = create_test_db();
let tx = db.tx_mut().unwrap();
{
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
// Rapidly overwrite the same key many times
for i in 0..100u8 {
cursor.upsert(1, &B256::repeat_byte(i)).unwrap();
}
}
// Verify final value
{
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
let (_, value) = cursor.seek_exact(1).unwrap().unwrap();
assert_eq!(value, B256::repeat_byte(99)); // Last write wins
}
}
/// Test interleaved reserve-path and fallback-path writes
#[test]
fn test_reserve_path_interleaved_with_compressable() {
use crate::tables::PlainAccountState;
use reth_primitives_traits::Account;
let db = create_test_db();
let tx = db.tx_mut().unwrap();
// Account uses Compact compression (fallback path)
// This tests that both paths work correctly when interleaved
let addr1 = Address::repeat_byte(0x11);
let addr2 = Address::repeat_byte(0x22);
let account1 = Account { nonce: 1, balance: U256::from(100), bytecode_hash: None };
let account2 = Account { nonce: 2, balance: U256::from(200), bytecode_hash: Some(B256::repeat_byte(0xab)) };
{
let mut cursor = tx.cursor_write::<PlainAccountState>().unwrap();
cursor.upsert(addr1, &account1).unwrap();
cursor.upsert(addr2, &account2).unwrap();
}
// Verify
{
let mut cursor = tx.cursor_read::<PlainAccountState>().unwrap();
let (_, a1) = cursor.seek_exact(addr1).unwrap().unwrap();
assert_eq!(a1.nonce, 1);
assert_eq!(a1.balance, U256::from(100));
let (_, a2) = cursor.seek_exact(addr2).unwrap().unwrap();
assert_eq!(a2.nonce, 2);
assert_eq!(a2.bytecode_hash, Some(B256::repeat_byte(0xab)));
}
}
/// Test cursor reuse after multiple reserve-path operations
#[test]
fn test_reserve_path_cursor_reuse() {
use crate::tables::CanonicalHeaders;
let db = create_test_db();
let tx = db.tx_mut().unwrap();
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
// Write, read, write, read pattern
cursor.upsert(1, &B256::repeat_byte(0x11)).unwrap();
// Read back using same cursor
let (k, v) = cursor.seek_exact(1).unwrap().unwrap();
assert_eq!(k, 1);
assert_eq!(v, B256::repeat_byte(0x11));
// Write again
cursor.upsert(2, &B256::repeat_byte(0x22)).unwrap();
// Overwrite first
cursor.upsert(1, &B256::repeat_byte(0xaa)).unwrap();
// Read all
let entries: Vec<_> = cursor.walk(None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(entries.len(), 2);
assert_eq!(entries[0], (1, B256::repeat_byte(0xaa)));
assert_eq!(entries[1], (2, B256::repeat_byte(0x22)));
}
/// Test boundary values for B256
#[test]
fn test_reserve_path_boundary_values() {
use crate::tables::CanonicalHeaders;
let db = create_test_db();
let tx = db.tx_mut().unwrap();
let boundary_values = vec![
B256::ZERO,
B256::repeat_byte(0xff), // All 1s
B256::with_last_byte(1),
B256::with_last_byte(0xff),
// Pattern that might cause issues if bytes are misaligned
B256::from_slice(&[
0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07,
0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f,
0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17,
0x18, 0x19, 0x1a, 0x1b, 0x1c, 0x1d, 0x1e, 0x1f,
]),
];
{
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
for (i, value) in boundary_values.iter().enumerate() {
cursor.upsert(i as u64, value).unwrap();
}
}
// Verify each boundary value
{
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
for (i, expected) in boundary_values.iter().enumerate() {
let (_, value) = cursor.seek_exact(i as u64).unwrap().unwrap();
assert_eq!(&value, expected, "Mismatch at index {i}");
}
}
}
/// Test that delete after reserve-path write works correctly
#[test]
fn test_reserve_path_write_then_delete() {
use crate::tables::CanonicalHeaders;
let db = create_test_db();
// Write via reserve path
{
let tx = db.tx_mut().unwrap();
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
cursor.upsert(1, &B256::repeat_byte(0x11)).unwrap();
cursor.upsert(2, &B256::repeat_byte(0x22)).unwrap();
cursor.upsert(3, &B256::repeat_byte(0x33)).unwrap();
tx.commit().unwrap();
}
// Delete middle entry
{
let tx = db.tx_mut().unwrap();
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
cursor.seek_exact(2).unwrap();
cursor.delete_current().unwrap();
tx.commit().unwrap();
}
// Verify
{
let tx = db.tx().unwrap();
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
assert!(cursor.seek_exact(1).unwrap().is_some());
assert!(cursor.seek_exact(2).unwrap().is_none()); // Deleted
assert!(cursor.seek_exact(3).unwrap().is_some());
}
}
/// Test large number of entries to stress the reserve path
#[test]
fn test_reserve_path_stress_many_entries() {
use crate::tables::CanonicalHeaders;
let db = create_test_db();
let num_entries = 1000u64;
// Write many entries
{
let tx = db.tx_mut().unwrap();
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
for i in 0..num_entries {
let hash = B256::from(U256::from(i));
cursor.append(i, &hash).unwrap();
}
tx.commit().unwrap();
}
// Verify all entries
{
let tx = db.tx().unwrap();
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
let entries: Vec<_> = cursor.walk(None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(entries.len(), num_entries as usize);
for (i, (key, value)) in entries.iter().enumerate() {
assert_eq!(*key, i as u64);
assert_eq!(*value, B256::from(U256::from(i as u64)));
}
}
}
/// Test concurrent-like access pattern (alternating read/write transactions)
#[test]
fn test_reserve_path_alternating_transactions() {
use crate::tables::CanonicalHeaders;
let db = create_test_db();
for round in 0..10u8 {
// Write transaction
{
let tx = db.tx_mut().unwrap();
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
cursor.upsert(round as u64, &B256::repeat_byte(round)).unwrap();
tx.commit().unwrap();
}
// Read transaction - verify all entries up to this round
{
let tx = db.tx().unwrap();
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
for i in 0..=round {
let (_, value) = cursor.seek_exact(i as u64).unwrap().unwrap();
assert_eq!(value, B256::repeat_byte(i));
}
}
}
}
/// Test DupSort with many duplicates via reserve path
#[test]
fn test_reserve_path_dupsort_many_duplicates() {
let db = create_test_db();
let tx = db.tx_mut().unwrap();
let addr = address!("0000000000000000000000000000000000000001");
let num_dups = 100;
// Write many duplicates for same key
{
let mut cursor = tx.cursor_dup_write::<StorageChangeSets>().unwrap();
for i in 0..num_dups {
let entry = StorageEntry {
key: B256::from(U256::from(i)),
value: U256::from(i * 100),
};
cursor.append_dup(BlockNumberAddress((1, addr)), entry).unwrap();
}
}
// Verify all duplicates
{
let mut cursor = tx.cursor_dup_read::<StorageChangeSets>().unwrap();
let results: Vec<_> = cursor
.walk_dup(Some(BlockNumberAddress((1, addr))), None)
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(results.len(), num_dups);
for (i, (_, entry)) in results.iter().enumerate() {
assert_eq!(entry.key, B256::from(U256::from(i as u64)));
assert_eq!(entry.value, U256::from(i as u64 * 100));
}
}
}
/// Test that insert fails correctly with reserve path when key exists
#[test]
fn test_reserve_path_insert_duplicate_key_fails() {
use crate::tables::CanonicalHeaders;
let db = create_test_db();
let tx = db.tx_mut().unwrap();
{
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
// First insert succeeds
cursor.insert(1, &B256::repeat_byte(0x11)).unwrap();
// Second insert with same key should fail
let result = cursor.insert(1, &B256::repeat_byte(0x22));
assert!(result.is_err());
// Original value should be unchanged
let (_, value) = cursor.seek_exact(1).unwrap().unwrap();
assert_eq!(value, B256::repeat_byte(0x11));
}
}
/// Test append order enforcement with reserve path
#[test]
fn test_reserve_path_append_order_enforced() {
use crate::tables::CanonicalHeaders;
let db = create_test_db();
let tx = db.tx_mut().unwrap();
{
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
// Append in order works
cursor.append(1, &B256::repeat_byte(0x11)).unwrap();
cursor.append(2, &B256::repeat_byte(0x22)).unwrap();
cursor.append(3, &B256::repeat_byte(0x33)).unwrap();
// Append out of order should fail (key 2 < last key 3)
let result = cursor.append(2, &B256::repeat_byte(0x44));
assert!(result.is_err());
}
}
}

View File

@@ -447,6 +447,44 @@ impl Cursor<RW> {
Ok(())
}
/// Reserves space for a key/data pair in the database and returns a mutable slice
/// to the reserved space for the value. The caller must fill the entire buffer before
/// the next database operation.
///
/// This enables zero-copy writes by allowing the caller to write directly into
/// MDBX's memory-mapped pages.
///
/// # Safety
///
/// The caller must ensure that:
/// - The returned buffer is completely filled before any other cursor/transaction operation
/// - The buffer is not used after the cursor is moved or the transaction ends
#[allow(clippy::mut_from_ref)]
pub unsafe fn reserve(&mut self, key: &[u8], len: usize, flags: WriteFlags) -> Result<&mut [u8]> {
let key_val: ffi::MDBX_val =
ffi::MDBX_val { iov_len: key.len(), iov_base: key.as_ptr() as *mut c_void };
let mut data_val: ffi::MDBX_val =
ffi::MDBX_val { iov_len: len, iov_base: ptr::null_mut::<c_void>() };
// SAFETY: We're calling mdbx_cursor_put with MDBX_RESERVE which returns a pointer
// to reserved space. The FFI calls are unsafe by nature.
unsafe {
mdbx_result(
self.txn.txn_execute(|_| {
ffi::mdbx_cursor_put(
self.cursor,
&key_val,
&mut data_val,
flags.bits() | ffi::MDBX_RESERVE,
)
})?,
)?;
Ok(std::slice::from_raw_parts_mut(
data_val.iov_base as *mut u8,
data_val.iov_len,
))
}
}
/// Deletes the current key/data pair.
///
/// ### Flags