Compare commits

..

1 Commits

Author SHA1 Message Date
Dan Cline
173bcf455a wip: vibed trie cache 2025-12-18 20:50:54 -05:00
11 changed files with 82 additions and 208 deletions

9
Cargo.lock generated
View File

@@ -3986,6 +3986,14 @@ dependencies = [
"windows-sys 0.60.2",
]
[[package]]
name = "fixed-cache"
version = "0.1.2"
source = "git+https://github.com/danipopes/fixed-cache#2324460d2b7c43f7665c37f21e6b3aab6298a1f0"
dependencies = [
"equivalent",
]
[[package]]
name = "fixed-hash"
version = "0.8.0"
@@ -11029,6 +11037,7 @@ dependencies = [
"assert_matches",
"auto_impl",
"codspeed-criterion-compat",
"fixed-cache",
"itertools 0.14.0",
"metrics",
"pretty_assertions",

View File

@@ -482,6 +482,7 @@ revm-interpreter = { version = "31.1.0", default-features = false }
revm-database-interface = { version = "8.0.5", default-features = false }
op-revm = { version = "14.1.0", default-features = false }
revm-inspectors = "0.33.2"
fixed-cache = { git = "https://github.com/danipopes/fixed-cache" }
# eth
alloy-chains = { version = "0.2.5", default-features = false }

View File

@@ -12936,125 +12936,6 @@ int mdbx_txn_renew(MDBX_txn *txn) {
return LOG_IFERR(rc);
}
int mdbx_txn_clone(const MDBX_txn *src, MDBX_txn **dest) {
if (unlikely(!dest))
return LOG_IFERR(MDBX_EINVAL);
*dest = nullptr;
int rc = check_txn(src, MDBX_TXN_BLOCKED);
if (unlikely(rc != MDBX_SUCCESS))
return LOG_IFERR(rc);
if (unlikely((src->flags & MDBX_TXN_RDONLY) == 0))
return LOG_IFERR(MDBX_EINVAL);
MDBX_env *const env = src->env;
rc = check_env(env, true);
if (unlikely(rc != MDBX_SUCCESS))
return LOG_IFERR(rc);
if (unlikely((env->flags & MDBX_NOSTICKYTHREADS) == 0))
return LOG_IFERR(MDBX_TXN_OVERLAPPING);
MDBX_txn *txn = nullptr;
const intptr_t bitmap_bytes =
#if MDBX_ENABLE_DBI_SPARSE
ceil_powerof2(env->max_dbi, CHAR_BIT * sizeof(txn->dbi_sparse[0])) / CHAR_BIT;
#else
0;
#endif /* MDBX_ENABLE_DBI_SPARSE */
STATIC_ASSERT(sizeof(txn->tw) > sizeof(txn->to));
const size_t base = sizeof(MDBX_txn) - sizeof(txn->tw) + sizeof(txn->to);
const size_t size = base + (size_t)bitmap_bytes + env->max_dbi * sizeof(txn->dbi_seqs[0]) +
env->max_dbi * (sizeof(txn->dbs[0]) + sizeof(txn->cursors[0]) + sizeof(txn->dbi_state[0]));
txn = osal_malloc(size);
if (unlikely(txn == nullptr))
return LOG_IFERR(MDBX_ENOMEM);
#if MDBX_DEBUG
memset(txn, 0xCD, size);
VALGRIND_MAKE_MEM_UNDEFINED(txn, size);
#endif /* MDBX_DEBUG */
MDBX_ANALYSIS_ASSUME(size > base);
memset(txn, 0, (MDBX_GOOFY_MSVC_STATIC_ANALYZER && base > size) ? size : base);
txn->dbs = ptr_disp(txn, base);
txn->cursors = ptr_disp(txn->dbs, env->max_dbi * sizeof(txn->dbs[0]));
#if MDBX_DEBUG
txn->cursors[FREE_DBI] = nullptr; /* avoid SIGSEGV in an assertion later */
#endif
txn->dbi_state = ptr_disp(txn, size - env->max_dbi * sizeof(txn->dbi_state[0]));
txn->dbi_seqs = ptr_disp(txn->cursors, env->max_dbi * sizeof(txn->cursors[0]));
#if MDBX_ENABLE_DBI_SPARSE
txn->dbi_sparse = ptr_disp(txn->dbi_state, -bitmap_bytes);
#endif /* MDBX_ENABLE_DBI_SPARSE */
txn->env = env;
txn->flags = src->flags & ~txn_state_flags;
txn->parent = nullptr;
txn->nested = nullptr;
txn->txnid = src->txnid;
txn->front_txnid = src->front_txnid;
txn->geo = src->geo;
txn->canary = src->canary;
txn->owner = (env->flags & MDBX_NOSTICKYTHREADS) ? 0 : osal_thread_self();
if (unlikely(src->n_dbi > env->max_dbi)) {
rc = MDBX_CORRUPTED;
goto bailout;
}
txn->n_dbi = src->n_dbi;
memset(txn->cursors, 0, env->max_dbi * sizeof(txn->cursors[0]));
memset(txn->dbi_state, 0, env->max_dbi * sizeof(txn->dbi_state[0]));
memset(txn->dbi_seqs, 0, env->max_dbi * sizeof(txn->dbi_seqs[0]));
#if MDBX_ENABLE_DBI_SPARSE
if (bitmap_bytes)
memset(txn->dbi_sparse, 0, bitmap_bytes);
#endif /* MDBX_ENABLE_DBI_SPARSE */
memcpy(txn->dbs, src->dbs, txn->n_dbi * sizeof(txn->dbs[0]));
memcpy(txn->dbi_state, src->dbi_state, txn->n_dbi * sizeof(txn->dbi_state[0]));
memcpy(txn->dbi_seqs, src->dbi_seqs, txn->n_dbi * sizeof(txn->dbi_seqs[0]));
#if MDBX_ENABLE_DBI_SPARSE
if (bitmap_bytes)
memcpy(txn->dbi_sparse, src->dbi_sparse, bitmap_bytes);
#endif /* MDBX_ENABLE_DBI_SPARSE */
txn->to.reader = nullptr;
if (env->lck_mmap.lck) {
bsr_t brs = mvcc_bind_slot(env);
if (unlikely(brs.err != MDBX_SUCCESS)) {
rc = brs.err;
goto bailout;
}
txn->to.reader = brs.rslot;
safe64_reset(&txn->to.reader->txnid, true);
if (src->to.reader) {
atomic_store32(&txn->to.reader->snapshot_pages_used,
atomic_load32(&src->to.reader->snapshot_pages_used, mo_Relaxed), mo_Relaxed);
atomic_store64(&txn->to.reader->snapshot_pages_retired,
atomic_load64(&src->to.reader->snapshot_pages_retired, mo_Relaxed), mo_Relaxed);
} else {
atomic_store32(&txn->to.reader->snapshot_pages_used, src->geo.first_unallocated, mo_Relaxed);
atomic_store64(&txn->to.reader->snapshot_pages_retired, 0, mo_Relaxed);
}
safe64_write(&txn->to.reader->txnid, src->txnid);
atomic_store32(&env->lck->rdt_refresh_flag, true, mo_AcquireRelease);
}
txn->signature = txn_signature;
txn->userctx = nullptr;
*dest = txn;
DEBUG("clone txn %" PRIaTXN "r %p from %p on env %p", txn->txnid, (void *)txn, (void *)src, (void *)env);
return MDBX_SUCCESS;
bailout:
osal_free(txn);
return LOG_IFERR(rc);
}
int mdbx_txn_set_userctx(MDBX_txn *txn, void *ctx) {
int rc = check_txn(txn, MDBX_TXN_FINISHED);
if (unlikely(rc != MDBX_SUCCESS))

View File

@@ -3882,35 +3882,6 @@ MDBX_NOTHROW_PURE_FUNCTION LIBMDBX_API void *mdbx_env_get_userctx(const MDBX_env
LIBMDBX_API int mdbx_txn_begin_ex(MDBX_env *env, MDBX_txn *parent, MDBX_txn_flags_t flags, MDBX_txn **txn,
void *context);
/** \brief Clone a read-only transaction snapshot.
* \ingroup c_transactions
*
* Creates a new read-only transaction that uses the same MVCC snapshot as
* the \p src transaction. This allows parallel read operations across threads
* without re-opening a read transaction and re-fetching state.
*
* \note This function requires \ref MDBX_NOSTICKYTHREADS (aka MDBX_NOTLS)
* to be enabled for the environment. Otherwise it will return
* \ref MDBX_TXN_OVERLAPPING.
*
* \note The \p src transaction must be an active read-only transaction.
*
* \note The \p src transaction and the cloned transaction must not be used
* concurrently from multiple threads. Each transaction and its cursors must
* be confined to a single thread at a time.
*
* \param [in] src A read-only transaction handle returned by
* \ref mdbx_txn_begin_ex() or \ref mdbx_txn_begin().
* \param [out] dest Address where the cloned \ref MDBX_txn handle will be
* stored. Must not be NULL.
*
* \returns A non-zero error value on failure and 0 on success.
* \retval MDBX_EINVAL Invalid arguments or \p src is not read-only.
* \retval MDBX_TXN_OVERLAPPING \ref MDBX_NOSTICKYTHREADS is not enabled.
* \retval MDBX_READERS_FULL Reader lock table is full.
* \retval MDBX_ENOMEM Out of memory. */
LIBMDBX_API int mdbx_txn_clone(const MDBX_txn *src, MDBX_txn **dest);
/** \brief Create a transaction for use with the environment.
* \ingroup c_transactions
*

View File

@@ -483,20 +483,6 @@ impl Transaction<RW> {
}
impl Transaction<RO> {
/// Clones this read-only transaction, preserving the same MVCC snapshot.
///
/// This requires the environment to be opened with `MDBX_NOSTICKYTHREADS` (aka `MDBX_NOTLS`).
/// The cloned transaction must not be used concurrently with this transaction from multiple
/// threads.
pub fn clone_snapshot(&self) -> Result<Self> {
let cloned = self.txn_execute(|txn| {
let mut cloned: *mut ffi::MDBX_txn = ptr::null_mut();
mdbx_result(unsafe { ffi::mdbx_txn_clone(txn, &mut cloned) }).map(|_| cloned)
})??;
Ok(Self::new_from_ptr(self.env().clone(), cloned))
}
/// Closes the database handle.
///
/// # Safety

View File

@@ -373,35 +373,3 @@ fn test_stat_dupsort() {
assert_eq!(stat.entries(), 8);
}
}
#[test]
fn test_txn_clone_snapshot() {
let dir = tempdir().unwrap();
let env = Environment::builder().open(dir.path()).unwrap();
{
let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap();
txn.put(db.dbi(), b"k", b"v1", WriteFlags::empty()).unwrap();
txn.commit().unwrap();
}
let ro = env.begin_ro_txn().unwrap();
let clone = ro.clone_snapshot().unwrap();
{
let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap();
txn.put(db.dbi(), b"k", b"v2", WriteFlags::empty()).unwrap();
txn.commit().unwrap();
}
let db = ro.open_db(None).unwrap();
assert_eq!(ro.get::<[u8; 2]>(db.dbi(), b"k").unwrap(), Some(*b"v1"));
let db = clone.open_db(None).unwrap();
assert_eq!(clone.get::<[u8; 2]>(db.dbi(), b"k").unwrap(), Some(*b"v1"));
let ro2 = env.begin_ro_txn().unwrap();
let db = ro2.open_db(None).unwrap();
assert_eq!(ro2.get::<[u8; 2]>(db.dbi(), b"k").unwrap(), Some(*b"v2"));
}

View File

@@ -4,7 +4,7 @@ use alloy_primitives::{
map::{Entry, HashMap},
B256,
};
use alloy_rlp::Decodable;
use alloy_rlp::{Decodable, Encodable};
use alloy_trie::{BranchNodeCompact, TrieMask, EMPTY_ROOT_HASH};
use reth_execution_errors::{SparseTrieErrorKind, SparseTrieResult};
use reth_trie_common::{
@@ -14,8 +14,8 @@ use reth_trie_common::{
};
use reth_trie_sparse::{
provider::{RevealedNode, TrieNodeProvider},
LeafLookup, LeafLookupError, RlpNodeStackItem, SparseNode, SparseNodeType, SparseTrieInterface,
SparseTrieUpdates,
rlp_node_from_rlp_cached, LeafLookup, LeafLookupError, RlpNodeStackItem, SparseNode,
SparseNodeType, SparseTrieInterface, SparseTrieUpdates,
};
use smallvec::SmallVec;
use std::cmp::{Ord, Ordering, PartialOrd};
@@ -2206,7 +2206,8 @@ impl SparseSubtrieInner {
// Encode the leaf node and update its hash
let value = self.values.get(&path).unwrap();
self.buffers.rlp_buf.clear();
let rlp_node = LeafNodeRef { key, value }.rlp(&mut self.buffers.rlp_buf);
LeafNodeRef { key, value }.encode(&mut self.buffers.rlp_buf);
let rlp_node = rlp_node_from_rlp_cached(&self.buffers.rlp_buf);
*hash = rlp_node.as_hash();
(rlp_node, SparseNodeType::Leaf)
}
@@ -2229,8 +2230,8 @@ impl SparseSubtrieInner {
let RlpNodeStackItem { path: _, rlp_node: child, node_type: child_node_type } =
self.buffers.rlp_node_stack.pop().unwrap();
self.buffers.rlp_buf.clear();
let rlp_node =
ExtensionNodeRef::new(key, &child).rlp(&mut self.buffers.rlp_buf);
ExtensionNodeRef::new(key, &child).encode(&mut self.buffers.rlp_buf);
let rlp_node = rlp_node_from_rlp_cached(&self.buffers.rlp_buf);
*hash = rlp_node.as_hash();
let store_in_db_trie_value = child_node_type.store_in_db_trie();
@@ -2387,7 +2388,8 @@ impl SparseSubtrieInner {
self.buffers.rlp_buf.clear();
let branch_node_ref =
BranchNodeRef::new(&self.buffers.branch_value_stack_buf, *state_mask);
let rlp_node = branch_node_ref.rlp(&mut self.buffers.rlp_buf);
branch_node_ref.encode(&mut self.buffers.rlp_buf);
let rlp_node = rlp_node_from_rlp_cached(&self.buffers.rlp_buf);
*hash = rlp_node.as_hash();
// Save a branch node update only if it's not a root node, and we need to

View File

@@ -27,6 +27,7 @@ alloy-rlp.workspace = true
auto_impl.workspace = true
rayon = { workspace = true, optional = true }
smallvec = { workspace = true, features = ["const_new"] }
fixed-cache = { workspace = true, optional = true }
# metrics
reth-metrics = { workspace = true, optional = true }
@@ -59,6 +60,7 @@ std = [
"alloy-primitives/std",
"alloy-rlp/std",
"alloy-trie/std",
"dep:fixed-cache",
"reth-execution-errors/std",
"reth-primitives-traits/std",
"reth-storage-api/std",

View File

@@ -0,0 +1,47 @@
use alloy_primitives::{keccak256, B256};
use reth_trie_common::RlpNode;
#[cfg(feature = "std")]
use fixed_cache::Cache;
#[cfg(feature = "std")]
use std::sync::OnceLock;
// ~1M entries targets ~128 MiB total usage (key vecs + hashes + bucket overhead) while providing
// good reuse for repeated trie node RLPs during sparse trie hashing.
const TRIE_NODE_HASH_CACHE_SIZE: usize = 1 << 20;
#[cfg(feature = "std")]
fn trie_node_hash_cache() -> &'static Cache<Vec<u8>, B256> {
static CACHE: OnceLock<Cache<Vec<u8>, B256>> = OnceLock::new();
CACHE.get_or_init(|| Cache::new(TRIE_NODE_HASH_CACHE_SIZE, Default::default()))
}
/// Hashes an RLP-encoded trie node with a fixed-size cache.
pub fn hash_trie_node_cached(rlp: &[u8]) -> B256 {
#[cfg(feature = "std")]
{
let cache = trie_node_hash_cache();
if let Some(hash) = cache.get(rlp) {
return hash;
}
let hash = keccak256(rlp);
cache.insert(rlp.to_vec(), hash);
return hash;
}
#[cfg(not(feature = "std"))]
{
keccak256(rlp)
}
}
/// Returns `rlp(node)` for short encodings or `rlp(keccak(rlp(node)))` using the cache.
pub fn rlp_node_from_rlp_cached(rlp: &[u8]) -> RlpNode {
if rlp.len() < 32 {
RlpNode::from_raw(rlp).expect("RLP node length already checked")
} else {
let hash = hash_trie_node_cached(rlp);
RlpNode::word_rlp(&hash)
}
}

View File

@@ -16,6 +16,9 @@ pub use traits::*;
pub mod provider;
mod hash_cache;
pub use hash_cache::{hash_trie_node_cached, rlp_node_from_rlp_cached};
#[cfg(feature = "metrics")]
mod metrics;

View File

@@ -1,6 +1,7 @@
use crate::{
hash_trie_node_cached,
provider::{RevealedNode, TrieNodeProvider},
LeafLookup, LeafLookupError, SparseTrieInterface, SparseTrieUpdates,
rlp_node_from_rlp_cached, LeafLookup, LeafLookupError, SparseTrieInterface, SparseTrieUpdates,
};
use alloc::{
borrow::Cow,
@@ -11,11 +12,11 @@ use alloc::{
vec::Vec,
};
use alloy_primitives::{
hex, keccak256,
hex,
map::{Entry, HashMap, HashSet},
B256,
};
use alloy_rlp::Decodable;
use alloy_rlp::{Decodable, Encodable};
use reth_execution_errors::{SparseTrieErrorKind, SparseTrieResult};
use reth_trie_common::{
prefix_set::{PrefixSet, PrefixSetMut},
@@ -928,7 +929,7 @@ impl SparseTrieInterface for SerialSparseTrie {
if let Some(root_hash) = rlp_node.as_hash() {
root_hash
} else {
keccak256(rlp_node)
hash_trie_node_cached(rlp_node.as_ref())
}
}
@@ -1532,7 +1533,8 @@ impl SerialSparseTrie {
} else {
let value = self.values.get(&path).unwrap();
rlp_buf.clear();
let rlp_node = LeafNodeRef { key, value }.rlp(rlp_buf);
LeafNodeRef { key, value }.encode(rlp_buf);
let rlp_node = rlp_node_from_rlp_cached(rlp_buf);
*hash = rlp_node.as_hash();
(rlp_node, SparseNodeType::Leaf)
}
@@ -1554,7 +1556,8 @@ impl SerialSparseTrie {
node_type: child_node_type,
} = buffers.rlp_node_stack.pop().unwrap();
rlp_buf.clear();
let rlp_node = ExtensionNodeRef::new(key, &child).rlp(rlp_buf);
ExtensionNodeRef::new(key, &child).encode(rlp_buf);
let rlp_node = rlp_node_from_rlp_cached(rlp_buf);
*hash = rlp_node.as_hash();
let store_in_db_trie_value = child_node_type.store_in_db_trie();
@@ -1708,7 +1711,8 @@ impl SerialSparseTrie {
rlp_buf.clear();
let branch_node_ref =
BranchNodeRef::new(&buffers.branch_value_stack_buf, *state_mask);
let rlp_node = branch_node_ref.rlp(rlp_buf);
branch_node_ref.encode(rlp_buf);
let rlp_node = rlp_node_from_rlp_cached(rlp_buf);
*hash = rlp_node.as_hash();
// Save a branch node update only if it's not a root node, and we need to