mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
7 Commits
main
...
yk/perf-en
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
be9933d5b8 | ||
|
|
77bc9464e2 | ||
|
|
b936984fff | ||
|
|
d5fd1b41c9 | ||
|
|
eff531b720 | ||
|
|
941d47d9e6 | ||
|
|
d42d61f94a |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -8104,6 +8104,7 @@ dependencies = [
|
||||
"arbitrary",
|
||||
"arrayvec",
|
||||
"bytes",
|
||||
"codspeed-criterion-compat",
|
||||
"derive_more",
|
||||
"metrics",
|
||||
"modular-bitfield",
|
||||
|
||||
@@ -61,6 +61,11 @@ test-fuzz.workspace = true
|
||||
arbitrary = { workspace = true, features = ["derive"] }
|
||||
proptest.workspace = true
|
||||
proptest-arbitrary-interop.workspace = true
|
||||
criterion.workspace = true
|
||||
|
||||
[[bench]]
|
||||
name = "encode_into"
|
||||
harness = false
|
||||
|
||||
[features]
|
||||
test-utils = [
|
||||
|
||||
67
crates/storage/db-api/benches/encode_into.rs
Normal file
67
crates/storage/db-api/benches/encode_into.rs
Normal file
@@ -0,0 +1,67 @@
|
||||
use alloy_primitives::{Address, B256};
|
||||
use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput};
|
||||
use reth_db_api::table::{Encode, EncodeInto};
|
||||
|
||||
fn bench_encode_methods(c: &mut Criterion) {
|
||||
let mut group = c.benchmark_group("encode");
|
||||
group.throughput(Throughput::Elements(10000));
|
||||
|
||||
let addresses: Vec<Address> = (0..10000u64)
|
||||
.map(|i| {
|
||||
let mut bytes = [0u8; 32];
|
||||
bytes[24..32].copy_from_slice(&i.to_be_bytes());
|
||||
Address::from_word(B256::from(bytes))
|
||||
})
|
||||
.collect();
|
||||
|
||||
let hashes: Vec<B256> = (0..10000u64)
|
||||
.map(|i| {
|
||||
let mut bytes = [0u8; 32];
|
||||
bytes[24..32].copy_from_slice(&i.to_be_bytes());
|
||||
B256::from(bytes)
|
||||
})
|
||||
.collect();
|
||||
|
||||
group.bench_function("Address::encode (returns array, consumes self)", |b| {
|
||||
b.iter(|| {
|
||||
for addr in &addresses {
|
||||
let encoded = black_box((*addr).encode());
|
||||
black_box(encoded);
|
||||
}
|
||||
})
|
||||
});
|
||||
|
||||
group.bench_function("Address::encode_into (zero-copy, borrows self)", |b| {
|
||||
let mut buf = [0u8; 20];
|
||||
b.iter(|| {
|
||||
for addr in &addresses {
|
||||
addr.encode_into(&mut buf);
|
||||
black_box(&buf);
|
||||
}
|
||||
})
|
||||
});
|
||||
|
||||
group.bench_function("B256::encode (returns array, consumes self)", |b| {
|
||||
b.iter(|| {
|
||||
for hash in &hashes {
|
||||
let encoded = black_box((*hash).encode());
|
||||
black_box(encoded);
|
||||
}
|
||||
})
|
||||
});
|
||||
|
||||
group.bench_function("B256::encode_into (zero-copy, borrows self)", |b| {
|
||||
let mut buf = [0u8; 32];
|
||||
b.iter(|| {
|
||||
for hash in &hashes {
|
||||
hash.encode_into(&mut buf);
|
||||
black_box(&buf);
|
||||
}
|
||||
})
|
||||
});
|
||||
|
||||
group.finish();
|
||||
}
|
||||
|
||||
criterion_group!(benches, bench_encode_methods);
|
||||
criterion_main!(benches);
|
||||
@@ -2,10 +2,10 @@
|
||||
|
||||
use crate::{
|
||||
impl_fixed_arbitrary,
|
||||
table::{Decode, Encode},
|
||||
table::{Decode, Encode, EncodeInto},
|
||||
DatabaseError,
|
||||
};
|
||||
use alloy_primitives::{Address, BlockNumber, StorageKey};
|
||||
use alloy_primitives::{Address, BlockNumber, StorageKey, B256};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::ops::{Bound, Range, RangeBounds, RangeInclusive};
|
||||
|
||||
@@ -70,6 +70,19 @@ impl Decode for BlockNumberAddress {
|
||||
}
|
||||
}
|
||||
|
||||
impl EncodeInto for BlockNumberAddress {
|
||||
#[inline]
|
||||
fn encoded_len(&self) -> usize {
|
||||
28
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn encode_into(&self, buf: &mut [u8]) {
|
||||
buf[..8].copy_from_slice(&self.0 .0.to_be_bytes());
|
||||
buf[8..28].copy_from_slice(self.0 .1.as_slice());
|
||||
}
|
||||
}
|
||||
|
||||
/// A [`RangeBounds`] over a range of [`BlockNumberAddress`]s. Used to conveniently convert from a
|
||||
/// range of [`BlockNumber`]s.
|
||||
#[derive(Debug)]
|
||||
@@ -108,6 +121,55 @@ impl<R: RangeBounds<BlockNumber>> From<R> for BlockNumberAddressRange {
|
||||
}
|
||||
}
|
||||
|
||||
/// [`BlockNumber`] concatenated with [`B256`] (hashed address).
|
||||
///
|
||||
/// Since it's used as a key, it isn't compressed when encoding it.
|
||||
#[derive(
|
||||
Debug, Default, Copy, Clone, PartialEq, Eq, Serialize, Deserialize, Ord, PartialOrd, Hash,
|
||||
)]
|
||||
pub struct BlockNumberHashedAddress(pub (BlockNumber, B256));
|
||||
|
||||
impl From<(BlockNumber, B256)> for BlockNumberHashedAddress {
|
||||
fn from(tpl: (BlockNumber, B256)) -> Self {
|
||||
Self(tpl)
|
||||
}
|
||||
}
|
||||
|
||||
impl Encode for BlockNumberHashedAddress {
|
||||
type Encoded = [u8; 40];
|
||||
|
||||
fn encode(self) -> Self::Encoded {
|
||||
let block_number = self.0 .0;
|
||||
let hashed_address = self.0 .1;
|
||||
|
||||
let mut buf = [0u8; 40];
|
||||
|
||||
buf[..8].copy_from_slice(&block_number.to_be_bytes());
|
||||
buf[8..].copy_from_slice(hashed_address.as_slice());
|
||||
buf
|
||||
}
|
||||
}
|
||||
|
||||
impl Decode for BlockNumberHashedAddress {
|
||||
fn decode(value: &[u8]) -> Result<Self, DatabaseError> {
|
||||
let num = u64::from_be_bytes(value[..8].try_into().map_err(|_| DatabaseError::Decode)?);
|
||||
let hash = B256::from_slice(&value[8..]);
|
||||
Ok(Self((num, hash)))
|
||||
}
|
||||
}
|
||||
|
||||
impl EncodeInto for BlockNumberHashedAddress {
|
||||
#[inline]
|
||||
fn encoded_len(&self) -> usize {
|
||||
40
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn encode_into(&self, buf: &mut [u8]) {
|
||||
buf[..8].copy_from_slice(&self.0 .0.to_be_bytes());
|
||||
buf[8..40].copy_from_slice(self.0 .1.as_slice());
|
||||
}
|
||||
}
|
||||
/// [`Address`] concatenated with [`StorageKey`]. Used by `reth_etl` and history stages.
|
||||
///
|
||||
/// Since it's used as a key, it isn't compressed when encoding it.
|
||||
@@ -139,7 +201,24 @@ impl Decode for AddressStorageKey {
|
||||
}
|
||||
}
|
||||
|
||||
impl_fixed_arbitrary!((BlockNumberAddress, 28), (AddressStorageKey, 52));
|
||||
impl EncodeInto for AddressStorageKey {
|
||||
#[inline]
|
||||
fn encoded_len(&self) -> usize {
|
||||
52
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn encode_into(&self, buf: &mut [u8]) {
|
||||
buf[..20].copy_from_slice(self.0 .0.as_slice());
|
||||
buf[20..52].copy_from_slice(self.0 .1.as_slice());
|
||||
}
|
||||
}
|
||||
|
||||
impl_fixed_arbitrary!(
|
||||
(BlockNumberAddress, 28),
|
||||
(BlockNumberHashedAddress, 40),
|
||||
(AddressStorageKey, 52)
|
||||
);
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
//! Implements data structures specific to the database
|
||||
|
||||
use crate::{
|
||||
table::{Compress, Decode, Decompress, Encode},
|
||||
table::{Compress, Decode, Decompress, Encode, EncodeInto},
|
||||
DatabaseError,
|
||||
};
|
||||
use alloy_consensus::Header;
|
||||
@@ -32,7 +32,7 @@ pub use reth_db_models::{
|
||||
};
|
||||
pub use sharded_key::ShardedKey;
|
||||
|
||||
/// Macro that implements [`Encode`] and [`Decode`] for uint types.
|
||||
/// Macro that implements [`Encode`], [`Decode`], and [`EncodeInto`] for uint types.
|
||||
macro_rules! impl_uints {
|
||||
($($name:tt),+) => {
|
||||
$(
|
||||
@@ -53,6 +53,18 @@ macro_rules! impl_uints {
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl EncodeInto for $name {
|
||||
#[inline]
|
||||
fn encoded_len(&self) -> usize {
|
||||
std::mem::size_of::<$name>()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn encode_into(&self, buf: &mut [u8]) {
|
||||
buf[..std::mem::size_of::<$name>()].copy_from_slice(&self.to_be_bytes());
|
||||
}
|
||||
}
|
||||
)+
|
||||
};
|
||||
}
|
||||
@@ -91,6 +103,18 @@ impl Decode for Address {
|
||||
}
|
||||
}
|
||||
|
||||
impl EncodeInto for Address {
|
||||
#[inline]
|
||||
fn encoded_len(&self) -> usize {
|
||||
20
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn encode_into(&self, buf: &mut [u8]) {
|
||||
buf[..20].copy_from_slice(self.as_ref());
|
||||
}
|
||||
}
|
||||
|
||||
impl Encode for B256 {
|
||||
type Encoded = [u8; 32];
|
||||
|
||||
@@ -105,6 +129,18 @@ impl Decode for B256 {
|
||||
}
|
||||
}
|
||||
|
||||
impl EncodeInto for B256 {
|
||||
#[inline]
|
||||
fn encoded_len(&self) -> usize {
|
||||
32
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn encode_into(&self, buf: &mut [u8]) {
|
||||
buf[..32].copy_from_slice(self.as_ref());
|
||||
}
|
||||
}
|
||||
|
||||
impl Encode for String {
|
||||
type Encoded = Vec<u8>;
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
//! Sharded key
|
||||
use crate::{
|
||||
table::{Decode, Encode},
|
||||
table::{Decode, Encode, EncodeInto},
|
||||
DatabaseError,
|
||||
};
|
||||
use alloy_primitives::{Address, BlockNumber};
|
||||
@@ -77,6 +77,19 @@ impl Decode for ShardedKey<Address> {
|
||||
}
|
||||
}
|
||||
|
||||
impl EncodeInto for ShardedKey<Address> {
|
||||
#[inline]
|
||||
fn encoded_len(&self) -> usize {
|
||||
28
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn encode_into(&self, buf: &mut [u8]) {
|
||||
buf[..20].copy_from_slice(self.key.as_slice());
|
||||
buf[20..28].copy_from_slice(&self.highest_block_number.to_be_bytes());
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
//! Storage sharded key
|
||||
use crate::{
|
||||
table::{Decode, Encode},
|
||||
table::{Decode, Encode, EncodeInto},
|
||||
DatabaseError,
|
||||
};
|
||||
use alloy_primitives::{Address, BlockNumber, B256};
|
||||
@@ -91,6 +91,20 @@ impl Decode for StorageShardedKey {
|
||||
}
|
||||
}
|
||||
|
||||
impl EncodeInto for StorageShardedKey {
|
||||
#[inline]
|
||||
fn encoded_len(&self) -> usize {
|
||||
60
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn encode_into(&self, buf: &mut [u8]) {
|
||||
buf[..20].copy_from_slice(self.address.as_slice());
|
||||
buf[20..52].copy_from_slice(self.sharded_key.key.as_slice());
|
||||
buf[52..60].copy_from_slice(&self.sharded_key.highest_block_number.to_be_bytes());
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -96,6 +96,53 @@ pub trait Decode: Send + Sync + Sized + Debug {
|
||||
}
|
||||
}
|
||||
|
||||
/// Trait for zero-copy encoding directly into a provided buffer.
|
||||
///
|
||||
/// This enables MDBX_RESERVE-style writes where we write directly into
|
||||
/// MDBX's memory-mapped pages, avoiding intermediate allocations.
|
||||
///
|
||||
/// For types where the in-memory representation equals the encoded bytes
|
||||
/// (e.g., `Address`, `B256`), implement this trait directly to avoid
|
||||
/// any intermediate copies. For other fixed-size types, use the blanket
|
||||
/// impl via [`EncodeIntoViaEncode`].
|
||||
pub trait EncodeInto: Send + Sync + Sized + Debug {
|
||||
/// Returns the exact number of bytes needed to encode this value.
|
||||
fn encoded_len(&self) -> usize;
|
||||
|
||||
/// Encodes `self` into the provided buffer.
|
||||
///
|
||||
/// # Panics
|
||||
/// May panic if `buf.len() < self.encoded_len()`.
|
||||
fn encode_into(&self, buf: &mut [u8]);
|
||||
}
|
||||
|
||||
mod sealed {
|
||||
pub trait Sealed {}
|
||||
}
|
||||
|
||||
/// Marker trait for types that should use the `Encode` + copy fallback for `EncodeInto`.
|
||||
///
|
||||
/// Implement this for types where cloning is acceptable or unavoidable.
|
||||
/// For hot-path types with identity encoding (where in-memory bytes equal encoded bytes),
|
||||
/// implement `EncodeInto` directly instead.
|
||||
pub trait EncodeIntoViaEncode: sealed::Sealed + Send + Sync + Sized + Debug {}
|
||||
|
||||
/// Blanket impl for types that opt into the `Encode` + copy fallback.
|
||||
impl<T, const N: usize> EncodeInto for T
|
||||
where
|
||||
T: Encode<Encoded = [u8; N]> + EncodeIntoViaEncode + Copy,
|
||||
{
|
||||
#[inline]
|
||||
fn encoded_len(&self) -> usize {
|
||||
N
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn encode_into(&self, buf: &mut [u8]) {
|
||||
buf[..N].copy_from_slice((*self).encode().as_ref());
|
||||
}
|
||||
}
|
||||
|
||||
/// Generic trait that enforces the database key to implement [`Encode`] and [`Decode`].
|
||||
pub trait Key: Encode + Decode + Ord + Clone + Serialize + for<'a> Deserialize<'a> {}
|
||||
|
||||
|
||||
@@ -24,7 +24,7 @@ use crate::{
|
||||
AccountBeforeTx, ClientVersion, CompactU256, IntegerList, ShardedKey,
|
||||
StoredBlockBodyIndices, StoredBlockWithdrawals,
|
||||
},
|
||||
table::{Decode, DupSort, Encode, Table, TableInfo},
|
||||
table::{Decode, DupSort, Encode, EncodeInto, Table, TableInfo},
|
||||
};
|
||||
use alloy_consensus::Header;
|
||||
use alloy_primitives::{Address, BlockHash, BlockNumber, TxHash, TxNumber, B256};
|
||||
@@ -566,6 +566,21 @@ impl Decode for ChainStateKey {
|
||||
}
|
||||
}
|
||||
|
||||
impl EncodeInto for ChainStateKey {
|
||||
#[inline]
|
||||
fn encoded_len(&self) -> usize {
|
||||
1
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn encode_into(&self, buf: &mut [u8]) {
|
||||
buf[0] = match self {
|
||||
Self::LastFinalizedBlock => 0,
|
||||
Self::LastSafeBlock => 1,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// Alias types.
|
||||
|
||||
/// List with transaction numbers.
|
||||
|
||||
@@ -74,19 +74,9 @@ where
|
||||
res.map_err(|e| DatabaseError::Read(e.into()))?.map(decoder::<T>).transpose()
|
||||
}
|
||||
|
||||
/// Some types don't support compression (eg. B256), and we don't want to be copying them to the
|
||||
/// allocated buffer when we can just use their reference.
|
||||
macro_rules! compress_to_buf_or_ref {
|
||||
($self:expr, $value:expr) => {
|
||||
if let Some(value) = $value.uncompressable_ref() {
|
||||
Some(value)
|
||||
} else {
|
||||
$self.buf.clear();
|
||||
$value.compress_to_buf(&mut $self.buf);
|
||||
None
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
impl<K: TransactionKind, T: Table> DbCursorRO<T> for Cursor<K, T> {
|
||||
fn first(&mut self) -> PairResult<T> {
|
||||
@@ -256,13 +246,46 @@ impl<T: Table> DbCursorRW<T> for Cursor<RW, T> {
|
||||
/// found, before calling `upsert`.
|
||||
fn upsert(&mut self, key: T::Key, value: &T::Value) -> Result<(), DatabaseError> {
|
||||
let key = key.encode();
|
||||
let value = compress_to_buf_or_ref!(self, value);
|
||||
|
||||
// Check if we can use zero-copy reserve path
|
||||
if let Some(value_ref) = value.uncompressable_ref() {
|
||||
let value_len = value_ref.len();
|
||||
return self.execute_with_operation_metric(
|
||||
Operation::CursorUpsert,
|
||||
Some(value_len),
|
||||
|this| {
|
||||
// SAFETY: We fill the reserved buffer immediately and don't use it after
|
||||
unsafe {
|
||||
this.inner
|
||||
.reserve(key.as_ref(), value_len, WriteFlags::UPSERT)
|
||||
.map(|reserved| {
|
||||
reserved.copy_from_slice(value_ref);
|
||||
})
|
||||
.map_err(|e| {
|
||||
DatabaseWriteError {
|
||||
info: e.into(),
|
||||
operation: DatabaseWriteOperation::CursorUpsert,
|
||||
table_name: T::NAME,
|
||||
key: key.into_vec(),
|
||||
}
|
||||
.into()
|
||||
})
|
||||
}
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
// Fallback: compress to buffer then put
|
||||
self.buf.clear();
|
||||
value.compress_to_buf(&mut self.buf);
|
||||
let value_len = self.buf.len();
|
||||
|
||||
self.execute_with_operation_metric(
|
||||
Operation::CursorUpsert,
|
||||
Some(value.unwrap_or(&self.buf).len()),
|
||||
Some(value_len),
|
||||
|this| {
|
||||
this.inner
|
||||
.put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::UPSERT)
|
||||
.put(key.as_ref(), &this.buf, WriteFlags::UPSERT)
|
||||
.map_err(|e| {
|
||||
DatabaseWriteError {
|
||||
info: e.into(),
|
||||
@@ -278,13 +301,43 @@ impl<T: Table> DbCursorRW<T> for Cursor<RW, T> {
|
||||
|
||||
fn insert(&mut self, key: T::Key, value: &T::Value) -> Result<(), DatabaseError> {
|
||||
let key = key.encode();
|
||||
let value = compress_to_buf_or_ref!(self, value);
|
||||
|
||||
if let Some(value_ref) = value.uncompressable_ref() {
|
||||
let value_len = value_ref.len();
|
||||
return self.execute_with_operation_metric(
|
||||
Operation::CursorInsert,
|
||||
Some(value_len),
|
||||
|this| {
|
||||
unsafe {
|
||||
this.inner
|
||||
.reserve(key.as_ref(), value_len, WriteFlags::NO_OVERWRITE)
|
||||
.map(|reserved| {
|
||||
reserved.copy_from_slice(value_ref);
|
||||
})
|
||||
.map_err(|e| {
|
||||
DatabaseWriteError {
|
||||
info: e.into(),
|
||||
operation: DatabaseWriteOperation::CursorInsert,
|
||||
table_name: T::NAME,
|
||||
key: key.into_vec(),
|
||||
}
|
||||
.into()
|
||||
})
|
||||
}
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
self.buf.clear();
|
||||
value.compress_to_buf(&mut self.buf);
|
||||
let value_len = self.buf.len();
|
||||
|
||||
self.execute_with_operation_metric(
|
||||
Operation::CursorInsert,
|
||||
Some(value.unwrap_or(&self.buf).len()),
|
||||
Some(value_len),
|
||||
|this| {
|
||||
this.inner
|
||||
.put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::NO_OVERWRITE)
|
||||
.put(key.as_ref(), &this.buf, WriteFlags::NO_OVERWRITE)
|
||||
.map_err(|e| {
|
||||
DatabaseWriteError {
|
||||
info: e.into(),
|
||||
@@ -302,13 +355,43 @@ impl<T: Table> DbCursorRW<T> for Cursor<RW, T> {
|
||||
/// will fail if the inserted key is less than the last table key
|
||||
fn append(&mut self, key: T::Key, value: &T::Value) -> Result<(), DatabaseError> {
|
||||
let key = key.encode();
|
||||
let value = compress_to_buf_or_ref!(self, value);
|
||||
|
||||
if let Some(value_ref) = value.uncompressable_ref() {
|
||||
let value_len = value_ref.len();
|
||||
return self.execute_with_operation_metric(
|
||||
Operation::CursorAppend,
|
||||
Some(value_len),
|
||||
|this| {
|
||||
unsafe {
|
||||
this.inner
|
||||
.reserve(key.as_ref(), value_len, WriteFlags::APPEND)
|
||||
.map(|reserved| {
|
||||
reserved.copy_from_slice(value_ref);
|
||||
})
|
||||
.map_err(|e| {
|
||||
DatabaseWriteError {
|
||||
info: e.into(),
|
||||
operation: DatabaseWriteOperation::CursorAppend,
|
||||
table_name: T::NAME,
|
||||
key: key.into_vec(),
|
||||
}
|
||||
.into()
|
||||
})
|
||||
}
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
self.buf.clear();
|
||||
value.compress_to_buf(&mut self.buf);
|
||||
let value_len = self.buf.len();
|
||||
|
||||
self.execute_with_operation_metric(
|
||||
Operation::CursorAppend,
|
||||
Some(value.unwrap_or(&self.buf).len()),
|
||||
Some(value_len),
|
||||
|this| {
|
||||
this.inner
|
||||
.put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::APPEND)
|
||||
.put(key.as_ref(), &this.buf, WriteFlags::APPEND)
|
||||
.map_err(|e| {
|
||||
DatabaseWriteError {
|
||||
info: e.into(),
|
||||
@@ -338,13 +421,43 @@ impl<T: DupSort> DbDupCursorRW<T> for Cursor<RW, T> {
|
||||
|
||||
fn append_dup(&mut self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> {
|
||||
let key = key.encode();
|
||||
let value = compress_to_buf_or_ref!(self, value);
|
||||
|
||||
if let Some(value_ref) = value.uncompressable_ref() {
|
||||
let value_len = value_ref.len();
|
||||
return self.execute_with_operation_metric(
|
||||
Operation::CursorAppendDup,
|
||||
Some(value_len),
|
||||
|this| {
|
||||
unsafe {
|
||||
this.inner
|
||||
.reserve(key.as_ref(), value_len, WriteFlags::APPEND_DUP)
|
||||
.map(|reserved| {
|
||||
reserved.copy_from_slice(value_ref);
|
||||
})
|
||||
.map_err(|e| {
|
||||
DatabaseWriteError {
|
||||
info: e.into(),
|
||||
operation: DatabaseWriteOperation::CursorAppendDup,
|
||||
table_name: T::NAME,
|
||||
key: key.into_vec(),
|
||||
}
|
||||
.into()
|
||||
})
|
||||
}
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
self.buf.clear();
|
||||
value.compress_to_buf(&mut self.buf);
|
||||
let value_len = self.buf.len();
|
||||
|
||||
self.execute_with_operation_metric(
|
||||
Operation::CursorAppendDup,
|
||||
Some(value.unwrap_or(&self.buf).len()),
|
||||
Some(value_len),
|
||||
|this| {
|
||||
this.inner
|
||||
.put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::APPEND_DUP)
|
||||
.put(key.as_ref(), &this.buf, WriteFlags::APPEND_DUP)
|
||||
.map_err(|e| {
|
||||
DatabaseWriteError {
|
||||
info: e.into(),
|
||||
@@ -368,7 +481,7 @@ mod tests {
|
||||
};
|
||||
use alloy_primitives::{address, Address, B256, U256};
|
||||
use reth_db_api::{
|
||||
cursor::{DbCursorRO, DbDupCursorRW},
|
||||
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
|
||||
models::{BlockNumberAddress, ClientVersion},
|
||||
table::TableImporter,
|
||||
transaction::{DbTx, DbTxMut},
|
||||
@@ -464,4 +577,546 @@ mod tests {
|
||||
assert_eq!(copied_value, expected_value);
|
||||
}
|
||||
}
|
||||
|
||||
/// Tests that the zero-copy reserve path works correctly for B256 values.
|
||||
/// B256 implements `uncompressable_ref()` returning Some, so it uses the reserve path.
|
||||
#[test]
|
||||
fn test_reserve_path_b256_roundtrip() {
|
||||
use crate::tables::CanonicalHeaders;
|
||||
|
||||
let db = create_test_db();
|
||||
let tx = db.tx_mut().unwrap();
|
||||
|
||||
// B256 values use the reserve path (uncompressable_ref returns Some)
|
||||
let test_data: Vec<(u64, B256)> = vec![
|
||||
(0, B256::ZERO),
|
||||
(1, B256::with_last_byte(1)),
|
||||
(100, B256::repeat_byte(0xab)),
|
||||
(u64::MAX, B256::repeat_byte(0xff)),
|
||||
];
|
||||
|
||||
// Write using upsert (uses reserve path for B256)
|
||||
{
|
||||
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
|
||||
for (key, value) in &test_data {
|
||||
cursor.upsert(*key, value).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
// Read back and verify
|
||||
{
|
||||
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
|
||||
for (expected_key, expected_value) in &test_data {
|
||||
let (key, value) = cursor.seek_exact(*expected_key).unwrap().unwrap();
|
||||
assert_eq!(key, *expected_key);
|
||||
assert_eq!(value, *expected_value);
|
||||
}
|
||||
}
|
||||
|
||||
tx.commit().unwrap();
|
||||
|
||||
// Verify again after commit
|
||||
let tx = db.tx().unwrap();
|
||||
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
|
||||
let all_entries: Vec<_> = cursor.walk(None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
|
||||
assert_eq!(all_entries.len(), test_data.len());
|
||||
for ((key, value), (expected_key, expected_value)) in all_entries.iter().zip(test_data.iter()) {
|
||||
assert_eq!(key, expected_key);
|
||||
assert_eq!(value, expected_value);
|
||||
}
|
||||
}
|
||||
|
||||
/// Tests insert operation with reserve path
|
||||
#[test]
|
||||
fn test_reserve_path_insert_roundtrip() {
|
||||
use crate::tables::CanonicalHeaders;
|
||||
|
||||
let db = create_test_db();
|
||||
let tx = db.tx_mut().unwrap();
|
||||
|
||||
let hash1 = B256::repeat_byte(0x11);
|
||||
let hash2 = B256::repeat_byte(0x22);
|
||||
|
||||
{
|
||||
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
|
||||
cursor.insert(1, &hash1).unwrap();
|
||||
cursor.insert(2, &hash2).unwrap();
|
||||
|
||||
// Insert with existing key should fail
|
||||
assert!(cursor.insert(1, &hash2).is_err());
|
||||
}
|
||||
|
||||
// Verify
|
||||
{
|
||||
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
|
||||
let (k1, v1) = cursor.seek_exact(1).unwrap().unwrap();
|
||||
assert_eq!(k1, 1);
|
||||
assert_eq!(v1, hash1);
|
||||
|
||||
let (k2, v2) = cursor.seek_exact(2).unwrap().unwrap();
|
||||
assert_eq!(k2, 2);
|
||||
assert_eq!(v2, hash2);
|
||||
}
|
||||
}
|
||||
|
||||
/// Tests append operation with reserve path
|
||||
#[test]
|
||||
fn test_reserve_path_append_roundtrip() {
|
||||
use crate::tables::CanonicalHeaders;
|
||||
|
||||
let db = create_test_db();
|
||||
let tx = db.tx_mut().unwrap();
|
||||
|
||||
let hashes: Vec<B256> = (0..10u8).map(|i| B256::repeat_byte(i)).collect();
|
||||
|
||||
{
|
||||
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
|
||||
for (i, hash) in hashes.iter().enumerate() {
|
||||
cursor.append(i as u64, hash).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
// Verify all entries
|
||||
{
|
||||
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
|
||||
let entries: Vec<_> = cursor.walk(None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
|
||||
assert_eq!(entries.len(), hashes.len());
|
||||
for (i, (key, value)) in entries.iter().enumerate() {
|
||||
assert_eq!(*key, i as u64);
|
||||
assert_eq!(*value, hashes[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Tests that DupSort append_dup works with reserve path
|
||||
#[test]
|
||||
fn test_reserve_path_append_dup_roundtrip() {
|
||||
let db = create_test_db();
|
||||
let tx = db.tx_mut().unwrap();
|
||||
|
||||
let addr = address!("0000000000000000000000000000000000000001");
|
||||
let entries = vec![
|
||||
StorageEntry { key: B256::with_last_byte(1), value: U256::from(100) },
|
||||
StorageEntry { key: B256::with_last_byte(2), value: U256::from(200) },
|
||||
StorageEntry { key: B256::with_last_byte(3), value: U256::from(300) },
|
||||
];
|
||||
|
||||
{
|
||||
let mut cursor = tx.cursor_dup_write::<StorageChangeSets>().unwrap();
|
||||
for entry in &entries {
|
||||
cursor.append_dup(BlockNumberAddress((1, addr)), *entry).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
// Verify
|
||||
{
|
||||
let mut cursor = tx.cursor_dup_read::<StorageChangeSets>().unwrap();
|
||||
let results: Vec<_> = cursor
|
||||
.walk_dup(Some(BlockNumberAddress((1, addr))), None)
|
||||
.unwrap()
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(results.len(), entries.len());
|
||||
for ((_, value), expected) in results.iter().zip(entries.iter()) {
|
||||
assert_eq!(value, expected);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Tests mixed operations to ensure reserve and non-reserve paths work together
|
||||
#[test]
|
||||
fn test_reserve_path_mixed_operations() {
|
||||
use crate::tables::CanonicalHeaders;
|
||||
|
||||
let db = create_test_db();
|
||||
|
||||
// First transaction: upsert
|
||||
{
|
||||
let tx = db.tx_mut().unwrap();
|
||||
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
|
||||
cursor.upsert(1, &B256::repeat_byte(0x11)).unwrap();
|
||||
cursor.upsert(2, &B256::repeat_byte(0x22)).unwrap();
|
||||
tx.commit().unwrap();
|
||||
}
|
||||
|
||||
// Second transaction: upsert to update existing
|
||||
{
|
||||
let tx = db.tx_mut().unwrap();
|
||||
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
|
||||
cursor.upsert(1, &B256::repeat_byte(0xaa)).unwrap(); // Update
|
||||
cursor.upsert(3, &B256::repeat_byte(0x33)).unwrap(); // New
|
||||
tx.commit().unwrap();
|
||||
}
|
||||
|
||||
// Verify final state
|
||||
{
|
||||
let tx = db.tx().unwrap();
|
||||
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
|
||||
|
||||
let (_, v1) = cursor.seek_exact(1).unwrap().unwrap();
|
||||
assert_eq!(v1, B256::repeat_byte(0xaa)); // Updated
|
||||
|
||||
let (_, v2) = cursor.seek_exact(2).unwrap().unwrap();
|
||||
assert_eq!(v2, B256::repeat_byte(0x22)); // Unchanged
|
||||
|
||||
let (_, v3) = cursor.seek_exact(3).unwrap().unwrap();
|
||||
assert_eq!(v3, B256::repeat_byte(0x33)); // New
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== ADVERSARIAL / EDGE CASE TESTS ====================
|
||||
|
||||
/// Test that transaction abort properly discards reserve-path writes
|
||||
#[test]
|
||||
fn test_reserve_path_transaction_abort() {
|
||||
use crate::tables::CanonicalHeaders;
|
||||
|
||||
let db = create_test_db();
|
||||
|
||||
// Write some initial data
|
||||
{
|
||||
let tx = db.tx_mut().unwrap();
|
||||
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
|
||||
cursor.upsert(1, &B256::repeat_byte(0x11)).unwrap();
|
||||
tx.commit().unwrap();
|
||||
}
|
||||
|
||||
// Start a transaction, write via reserve path, then DROP (abort)
|
||||
{
|
||||
let tx = db.tx_mut().unwrap();
|
||||
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
|
||||
cursor.upsert(1, &B256::repeat_byte(0xff)).unwrap(); // Overwrite
|
||||
cursor.upsert(2, &B256::repeat_byte(0x22)).unwrap(); // New
|
||||
// Drop tx without commit - should abort
|
||||
drop(cursor);
|
||||
drop(tx);
|
||||
}
|
||||
|
||||
// Verify original data is unchanged
|
||||
{
|
||||
let tx = db.tx().unwrap();
|
||||
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
|
||||
|
||||
let (_, v1) = cursor.seek_exact(1).unwrap().unwrap();
|
||||
assert_eq!(v1, B256::repeat_byte(0x11)); // Original, not 0xff
|
||||
|
||||
assert!(cursor.seek_exact(2).unwrap().is_none()); // Should not exist
|
||||
}
|
||||
}
|
||||
|
||||
/// Test rapid successive writes to same key via reserve path
|
||||
#[test]
|
||||
fn test_reserve_path_rapid_overwrites() {
|
||||
use crate::tables::CanonicalHeaders;
|
||||
|
||||
let db = create_test_db();
|
||||
let tx = db.tx_mut().unwrap();
|
||||
|
||||
{
|
||||
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
|
||||
|
||||
// Rapidly overwrite the same key many times
|
||||
for i in 0..100u8 {
|
||||
cursor.upsert(1, &B256::repeat_byte(i)).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
// Verify final value
|
||||
{
|
||||
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
|
||||
let (_, value) = cursor.seek_exact(1).unwrap().unwrap();
|
||||
assert_eq!(value, B256::repeat_byte(99)); // Last write wins
|
||||
}
|
||||
}
|
||||
|
||||
/// Test interleaved reserve-path and fallback-path writes
|
||||
#[test]
|
||||
fn test_reserve_path_interleaved_with_compressable() {
|
||||
use crate::tables::PlainAccountState;
|
||||
use reth_primitives_traits::Account;
|
||||
|
||||
let db = create_test_db();
|
||||
let tx = db.tx_mut().unwrap();
|
||||
|
||||
// Account uses Compact compression (fallback path)
|
||||
// This tests that both paths work correctly when interleaved
|
||||
let addr1 = Address::repeat_byte(0x11);
|
||||
let addr2 = Address::repeat_byte(0x22);
|
||||
let account1 = Account { nonce: 1, balance: U256::from(100), bytecode_hash: None };
|
||||
let account2 = Account { nonce: 2, balance: U256::from(200), bytecode_hash: Some(B256::repeat_byte(0xab)) };
|
||||
|
||||
{
|
||||
let mut cursor = tx.cursor_write::<PlainAccountState>().unwrap();
|
||||
cursor.upsert(addr1, &account1).unwrap();
|
||||
cursor.upsert(addr2, &account2).unwrap();
|
||||
}
|
||||
|
||||
// Verify
|
||||
{
|
||||
let mut cursor = tx.cursor_read::<PlainAccountState>().unwrap();
|
||||
let (_, a1) = cursor.seek_exact(addr1).unwrap().unwrap();
|
||||
assert_eq!(a1.nonce, 1);
|
||||
assert_eq!(a1.balance, U256::from(100));
|
||||
|
||||
let (_, a2) = cursor.seek_exact(addr2).unwrap().unwrap();
|
||||
assert_eq!(a2.nonce, 2);
|
||||
assert_eq!(a2.bytecode_hash, Some(B256::repeat_byte(0xab)));
|
||||
}
|
||||
}
|
||||
|
||||
/// Test cursor reuse after multiple reserve-path operations
|
||||
#[test]
|
||||
fn test_reserve_path_cursor_reuse() {
|
||||
use crate::tables::CanonicalHeaders;
|
||||
|
||||
let db = create_test_db();
|
||||
let tx = db.tx_mut().unwrap();
|
||||
|
||||
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
|
||||
|
||||
// Write, read, write, read pattern
|
||||
cursor.upsert(1, &B256::repeat_byte(0x11)).unwrap();
|
||||
|
||||
// Read back using same cursor
|
||||
let (k, v) = cursor.seek_exact(1).unwrap().unwrap();
|
||||
assert_eq!(k, 1);
|
||||
assert_eq!(v, B256::repeat_byte(0x11));
|
||||
|
||||
// Write again
|
||||
cursor.upsert(2, &B256::repeat_byte(0x22)).unwrap();
|
||||
|
||||
// Overwrite first
|
||||
cursor.upsert(1, &B256::repeat_byte(0xaa)).unwrap();
|
||||
|
||||
// Read all
|
||||
let entries: Vec<_> = cursor.walk(None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
|
||||
assert_eq!(entries.len(), 2);
|
||||
assert_eq!(entries[0], (1, B256::repeat_byte(0xaa)));
|
||||
assert_eq!(entries[1], (2, B256::repeat_byte(0x22)));
|
||||
}
|
||||
|
||||
/// Test boundary values for B256
|
||||
#[test]
|
||||
fn test_reserve_path_boundary_values() {
|
||||
use crate::tables::CanonicalHeaders;
|
||||
|
||||
let db = create_test_db();
|
||||
let tx = db.tx_mut().unwrap();
|
||||
|
||||
let boundary_values = vec![
|
||||
B256::ZERO,
|
||||
B256::repeat_byte(0xff), // All 1s
|
||||
B256::with_last_byte(1),
|
||||
B256::with_last_byte(0xff),
|
||||
// Pattern that might cause issues if bytes are misaligned
|
||||
B256::from_slice(&[
|
||||
0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07,
|
||||
0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f,
|
||||
0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17,
|
||||
0x18, 0x19, 0x1a, 0x1b, 0x1c, 0x1d, 0x1e, 0x1f,
|
||||
]),
|
||||
];
|
||||
|
||||
{
|
||||
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
|
||||
for (i, value) in boundary_values.iter().enumerate() {
|
||||
cursor.upsert(i as u64, value).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
// Verify each boundary value
|
||||
{
|
||||
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
|
||||
for (i, expected) in boundary_values.iter().enumerate() {
|
||||
let (_, value) = cursor.seek_exact(i as u64).unwrap().unwrap();
|
||||
assert_eq!(&value, expected, "Mismatch at index {i}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Test that delete after reserve-path write works correctly
|
||||
#[test]
|
||||
fn test_reserve_path_write_then_delete() {
|
||||
use crate::tables::CanonicalHeaders;
|
||||
|
||||
let db = create_test_db();
|
||||
|
||||
// Write via reserve path
|
||||
{
|
||||
let tx = db.tx_mut().unwrap();
|
||||
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
|
||||
cursor.upsert(1, &B256::repeat_byte(0x11)).unwrap();
|
||||
cursor.upsert(2, &B256::repeat_byte(0x22)).unwrap();
|
||||
cursor.upsert(3, &B256::repeat_byte(0x33)).unwrap();
|
||||
tx.commit().unwrap();
|
||||
}
|
||||
|
||||
// Delete middle entry
|
||||
{
|
||||
let tx = db.tx_mut().unwrap();
|
||||
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
|
||||
cursor.seek_exact(2).unwrap();
|
||||
cursor.delete_current().unwrap();
|
||||
tx.commit().unwrap();
|
||||
}
|
||||
|
||||
// Verify
|
||||
{
|
||||
let tx = db.tx().unwrap();
|
||||
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
|
||||
|
||||
assert!(cursor.seek_exact(1).unwrap().is_some());
|
||||
assert!(cursor.seek_exact(2).unwrap().is_none()); // Deleted
|
||||
assert!(cursor.seek_exact(3).unwrap().is_some());
|
||||
}
|
||||
}
|
||||
|
||||
/// Test large number of entries to stress the reserve path
|
||||
#[test]
|
||||
fn test_reserve_path_stress_many_entries() {
|
||||
use crate::tables::CanonicalHeaders;
|
||||
|
||||
let db = create_test_db();
|
||||
let num_entries = 1000u64;
|
||||
|
||||
// Write many entries
|
||||
{
|
||||
let tx = db.tx_mut().unwrap();
|
||||
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
|
||||
|
||||
for i in 0..num_entries {
|
||||
let hash = B256::from(U256::from(i));
|
||||
cursor.append(i, &hash).unwrap();
|
||||
}
|
||||
tx.commit().unwrap();
|
||||
}
|
||||
|
||||
// Verify all entries
|
||||
{
|
||||
let tx = db.tx().unwrap();
|
||||
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
|
||||
|
||||
let entries: Vec<_> = cursor.walk(None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
|
||||
assert_eq!(entries.len(), num_entries as usize);
|
||||
|
||||
for (i, (key, value)) in entries.iter().enumerate() {
|
||||
assert_eq!(*key, i as u64);
|
||||
assert_eq!(*value, B256::from(U256::from(i as u64)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Test concurrent-like access pattern (alternating read/write transactions)
|
||||
#[test]
|
||||
fn test_reserve_path_alternating_transactions() {
|
||||
use crate::tables::CanonicalHeaders;
|
||||
|
||||
let db = create_test_db();
|
||||
|
||||
for round in 0..10u8 {
|
||||
// Write transaction
|
||||
{
|
||||
let tx = db.tx_mut().unwrap();
|
||||
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
|
||||
cursor.upsert(round as u64, &B256::repeat_byte(round)).unwrap();
|
||||
tx.commit().unwrap();
|
||||
}
|
||||
|
||||
// Read transaction - verify all entries up to this round
|
||||
{
|
||||
let tx = db.tx().unwrap();
|
||||
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
|
||||
|
||||
for i in 0..=round {
|
||||
let (_, value) = cursor.seek_exact(i as u64).unwrap().unwrap();
|
||||
assert_eq!(value, B256::repeat_byte(i));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Test DupSort with many duplicates via reserve path
|
||||
#[test]
|
||||
fn test_reserve_path_dupsort_many_duplicates() {
|
||||
let db = create_test_db();
|
||||
let tx = db.tx_mut().unwrap();
|
||||
|
||||
let addr = address!("0000000000000000000000000000000000000001");
|
||||
let num_dups = 100;
|
||||
|
||||
// Write many duplicates for same key
|
||||
{
|
||||
let mut cursor = tx.cursor_dup_write::<StorageChangeSets>().unwrap();
|
||||
for i in 0..num_dups {
|
||||
let entry = StorageEntry {
|
||||
key: B256::from(U256::from(i)),
|
||||
value: U256::from(i * 100),
|
||||
};
|
||||
cursor.append_dup(BlockNumberAddress((1, addr)), entry).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
// Verify all duplicates
|
||||
{
|
||||
let mut cursor = tx.cursor_dup_read::<StorageChangeSets>().unwrap();
|
||||
let results: Vec<_> = cursor
|
||||
.walk_dup(Some(BlockNumberAddress((1, addr))), None)
|
||||
.unwrap()
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(results.len(), num_dups);
|
||||
for (i, (_, entry)) in results.iter().enumerate() {
|
||||
assert_eq!(entry.key, B256::from(U256::from(i as u64)));
|
||||
assert_eq!(entry.value, U256::from(i as u64 * 100));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Test that insert fails correctly with reserve path when key exists
|
||||
#[test]
|
||||
fn test_reserve_path_insert_duplicate_key_fails() {
|
||||
use crate::tables::CanonicalHeaders;
|
||||
|
||||
let db = create_test_db();
|
||||
let tx = db.tx_mut().unwrap();
|
||||
|
||||
{
|
||||
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
|
||||
|
||||
// First insert succeeds
|
||||
cursor.insert(1, &B256::repeat_byte(0x11)).unwrap();
|
||||
|
||||
// Second insert with same key should fail
|
||||
let result = cursor.insert(1, &B256::repeat_byte(0x22));
|
||||
assert!(result.is_err());
|
||||
|
||||
// Original value should be unchanged
|
||||
let (_, value) = cursor.seek_exact(1).unwrap().unwrap();
|
||||
assert_eq!(value, B256::repeat_byte(0x11));
|
||||
}
|
||||
}
|
||||
|
||||
/// Test append order enforcement with reserve path
|
||||
#[test]
|
||||
fn test_reserve_path_append_order_enforced() {
|
||||
use crate::tables::CanonicalHeaders;
|
||||
|
||||
let db = create_test_db();
|
||||
let tx = db.tx_mut().unwrap();
|
||||
|
||||
{
|
||||
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
|
||||
|
||||
// Append in order works
|
||||
cursor.append(1, &B256::repeat_byte(0x11)).unwrap();
|
||||
cursor.append(2, &B256::repeat_byte(0x22)).unwrap();
|
||||
cursor.append(3, &B256::repeat_byte(0x33)).unwrap();
|
||||
|
||||
// Append out of order should fail (key 2 < last key 3)
|
||||
let result = cursor.append(2, &B256::repeat_byte(0x44));
|
||||
assert!(result.is_err());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -447,6 +447,44 @@ impl Cursor<RW> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Reserves space for a key/data pair in the database and returns a mutable slice
|
||||
/// to the reserved space for the value. The caller must fill the entire buffer before
|
||||
/// the next database operation.
|
||||
///
|
||||
/// This enables zero-copy writes by allowing the caller to write directly into
|
||||
/// MDBX's memory-mapped pages.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// The caller must ensure that:
|
||||
/// - The returned buffer is completely filled before any other cursor/transaction operation
|
||||
/// - The buffer is not used after the cursor is moved or the transaction ends
|
||||
#[allow(clippy::mut_from_ref)]
|
||||
pub unsafe fn reserve(&mut self, key: &[u8], len: usize, flags: WriteFlags) -> Result<&mut [u8]> {
|
||||
let key_val: ffi::MDBX_val =
|
||||
ffi::MDBX_val { iov_len: key.len(), iov_base: key.as_ptr() as *mut c_void };
|
||||
let mut data_val: ffi::MDBX_val =
|
||||
ffi::MDBX_val { iov_len: len, iov_base: ptr::null_mut::<c_void>() };
|
||||
// SAFETY: We're calling mdbx_cursor_put with MDBX_RESERVE which returns a pointer
|
||||
// to reserved space. The FFI calls are unsafe by nature.
|
||||
unsafe {
|
||||
mdbx_result(
|
||||
self.txn.txn_execute(|_| {
|
||||
ffi::mdbx_cursor_put(
|
||||
self.cursor,
|
||||
&key_val,
|
||||
&mut data_val,
|
||||
flags.bits() | ffi::MDBX_RESERVE,
|
||||
)
|
||||
})?,
|
||||
)?;
|
||||
Ok(std::slice::from_raw_parts_mut(
|
||||
data_val.iov_base as *mut u8,
|
||||
data_val.iov_len,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
/// Deletes the current key/data pair.
|
||||
///
|
||||
/// ### Flags
|
||||
|
||||
Reference in New Issue
Block a user