diff --git a/crates/storage/provider/src/trie/mod.rs b/crates/storage/provider/src/trie/mod.rs index 8965d6da7d..e0128245bc 100644 --- a/crates/storage/provider/src/trie/mod.rs +++ b/crates/storage/provider/src/trie/mod.rs @@ -1,11 +1,12 @@ use cita_trie::{PatriciaTrie, Trie}; use hasher::HasherKeccak; +use parking_lot::Mutex; use reth_codecs::Compact; use reth_db::{ cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO}, models::{AccountBeforeTx, TransitionIdAddress}, tables, - transaction::{DbTx, DbTxMut}, + transaction::{DbTx, DbTxMut, DbTxMutGAT}, }; use reth_primitives::{ keccak256, proofs::EMPTY_ROOT, Account, Address, ProofCheckpoint, StorageEntry, @@ -45,9 +46,12 @@ pub enum TrieError { UnexpectedCheckpoint, } +type AccountsTrieCursor<'tx, TX> = + Arc>::CursorMut>>; + /// Database wrapper implementing HashDB trait, with a read-write transaction. -pub struct HashDatabaseMut<'tx, TX> { - tx: &'tx TX, +pub struct HashDatabaseMut<'tx, TX: DbTxMutGAT<'tx>> { + accounts_trie_cursor: AccountsTrieCursor<'tx, TX>, } impl<'tx, 'db, TX> cita_trie::DB for HashDatabaseMut<'tx, TX> @@ -57,7 +61,7 @@ where type Error = TrieError; fn get(&self, key: &[u8]) -> Result>, Self::Error> { - Ok(self.tx.get::(H256::from_slice(key))?) + Ok(self.accounts_trie_cursor.lock().seek_exact(H256::from_slice(key))?.map(|(_, v)| v)) } fn contains(&self, key: &[u8]) -> Result { @@ -70,7 +74,7 @@ where // Insert a batch of data into the cache. fn insert_batch(&self, keys: Vec>, values: Vec>) -> Result<(), Self::Error> { - let mut cursor = self.tx.cursor_write::()?; + let mut cursor = self.accounts_trie_cursor.lock(); for (key, value) in keys.into_iter().zip(values.into_iter()) { cursor.upsert(H256::from_slice(key.as_slice()), value)?; } @@ -78,7 +82,7 @@ where } fn remove_batch(&self, keys: &[Vec]) -> Result<(), Self::Error> { - let mut cursor = self.tx.cursor_write::()?; + let mut cursor = self.accounts_trie_cursor.lock(); for key in keys { if cursor.seek_exact(H256::from_slice(key.as_slice()))?.is_some() { cursor.delete_current()?; @@ -102,26 +106,35 @@ where { /// Instantiates a new Database for the accounts trie, with an empty root pub fn new(tx: &'tx TX) -> Result { + let mut accounts_trie_cursor = tx.cursor_write::()?; + let root = EMPTY_ROOT; - if tx.get::(root)?.is_none() { - tx.put::(root, [EMPTY_STRING_CODE].to_vec())?; + if accounts_trie_cursor.seek_exact(root)?.is_none() { + accounts_trie_cursor.upsert(root, [EMPTY_STRING_CODE].to_vec())?; } - Ok(Self { tx }) + + Ok(Self { accounts_trie_cursor: Arc::new(Mutex::new(accounts_trie_cursor)) }) } /// Instantiates a new Database for the accounts trie, with an existing root pub fn from_root(tx: &'tx TX, root: H256) -> Result { + let mut accounts_trie_cursor = tx.cursor_write::()?; + if root == EMPTY_ROOT { return Self::new(tx) } - tx.get::(root)?.ok_or(TrieError::MissingAccountRoot(root))?; - Ok(Self { tx }) + accounts_trie_cursor.seek_exact(root)?.ok_or(TrieError::MissingAccountRoot(root))?; + + Ok(Self { accounts_trie_cursor: Arc::new(Mutex::new(accounts_trie_cursor)) }) } } +type StoragesTrieCursor<'tx, TX> = + Arc>::DupCursorMut>>; + /// Database wrapper implementing HashDB trait, with a read-write transaction. -pub struct DupHashDatabaseMut<'tx, TX> { - tx: &'tx TX, +pub struct DupHashDatabaseMut<'tx, TX: DbTxMutGAT<'tx>> { + storages_trie_cursor: StoragesTrieCursor<'tx, TX>, key: H256, } @@ -132,9 +145,10 @@ where type Error = TrieError; fn get(&self, key: &[u8]) -> Result>, Self::Error> { - let mut cursor = self.tx.cursor_dup_read::()?; let subkey = H256::from_slice(key); - Ok(cursor + Ok(self + .storages_trie_cursor + .lock() .seek_by_key_subkey(self.key, subkey)? .filter(|entry| entry.hash == subkey) .map(|entry| entry.node)) @@ -150,7 +164,7 @@ where /// Insert a batch of data into the cache. fn insert_batch(&self, keys: Vec>, values: Vec>) -> Result<(), Self::Error> { - let mut cursor = self.tx.cursor_dup_write::()?; + let mut cursor = self.storages_trie_cursor.lock(); for (key, node) in keys.into_iter().zip(values.into_iter()) { let hash = H256::from_slice(key.as_slice()); if cursor.seek_by_key_subkey(self.key, hash)?.filter(|e| e.hash == hash).is_some() { @@ -162,7 +176,7 @@ where } fn remove_batch(&self, keys: &[Vec]) -> Result<(), Self::Error> { - let mut cursor = self.tx.cursor_dup_write::()?; + let mut cursor = self.storages_trie_cursor.lock(); for key in keys { let hash = H256::from_slice(key.as_slice()); if cursor.seek_by_key_subkey(self.key, hash)?.filter(|e| e.hash == hash).is_some() { @@ -186,28 +200,38 @@ where TX: DbTxMut<'db> + DbTx<'db> + Send + Sync, { /// Instantiates a new Database for the storage trie, with an empty root - pub fn new(tx: &'tx TX, key: H256) -> Result { + pub fn new( + storages_trie_cursor: StoragesTrieCursor<'tx, TX>, + key: H256, + ) -> Result { let root = EMPTY_ROOT; - let mut cursor = tx.cursor_dup_write::()?; - if cursor.seek_by_key_subkey(key, root)?.filter(|entry| entry.hash == root).is_none() { - tx.put::( - key, - StorageTrieEntry { hash: root, node: [EMPTY_STRING_CODE].to_vec() }, - )?; + { + let mut cursor = storages_trie_cursor.lock(); + if cursor.seek_by_key_subkey(key, root)?.filter(|entry| entry.hash == root).is_none() { + cursor.upsert( + key, + StorageTrieEntry { hash: root, node: [EMPTY_STRING_CODE].to_vec() }, + )?; + } } - Ok(Self { tx, key }) + Ok(Self { storages_trie_cursor, key }) } /// Instantiates a new Database for the storage trie, with an existing root - pub fn from_root(tx: &'tx TX, key: H256, root: H256) -> Result { + pub fn from_root( + storages_trie_cursor: StoragesTrieCursor<'tx, TX>, + key: H256, + root: H256, + ) -> Result { if root == EMPTY_ROOT { - return Self::new(tx, key) + return Self::new(storages_trie_cursor, key) } - tx.cursor_dup_read::()? + storages_trie_cursor + .lock() .seek_by_key_subkey(key, root)? .filter(|entry| entry.hash == root) .ok_or(TrieError::MissingStorageRoot(root))?; - Ok(Self { tx, key }) + Ok(Self { storages_trie_cursor, key }) } } @@ -422,11 +446,14 @@ where }; let mut accounts_cursor = self.tx.cursor_read::()?; + let storage_trie_cursor = + Arc::new(Mutex::new(self.tx.cursor_dup_write::()?)); let mut walker = accounts_cursor.walk(checkpoint.hashed_address.take())?; while let Some((hashed_address, account)) = walker.next().transpose()? { match self.calculate_storage_root( hashed_address, + storage_trie_cursor.clone(), checkpoint.storage_key.take(), checkpoint.storage_root.take(), )? { @@ -464,6 +491,7 @@ where fn calculate_storage_root( &mut self, address: H256, + storage_trie_cursor: StoragesTrieCursor<'tx, TX>, next_storage: Option, previous_root: Option, ) -> Result { @@ -475,7 +503,7 @@ where storage_cursor.seek_by_key_subkey(address, entry)?.filter(|e| e.key == entry), PatriciaTrie::from( Arc::new(DupHashDatabaseMut::from_root( - self.tx, + storage_trie_cursor, address, previous_root.expect("is some"), )?), @@ -486,7 +514,10 @@ where } else { ( storage_cursor.seek_by_key_subkey(address, H256::zero())?, - PatriciaTrie::new(Arc::new(DupHashDatabaseMut::new(self.tx, address)?), hasher), + PatriciaTrie::new( + Arc::new(DupHashDatabaseMut::new(storage_trie_cursor, address)?), + hasher, + ), ) }; @@ -542,6 +573,8 @@ where )?; let mut accounts_cursor = self.tx.cursor_read::()?; + let storage_trie_cursor = + Arc::new(Mutex::new(self.tx.cursor_dup_write::()?)); for (hashed_address, changed_storages) in changed_accounts { let res = if let Some(account) = trie.get(hashed_address.as_slice())? { @@ -551,12 +584,14 @@ where self.update_storage_root( checkpoint.storage_root.take().unwrap_or(storage_root), hashed_address, + storage_trie_cursor.clone(), changed_storages, checkpoint.storage_key.take(), )? } else { self.calculate_storage_root( hashed_address, + storage_trie_cursor.clone(), checkpoint.storage_key.take(), checkpoint.storage_root.take(), )? @@ -602,12 +637,13 @@ where &mut self, previous_root: H256, address: H256, + storage_trie_cursor: StoragesTrieCursor<'tx, TX>, changed_storages: BTreeSet, next_storage: Option, ) -> Result { let mut hashed_storage_cursor = self.tx.cursor_dup_read::()?; let mut trie = PatriciaTrie::new( - Arc::new(DupHashDatabaseMut::from_root(self.tx, address, previous_root)?), + Arc::new(DupHashDatabaseMut::from_root(storage_trie_cursor, address, previous_root)?), Arc::new(HasherKeccak::new()), ); @@ -955,8 +991,10 @@ mod tests { (k, out) }); let expected = H256(sec_trie_root::(encoded_storage).0); + let storage_trie_cursor = + Arc::new(Mutex::new(trie.tx.cursor_dup_write::().unwrap())); assert_matches!( - trie.calculate_storage_root(hashed_address, None, None), + trie.calculate_storage_root(hashed_address, storage_trie_cursor, None, None), Ok(got) if got.root().unwrap() == expected ); }