perf: create only one trie cursor per trie type per merkle execute (#1855)

This commit is contained in:
joshieDo
2023-03-20 23:33:58 +08:00
committed by GitHub
parent a7a469302e
commit a015744138

View File

@@ -1,11 +1,12 @@
use cita_trie::{PatriciaTrie, Trie};
use hasher::HasherKeccak;
use parking_lot::Mutex;
use reth_codecs::Compact;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
models::{AccountBeforeTx, TransitionIdAddress},
tables,
transaction::{DbTx, DbTxMut},
transaction::{DbTx, DbTxMut, DbTxMutGAT},
};
use reth_primitives::{
keccak256, proofs::EMPTY_ROOT, Account, Address, ProofCheckpoint, StorageEntry,
@@ -45,9 +46,12 @@ pub enum TrieError {
UnexpectedCheckpoint,
}
type AccountsTrieCursor<'tx, TX> =
Arc<Mutex<<TX as DbTxMutGAT<'tx>>::CursorMut<tables::AccountsTrie>>>;
/// Database wrapper implementing HashDB trait, with a read-write transaction.
pub struct HashDatabaseMut<'tx, TX> {
tx: &'tx TX,
pub struct HashDatabaseMut<'tx, TX: DbTxMutGAT<'tx>> {
accounts_trie_cursor: AccountsTrieCursor<'tx, TX>,
}
impl<'tx, 'db, TX> cita_trie::DB for HashDatabaseMut<'tx, TX>
@@ -57,7 +61,7 @@ where
type Error = TrieError;
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
Ok(self.tx.get::<tables::AccountsTrie>(H256::from_slice(key))?)
Ok(self.accounts_trie_cursor.lock().seek_exact(H256::from_slice(key))?.map(|(_, v)| v))
}
fn contains(&self, key: &[u8]) -> Result<bool, Self::Error> {
@@ -70,7 +74,7 @@ where
// Insert a batch of data into the cache.
fn insert_batch(&self, keys: Vec<Vec<u8>>, values: Vec<Vec<u8>>) -> Result<(), Self::Error> {
let mut cursor = self.tx.cursor_write::<tables::AccountsTrie>()?;
let mut cursor = self.accounts_trie_cursor.lock();
for (key, value) in keys.into_iter().zip(values.into_iter()) {
cursor.upsert(H256::from_slice(key.as_slice()), value)?;
}
@@ -78,7 +82,7 @@ where
}
fn remove_batch(&self, keys: &[Vec<u8>]) -> Result<(), Self::Error> {
let mut cursor = self.tx.cursor_write::<tables::AccountsTrie>()?;
let mut cursor = self.accounts_trie_cursor.lock();
for key in keys {
if cursor.seek_exact(H256::from_slice(key.as_slice()))?.is_some() {
cursor.delete_current()?;
@@ -102,26 +106,35 @@ where
{
/// Instantiates a new Database for the accounts trie, with an empty root
pub fn new(tx: &'tx TX) -> Result<Self, TrieError> {
let mut accounts_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
let root = EMPTY_ROOT;
if tx.get::<tables::AccountsTrie>(root)?.is_none() {
tx.put::<tables::AccountsTrie>(root, [EMPTY_STRING_CODE].to_vec())?;
if accounts_trie_cursor.seek_exact(root)?.is_none() {
accounts_trie_cursor.upsert(root, [EMPTY_STRING_CODE].to_vec())?;
}
Ok(Self { tx })
Ok(Self { accounts_trie_cursor: Arc::new(Mutex::new(accounts_trie_cursor)) })
}
/// Instantiates a new Database for the accounts trie, with an existing root
pub fn from_root(tx: &'tx TX, root: H256) -> Result<Self, TrieError> {
let mut accounts_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
if root == EMPTY_ROOT {
return Self::new(tx)
}
tx.get::<tables::AccountsTrie>(root)?.ok_or(TrieError::MissingAccountRoot(root))?;
Ok(Self { tx })
accounts_trie_cursor.seek_exact(root)?.ok_or(TrieError::MissingAccountRoot(root))?;
Ok(Self { accounts_trie_cursor: Arc::new(Mutex::new(accounts_trie_cursor)) })
}
}
type StoragesTrieCursor<'tx, TX> =
Arc<Mutex<<TX as DbTxMutGAT<'tx>>::DupCursorMut<tables::StoragesTrie>>>;
/// Database wrapper implementing HashDB trait, with a read-write transaction.
pub struct DupHashDatabaseMut<'tx, TX> {
tx: &'tx TX,
pub struct DupHashDatabaseMut<'tx, TX: DbTxMutGAT<'tx>> {
storages_trie_cursor: StoragesTrieCursor<'tx, TX>,
key: H256,
}
@@ -132,9 +145,10 @@ where
type Error = TrieError;
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
let mut cursor = self.tx.cursor_dup_read::<tables::StoragesTrie>()?;
let subkey = H256::from_slice(key);
Ok(cursor
Ok(self
.storages_trie_cursor
.lock()
.seek_by_key_subkey(self.key, subkey)?
.filter(|entry| entry.hash == subkey)
.map(|entry| entry.node))
@@ -150,7 +164,7 @@ where
/// Insert a batch of data into the cache.
fn insert_batch(&self, keys: Vec<Vec<u8>>, values: Vec<Vec<u8>>) -> Result<(), Self::Error> {
let mut cursor = self.tx.cursor_dup_write::<tables::StoragesTrie>()?;
let mut cursor = self.storages_trie_cursor.lock();
for (key, node) in keys.into_iter().zip(values.into_iter()) {
let hash = H256::from_slice(key.as_slice());
if cursor.seek_by_key_subkey(self.key, hash)?.filter(|e| e.hash == hash).is_some() {
@@ -162,7 +176,7 @@ where
}
fn remove_batch(&self, keys: &[Vec<u8>]) -> Result<(), Self::Error> {
let mut cursor = self.tx.cursor_dup_write::<tables::StoragesTrie>()?;
let mut cursor = self.storages_trie_cursor.lock();
for key in keys {
let hash = H256::from_slice(key.as_slice());
if cursor.seek_by_key_subkey(self.key, hash)?.filter(|e| e.hash == hash).is_some() {
@@ -186,28 +200,38 @@ where
TX: DbTxMut<'db> + DbTx<'db> + Send + Sync,
{
/// Instantiates a new Database for the storage trie, with an empty root
pub fn new(tx: &'tx TX, key: H256) -> Result<Self, TrieError> {
pub fn new(
storages_trie_cursor: StoragesTrieCursor<'tx, TX>,
key: H256,
) -> Result<Self, TrieError> {
let root = EMPTY_ROOT;
let mut cursor = tx.cursor_dup_write::<tables::StoragesTrie>()?;
if cursor.seek_by_key_subkey(key, root)?.filter(|entry| entry.hash == root).is_none() {
tx.put::<tables::StoragesTrie>(
key,
StorageTrieEntry { hash: root, node: [EMPTY_STRING_CODE].to_vec() },
)?;
{
let mut cursor = storages_trie_cursor.lock();
if cursor.seek_by_key_subkey(key, root)?.filter(|entry| entry.hash == root).is_none() {
cursor.upsert(
key,
StorageTrieEntry { hash: root, node: [EMPTY_STRING_CODE].to_vec() },
)?;
}
}
Ok(Self { tx, key })
Ok(Self { storages_trie_cursor, key })
}
/// Instantiates a new Database for the storage trie, with an existing root
pub fn from_root(tx: &'tx TX, key: H256, root: H256) -> Result<Self, TrieError> {
pub fn from_root(
storages_trie_cursor: StoragesTrieCursor<'tx, TX>,
key: H256,
root: H256,
) -> Result<Self, TrieError> {
if root == EMPTY_ROOT {
return Self::new(tx, key)
return Self::new(storages_trie_cursor, key)
}
tx.cursor_dup_read::<tables::StoragesTrie>()?
storages_trie_cursor
.lock()
.seek_by_key_subkey(key, root)?
.filter(|entry| entry.hash == root)
.ok_or(TrieError::MissingStorageRoot(root))?;
Ok(Self { tx, key })
Ok(Self { storages_trie_cursor, key })
}
}
@@ -422,11 +446,14 @@ where
};
let mut accounts_cursor = self.tx.cursor_read::<tables::HashedAccount>()?;
let storage_trie_cursor =
Arc::new(Mutex::new(self.tx.cursor_dup_write::<tables::StoragesTrie>()?));
let mut walker = accounts_cursor.walk(checkpoint.hashed_address.take())?;
while let Some((hashed_address, account)) = walker.next().transpose()? {
match self.calculate_storage_root(
hashed_address,
storage_trie_cursor.clone(),
checkpoint.storage_key.take(),
checkpoint.storage_root.take(),
)? {
@@ -464,6 +491,7 @@ where
fn calculate_storage_root(
&mut self,
address: H256,
storage_trie_cursor: StoragesTrieCursor<'tx, TX>,
next_storage: Option<H256>,
previous_root: Option<H256>,
) -> Result<TrieProgress, TrieError> {
@@ -475,7 +503,7 @@ where
storage_cursor.seek_by_key_subkey(address, entry)?.filter(|e| e.key == entry),
PatriciaTrie::from(
Arc::new(DupHashDatabaseMut::from_root(
self.tx,
storage_trie_cursor,
address,
previous_root.expect("is some"),
)?),
@@ -486,7 +514,10 @@ where
} else {
(
storage_cursor.seek_by_key_subkey(address, H256::zero())?,
PatriciaTrie::new(Arc::new(DupHashDatabaseMut::new(self.tx, address)?), hasher),
PatriciaTrie::new(
Arc::new(DupHashDatabaseMut::new(storage_trie_cursor, address)?),
hasher,
),
)
};
@@ -542,6 +573,8 @@ where
)?;
let mut accounts_cursor = self.tx.cursor_read::<tables::HashedAccount>()?;
let storage_trie_cursor =
Arc::new(Mutex::new(self.tx.cursor_dup_write::<tables::StoragesTrie>()?));
for (hashed_address, changed_storages) in changed_accounts {
let res = if let Some(account) = trie.get(hashed_address.as_slice())? {
@@ -551,12 +584,14 @@ where
self.update_storage_root(
checkpoint.storage_root.take().unwrap_or(storage_root),
hashed_address,
storage_trie_cursor.clone(),
changed_storages,
checkpoint.storage_key.take(),
)?
} else {
self.calculate_storage_root(
hashed_address,
storage_trie_cursor.clone(),
checkpoint.storage_key.take(),
checkpoint.storage_root.take(),
)?
@@ -602,12 +637,13 @@ where
&mut self,
previous_root: H256,
address: H256,
storage_trie_cursor: StoragesTrieCursor<'tx, TX>,
changed_storages: BTreeSet<H256>,
next_storage: Option<H256>,
) -> Result<TrieProgress, TrieError> {
let mut hashed_storage_cursor = self.tx.cursor_dup_read::<tables::HashedStorage>()?;
let mut trie = PatriciaTrie::new(
Arc::new(DupHashDatabaseMut::from_root(self.tx, address, previous_root)?),
Arc::new(DupHashDatabaseMut::from_root(storage_trie_cursor, address, previous_root)?),
Arc::new(HasherKeccak::new()),
);
@@ -955,8 +991,10 @@ mod tests {
(k, out)
});
let expected = H256(sec_trie_root::<KeccakHasher, _, _, _>(encoded_storage).0);
let storage_trie_cursor =
Arc::new(Mutex::new(trie.tx.cursor_dup_write::<tables::StoragesTrie>().unwrap()));
assert_matches!(
trie.calculate_storage_root(hashed_address, None, None),
trie.calculate_storage_root(hashed_address, storage_trie_cursor, None, None),
Ok(got) if got.root().unwrap() == expected
);
}