From 8ee6d80949d8b8e2fea08bfbe7362f57bbd8f8c6 Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Tue, 15 Nov 2022 19:10:11 +0200 Subject: [PATCH] cleanup txindex tests --- crates/stages/src/stages/headers.rs | 151 ++++++++++----------------- crates/stages/src/stages/tx_index.rs | 138 +++++++++--------------- crates/stages/src/util.rs | 45 +++++--- 3 files changed, 129 insertions(+), 205 deletions(-) diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index e9e0d09ef0..f8a93e118e 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -186,7 +186,6 @@ mod tests { stage_test_suite, ExecuteStageTestRunner, UnwindStageTestRunner, PREV_STAGE_ID, }; use assert_matches::assert_matches; - use reth_interfaces::test_utils::{gen_random_header, gen_random_header_range}; use test_runner::HeadersTestRunner; stage_test_suite!(HeadersTestRunner); @@ -195,105 +194,53 @@ mod tests { // Validate that the execution does not fail on timeout async fn execute_timeout() { let mut runner = HeadersTestRunner::default(); - let (stage_progress, previous_stage) = (0, 0); - runner - .seed_execution(ExecInput { - previous_stage: Some((PREV_STAGE_ID, previous_stage)), - stage_progress: Some(stage_progress), - }) - .expect("failed to seed execution"); - - let rx = runner.execute(ExecInput::default()); - runner.after_execution().await.expect("failed to run after execution hook"); + let input = ExecInput::default(); + runner.seed_execution(input).expect("failed to seed execution"); + let rx = runner.execute(input); + runner.consensus.update_tip(H256::from_low_u64_be(1)); + let result = rx.await.unwrap(); assert_matches!( - rx.await.unwrap(), + result, Ok(ExecOutput { done, reached_tip, stage_progress: out_stage_progress }) - if !done && !reached_tip && out_stage_progress == stage_progress + if !done && !reached_tip && out_stage_progress == 0 ); - assert!(runner.validate_execution().is_ok(), "validation failed"); + assert!(runner.validate_execution(input).is_ok(), "validation failed"); } #[tokio::test] - // Validate that all necessary tables are updated after the - // header download with some previous progress. - async fn execute_prev_progress() { - let mut runner = HeadersTestRunner::default(); - let (stage_progress, previous_stage) = (10000, 10241); + // Execute the stage with linear downloader + async fn execute_with_linear_downloader() { + let mut runner = HeadersTestRunner::with_linear_downloader(); + let (stage_progress, previous_stage) = (1000, 1200); let input = ExecInput { previous_stage: Some((PREV_STAGE_ID, previous_stage)), stage_progress: Some(stage_progress), }; runner.seed_execution(input).expect("failed to seed execution"); let rx = runner.execute(input); - runner.after_execution().await.expect("failed to run after execution hook"); + + // skip `after_execution` hook for linear downloader + let headers = runner.context.as_ref().unwrap(); + let tip = headers.last().unwrap(); + runner.consensus.update_tip(tip.hash()); + + let download_result = headers.clone(); + runner + .client + .on_header_request(1, |id, _| { + let response = download_result.clone().into_iter().map(|h| h.unseal()).collect(); + runner.client.send_header_response(id, response) + }) + .await; + assert_matches!( rx.await.unwrap(), Ok(ExecOutput { done, reached_tip, stage_progress }) - if done && reached_tip && stage_progress == stage_progress + if done && reached_tip && stage_progress == tip.number ); - assert!(runner.validate_execution().is_ok(), "validation failed"); + assert!(runner.validate_execution(input).is_ok(), "validation failed"); } - // TODO: - // #[tokio::test] - // // Execute the stage with linear downloader - // async fn execute_with_linear_downloader() { - // let mut runner = HeadersTestRunner::with_linear_downloader(); - // let (stage_progress, previous_stage) = (1000, 1200); - // let input = ExecInput { - // previous_stage: Some((PREV_STAGE_ID, previous_stage)), - // stage_progress: Some(stage_progress), - // }; - // runner.seed_execution(input).expect("failed to seed execution"); - // let rx = runner.execute(input); - - // // skip hook for linear downloader - // let headers = runner.context.as_ref().unwrap(); - // let tip = headers.first().unwrap(); - // runner.consensus.update_tip(tip.hash()); - - // // TODO: - // let mut download_result = headers.clone(); - // download_result.insert(0, headers.last().unwrap().clone()); - // runner - // .client - // .on_header_request(1, |id, _| { - // let response = download_result.clone().into_iter().map(|h| h.unseal()).collect(); - // runner.client.send_header_response(id, response) - // }) - // .await; - - // assert_matches!( - // rx.await.unwrap(), - // Ok(ExecOutput { done, reached_tip, stage_progress }) - // if done && reached_tip && stage_progress == tip.number - // ); - // assert!(runner.validate_execution().is_ok(), "validation failed"); - // } - - // TODO: - // #[tokio::test] - // // Check that unwind can remove headers across gaps - // async fn unwind_db_gaps() { - // let runner = HeadersTestRunner::default(); - // let head = gen_random_header(0, None); - // let first_range = gen_random_header_range(1..20, head.hash()); - // let second_range = gen_random_header_range(50..100, H256::zero()); - // runner.insert_header(&head).expect("failed to insert header"); - // runner - // .insert_headers(first_range.iter().chain(second_range.iter())) - // .expect("failed to insert headers"); - - // let unwind_to = 15; - // let input = UnwindInput { bad_block: None, stage_progress: unwind_to, unwind_to }; - // let rx = runner.unwind(input); - // assert_matches!( - // rx.await.unwrap(), - // Ok(UnwindOutput {stage_progress} ) if stage_progress == unwind_to - // ); - // assert!(runner.validate_unwind(input).is_ok(), "validation failed"); - // } - mod test_runner { use crate::{ stages::headers::HeaderStage, @@ -370,7 +317,7 @@ mod tests { let end = input.previous_stage.map(|(_, num)| num).unwrap_or_default() + 1; if end > start + 1 { let mut headers = gen_random_header_range(start + 1..end, head.hash()); - headers.reverse(); + headers.insert(0, head); self.context = Some(headers); } Ok(()) @@ -381,27 +328,28 @@ mod tests { ) -> Result<(), Box> { let (tip, headers) = match self.context { Some(ref headers) if headers.len() > 1 => { - // headers are in reverse - (headers.first().unwrap().hash(), headers.clone()) + (headers.last().unwrap().hash(), headers.clone()) } _ => (H256::from_low_u64_be(rand::random()), Vec::default()), }; self.consensus.update_tip(tip); - if !headers.is_empty() { - self.client - .send_header_response_delayed( - 0, - headers.iter().cloned().map(|h| h.unseal()).collect(), - 1, - ) - .await; - } + self.client + .send_header_response_delayed( + 0, + headers.iter().cloned().map(|h| h.unseal()).collect(), + 1, + ) + .await; Ok(()) } - fn validate_execution(&self) -> Result<(), Box> { + fn validate_execution( + &self, + _input: ExecInput, + ) -> Result<(), Box> { if let Some(ref headers) = self.context { - headers.iter().try_for_each(|h| self.validate_db_header(&h))?; + // skip head and validate each + headers.iter().skip(1).try_for_each(|h| self.validate_db_header(&h))?; } Ok(()) } @@ -546,11 +494,18 @@ mod tests { _: &ForkchoiceState, ) -> Result, DownloadError> { let stream = self.client.stream_headers().await; - let stream = stream.timeout(Duration::from_secs(3)); + let stream = stream.timeout(Duration::from_secs(1)); match Box::pin(stream).try_next().await { Ok(Some(res)) => { - Ok(res.headers.iter().map(|h| h.clone().seal()).collect::>()) + let mut headers = + res.headers.iter().map(|h| h.clone().seal()).collect::>(); + if !headers.is_empty() { + headers.sort_unstable_by_key(|h| h.number); + headers.remove(0); // remove head from response + headers.reverse(); + } + Ok(headers) } _ => Err(DownloadError::Timeout { request_id: 0 }), } diff --git a/crates/stages/src/stages/tx_index.rs b/crates/stages/src/stages/tx_index.rs index 05b45cb383..808aed6ba5 100644 --- a/crates/stages/src/stages/tx_index.rs +++ b/crates/stages/src/stages/tx_index.rs @@ -89,7 +89,7 @@ mod tests { use super::*; use crate::util::test_utils::{ stage_test_suite, ExecuteStageTestRunner, StageTestDB, StageTestRunner, - UnwindStageTestRunner, PREV_STAGE_ID, + UnwindStageTestRunner, }; use assert_matches::assert_matches; use reth_interfaces::{db::models::BlockNumHash, test_utils::gen_random_header_range}; @@ -97,83 +97,6 @@ mod tests { stage_test_suite!(TxIndexTestRunner); - #[tokio::test] - async fn execute_no_prev_tx_count() { - let runner = TxIndexTestRunner::default(); - let headers = gen_random_header_range(0..10, H256::zero()); - runner - .db() - .map_put::(&headers, |h| (h.number, h.hash())) - .expect("failed to insert"); - - let (head, tail) = (headers.first().unwrap(), headers.last().unwrap()); - let input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, tail.number)), - stage_progress: Some(head.number), - }; - let rx = runner.execute(input); - assert_matches!( - rx.await.unwrap(), - Err(StageError::DatabaseIntegrity(DatabaseIntegrityError::CumulativeTxCount { .. })) - ); - } - - #[tokio::test] - async fn unwind_no_input() { - let runner = TxIndexTestRunner::default(); - let headers = gen_random_header_range(0..10, H256::zero()); - runner - .db() - .transform_append::(&headers, |prev, h| { - ( - BlockNumHash((h.number, h.hash())), - prev.unwrap_or_default() + (rand::random::() as u64), - ) - }) - .expect("failed to insert"); - - let rx = runner.unwind(UnwindInput::default()); - assert_matches!( - rx.await.unwrap(), - Ok(UnwindOutput { stage_progress }) if stage_progress == 0 - ); - runner - .db() - .check_no_entry_above::(0, |h| h.number()) - .expect("failed to check tx count"); - } - - #[tokio::test] - async fn unwind_with_db_gaps() { - let runner = TxIndexTestRunner::default(); - let first_range = gen_random_header_range(0..20, H256::zero()); - let second_range = gen_random_header_range(50..100, H256::zero()); - runner - .db() - .transform_append::( - &first_range.iter().chain(second_range.iter()).collect::>(), - |prev, h| { - ( - BlockNumHash((h.number, h.hash())), - prev.unwrap_or_default() + (rand::random::() as u64), - ) - }, - ) - .expect("failed to insert"); - - let unwind_to = 10; - let input = UnwindInput { unwind_to, ..Default::default() }; - let rx = runner.unwind(input); - assert_matches!( - rx.await.unwrap(), - Ok(UnwindOutput { stage_progress }) if stage_progress == unwind_to - ); - runner - .db() - .check_no_entry_above::(unwind_to, |h| h.number()) - .expect("failed to check tx count"); - } - #[derive(Default)] pub(crate) struct TxIndexTestRunner { db: StageTestDB, @@ -198,24 +121,60 @@ mod tests { ) -> Result<(), Box> { let pivot = input.stage_progress.unwrap_or_default(); let start = pivot.saturating_sub(100); - let end = input.previous_stage.as_ref().map(|(_, num)| *num).unwrap_or_default(); + let mut end = input.previous_stage.as_ref().map(|(_, num)| *num).unwrap_or_default(); + end += 2; // generate 2 additional headers to account for start header lookup let headers = gen_random_header_range(start..end, H256::zero()); - self.db() - .map_put::(&headers, |h| (h.number, h.hash()))?; + + let headers = + headers.into_iter().map(|h| (h, rand::random::())).collect::>(); + + self.db().map_put::(&headers, |(h, _)| { + (h.number, h.hash()) + })?; + self.db().map_put::(&headers, |(h, count)| { + (BlockNumHash((h.number, h.hash())), *count as u16) + })?; + + let slice_up_to = + std::cmp::min(pivot.saturating_sub(start) as usize, headers.len() - 1); self.db().transform_append::( - &headers[..=(pivot as usize)], - |prev, h| { - ( - BlockNumHash((h.number, h.hash())), - prev.unwrap_or_default() + (rand::random::() as u64), - ) + &headers[..=slice_up_to], + |prev, (h, count)| { + (BlockNumHash((h.number, h.hash())), prev.unwrap_or_default() + (*count as u64)) }, )?; + Ok(()) } - fn validate_execution(&self) -> Result<(), Box> { - // TODO: + fn validate_execution( + &self, + input: ExecInput, + ) -> Result<(), Box> { + let db = self.db().container(); + let tx = db.get(); + let (start, end) = ( + input.stage_progress.unwrap_or_default(), + input.previous_stage.as_ref().map(|(_, num)| *num).unwrap_or_default(), + ); + if start >= end { + return Ok(()) + } + + let start_hash = + tx.get::(start)?.expect("no canonical found"); + let mut tx_count_cursor = tx.cursor::()?; + let mut tx_count_walker = tx_count_cursor.walk((start, start_hash).into())?; + let mut count = tx_count_walker.next().unwrap()?.1; + let mut last_num = start; + while let Some(entry) = tx_count_walker.next() { + let (key, db_count) = entry?; + count += tx.get::(key)?.unwrap() as u64; + assert_eq!(db_count, count); + last_num = key.number(); + } + assert_eq!(last_num, end); + Ok(()) } } @@ -226,7 +185,6 @@ mod tests { input: UnwindInput, highest_entry: u64, ) -> Result<(), Box> { - // TODO: accept range let headers = gen_random_header_range(input.unwind_to..highest_entry, H256::zero()); self.db().transform_append::( &headers, diff --git a/crates/stages/src/util.rs b/crates/stages/src/util.rs index a9817964fe..5bed834f0a 100644 --- a/crates/stages/src/util.rs +++ b/crates/stages/src/util.rs @@ -181,6 +181,7 @@ pub(crate) mod test_utils { DBContainer::new(self.db.borrow()).expect("failed to create db container") } + /// Invoke a callback with transaction committing it afterwards fn commit(&self, f: F) -> Result<(), Error> where F: FnOnce(&mut Tx<'_, RW, WriteMap>) -> Result<(), Error>, @@ -192,19 +193,6 @@ pub(crate) mod test_utils { Ok(()) } - /// Put a single value into the table - pub(crate) fn put(&self, k: T::Key, v: T::Value) -> Result<(), Error> { - self.commit(|tx| tx.put::(k, v)) - } - - /// Delete a single value from the table - pub(crate) fn delete(&self, k: T::Key) -> Result<(), Error> { - self.commit(|tx| { - tx.delete::(k, None)?; - Ok(()) - }) - } - /// Map a collection of values and store them in the database. /// This function commits the transaction before exiting. /// @@ -323,7 +311,10 @@ pub(crate) mod test_utils { ) -> Result<(), Box>; /// Validate stage execution - fn validate_execution(&self) -> Result<(), Box>; + fn validate_execution( + &self, + input: ExecInput, + ) -> Result<(), Box>; /// Run [Stage::execute] and return a receiver for the result. fn execute(&self, input: ExecInput) -> oneshot::Receiver> { @@ -383,12 +374,32 @@ pub(crate) mod test_utils { // prev progress missing from the database. async fn execute_empty_db() { let runner = $runner::default(); - let rx = runner.execute(crate::stage::ExecInput::default()); + let input = crate::stage::ExecInput::default(); + let rx = runner.execute(input); assert_matches!( rx.await.unwrap(), Err(crate::error::StageError::DatabaseIntegrity(_)) ); - assert!(runner.validate_execution().is_ok(), "execution validation"); + assert!(runner.validate_execution(input).is_ok(), "execution validation"); + } + + #[tokio::test] + async fn execute_no_progress() { + let stage_progress = 1000; + let mut runner = $runner::default(); + let input = crate::stage::ExecInput { + previous_stage: Some((crate::util::test_utils::PREV_STAGE_ID, stage_progress)), + stage_progress: Some(stage_progress), + }; + runner.seed_execution(input).expect("failed to seed"); + let rx = runner.execute(input); + runner.after_execution().await.expect("failed to run after execution hook"); + assert_matches!( + rx.await.unwrap(), + Ok(ExecOutput { done, reached_tip, stage_progress }) + if done && reached_tip && stage_progress == stage_progress + ); + assert!(runner.validate_execution(input).is_ok(), "execution validation"); } #[tokio::test] @@ -407,7 +418,7 @@ pub(crate) mod test_utils { Ok(ExecOutput { done, reached_tip, stage_progress }) if done && reached_tip && stage_progress == previous_stage ); - assert!(runner.validate_execution().is_ok(), "execution validation"); + assert!(runner.validate_execution(input).is_ok(), "execution validation"); } #[tokio::test]