chore: complete StageDB -> Transaction renaming (#497)

This commit is contained in:
0xYami
2022-12-16 23:16:31 +01:00
committed by GitHub
parent 4da574df84
commit 0a4bf67c32
8 changed files with 190 additions and 188 deletions

View File

@@ -229,14 +229,14 @@ impl<DB: Database> Pipeline<DB> {
};
// Unwind stages in reverse order of priority (i.e. higher priority = first)
let mut db = Transaction::new(db)?;
let mut tx = Transaction::new(db)?;
for (_, QueuedStage { stage, .. }) in unwind_pipeline.iter_mut() {
let stage_id = stage.id();
let span = info_span!("Unwinding", stage = %stage_id);
let _enter = span.enter();
let mut stage_progress = stage_id.get_progress(db.deref())?.unwrap_or_default();
let mut stage_progress = stage_id.get_progress(tx.deref())?.unwrap_or_default();
if stage_progress < to {
debug!(from = %stage_progress, %to, "Unwind point too far for stage");
self.events_sender.send(PipelineEvent::Skipped { stage_id }).await?;
@@ -248,11 +248,11 @@ impl<DB: Database> Pipeline<DB> {
let input = UnwindInput { stage_progress, unwind_to: to, bad_block };
self.events_sender.send(PipelineEvent::Unwinding { stage_id, input }).await?;
let output = stage.unwind(&mut db, input).await;
let output = stage.unwind(&mut tx, input).await;
match output {
Ok(unwind_output) => {
stage_progress = unwind_output.stage_progress;
stage_id.save_progress(db.deref(), stage_progress)?;
stage_id.save_progress(tx.deref(), stage_progress)?;
self.events_sender
.send(PipelineEvent::Unwound { stage_id, result: unwind_output })
@@ -266,7 +266,7 @@ impl<DB: Database> Pipeline<DB> {
}
}
db.commit()?;
tx.commit()?;
Ok(())
}
}
@@ -305,9 +305,9 @@ impl<DB: Database> QueuedStage<DB> {
}
loop {
let mut db = Transaction::new(db)?;
let mut tx = Transaction::new(db)?;
let prev_progress = stage_id.get_progress(db.deref())?;
let prev_progress = stage_id.get_progress(tx.deref())?;
let stage_reached_max_block = prev_progress
.zip(state.max_block)
@@ -332,7 +332,7 @@ impl<DB: Database> QueuedStage<DB> {
match self
.stage
.execute(&mut db, ExecInput { previous_stage, stage_progress: prev_progress })
.execute(&mut tx, ExecInput { previous_stage, stage_progress: prev_progress })
.await
{
Ok(out @ ExecOutput { stage_progress, done, reached_tip }) => {
@@ -343,7 +343,7 @@ impl<DB: Database> QueuedStage<DB> {
%done,
"Stage made progress"
);
stage_id.save_progress(db.deref(), stage_progress)?;
stage_id.save_progress(tx.deref(), stage_progress)?;
state
.events_sender
@@ -351,7 +351,7 @@ impl<DB: Database> QueuedStage<DB> {
.await?;
// TODO: Make the commit interval configurable
db.commit()?;
tx.commit()?;
state.record_progress_outliers(stage_progress);
state.set_reached_tip(reached_tip);

View File

@@ -71,14 +71,14 @@ pub trait Stage<DB: Database>: Send + Sync {
/// Execute the stage.
async fn execute(
&mut self,
db: &mut Transaction<'_, DB>,
tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError>;
/// Unwind the stage.
async fn unwind(
&mut self,
db: &mut Transaction<'_, DB>,
tx: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>>;
}

View File

@@ -75,7 +75,7 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
/// header, limited by the stage's batch size.
async fn execute(
&mut self,
db: &mut Transaction<'_, DB>,
tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let previous_stage_progress = input.previous_stage_progress();
@@ -96,19 +96,19 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
return Ok(ExecOutput { stage_progress, reached_tip: true, done: true })
}
let bodies_to_download = self.bodies_to_download::<DB>(db, starting_block, target)?;
let bodies_to_download = self.bodies_to_download::<DB>(tx, starting_block, target)?;
// Cursors used to write bodies, ommers and transactions
let mut body_cursor = db.cursor_mut::<tables::BlockBodies>()?;
let mut ommers_cursor = db.cursor_mut::<tables::BlockOmmers>()?;
let mut tx_cursor = db.cursor_mut::<tables::Transactions>()?;
let mut body_cursor = tx.cursor_mut::<tables::BlockBodies>()?;
let mut ommers_cursor = tx.cursor_mut::<tables::BlockOmmers>()?;
let mut tx_cursor = tx.cursor_mut::<tables::Transactions>()?;
// Cursors used to write state transition mapping
let mut block_transition_cursor = db.cursor_mut::<tables::BlockTransitionIndex>()?;
let mut tx_transition_cursor = db.cursor_mut::<tables::TxTransitionIndex>()?;
let mut block_transition_cursor = tx.cursor_mut::<tables::BlockTransitionIndex>()?;
let mut tx_transition_cursor = tx.cursor_mut::<tables::TxTransitionIndex>()?;
// Get id for the first transaction in the block
let (mut current_tx_id, mut transition_id) = db.get_next_block_ids(starting_block)?;
let (mut current_tx_id, mut transition_id) = tx.get_next_block_ids(starting_block)?;
// NOTE(onbjerg): The stream needs to live here otherwise it will just create a new iterator
// on every iteration of the while loop -_-
@@ -156,7 +156,7 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
// Write transactions
for transaction in block.body {
// Insert the transaction hash to number mapping
db.put::<tables::TxHashNumber>(transaction.hash(), current_tx_id)?;
tx.put::<tables::TxHashNumber>(transaction.hash(), current_tx_id)?;
// Append the transaction
tx_cursor.append(current_tx_id, transaction)?;
tx_transition_cursor.append(current_tx_id, transition_id)?;
@@ -196,17 +196,17 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
/// Unwind the stage.
async fn unwind(
&mut self,
db: &mut Transaction<'_, DB>,
tx: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>> {
// Cursors to unwind bodies, ommers, transactions and tx hash to number
let mut body_cursor = db.cursor_mut::<tables::BlockBodies>()?;
let mut ommers_cursor = db.cursor_mut::<tables::BlockOmmers>()?;
let mut transaction_cursor = db.cursor_mut::<tables::Transactions>()?;
let mut tx_hash_number_cursor = db.cursor_mut::<tables::TxHashNumber>()?;
let mut body_cursor = tx.cursor_mut::<tables::BlockBodies>()?;
let mut ommers_cursor = tx.cursor_mut::<tables::BlockOmmers>()?;
let mut transaction_cursor = tx.cursor_mut::<tables::Transactions>()?;
let mut tx_hash_number_cursor = tx.cursor_mut::<tables::TxHashNumber>()?;
// Cursors to unwind transitions
let mut block_transition_cursor = db.cursor_mut::<tables::BlockTransitionIndex>()?;
let mut tx_transition_cursor = db.cursor_mut::<tables::TxTransitionIndex>()?;
let mut block_transition_cursor = tx.cursor_mut::<tables::BlockTransitionIndex>()?;
let mut tx_transition_cursor = tx.cursor_mut::<tables::TxTransitionIndex>()?;
// let mut entry = tx_count_cursor.last()?;
let mut entry = body_cursor.last()?;
@@ -471,7 +471,7 @@ mod tests {
// Delete a transaction
runner
.db()
.tx()
.commit(|tx| {
let mut tx_cursor = tx.cursor_mut::<tables::Transactions>()?;
let (_, transaction) = tx_cursor.last()?.expect("Could not read last transaction");
@@ -579,7 +579,7 @@ mod tests {
pub(crate) struct BodyTestRunner {
pub(crate) consensus: Arc<TestConsensus>,
responses: HashMap<H256, DownloadResult<BlockBody>>,
db: TestTransaction,
tx: TestTransaction,
batch_size: u64,
}
@@ -588,7 +588,7 @@ mod tests {
Self {
consensus: Arc::new(TestConsensus::default()),
responses: HashMap::default(),
db: TestTransaction::default(),
tx: TestTransaction::default(),
batch_size: 1000,
}
}
@@ -610,8 +610,8 @@ mod tests {
impl StageTestRunner for BodyTestRunner {
type S = BodyStage<TestBodyDownloader, TestConsensus>;
fn db(&self) -> &TestTransaction {
&self.db
fn tx(&self) -> &TestTransaction {
&self.tx
}
fn stage(&self) -> Self::S {
@@ -631,10 +631,10 @@ mod tests {
let start = input.stage_progress.unwrap_or_default();
let end = input.previous_stage_progress() + 1;
let blocks = random_block_range(start..end, GENESIS_HASH);
self.db.insert_headers(blocks.iter().map(|block| &block.header))?;
self.tx.insert_headers(blocks.iter().map(|block| &block.header))?;
if let Some(progress) = blocks.first() {
// Insert last progress data
self.db.commit(|tx| {
self.tx.commit(|tx| {
let key = (progress.number, progress.hash()).into();
let body = StoredBlockBody {
start_tx_id: 0,
@@ -678,24 +678,24 @@ mod tests {
impl UnwindStageTestRunner for BodyTestRunner {
fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
self.db.check_no_entry_above::<tables::BlockBodies, _>(input.unwind_to, |key| {
self.tx.check_no_entry_above::<tables::BlockBodies, _>(input.unwind_to, |key| {
key.number()
})?;
self.db.check_no_entry_above::<tables::BlockOmmers, _>(input.unwind_to, |key| {
self.tx.check_no_entry_above::<tables::BlockOmmers, _>(input.unwind_to, |key| {
key.number()
})?;
self.db.check_no_entry_above::<tables::BlockTransitionIndex, _>(
self.tx.check_no_entry_above::<tables::BlockTransitionIndex, _>(
input.unwind_to,
|key| key.number(),
)?;
if let Some(last_tx_id) = self.get_last_tx_id()? {
self.db
self.tx
.check_no_entry_above::<tables::Transactions, _>(last_tx_id, |key| key)?;
self.db.check_no_entry_above::<tables::TxTransitionIndex, _>(
self.tx.check_no_entry_above::<tables::TxTransitionIndex, _>(
last_tx_id,
|key| key,
)?;
self.db.check_no_entry_above_by_value::<tables::TxHashNumber, _>(
self.tx.check_no_entry_above_by_value::<tables::TxHashNumber, _>(
last_tx_id,
|value| value,
)?;
@@ -707,7 +707,7 @@ mod tests {
impl BodyTestRunner {
/// Get the last available tx id if any
pub(crate) fn get_last_tx_id(&self) -> Result<Option<TxNumber>, TestRunnerError> {
let last_body = self.db.query(|tx| {
let last_body = self.tx.query(|tx| {
let v = tx.cursor::<tables::BlockBodies>()?.last()?;
Ok(v)
})?;
@@ -725,7 +725,7 @@ mod tests {
prev_progress: BlockNumber,
highest_block: BlockNumber,
) -> Result<(), TestRunnerError> {
self.db.query(|tx| {
self.tx.query(|tx| {
// Acquire cursors on body related tables
let mut bodies_cursor = tx.cursor::<tables::BlockBodies>()?;
let mut ommers_cursor = tx.cursor::<tables::BlockOmmers>()?;

View File

@@ -80,7 +80,7 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
/// Execute the stage
async fn execute(
&mut self,
db: &mut Transaction<'_, DB>,
tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
// none and zero are same as for genesis block (zeroed block) we are making assumption to
@@ -89,15 +89,15 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
let start_block = last_block + 1;
// Get next canonical block hashes to execute.
let mut canonicals = db.cursor::<tables::CanonicalHeaders>()?;
let mut canonicals = tx.cursor::<tables::CanonicalHeaders>()?;
// Get header with canonical hashes.
let mut headers = db.cursor::<tables::Headers>()?;
let mut headers = tx.cursor::<tables::Headers>()?;
// Get bodies with canonical hashes.
let mut bodies_cursor = db.cursor::<tables::BlockBodies>()?;
let mut bodies_cursor = tx.cursor::<tables::BlockBodies>()?;
// Get transaction of the block that we are executing.
let mut tx = db.cursor::<tables::Transactions>()?;
let mut tx_cursor = tx.cursor::<tables::Transactions>()?;
// Skip sender recovery and load signer from database.
let mut tx_sender = db.cursor::<tables::TxSenders>()?;
let mut tx_sender = tx.cursor::<tables::TxSenders>()?;
// get canonical blocks (num,hash)
let canonical_batch = canonicals
@@ -134,7 +134,7 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
let num = header.number;
tracing::trace!(target: "stages::execution", ?num, "Execute block num.");
// iterate over all transactions
let mut tx_walker = tx.walk(body.start_tx_id)?;
let mut tx_walker = tx_cursor.walk(body.start_tx_id)?;
let mut transactions = Vec::with_capacity(body.tx_count as usize);
// get next N transactions.
for index in body.tx_id_range() {
@@ -170,7 +170,7 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
.collect();
// for now use default eth config
let state_provider = SubState::new(State::new(StateProviderImplRefLatest::new(&**db)));
let state_provider = SubState::new(State::new(StateProviderImplRefLatest::new(&**tx)));
let change_set = std::thread::scope(|scope| {
let handle = std::thread::Builder::new()
@@ -194,7 +194,7 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
}
// Get last tx count so that we can know amount of transaction in the block.
let mut current_transition_id = db.get_block_transition_by_num(last_block)? + 1;
let mut current_transition_id = tx.get_block_transition_by_num(last_block)? + 1;
// apply changes to plain database.
for results in block_change_patches.into_iter() {
@@ -205,12 +205,12 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
let AccountChangeSet { account, wipe_storage, storage } = account_change_set;
// apply account change to db. Updates AccountChangeSet and PlainAccountState
// tables.
account.apply_to_db(&**db, address, current_transition_id)?;
account.apply_to_db(&**tx, address, current_transition_id)?;
// wipe storage
if wipe_storage {
// TODO insert all changes to StorageChangeSet
db.delete::<tables::PlainStorageState>(address, None)?;
tx.delete::<tables::PlainStorageState>(address, None)?;
}
// insert storage changeset
let storage_id = TransitionIdAddress((current_transition_id, address));
@@ -219,18 +219,18 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
key.to_big_endian(&mut hkey.0);
// insert into StorageChangeSet
db.put::<tables::StorageChangeSet>(
tx.put::<tables::StorageChangeSet>(
storage_id.clone(),
StorageEntry { key: hkey, value: old_value },
)?;
if new_value.is_zero() {
db.delete::<tables::PlainStorageState>(
tx.delete::<tables::PlainStorageState>(
address,
Some(StorageEntry { key: hkey, value: old_value }),
)?;
} else {
db.put::<tables::PlainStorageState>(
tx.put::<tables::PlainStorageState>(
address,
StorageEntry { key: hkey, value: new_value },
)?;
@@ -242,7 +242,7 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
for (hash, bytecode) in result.new_bytecodes.into_iter() {
// make different types of bytecode. Checked and maybe even analyzed (needs to
// be packed). Currently save only raw bytes.
db.put::<tables::Bytecodes>(hash, bytecode.bytes()[..bytecode.len()].to_vec())?;
tx.put::<tables::Bytecodes>(hash, bytecode.bytes()[..bytecode.len()].to_vec())?;
// NOTE: bytecode bytes are not inserted in change set and it stand in saparate
// table
@@ -254,7 +254,7 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
if let Some(block_reward_changeset) = results.block_reward {
// we are sure that block reward index is present.
for (address, changeset) in block_reward_changeset.into_iter() {
changeset.apply_to_db(&**db, address, current_transition_id)?;
changeset.apply_to_db(&**tx, address, current_transition_id)?;
}
current_transition_id += 1;
}
@@ -268,17 +268,17 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
/// Unwind the stage.
async fn unwind(
&mut self,
db: &mut Transaction<'_, DB>,
tx: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>> {
// Acquire changeset cursors
let mut account_changeset = db.cursor_dup_mut::<tables::AccountChangeSet>()?;
let mut storage_changeset = db.cursor_dup_mut::<tables::StorageChangeSet>()?;
let mut account_changeset = tx.cursor_dup_mut::<tables::AccountChangeSet>()?;
let mut storage_changeset = tx.cursor_dup_mut::<tables::StorageChangeSet>()?;
let from_transition = db.get_block_transition_by_num(input.stage_progress)?;
let from_transition = tx.get_block_transition_by_num(input.stage_progress)?;
let to_transition = if input.unwind_to != 0 {
db.get_block_transition_by_num(input.unwind_to - 1)?
tx.get_block_transition_by_num(input.unwind_to - 1)?
} else {
0
};
@@ -308,9 +308,9 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
for (_, changeset) in account_changeset_batch.into_iter().rev() {
// TODO refactor in db fn called tx.aplly_account_changeset
if let Some(account_info) = changeset.info {
db.put::<tables::PlainAccountState>(changeset.address, account_info)?;
tx.put::<tables::PlainAccountState>(changeset.address, account_info)?;
} else {
db.delete::<tables::PlainAccountState>(changeset.address, None)?;
tx.delete::<tables::PlainAccountState>(changeset.address, None)?;
}
}
@@ -328,10 +328,10 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
// revert all changes to PlainStorage
for (key, storage) in storage_chageset_batch.into_iter().rev() {
let address = key.address();
db.put::<tables::PlainStorageState>(address, storage.clone())?;
tx.put::<tables::PlainStorageState>(address, storage.clone())?;
if storage.value == U256::zero() {
// delete value that is zero
db.delete::<tables::PlainStorageState>(address, Some(storage))?;
tx.delete::<tables::PlainStorageState>(address, Some(storage))?;
}
}
@@ -373,7 +373,7 @@ mod tests {
// TODO cleanup the setup after https://github.com/paradigmxyz/reth/issues/332
// is merged as it has similar framework
let state_db = create_test_db::<WriteMap>(EnvKind::RW);
let mut db = Transaction::new(state_db.as_ref()).unwrap();
let mut tx = Transaction::new(state_db.as_ref()).unwrap();
let input = ExecInput {
previous_stage: None,
/// The progress of this stage the last time it was executed.
@@ -383,37 +383,39 @@ mod tests {
let genesis = BlockLocked::decode(&mut genesis_rlp).unwrap();
let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabb901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000018502540be40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice();
let block = BlockLocked::decode(&mut block_rlp).unwrap();
insert_canonical_block(db.deref_mut(), &genesis, true).unwrap();
insert_canonical_block(db.deref_mut(), &block, true).unwrap();
db.commit().unwrap();
insert_canonical_block(tx.deref_mut(), &genesis, true).unwrap();
insert_canonical_block(tx.deref_mut(), &block, true).unwrap();
tx.commit().unwrap();
// insert pre state
let tx = db.deref_mut();
let db_tx = tx.deref_mut();
let acc1 = H160(hex!("1000000000000000000000000000000000000000"));
let acc2 = H160(hex!("a94f5374fce5edbc8e2a8697c15331677e6ebf0b"));
let code = hex!("5a465a905090036002900360015500");
let balance = U256::from(0x3635c9adc5dea00000u128);
let code_hash = keccak256(code);
tx.put::<tables::PlainAccountState>(
acc1,
Account { nonce: 0, balance: 0.into(), bytecode_hash: Some(code_hash) },
)
.unwrap();
tx.put::<tables::PlainAccountState>(
acc2,
Account { nonce: 0, balance, bytecode_hash: None },
)
.unwrap();
tx.put::<tables::Bytecodes>(code_hash, code.to_vec()).unwrap();
db.commit().unwrap();
db_tx
.put::<tables::PlainAccountState>(
acc1,
Account { nonce: 0, balance: 0.into(), bytecode_hash: Some(code_hash) },
)
.unwrap();
db_tx
.put::<tables::PlainAccountState>(
acc2,
Account { nonce: 0, balance, bytecode_hash: None },
)
.unwrap();
db_tx.put::<tables::Bytecodes>(code_hash, code.to_vec()).unwrap();
tx.commit().unwrap();
// execute
let mut execution_stage = ExecutionStage::default();
execution_stage.config.spec_upgrades = SpecUpgrades::new_berlin_activated();
let output = execution_stage.execute(&mut db, input).await.unwrap();
db.commit().unwrap();
let output = execution_stage.execute(&mut tx, input).await.unwrap();
tx.commit().unwrap();
assert_eq!(output, ExecOutput { stage_progress: 1, done: true, reached_tip: true });
let tx = db.deref_mut();
let tx = tx.deref_mut();
// check post state
let account1 = H160(hex!("1000000000000000000000000000000000000000"));
let account1_info =
@@ -456,7 +458,7 @@ mod tests {
// is merged as it has similar framework
let state_db = create_test_db::<WriteMap>(EnvKind::RW);
let mut db = Transaction::new(state_db.as_ref()).unwrap();
let mut tx = Transaction::new(state_db.as_ref()).unwrap();
let input = ExecInput {
previous_stage: None,
/// The progress of this stage the last time it was executed.
@@ -466,56 +468,56 @@ mod tests {
let genesis = BlockLocked::decode(&mut genesis_rlp).unwrap();
let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabb901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000018502540be40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice();
let block = BlockLocked::decode(&mut block_rlp).unwrap();
insert_canonical_block(db.deref_mut(), &genesis, true).unwrap();
insert_canonical_block(db.deref_mut(), &block, true).unwrap();
db.commit().unwrap();
insert_canonical_block(tx.deref_mut(), &genesis, true).unwrap();
insert_canonical_block(tx.deref_mut(), &block, true).unwrap();
tx.commit().unwrap();
// variables
let code = hex!("5a465a905090036002900360015500");
let balance = U256::from(0x3635c9adc5dea00000u128);
let code_hash = keccak256(code);
// pre state
let tx = db.deref_mut();
let db_tx = tx.deref_mut();
let acc1 = H160(hex!("1000000000000000000000000000000000000000"));
let acc1_info = Account { nonce: 0, balance: 0.into(), bytecode_hash: Some(code_hash) };
let acc2 = H160(hex!("a94f5374fce5edbc8e2a8697c15331677e6ebf0b"));
let acc2_info = Account { nonce: 0, balance, bytecode_hash: None };
tx.put::<tables::PlainAccountState>(acc1, acc1_info).unwrap();
tx.put::<tables::PlainAccountState>(acc2, acc2_info).unwrap();
tx.put::<tables::Bytecodes>(code_hash, code.to_vec()).unwrap();
db.commit().unwrap();
db_tx.put::<tables::PlainAccountState>(acc1, acc1_info).unwrap();
db_tx.put::<tables::PlainAccountState>(acc2, acc2_info).unwrap();
db_tx.put::<tables::Bytecodes>(code_hash, code.to_vec()).unwrap();
tx.commit().unwrap();
// execute
let mut execution_stage = ExecutionStage::default();
execution_stage.config.spec_upgrades = SpecUpgrades::new_berlin_activated();
let _ = execution_stage.execute(&mut db, input).await.unwrap();
db.commit().unwrap();
let _ = execution_stage.execute(&mut tx, input).await.unwrap();
tx.commit().unwrap();
let o = ExecutionStage::default()
.unwind(&mut db, UnwindInput { stage_progress: 1, unwind_to: 0, bad_block: None })
.unwind(&mut tx, UnwindInput { stage_progress: 1, unwind_to: 0, bad_block: None })
.await
.unwrap();
assert_eq!(o, UnwindOutput { stage_progress: 0 });
// assert unwind stage
let tx = db.deref();
let db_tx = tx.deref();
assert_eq!(
tx.get::<tables::PlainAccountState>(acc1),
db_tx.get::<tables::PlainAccountState>(acc1),
Ok(Some(acc1_info)),
"Pre changed of a account"
);
assert_eq!(
tx.get::<tables::PlainAccountState>(acc2),
db_tx.get::<tables::PlainAccountState>(acc2),
Ok(Some(acc2_info)),
"Post changed of a account"
);
let miner_acc = H160(hex!("2adc25665018aa1fe0e6bc666dac8fc2697ff9ba"));
assert_eq!(
tx.get::<tables::PlainAccountState>(miner_acc),
db_tx.get::<tables::PlainAccountState>(miner_acc),
Ok(None),
"Third account should be unwinded"
);

View File

@@ -68,14 +68,14 @@ impl<DB: Database, D: HeaderDownloader, C: Consensus, H: HeadersClient, S: Statu
/// starting from the tip
async fn execute(
&mut self,
db: &mut Transaction<'_, DB>,
tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let stage_progress = input.stage_progress.unwrap_or_default();
self.update_head::<DB>(db, stage_progress).await?;
self.update_head::<DB>(tx, stage_progress).await?;
// Lookup the head and tip of the sync range
let (head, tip) = self.get_head_and_tip(db, stage_progress).await?;
let (head, tip) = self.get_head_and_tip(tx, stage_progress).await?;
debug!(
target: "sync::stages::headers",
"Syncing from tip {:?} to head {:?}",
@@ -101,8 +101,8 @@ impl<DB: Database, D: HeaderDownloader, C: Consensus, H: HeadersClient, S: Statu
// Perform basic response validation
self.validate_header_response(&res)?;
let write_progress =
self.write_headers::<DB>(db, res).await?.unwrap_or_default();
db.commit()?;
self.write_headers::<DB>(tx, res).await?.unwrap_or_default();
tx.commit()?;
current_progress = current_progress.max(write_progress);
}
Err(e) => match e {
@@ -133,10 +133,10 @@ impl<DB: Database, D: HeaderDownloader, C: Consensus, H: HeadersClient, S: Statu
}
// Write total difficulty values after all headers have been inserted
self.write_td::<DB>(db, &head)?;
self.write_td::<DB>(tx, &head)?;
let stage_progress = current_progress.max(
db.cursor::<tables::CanonicalHeaders>()?
tx.cursor::<tables::CanonicalHeaders>()?
.last()?
.map(|(num, _)| num)
.unwrap_or_default(),
@@ -148,16 +148,16 @@ impl<DB: Database, D: HeaderDownloader, C: Consensus, H: HeadersClient, S: Statu
/// Unwind the stage.
async fn unwind(
&mut self,
db: &mut Transaction<'_, DB>,
tx: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>> {
// TODO: handle bad block
db.unwind_table_by_walker::<tables::CanonicalHeaders, tables::HeaderNumbers>(
tx.unwind_table_by_walker::<tables::CanonicalHeaders, tables::HeaderNumbers>(
input.unwind_to + 1,
)?;
db.unwind_table_by_num::<tables::CanonicalHeaders>(input.unwind_to)?;
db.unwind_table_by_num_hash::<tables::Headers>(input.unwind_to)?;
db.unwind_table_by_num_hash::<tables::HeaderTD>(input.unwind_to)?;
tx.unwind_table_by_num::<tables::CanonicalHeaders>(input.unwind_to)?;
tx.unwind_table_by_num_hash::<tables::Headers>(input.unwind_to)?;
tx.unwind_table_by_num_hash::<tables::HeaderTD>(input.unwind_to)?;
Ok(UnwindOutput { stage_progress: input.unwind_to })
}
}
@@ -167,11 +167,11 @@ impl<D: HeaderDownloader, C: Consensus, H: HeadersClient, S: StatusUpdater>
{
async fn update_head<DB: Database>(
&self,
db: &Transaction<'_, DB>,
tx: &Transaction<'_, DB>,
height: BlockNumber,
) -> Result<(), StageError> {
let block_key = db.get_block_numhash(height)?;
let td: U256 = *db
let block_key = tx.get_block_numhash(height)?;
let td: U256 = *tx
.get::<tables::HeaderTD>(block_key)?
.ok_or(DatabaseIntegrityError::TotalDifficulty { number: height })?;
// TODO: This should happen in the last stage
@@ -182,12 +182,12 @@ impl<D: HeaderDownloader, C: Consensus, H: HeadersClient, S: StatusUpdater>
/// Get the head and tip of the range we need to sync
async fn get_head_and_tip<DB: Database>(
&self,
db: &Transaction<'_, DB>,
tx: &Transaction<'_, DB>,
stage_progress: u64,
) -> Result<(SealedHeader, H256), StageError> {
// Create a cursor over canonical header hashes
let mut cursor = db.cursor::<tables::CanonicalHeaders>()?;
let mut header_cursor = db.cursor::<tables::Headers>()?;
let mut cursor = tx.cursor::<tables::CanonicalHeaders>()?;
let mut header_cursor = tx.cursor::<tables::Headers>()?;
// Get head hash and reposition the cursor
let (head_num, head_hash) = cursor
@@ -250,11 +250,11 @@ impl<D: HeaderDownloader, C: Consensus, H: HeadersClient, S: StatusUpdater>
/// Write downloaded headers to the database
async fn write_headers<DB: Database>(
&self,
db: &Transaction<'_, DB>,
tx: &Transaction<'_, DB>,
headers: Vec<SealedHeader>,
) -> Result<Option<BlockNumber>, StageError> {
let mut cursor_header = db.cursor_mut::<tables::Headers>()?;
let mut cursor_canonical = db.cursor_mut::<tables::CanonicalHeaders>()?;
let mut cursor_header = tx.cursor_mut::<tables::Headers>()?;
let mut cursor_canonical = tx.cursor_mut::<tables::CanonicalHeaders>()?;
let mut latest = None;
// Since the headers were returned in descending order,
@@ -270,7 +270,7 @@ impl<D: HeaderDownloader, C: Consensus, H: HeadersClient, S: StatusUpdater>
latest = Some(header.number);
// NOTE: HeaderNumbers are not sorted and can't be inserted with cursor.
db.put::<tables::HeaderNumbers>(block_hash, header.number)?;
tx.put::<tables::HeaderNumbers>(block_hash, header.number)?;
cursor_header.insert(key, header)?;
cursor_canonical.insert(key.number(), key.hash())?;
}
@@ -281,11 +281,11 @@ impl<D: HeaderDownloader, C: Consensus, H: HeadersClient, S: StatusUpdater>
/// Iterate over inserted headers and write td entries
fn write_td<DB: Database>(
&self,
db: &Transaction<'_, DB>,
tx: &Transaction<'_, DB>,
head: &SealedHeader,
) -> Result<(), StageError> {
// Acquire cursor over total difficulty table
let mut cursor_td = db.cursor_mut::<tables::HeaderTD>()?;
let mut cursor_td = tx.cursor_mut::<tables::HeaderTD>()?;
// Get latest total difficulty
let last_entry = cursor_td
@@ -294,10 +294,10 @@ impl<D: HeaderDownloader, C: Consensus, H: HeadersClient, S: StatusUpdater>
let mut td: U256 = last_entry.1.into();
// Start at first inserted block during this iteration
let start_key = db.get_block_numhash(head.number + 1)?;
let start_key = tx.get_block_numhash(head.number + 1)?;
// Walk over newly inserted headers, update & insert td
for entry in db.cursor::<tables::Headers>()?.walk(start_key)? {
for entry in tx.cursor::<tables::Headers>()?.walk(start_key)? {
let (key, header) = entry?;
td += header.difficulty;
cursor_td.append(key, td.into())?;
@@ -413,7 +413,7 @@ mod tests {
#[tokio::test]
async fn head_and_tip_lookup() {
let runner = HeadersTestRunner::default();
let db = runner.db().inner();
let tx = runner.tx().inner();
let stage = runner.stage();
let consensus_tip = H256::random();
@@ -427,38 +427,38 @@ mod tests {
// Empty database
assert_matches!(
stage.get_head_and_tip(&db, stage_progress).await,
stage.get_head_and_tip(&tx, stage_progress).await,
Err(StageError::DatabaseIntegrity(DatabaseIntegrityError::CanonicalHeader { number }))
if number == stage_progress
);
// Checkpoint and no gap
db.put::<tables::CanonicalHeaders>(head.number, head.hash())
tx.put::<tables::CanonicalHeaders>(head.number, head.hash())
.expect("falied to write canonical");
db.put::<tables::Headers>(head.num_hash().into(), head.clone().unseal())
tx.put::<tables::Headers>(head.num_hash().into(), head.clone().unseal())
.expect("failed to write header");
assert_matches!(
stage.get_head_and_tip(&db, stage_progress).await,
stage.get_head_and_tip(&tx, stage_progress).await,
Ok((h, t)) if h == head && t == consensus_tip
);
// Checkpoint and gap
db.put::<tables::CanonicalHeaders>(gap_tip.number, gap_tip.hash())
tx.put::<tables::CanonicalHeaders>(gap_tip.number, gap_tip.hash())
.expect("falied to write canonical");
db.put::<tables::Headers>(gap_tip.num_hash().into(), gap_tip.clone().unseal())
tx.put::<tables::Headers>(gap_tip.num_hash().into(), gap_tip.clone().unseal())
.expect("failed to write header");
assert_matches!(
stage.get_head_and_tip(&db, stage_progress).await,
stage.get_head_and_tip(&tx, stage_progress).await,
Ok((h, t)) if h == head && t == gap_tip.parent_hash
);
// Checkpoint and gap closed
db.put::<tables::CanonicalHeaders>(gap_fill.number, gap_fill.hash())
tx.put::<tables::CanonicalHeaders>(gap_fill.number, gap_fill.hash())
.expect("falied to write canonical");
db.put::<tables::Headers>(gap_fill.num_hash().into(), gap_fill.clone().unseal())
tx.put::<tables::Headers>(gap_fill.num_hash().into(), gap_fill.clone().unseal())
.expect("failed to write header");
assert_matches!(
stage.get_head_and_tip(&db, stage_progress).await,
stage.get_head_and_tip(&tx, stage_progress).await,
Err(StageError::StageProgress(progress)) if progress == stage_progress
);
}
@@ -489,7 +489,7 @@ mod tests {
pub(crate) client: Arc<TestHeadersClient>,
downloader: Arc<D>,
network_handle: TestStatusUpdater,
db: TestTransaction,
tx: TestTransaction,
}
impl Default for HeadersTestRunner<TestHeaderDownloader> {
@@ -501,7 +501,7 @@ mod tests {
consensus: consensus.clone(),
downloader: Arc::new(TestHeaderDownloader::new(client, consensus, 1000)),
network_handle: TestStatusUpdater::default(),
db: TestTransaction::default(),
tx: TestTransaction::default(),
}
}
}
@@ -509,8 +509,8 @@ mod tests {
impl<D: HeaderDownloader + 'static> StageTestRunner for HeadersTestRunner<D> {
type S = HeaderStage<Arc<D>, TestConsensus, TestHeadersClient, TestStatusUpdater>;
fn db(&self) -> &TestTransaction {
&self.db
fn tx(&self) -> &TestTransaction {
&self.tx
}
fn stage(&self) -> Self::S {
@@ -531,7 +531,7 @@ mod tests {
fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
let start = input.stage_progress.unwrap_or_default();
let head = random_header(start, None);
self.db.insert_headers(std::iter::once(&head))?;
self.tx.insert_headers(std::iter::once(&head))?;
// use previous progress as seed size
let end = input.previous_stage.map(|(_, num)| num).unwrap_or_default() + 1;
@@ -554,7 +554,7 @@ mod tests {
let initial_stage_progress = input.stage_progress.unwrap_or_default();
match output {
Some(output) if output.stage_progress > initial_stage_progress => {
self.db.query(|tx| {
self.tx.query(|tx| {
for block_num in (initial_stage_progress..output.stage_progress).rev() {
// look up the header hash
let hash = tx
@@ -597,7 +597,7 @@ mod tests {
headers.last().unwrap().hash()
} else {
let tip = random_header(0, None);
self.db.insert_headers(std::iter::once(&tip))?;
self.tx.insert_headers(std::iter::once(&tip))?;
tip.hash()
};
self.consensus.update_tip(tip);
@@ -624,7 +624,7 @@ mod tests {
consensus,
downloader,
network_handle: TestStatusUpdater::default(),
db: TestTransaction::default(),
tx: TestTransaction::default(),
}
}
}
@@ -634,11 +634,11 @@ mod tests {
&self,
block: BlockNumber,
) -> Result<(), TestRunnerError> {
self.db
self.tx
.check_no_entry_above_by_value::<tables::HeaderNumbers, _>(block, |val| val)?;
self.db.check_no_entry_above::<tables::CanonicalHeaders, _>(block, |key| key)?;
self.db.check_no_entry_above::<tables::Headers, _>(block, |key| key.number())?;
self.db.check_no_entry_above::<tables::HeaderTD, _>(block, |key| key.number())?;
self.tx.check_no_entry_above::<tables::CanonicalHeaders, _>(block, |key| key)?;
self.tx.check_no_entry_above::<tables::Headers, _>(block, |key| key.number())?;
self.tx.check_no_entry_above::<tables::HeaderTD, _>(block, |key| key.number())?;
Ok(())
}
}

View File

@@ -55,7 +55,7 @@ impl<DB: Database> Stage<DB> for SendersStage {
/// the [`TxSenders`][reth_interfaces::db::tables::TxSenders] table.
async fn execute(
&mut self,
db: &mut Transaction<'_, DB>,
tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let stage_progress = input.stage_progress.unwrap_or_default();
@@ -67,10 +67,10 @@ impl<DB: Database> Stage<DB> for SendersStage {
}
// Look up the start index for the transaction range
let start_tx_index = db.get_block_body_by_num(stage_progress + 1)?.start_tx_id;
let start_tx_index = tx.get_block_body_by_num(stage_progress + 1)?.start_tx_id;
// Look up the end index for transaction range (inclusive)
let end_tx_index = db.get_block_body_by_num(max_block_num)?.last_tx_index();
let end_tx_index = tx.get_block_body_by_num(max_block_num)?.last_tx_index();
// No transactions to walk over
if start_tx_index > end_tx_index {
@@ -78,10 +78,10 @@ impl<DB: Database> Stage<DB> for SendersStage {
}
// Acquire the cursor for inserting elements
let mut senders_cursor = db.cursor_mut::<tables::TxSenders>()?;
let mut senders_cursor = tx.cursor_mut::<tables::TxSenders>()?;
// Acquire the cursor over the transactions
let mut tx_cursor = db.cursor::<tables::Transactions>()?;
let mut tx_cursor = tx.cursor::<tables::Transactions>()?;
// Walk the transactions from start to end index (inclusive)
let entries = tx_cursor
.walk(start_tx_index)?
@@ -112,12 +112,12 @@ impl<DB: Database> Stage<DB> for SendersStage {
/// Unwind the stage.
async fn unwind(
&mut self,
db: &mut Transaction<'_, DB>,
tx: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>> {
// Lookup latest tx id that we should unwind to
let latest_tx_id = db.get_block_body_by_num(input.unwind_to)?.last_tx_index();
db.unwind_table_by_num::<tables::TxSenders>(latest_tx_id)?;
let latest_tx_id = tx.get_block_body_by_num(input.unwind_to)?.last_tx_index();
tx.unwind_table_by_num::<tables::TxSenders>(latest_tx_id)?;
Ok(UnwindOutput { stage_progress: input.unwind_to })
}
}
@@ -176,13 +176,13 @@ mod tests {
}
struct SendersTestRunner {
db: TestTransaction,
tx: TestTransaction,
threshold: u64,
}
impl Default for SendersTestRunner {
fn default() -> Self {
Self { threshold: 1000, db: TestTransaction::default() }
Self { threshold: 1000, tx: TestTransaction::default() }
}
}
@@ -195,8 +195,8 @@ mod tests {
impl StageTestRunner for SendersTestRunner {
type S = SendersStage;
fn db(&self) -> &TestTransaction {
&self.db
fn tx(&self) -> &TestTransaction {
&self.tx
}
fn stage(&self) -> Self::S {
@@ -213,7 +213,7 @@ mod tests {
let blocks = random_block_range(stage_progress..end, H256::zero());
self.db.commit(|tx| {
self.tx.commit(|tx| {
let mut current_tx_id = 0;
blocks.iter().try_for_each(|b| {
let txs = b.body.clone();
@@ -251,7 +251,7 @@ mod tests {
output: Option<ExecOutput>,
) -> Result<(), TestRunnerError> {
if let Some(output) = output {
self.db.query(|tx| {
self.tx.query(|tx| {
let start_block = input.stage_progress.unwrap_or_default() + 1;
let end_block = output.stage_progress;
@@ -292,15 +292,15 @@ mod tests {
impl SendersTestRunner {
fn check_no_senders_by_block(&self, block: BlockNumber) -> Result<(), TestRunnerError> {
let body_result = self.db.inner().get_block_body_by_num(block);
let body_result = self.tx.inner().get_block_body_by_num(block);
match body_result {
Ok(body) => self
.db
.tx
.check_no_entry_above::<tables::TxSenders, _>(body.last_tx_index(), |key| {
key
})?,
Err(_) => {
assert!(self.db.table_is_empty::<tables::TxSenders>()?);
assert!(self.tx.table_is_empty::<tables::TxSenders>()?);
}
};

View File

@@ -19,7 +19,7 @@ pub(crate) trait StageTestRunner {
type S: Stage<Env<WriteMap>> + 'static;
/// Return a reference to the database.
fn db(&self) -> &TestTransaction;
fn tx(&self) -> &TestTransaction;
/// Return an instance of a Stage.
fn stage(&self) -> Self::S;
@@ -42,7 +42,7 @@ pub(crate) trait ExecuteStageTestRunner: StageTestRunner {
/// Run [Stage::execute] and return a receiver for the result.
fn execute(&self, input: ExecInput) -> oneshot::Receiver<Result<ExecOutput, StageError>> {
let (tx, rx) = oneshot::channel();
let (db, mut stage) = (self.db().inner_raw(), self.stage());
let (db, mut stage) = (self.tx().inner_raw(), self.stage());
tokio::spawn(async move {
let mut db = Transaction::new(db.borrow()).expect("failed to create db container");
let result = stage.execute(&mut db, input).await;
@@ -69,7 +69,7 @@ pub(crate) trait UnwindStageTestRunner: StageTestRunner {
input: UnwindInput,
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>> {
let (tx, rx) = oneshot::channel();
let (db, mut stage) = (self.db().inner_raw(), self.stage());
let (db, mut stage) = (self.tx().inner_raw(), self.stage());
tokio::spawn(async move {
let mut db = Transaction::new(db.borrow()).expect("failed to create db container");
let result = stage.unwind(&mut db, input).await;

View File

@@ -12,33 +12,33 @@ use std::{borrow::Borrow, sync::Arc};
use crate::db::Transaction;
/// The [StageTestDB] is used as an internal
/// The [TestTransaction] is used as an internal
/// database for testing stage implementation.
///
/// ```rust
/// let db = StageTestDB::default();
/// stage.execute(&mut db.container(), input);
/// let tx = TestTransaction::default();
/// stage.execute(&mut tx.container(), input);
/// ```
pub(crate) struct TestTransaction {
db: Arc<Env<WriteMap>>,
tx: Arc<Env<WriteMap>>,
}
impl Default for TestTransaction {
/// Create a new instance of [StageTestDB]
/// Create a new instance of [TestTransaction]
fn default() -> Self {
Self { db: create_test_db::<WriteMap>(EnvKind::RW) }
Self { tx: create_test_db::<WriteMap>(EnvKind::RW) }
}
}
impl TestTransaction {
/// Return a database wrapped in [Transaction].
pub(crate) fn inner(&self) -> Transaction<'_, Env<WriteMap>> {
Transaction::new(self.db.borrow()).expect("failed to create db container")
Transaction::new(self.tx.borrow()).expect("failed to create db container")
}
/// Get a pointer to an internal database.
pub(crate) fn inner_raw(&self) -> Arc<Env<WriteMap>> {
self.db.clone()
self.tx.clone()
}
/// Invoke a callback with transaction committing it afterwards
@@ -46,9 +46,9 @@ impl TestTransaction {
where
F: FnOnce(&mut Tx<'_, RW, WriteMap>) -> Result<(), DbError>,
{
let mut db = self.inner();
f(&mut db)?;
db.commit()?;
let mut tx = self.inner();
f(&mut tx)?;
tx.commit()?;
Ok(())
}
@@ -72,8 +72,8 @@ impl TestTransaction {
/// This function commits the transaction before exiting.
///
/// ```rust
/// let db = StageTestDB::default();
/// db.map_put::<Table, _, _>(&items, |item| item)?;
/// let tx = TestTransaction::default();
/// tx.map_put::<Table, _, _>(&items, |item| item)?;
/// ```
#[allow(dead_code)]
pub(crate) fn map_put<T, S, F>(&self, values: &[S], mut map: F) -> Result<(), DbError>
@@ -96,8 +96,8 @@ impl TestTransaction {
/// This function commits the transaction before exiting.
///
/// ```rust
/// let db = StageTestDB::default();
/// db.transform_append::<Table, _, _>(&items, |prev, item| prev.unwrap_or_default() + item)?;
/// let tx = TestTransaction::default();
/// tx.transform_append::<Table, _, _>(&items, |prev, item| prev.unwrap_or_default() + item)?;
/// ```
#[allow(dead_code)]
pub(crate) fn transform_append<T, S, F>(