From dd6742e8b7564b702fc1a870d41fb90ef0fd32b0 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 6 Mar 2024 17:19:25 +0000 Subject: [PATCH] test(stages): fix deadlock with multiple static file writers (#7011) --- crates/stages/src/test_utils/test_db.rs | 85 ++++++++++++++----------- 1 file changed, 49 insertions(+), 36 deletions(-) diff --git a/crates/stages/src/test_utils/test_db.rs b/crates/stages/src/test_utils/test_db.rs index 5885529bf8..5946f4b2ec 100644 --- a/crates/stages/src/test_utils/test_db.rs +++ b/crates/stages/src/test_utils/test_db.rs @@ -204,53 +204,66 @@ impl TestStageDB { { let provider = self.factory.static_file_provider(); - let mut txs_writer = storage_kind.is_static().then(|| { - provider.latest_writer(reth_primitives::StaticFileSegment::Transactions).unwrap() - }); - - let mut headers_writer = - provider.latest_writer(reth_primitives::StaticFileSegment::Headers)?; let tx = self.factory.provider_rw().unwrap().into_tx(); - let mut next_tx_num = storage_kind.tx_offset(); - blocks.into_iter().try_for_each(|block| { - Self::insert_header(Some(&mut headers_writer), &tx, &block.header, U256::ZERO)?; - // Insert into body tables. - let block_body_indices = StoredBlockBodyIndices { - first_tx_num: next_tx_num, - tx_count: block.body.len() as u64, - }; + let blocks = blocks.into_iter().collect::>(); - if !block.body.is_empty() { - tx.put::( - block_body_indices.last_tx_num(), - block.number, - )?; - } - tx.put::(block.number, block_body_indices)?; + { + let mut headers_writer = + provider.latest_writer(reth_primitives::StaticFileSegment::Headers)?; - let res = block.body.iter().try_for_each(|body_tx| { - if let Some(txs_writer) = &mut txs_writer { - txs_writer.append_transaction(next_tx_num, body_tx.clone().into())?; - } else { - tx.put::(next_tx_num, body_tx.clone().into())? - } - next_tx_num += 1; - Ok::<(), ProviderError>(()) + blocks.iter().try_for_each(|block| { + Self::insert_header(Some(&mut headers_writer), &tx, &block.header, U256::ZERO) + })?; + + headers_writer.commit()?; + } + + { + let mut txs_writer = storage_kind.is_static().then(|| { + provider.latest_writer(reth_primitives::StaticFileSegment::Transactions).unwrap() }); + blocks.into_iter().try_for_each(|block| { + // Insert into body tables. + let block_body_indices = StoredBlockBodyIndices { + first_tx_num: next_tx_num, + tx_count: block.body.len() as u64, + }; + + if !block.body.is_empty() { + tx.put::( + block_body_indices.last_tx_num(), + block.number, + )?; + } + tx.put::(block.number, block_body_indices)?; + + let res = block.body.iter().try_for_each(|body_tx| { + if let Some(txs_writer) = &mut txs_writer { + txs_writer.append_transaction(next_tx_num, body_tx.clone().into())?; + } else { + tx.put::(next_tx_num, body_tx.clone().into())? + } + next_tx_num += 1; + Ok::<(), ProviderError>(()) + }); + + if let Some(txs_writer) = &mut txs_writer { + txs_writer.increment_block(reth_primitives::StaticFileSegment::Transactions)?; + } + res + })?; + if let Some(txs_writer) = &mut txs_writer { - txs_writer.increment_block(reth_primitives::StaticFileSegment::Transactions)?; + txs_writer.commit()?; } - res - })?; + } tx.commit()?; - if let Some(txs_writer) = &mut txs_writer { - txs_writer.commit()?; - } - headers_writer.commit() + + Ok(()) } pub fn insert_tx_hash_numbers(&self, tx_hash_numbers: I) -> ProviderResult<()>