diff --git a/crates/stages/src/pipeline.rs b/crates/stages/src/pipeline.rs index 4861507539..0d84293e44 100644 --- a/crates/stages/src/pipeline.rs +++ b/crates/stages/src/pipeline.rs @@ -229,14 +229,14 @@ impl Pipeline { }; // Unwind stages in reverse order of priority (i.e. higher priority = first) - let mut db = Transaction::new(db)?; + let mut tx = Transaction::new(db)?; for (_, QueuedStage { stage, .. }) in unwind_pipeline.iter_mut() { let stage_id = stage.id(); let span = info_span!("Unwinding", stage = %stage_id); let _enter = span.enter(); - let mut stage_progress = stage_id.get_progress(db.deref())?.unwrap_or_default(); + let mut stage_progress = stage_id.get_progress(tx.deref())?.unwrap_or_default(); if stage_progress < to { debug!(from = %stage_progress, %to, "Unwind point too far for stage"); self.events_sender.send(PipelineEvent::Skipped { stage_id }).await?; @@ -248,11 +248,11 @@ impl Pipeline { let input = UnwindInput { stage_progress, unwind_to: to, bad_block }; self.events_sender.send(PipelineEvent::Unwinding { stage_id, input }).await?; - let output = stage.unwind(&mut db, input).await; + let output = stage.unwind(&mut tx, input).await; match output { Ok(unwind_output) => { stage_progress = unwind_output.stage_progress; - stage_id.save_progress(db.deref(), stage_progress)?; + stage_id.save_progress(tx.deref(), stage_progress)?; self.events_sender .send(PipelineEvent::Unwound { stage_id, result: unwind_output }) @@ -266,7 +266,7 @@ impl Pipeline { } } - db.commit()?; + tx.commit()?; Ok(()) } } @@ -305,9 +305,9 @@ impl QueuedStage { } loop { - let mut db = Transaction::new(db)?; + let mut tx = Transaction::new(db)?; - let prev_progress = stage_id.get_progress(db.deref())?; + let prev_progress = stage_id.get_progress(tx.deref())?; let stage_reached_max_block = prev_progress .zip(state.max_block) @@ -332,7 +332,7 @@ impl QueuedStage { match self .stage - .execute(&mut db, ExecInput { previous_stage, stage_progress: prev_progress }) + .execute(&mut tx, ExecInput { previous_stage, stage_progress: prev_progress }) .await { Ok(out @ ExecOutput { stage_progress, done, reached_tip }) => { @@ -343,7 +343,7 @@ impl QueuedStage { %done, "Stage made progress" ); - stage_id.save_progress(db.deref(), stage_progress)?; + stage_id.save_progress(tx.deref(), stage_progress)?; state .events_sender @@ -351,7 +351,7 @@ impl QueuedStage { .await?; // TODO: Make the commit interval configurable - db.commit()?; + tx.commit()?; state.record_progress_outliers(stage_progress); state.set_reached_tip(reached_tip); diff --git a/crates/stages/src/stage.rs b/crates/stages/src/stage.rs index 473fdccbd7..7ba1e03eec 100644 --- a/crates/stages/src/stage.rs +++ b/crates/stages/src/stage.rs @@ -71,14 +71,14 @@ pub trait Stage: Send + Sync { /// Execute the stage. async fn execute( &mut self, - db: &mut Transaction<'_, DB>, + tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result; /// Unwind the stage. async fn unwind( &mut self, - db: &mut Transaction<'_, DB>, + tx: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result>; } diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index f8dcbaa6ac..d4d598ae5d 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -75,7 +75,7 @@ impl Stage for BodyStage, + tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { let previous_stage_progress = input.previous_stage_progress(); @@ -96,19 +96,19 @@ impl Stage for BodyStage(db, starting_block, target)?; + let bodies_to_download = self.bodies_to_download::(tx, starting_block, target)?; // Cursors used to write bodies, ommers and transactions - let mut body_cursor = db.cursor_mut::()?; - let mut ommers_cursor = db.cursor_mut::()?; - let mut tx_cursor = db.cursor_mut::()?; + let mut body_cursor = tx.cursor_mut::()?; + let mut ommers_cursor = tx.cursor_mut::()?; + let mut tx_cursor = tx.cursor_mut::()?; // Cursors used to write state transition mapping - let mut block_transition_cursor = db.cursor_mut::()?; - let mut tx_transition_cursor = db.cursor_mut::()?; + let mut block_transition_cursor = tx.cursor_mut::()?; + let mut tx_transition_cursor = tx.cursor_mut::()?; // Get id for the first transaction in the block - let (mut current_tx_id, mut transition_id) = db.get_next_block_ids(starting_block)?; + let (mut current_tx_id, mut transition_id) = tx.get_next_block_ids(starting_block)?; // NOTE(onbjerg): The stream needs to live here otherwise it will just create a new iterator // on every iteration of the while loop -_- @@ -156,7 +156,7 @@ impl Stage for BodyStage(transaction.hash(), current_tx_id)?; + tx.put::(transaction.hash(), current_tx_id)?; // Append the transaction tx_cursor.append(current_tx_id, transaction)?; tx_transition_cursor.append(current_tx_id, transition_id)?; @@ -196,17 +196,17 @@ impl Stage for BodyStage, + tx: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result> { // Cursors to unwind bodies, ommers, transactions and tx hash to number - let mut body_cursor = db.cursor_mut::()?; - let mut ommers_cursor = db.cursor_mut::()?; - let mut transaction_cursor = db.cursor_mut::()?; - let mut tx_hash_number_cursor = db.cursor_mut::()?; + let mut body_cursor = tx.cursor_mut::()?; + let mut ommers_cursor = tx.cursor_mut::()?; + let mut transaction_cursor = tx.cursor_mut::()?; + let mut tx_hash_number_cursor = tx.cursor_mut::()?; // Cursors to unwind transitions - let mut block_transition_cursor = db.cursor_mut::()?; - let mut tx_transition_cursor = db.cursor_mut::()?; + let mut block_transition_cursor = tx.cursor_mut::()?; + let mut tx_transition_cursor = tx.cursor_mut::()?; // let mut entry = tx_count_cursor.last()?; let mut entry = body_cursor.last()?; @@ -471,7 +471,7 @@ mod tests { // Delete a transaction runner - .db() + .tx() .commit(|tx| { let mut tx_cursor = tx.cursor_mut::()?; let (_, transaction) = tx_cursor.last()?.expect("Could not read last transaction"); @@ -579,7 +579,7 @@ mod tests { pub(crate) struct BodyTestRunner { pub(crate) consensus: Arc, responses: HashMap>, - db: TestTransaction, + tx: TestTransaction, batch_size: u64, } @@ -588,7 +588,7 @@ mod tests { Self { consensus: Arc::new(TestConsensus::default()), responses: HashMap::default(), - db: TestTransaction::default(), + tx: TestTransaction::default(), batch_size: 1000, } } @@ -610,8 +610,8 @@ mod tests { impl StageTestRunner for BodyTestRunner { type S = BodyStage; - fn db(&self) -> &TestTransaction { - &self.db + fn tx(&self) -> &TestTransaction { + &self.tx } fn stage(&self) -> Self::S { @@ -631,10 +631,10 @@ mod tests { let start = input.stage_progress.unwrap_or_default(); let end = input.previous_stage_progress() + 1; let blocks = random_block_range(start..end, GENESIS_HASH); - self.db.insert_headers(blocks.iter().map(|block| &block.header))?; + self.tx.insert_headers(blocks.iter().map(|block| &block.header))?; if let Some(progress) = blocks.first() { // Insert last progress data - self.db.commit(|tx| { + self.tx.commit(|tx| { let key = (progress.number, progress.hash()).into(); let body = StoredBlockBody { start_tx_id: 0, @@ -678,24 +678,24 @@ mod tests { impl UnwindStageTestRunner for BodyTestRunner { fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> { - self.db.check_no_entry_above::(input.unwind_to, |key| { + self.tx.check_no_entry_above::(input.unwind_to, |key| { key.number() })?; - self.db.check_no_entry_above::(input.unwind_to, |key| { + self.tx.check_no_entry_above::(input.unwind_to, |key| { key.number() })?; - self.db.check_no_entry_above::( + self.tx.check_no_entry_above::( input.unwind_to, |key| key.number(), )?; if let Some(last_tx_id) = self.get_last_tx_id()? { - self.db + self.tx .check_no_entry_above::(last_tx_id, |key| key)?; - self.db.check_no_entry_above::( + self.tx.check_no_entry_above::( last_tx_id, |key| key, )?; - self.db.check_no_entry_above_by_value::( + self.tx.check_no_entry_above_by_value::( last_tx_id, |value| value, )?; @@ -707,7 +707,7 @@ mod tests { impl BodyTestRunner { /// Get the last available tx id if any pub(crate) fn get_last_tx_id(&self) -> Result, TestRunnerError> { - let last_body = self.db.query(|tx| { + let last_body = self.tx.query(|tx| { let v = tx.cursor::()?.last()?; Ok(v) })?; @@ -725,7 +725,7 @@ mod tests { prev_progress: BlockNumber, highest_block: BlockNumber, ) -> Result<(), TestRunnerError> { - self.db.query(|tx| { + self.tx.query(|tx| { // Acquire cursors on body related tables let mut bodies_cursor = tx.cursor::()?; let mut ommers_cursor = tx.cursor::()?; diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index 9f3caf72c2..092153b36d 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -80,7 +80,7 @@ impl Stage for ExecutionStage { /// Execute the stage async fn execute( &mut self, - db: &mut Transaction<'_, DB>, + tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { // none and zero are same as for genesis block (zeroed block) we are making assumption to @@ -89,15 +89,15 @@ impl Stage for ExecutionStage { let start_block = last_block + 1; // Get next canonical block hashes to execute. - let mut canonicals = db.cursor::()?; + let mut canonicals = tx.cursor::()?; // Get header with canonical hashes. - let mut headers = db.cursor::()?; + let mut headers = tx.cursor::()?; // Get bodies with canonical hashes. - let mut bodies_cursor = db.cursor::()?; + let mut bodies_cursor = tx.cursor::()?; // Get transaction of the block that we are executing. - let mut tx = db.cursor::()?; + let mut tx_cursor = tx.cursor::()?; // Skip sender recovery and load signer from database. - let mut tx_sender = db.cursor::()?; + let mut tx_sender = tx.cursor::()?; // get canonical blocks (num,hash) let canonical_batch = canonicals @@ -134,7 +134,7 @@ impl Stage for ExecutionStage { let num = header.number; tracing::trace!(target: "stages::execution", ?num, "Execute block num."); // iterate over all transactions - let mut tx_walker = tx.walk(body.start_tx_id)?; + let mut tx_walker = tx_cursor.walk(body.start_tx_id)?; let mut transactions = Vec::with_capacity(body.tx_count as usize); // get next N transactions. for index in body.tx_id_range() { @@ -170,7 +170,7 @@ impl Stage for ExecutionStage { .collect(); // for now use default eth config - let state_provider = SubState::new(State::new(StateProviderImplRefLatest::new(&**db))); + let state_provider = SubState::new(State::new(StateProviderImplRefLatest::new(&**tx))); let change_set = std::thread::scope(|scope| { let handle = std::thread::Builder::new() @@ -194,7 +194,7 @@ impl Stage for ExecutionStage { } // Get last tx count so that we can know amount of transaction in the block. - let mut current_transition_id = db.get_block_transition_by_num(last_block)? + 1; + let mut current_transition_id = tx.get_block_transition_by_num(last_block)? + 1; // apply changes to plain database. for results in block_change_patches.into_iter() { @@ -205,12 +205,12 @@ impl Stage for ExecutionStage { let AccountChangeSet { account, wipe_storage, storage } = account_change_set; // apply account change to db. Updates AccountChangeSet and PlainAccountState // tables. - account.apply_to_db(&**db, address, current_transition_id)?; + account.apply_to_db(&**tx, address, current_transition_id)?; // wipe storage if wipe_storage { // TODO insert all changes to StorageChangeSet - db.delete::(address, None)?; + tx.delete::(address, None)?; } // insert storage changeset let storage_id = TransitionIdAddress((current_transition_id, address)); @@ -219,18 +219,18 @@ impl Stage for ExecutionStage { key.to_big_endian(&mut hkey.0); // insert into StorageChangeSet - db.put::( + tx.put::( storage_id.clone(), StorageEntry { key: hkey, value: old_value }, )?; if new_value.is_zero() { - db.delete::( + tx.delete::( address, Some(StorageEntry { key: hkey, value: old_value }), )?; } else { - db.put::( + tx.put::( address, StorageEntry { key: hkey, value: new_value }, )?; @@ -242,7 +242,7 @@ impl Stage for ExecutionStage { for (hash, bytecode) in result.new_bytecodes.into_iter() { // make different types of bytecode. Checked and maybe even analyzed (needs to // be packed). Currently save only raw bytes. - db.put::(hash, bytecode.bytes()[..bytecode.len()].to_vec())?; + tx.put::(hash, bytecode.bytes()[..bytecode.len()].to_vec())?; // NOTE: bytecode bytes are not inserted in change set and it stand in saparate // table @@ -254,7 +254,7 @@ impl Stage for ExecutionStage { if let Some(block_reward_changeset) = results.block_reward { // we are sure that block reward index is present. for (address, changeset) in block_reward_changeset.into_iter() { - changeset.apply_to_db(&**db, address, current_transition_id)?; + changeset.apply_to_db(&**tx, address, current_transition_id)?; } current_transition_id += 1; } @@ -268,17 +268,17 @@ impl Stage for ExecutionStage { /// Unwind the stage. async fn unwind( &mut self, - db: &mut Transaction<'_, DB>, + tx: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result> { // Acquire changeset cursors - let mut account_changeset = db.cursor_dup_mut::()?; - let mut storage_changeset = db.cursor_dup_mut::()?; + let mut account_changeset = tx.cursor_dup_mut::()?; + let mut storage_changeset = tx.cursor_dup_mut::()?; - let from_transition = db.get_block_transition_by_num(input.stage_progress)?; + let from_transition = tx.get_block_transition_by_num(input.stage_progress)?; let to_transition = if input.unwind_to != 0 { - db.get_block_transition_by_num(input.unwind_to - 1)? + tx.get_block_transition_by_num(input.unwind_to - 1)? } else { 0 }; @@ -308,9 +308,9 @@ impl Stage for ExecutionStage { for (_, changeset) in account_changeset_batch.into_iter().rev() { // TODO refactor in db fn called tx.aplly_account_changeset if let Some(account_info) = changeset.info { - db.put::(changeset.address, account_info)?; + tx.put::(changeset.address, account_info)?; } else { - db.delete::(changeset.address, None)?; + tx.delete::(changeset.address, None)?; } } @@ -328,10 +328,10 @@ impl Stage for ExecutionStage { // revert all changes to PlainStorage for (key, storage) in storage_chageset_batch.into_iter().rev() { let address = key.address(); - db.put::(address, storage.clone())?; + tx.put::(address, storage.clone())?; if storage.value == U256::zero() { // delete value that is zero - db.delete::(address, Some(storage))?; + tx.delete::(address, Some(storage))?; } } @@ -373,7 +373,7 @@ mod tests { // TODO cleanup the setup after https://github.com/paradigmxyz/reth/issues/332 // is merged as it has similar framework let state_db = create_test_db::(EnvKind::RW); - let mut db = Transaction::new(state_db.as_ref()).unwrap(); + let mut tx = Transaction::new(state_db.as_ref()).unwrap(); let input = ExecInput { previous_stage: None, /// The progress of this stage the last time it was executed. @@ -383,37 +383,39 @@ mod tests { let genesis = BlockLocked::decode(&mut genesis_rlp).unwrap(); let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabb901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000018502540be40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice(); let block = BlockLocked::decode(&mut block_rlp).unwrap(); - insert_canonical_block(db.deref_mut(), &genesis, true).unwrap(); - insert_canonical_block(db.deref_mut(), &block, true).unwrap(); - db.commit().unwrap(); + insert_canonical_block(tx.deref_mut(), &genesis, true).unwrap(); + insert_canonical_block(tx.deref_mut(), &block, true).unwrap(); + tx.commit().unwrap(); // insert pre state - let tx = db.deref_mut(); + let db_tx = tx.deref_mut(); let acc1 = H160(hex!("1000000000000000000000000000000000000000")); let acc2 = H160(hex!("a94f5374fce5edbc8e2a8697c15331677e6ebf0b")); let code = hex!("5a465a905090036002900360015500"); let balance = U256::from(0x3635c9adc5dea00000u128); let code_hash = keccak256(code); - tx.put::( - acc1, - Account { nonce: 0, balance: 0.into(), bytecode_hash: Some(code_hash) }, - ) - .unwrap(); - tx.put::( - acc2, - Account { nonce: 0, balance, bytecode_hash: None }, - ) - .unwrap(); - tx.put::(code_hash, code.to_vec()).unwrap(); - db.commit().unwrap(); + db_tx + .put::( + acc1, + Account { nonce: 0, balance: 0.into(), bytecode_hash: Some(code_hash) }, + ) + .unwrap(); + db_tx + .put::( + acc2, + Account { nonce: 0, balance, bytecode_hash: None }, + ) + .unwrap(); + db_tx.put::(code_hash, code.to_vec()).unwrap(); + tx.commit().unwrap(); // execute let mut execution_stage = ExecutionStage::default(); execution_stage.config.spec_upgrades = SpecUpgrades::new_berlin_activated(); - let output = execution_stage.execute(&mut db, input).await.unwrap(); - db.commit().unwrap(); + let output = execution_stage.execute(&mut tx, input).await.unwrap(); + tx.commit().unwrap(); assert_eq!(output, ExecOutput { stage_progress: 1, done: true, reached_tip: true }); - let tx = db.deref_mut(); + let tx = tx.deref_mut(); // check post state let account1 = H160(hex!("1000000000000000000000000000000000000000")); let account1_info = @@ -456,7 +458,7 @@ mod tests { // is merged as it has similar framework let state_db = create_test_db::(EnvKind::RW); - let mut db = Transaction::new(state_db.as_ref()).unwrap(); + let mut tx = Transaction::new(state_db.as_ref()).unwrap(); let input = ExecInput { previous_stage: None, /// The progress of this stage the last time it was executed. @@ -466,56 +468,56 @@ mod tests { let genesis = BlockLocked::decode(&mut genesis_rlp).unwrap(); let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabb901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000018502540be40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice(); let block = BlockLocked::decode(&mut block_rlp).unwrap(); - insert_canonical_block(db.deref_mut(), &genesis, true).unwrap(); - insert_canonical_block(db.deref_mut(), &block, true).unwrap(); - db.commit().unwrap(); + insert_canonical_block(tx.deref_mut(), &genesis, true).unwrap(); + insert_canonical_block(tx.deref_mut(), &block, true).unwrap(); + tx.commit().unwrap(); // variables let code = hex!("5a465a905090036002900360015500"); let balance = U256::from(0x3635c9adc5dea00000u128); let code_hash = keccak256(code); // pre state - let tx = db.deref_mut(); + let db_tx = tx.deref_mut(); let acc1 = H160(hex!("1000000000000000000000000000000000000000")); let acc1_info = Account { nonce: 0, balance: 0.into(), bytecode_hash: Some(code_hash) }; let acc2 = H160(hex!("a94f5374fce5edbc8e2a8697c15331677e6ebf0b")); let acc2_info = Account { nonce: 0, balance, bytecode_hash: None }; - tx.put::(acc1, acc1_info).unwrap(); - tx.put::(acc2, acc2_info).unwrap(); - tx.put::(code_hash, code.to_vec()).unwrap(); - db.commit().unwrap(); + db_tx.put::(acc1, acc1_info).unwrap(); + db_tx.put::(acc2, acc2_info).unwrap(); + db_tx.put::(code_hash, code.to_vec()).unwrap(); + tx.commit().unwrap(); // execute let mut execution_stage = ExecutionStage::default(); execution_stage.config.spec_upgrades = SpecUpgrades::new_berlin_activated(); - let _ = execution_stage.execute(&mut db, input).await.unwrap(); - db.commit().unwrap(); + let _ = execution_stage.execute(&mut tx, input).await.unwrap(); + tx.commit().unwrap(); let o = ExecutionStage::default() - .unwind(&mut db, UnwindInput { stage_progress: 1, unwind_to: 0, bad_block: None }) + .unwind(&mut tx, UnwindInput { stage_progress: 1, unwind_to: 0, bad_block: None }) .await .unwrap(); assert_eq!(o, UnwindOutput { stage_progress: 0 }); // assert unwind stage - let tx = db.deref(); + let db_tx = tx.deref(); assert_eq!( - tx.get::(acc1), + db_tx.get::(acc1), Ok(Some(acc1_info)), "Pre changed of a account" ); assert_eq!( - tx.get::(acc2), + db_tx.get::(acc2), Ok(Some(acc2_info)), "Post changed of a account" ); let miner_acc = H160(hex!("2adc25665018aa1fe0e6bc666dac8fc2697ff9ba")); assert_eq!( - tx.get::(miner_acc), + db_tx.get::(miner_acc), Ok(None), "Third account should be unwinded" ); diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index 99f1c39637..3256c65abc 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -68,14 +68,14 @@ impl, + tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { let stage_progress = input.stage_progress.unwrap_or_default(); - self.update_head::(db, stage_progress).await?; + self.update_head::(tx, stage_progress).await?; // Lookup the head and tip of the sync range - let (head, tip) = self.get_head_and_tip(db, stage_progress).await?; + let (head, tip) = self.get_head_and_tip(tx, stage_progress).await?; debug!( target: "sync::stages::headers", "Syncing from tip {:?} to head {:?}", @@ -101,8 +101,8 @@ impl(db, res).await?.unwrap_or_default(); - db.commit()?; + self.write_headers::(tx, res).await?.unwrap_or_default(); + tx.commit()?; current_progress = current_progress.max(write_progress); } Err(e) => match e { @@ -133,10 +133,10 @@ impl(db, &head)?; + self.write_td::(tx, &head)?; let stage_progress = current_progress.max( - db.cursor::()? + tx.cursor::()? .last()? .map(|(num, _)| num) .unwrap_or_default(), @@ -148,16 +148,16 @@ impl, + tx: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result> { // TODO: handle bad block - db.unwind_table_by_walker::( + tx.unwind_table_by_walker::( input.unwind_to + 1, )?; - db.unwind_table_by_num::(input.unwind_to)?; - db.unwind_table_by_num_hash::(input.unwind_to)?; - db.unwind_table_by_num_hash::(input.unwind_to)?; + tx.unwind_table_by_num::(input.unwind_to)?; + tx.unwind_table_by_num_hash::(input.unwind_to)?; + tx.unwind_table_by_num_hash::(input.unwind_to)?; Ok(UnwindOutput { stage_progress: input.unwind_to }) } } @@ -167,11 +167,11 @@ impl { async fn update_head( &self, - db: &Transaction<'_, DB>, + tx: &Transaction<'_, DB>, height: BlockNumber, ) -> Result<(), StageError> { - let block_key = db.get_block_numhash(height)?; - let td: U256 = *db + let block_key = tx.get_block_numhash(height)?; + let td: U256 = *tx .get::(block_key)? .ok_or(DatabaseIntegrityError::TotalDifficulty { number: height })?; // TODO: This should happen in the last stage @@ -182,12 +182,12 @@ impl /// Get the head and tip of the range we need to sync async fn get_head_and_tip( &self, - db: &Transaction<'_, DB>, + tx: &Transaction<'_, DB>, stage_progress: u64, ) -> Result<(SealedHeader, H256), StageError> { // Create a cursor over canonical header hashes - let mut cursor = db.cursor::()?; - let mut header_cursor = db.cursor::()?; + let mut cursor = tx.cursor::()?; + let mut header_cursor = tx.cursor::()?; // Get head hash and reposition the cursor let (head_num, head_hash) = cursor @@ -250,11 +250,11 @@ impl /// Write downloaded headers to the database async fn write_headers( &self, - db: &Transaction<'_, DB>, + tx: &Transaction<'_, DB>, headers: Vec, ) -> Result, StageError> { - let mut cursor_header = db.cursor_mut::()?; - let mut cursor_canonical = db.cursor_mut::()?; + let mut cursor_header = tx.cursor_mut::()?; + let mut cursor_canonical = tx.cursor_mut::()?; let mut latest = None; // Since the headers were returned in descending order, @@ -270,7 +270,7 @@ impl latest = Some(header.number); // NOTE: HeaderNumbers are not sorted and can't be inserted with cursor. - db.put::(block_hash, header.number)?; + tx.put::(block_hash, header.number)?; cursor_header.insert(key, header)?; cursor_canonical.insert(key.number(), key.hash())?; } @@ -281,11 +281,11 @@ impl /// Iterate over inserted headers and write td entries fn write_td( &self, - db: &Transaction<'_, DB>, + tx: &Transaction<'_, DB>, head: &SealedHeader, ) -> Result<(), StageError> { // Acquire cursor over total difficulty table - let mut cursor_td = db.cursor_mut::()?; + let mut cursor_td = tx.cursor_mut::()?; // Get latest total difficulty let last_entry = cursor_td @@ -294,10 +294,10 @@ impl let mut td: U256 = last_entry.1.into(); // Start at first inserted block during this iteration - let start_key = db.get_block_numhash(head.number + 1)?; + let start_key = tx.get_block_numhash(head.number + 1)?; // Walk over newly inserted headers, update & insert td - for entry in db.cursor::()?.walk(start_key)? { + for entry in tx.cursor::()?.walk(start_key)? { let (key, header) = entry?; td += header.difficulty; cursor_td.append(key, td.into())?; @@ -413,7 +413,7 @@ mod tests { #[tokio::test] async fn head_and_tip_lookup() { let runner = HeadersTestRunner::default(); - let db = runner.db().inner(); + let tx = runner.tx().inner(); let stage = runner.stage(); let consensus_tip = H256::random(); @@ -427,38 +427,38 @@ mod tests { // Empty database assert_matches!( - stage.get_head_and_tip(&db, stage_progress).await, + stage.get_head_and_tip(&tx, stage_progress).await, Err(StageError::DatabaseIntegrity(DatabaseIntegrityError::CanonicalHeader { number })) if number == stage_progress ); // Checkpoint and no gap - db.put::(head.number, head.hash()) + tx.put::(head.number, head.hash()) .expect("falied to write canonical"); - db.put::(head.num_hash().into(), head.clone().unseal()) + tx.put::(head.num_hash().into(), head.clone().unseal()) .expect("failed to write header"); assert_matches!( - stage.get_head_and_tip(&db, stage_progress).await, + stage.get_head_and_tip(&tx, stage_progress).await, Ok((h, t)) if h == head && t == consensus_tip ); // Checkpoint and gap - db.put::(gap_tip.number, gap_tip.hash()) + tx.put::(gap_tip.number, gap_tip.hash()) .expect("falied to write canonical"); - db.put::(gap_tip.num_hash().into(), gap_tip.clone().unseal()) + tx.put::(gap_tip.num_hash().into(), gap_tip.clone().unseal()) .expect("failed to write header"); assert_matches!( - stage.get_head_and_tip(&db, stage_progress).await, + stage.get_head_and_tip(&tx, stage_progress).await, Ok((h, t)) if h == head && t == gap_tip.parent_hash ); // Checkpoint and gap closed - db.put::(gap_fill.number, gap_fill.hash()) + tx.put::(gap_fill.number, gap_fill.hash()) .expect("falied to write canonical"); - db.put::(gap_fill.num_hash().into(), gap_fill.clone().unseal()) + tx.put::(gap_fill.num_hash().into(), gap_fill.clone().unseal()) .expect("failed to write header"); assert_matches!( - stage.get_head_and_tip(&db, stage_progress).await, + stage.get_head_and_tip(&tx, stage_progress).await, Err(StageError::StageProgress(progress)) if progress == stage_progress ); } @@ -489,7 +489,7 @@ mod tests { pub(crate) client: Arc, downloader: Arc, network_handle: TestStatusUpdater, - db: TestTransaction, + tx: TestTransaction, } impl Default for HeadersTestRunner { @@ -501,7 +501,7 @@ mod tests { consensus: consensus.clone(), downloader: Arc::new(TestHeaderDownloader::new(client, consensus, 1000)), network_handle: TestStatusUpdater::default(), - db: TestTransaction::default(), + tx: TestTransaction::default(), } } } @@ -509,8 +509,8 @@ mod tests { impl StageTestRunner for HeadersTestRunner { type S = HeaderStage, TestConsensus, TestHeadersClient, TestStatusUpdater>; - fn db(&self) -> &TestTransaction { - &self.db + fn tx(&self) -> &TestTransaction { + &self.tx } fn stage(&self) -> Self::S { @@ -531,7 +531,7 @@ mod tests { fn seed_execution(&mut self, input: ExecInput) -> Result { let start = input.stage_progress.unwrap_or_default(); let head = random_header(start, None); - self.db.insert_headers(std::iter::once(&head))?; + self.tx.insert_headers(std::iter::once(&head))?; // use previous progress as seed size let end = input.previous_stage.map(|(_, num)| num).unwrap_or_default() + 1; @@ -554,7 +554,7 @@ mod tests { let initial_stage_progress = input.stage_progress.unwrap_or_default(); match output { Some(output) if output.stage_progress > initial_stage_progress => { - self.db.query(|tx| { + self.tx.query(|tx| { for block_num in (initial_stage_progress..output.stage_progress).rev() { // look up the header hash let hash = tx @@ -597,7 +597,7 @@ mod tests { headers.last().unwrap().hash() } else { let tip = random_header(0, None); - self.db.insert_headers(std::iter::once(&tip))?; + self.tx.insert_headers(std::iter::once(&tip))?; tip.hash() }; self.consensus.update_tip(tip); @@ -624,7 +624,7 @@ mod tests { consensus, downloader, network_handle: TestStatusUpdater::default(), - db: TestTransaction::default(), + tx: TestTransaction::default(), } } } @@ -634,11 +634,11 @@ mod tests { &self, block: BlockNumber, ) -> Result<(), TestRunnerError> { - self.db + self.tx .check_no_entry_above_by_value::(block, |val| val)?; - self.db.check_no_entry_above::(block, |key| key)?; - self.db.check_no_entry_above::(block, |key| key.number())?; - self.db.check_no_entry_above::(block, |key| key.number())?; + self.tx.check_no_entry_above::(block, |key| key)?; + self.tx.check_no_entry_above::(block, |key| key.number())?; + self.tx.check_no_entry_above::(block, |key| key.number())?; Ok(()) } } diff --git a/crates/stages/src/stages/senders.rs b/crates/stages/src/stages/senders.rs index aa18c0d9be..8bec55f13e 100644 --- a/crates/stages/src/stages/senders.rs +++ b/crates/stages/src/stages/senders.rs @@ -55,7 +55,7 @@ impl Stage for SendersStage { /// the [`TxSenders`][reth_interfaces::db::tables::TxSenders] table. async fn execute( &mut self, - db: &mut Transaction<'_, DB>, + tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { let stage_progress = input.stage_progress.unwrap_or_default(); @@ -67,10 +67,10 @@ impl Stage for SendersStage { } // Look up the start index for the transaction range - let start_tx_index = db.get_block_body_by_num(stage_progress + 1)?.start_tx_id; + let start_tx_index = tx.get_block_body_by_num(stage_progress + 1)?.start_tx_id; // Look up the end index for transaction range (inclusive) - let end_tx_index = db.get_block_body_by_num(max_block_num)?.last_tx_index(); + let end_tx_index = tx.get_block_body_by_num(max_block_num)?.last_tx_index(); // No transactions to walk over if start_tx_index > end_tx_index { @@ -78,10 +78,10 @@ impl Stage for SendersStage { } // Acquire the cursor for inserting elements - let mut senders_cursor = db.cursor_mut::()?; + let mut senders_cursor = tx.cursor_mut::()?; // Acquire the cursor over the transactions - let mut tx_cursor = db.cursor::()?; + let mut tx_cursor = tx.cursor::()?; // Walk the transactions from start to end index (inclusive) let entries = tx_cursor .walk(start_tx_index)? @@ -112,12 +112,12 @@ impl Stage for SendersStage { /// Unwind the stage. async fn unwind( &mut self, - db: &mut Transaction<'_, DB>, + tx: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result> { // Lookup latest tx id that we should unwind to - let latest_tx_id = db.get_block_body_by_num(input.unwind_to)?.last_tx_index(); - db.unwind_table_by_num::(latest_tx_id)?; + let latest_tx_id = tx.get_block_body_by_num(input.unwind_to)?.last_tx_index(); + tx.unwind_table_by_num::(latest_tx_id)?; Ok(UnwindOutput { stage_progress: input.unwind_to }) } } @@ -176,13 +176,13 @@ mod tests { } struct SendersTestRunner { - db: TestTransaction, + tx: TestTransaction, threshold: u64, } impl Default for SendersTestRunner { fn default() -> Self { - Self { threshold: 1000, db: TestTransaction::default() } + Self { threshold: 1000, tx: TestTransaction::default() } } } @@ -195,8 +195,8 @@ mod tests { impl StageTestRunner for SendersTestRunner { type S = SendersStage; - fn db(&self) -> &TestTransaction { - &self.db + fn tx(&self) -> &TestTransaction { + &self.tx } fn stage(&self) -> Self::S { @@ -213,7 +213,7 @@ mod tests { let blocks = random_block_range(stage_progress..end, H256::zero()); - self.db.commit(|tx| { + self.tx.commit(|tx| { let mut current_tx_id = 0; blocks.iter().try_for_each(|b| { let txs = b.body.clone(); @@ -251,7 +251,7 @@ mod tests { output: Option, ) -> Result<(), TestRunnerError> { if let Some(output) = output { - self.db.query(|tx| { + self.tx.query(|tx| { let start_block = input.stage_progress.unwrap_or_default() + 1; let end_block = output.stage_progress; @@ -292,15 +292,15 @@ mod tests { impl SendersTestRunner { fn check_no_senders_by_block(&self, block: BlockNumber) -> Result<(), TestRunnerError> { - let body_result = self.db.inner().get_block_body_by_num(block); + let body_result = self.tx.inner().get_block_body_by_num(block); match body_result { Ok(body) => self - .db + .tx .check_no_entry_above::(body.last_tx_index(), |key| { key })?, Err(_) => { - assert!(self.db.table_is_empty::()?); + assert!(self.tx.table_is_empty::()?); } }; diff --git a/crates/stages/src/test_utils/runner.rs b/crates/stages/src/test_utils/runner.rs index f5a75b8ef3..39250ba161 100644 --- a/crates/stages/src/test_utils/runner.rs +++ b/crates/stages/src/test_utils/runner.rs @@ -19,7 +19,7 @@ pub(crate) trait StageTestRunner { type S: Stage> + 'static; /// Return a reference to the database. - fn db(&self) -> &TestTransaction; + fn tx(&self) -> &TestTransaction; /// Return an instance of a Stage. fn stage(&self) -> Self::S; @@ -42,7 +42,7 @@ pub(crate) trait ExecuteStageTestRunner: StageTestRunner { /// Run [Stage::execute] and return a receiver for the result. fn execute(&self, input: ExecInput) -> oneshot::Receiver> { let (tx, rx) = oneshot::channel(); - let (db, mut stage) = (self.db().inner_raw(), self.stage()); + let (db, mut stage) = (self.tx().inner_raw(), self.stage()); tokio::spawn(async move { let mut db = Transaction::new(db.borrow()).expect("failed to create db container"); let result = stage.execute(&mut db, input).await; @@ -69,7 +69,7 @@ pub(crate) trait UnwindStageTestRunner: StageTestRunner { input: UnwindInput, ) -> Result> { let (tx, rx) = oneshot::channel(); - let (db, mut stage) = (self.db().inner_raw(), self.stage()); + let (db, mut stage) = (self.tx().inner_raw(), self.stage()); tokio::spawn(async move { let mut db = Transaction::new(db.borrow()).expect("failed to create db container"); let result = stage.unwind(&mut db, input).await; diff --git a/crates/stages/src/test_utils/test_db.rs b/crates/stages/src/test_utils/test_db.rs index 05e29431a2..349da4068f 100644 --- a/crates/stages/src/test_utils/test_db.rs +++ b/crates/stages/src/test_utils/test_db.rs @@ -12,33 +12,33 @@ use std::{borrow::Borrow, sync::Arc}; use crate::db::Transaction; -/// The [StageTestDB] is used as an internal +/// The [TestTransaction] is used as an internal /// database for testing stage implementation. /// /// ```rust -/// let db = StageTestDB::default(); -/// stage.execute(&mut db.container(), input); +/// let tx = TestTransaction::default(); +/// stage.execute(&mut tx.container(), input); /// ``` pub(crate) struct TestTransaction { - db: Arc>, + tx: Arc>, } impl Default for TestTransaction { - /// Create a new instance of [StageTestDB] + /// Create a new instance of [TestTransaction] fn default() -> Self { - Self { db: create_test_db::(EnvKind::RW) } + Self { tx: create_test_db::(EnvKind::RW) } } } impl TestTransaction { /// Return a database wrapped in [Transaction]. pub(crate) fn inner(&self) -> Transaction<'_, Env> { - Transaction::new(self.db.borrow()).expect("failed to create db container") + Transaction::new(self.tx.borrow()).expect("failed to create db container") } /// Get a pointer to an internal database. pub(crate) fn inner_raw(&self) -> Arc> { - self.db.clone() + self.tx.clone() } /// Invoke a callback with transaction committing it afterwards @@ -46,9 +46,9 @@ impl TestTransaction { where F: FnOnce(&mut Tx<'_, RW, WriteMap>) -> Result<(), DbError>, { - let mut db = self.inner(); - f(&mut db)?; - db.commit()?; + let mut tx = self.inner(); + f(&mut tx)?; + tx.commit()?; Ok(()) } @@ -72,8 +72,8 @@ impl TestTransaction { /// This function commits the transaction before exiting. /// /// ```rust - /// let db = StageTestDB::default(); - /// db.map_put::(&items, |item| item)?; + /// let tx = TestTransaction::default(); + /// tx.map_put::(&items, |item| item)?; /// ``` #[allow(dead_code)] pub(crate) fn map_put(&self, values: &[S], mut map: F) -> Result<(), DbError> @@ -96,8 +96,8 @@ impl TestTransaction { /// This function commits the transaction before exiting. /// /// ```rust - /// let db = StageTestDB::default(); - /// db.transform_append::(&items, |prev, item| prev.unwrap_or_default() + item)?; + /// let tx = TestTransaction::default(); + /// tx.transform_append::(&items, |prev, item| prev.unwrap_or_default() + item)?; /// ``` #[allow(dead_code)] pub(crate) fn transform_append(