test(stages): fix deadlock with multiple static file writers (#7011)

This commit is contained in:
Alexey Shekhirin
2024-03-06 17:19:25 +00:00
committed by GitHub
parent d681dbf129
commit dd6742e8b7

View File

@@ -204,53 +204,66 @@ impl TestStageDB {
{
let provider = self.factory.static_file_provider();
let mut txs_writer = storage_kind.is_static().then(|| {
provider.latest_writer(reth_primitives::StaticFileSegment::Transactions).unwrap()
});
let mut headers_writer =
provider.latest_writer(reth_primitives::StaticFileSegment::Headers)?;
let tx = self.factory.provider_rw().unwrap().into_tx();
let mut next_tx_num = storage_kind.tx_offset();
blocks.into_iter().try_for_each(|block| {
Self::insert_header(Some(&mut headers_writer), &tx, &block.header, U256::ZERO)?;
// Insert into body tables.
let block_body_indices = StoredBlockBodyIndices {
first_tx_num: next_tx_num,
tx_count: block.body.len() as u64,
};
let blocks = blocks.into_iter().collect::<Vec<_>>();
if !block.body.is_empty() {
tx.put::<tables::TransactionBlocks>(
block_body_indices.last_tx_num(),
block.number,
)?;
}
tx.put::<tables::BlockBodyIndices>(block.number, block_body_indices)?;
{
let mut headers_writer =
provider.latest_writer(reth_primitives::StaticFileSegment::Headers)?;
let res = block.body.iter().try_for_each(|body_tx| {
if let Some(txs_writer) = &mut txs_writer {
txs_writer.append_transaction(next_tx_num, body_tx.clone().into())?;
} else {
tx.put::<tables::Transactions>(next_tx_num, body_tx.clone().into())?
}
next_tx_num += 1;
Ok::<(), ProviderError>(())
blocks.iter().try_for_each(|block| {
Self::insert_header(Some(&mut headers_writer), &tx, &block.header, U256::ZERO)
})?;
headers_writer.commit()?;
}
{
let mut txs_writer = storage_kind.is_static().then(|| {
provider.latest_writer(reth_primitives::StaticFileSegment::Transactions).unwrap()
});
blocks.into_iter().try_for_each(|block| {
// Insert into body tables.
let block_body_indices = StoredBlockBodyIndices {
first_tx_num: next_tx_num,
tx_count: block.body.len() as u64,
};
if !block.body.is_empty() {
tx.put::<tables::TransactionBlocks>(
block_body_indices.last_tx_num(),
block.number,
)?;
}
tx.put::<tables::BlockBodyIndices>(block.number, block_body_indices)?;
let res = block.body.iter().try_for_each(|body_tx| {
if let Some(txs_writer) = &mut txs_writer {
txs_writer.append_transaction(next_tx_num, body_tx.clone().into())?;
} else {
tx.put::<tables::Transactions>(next_tx_num, body_tx.clone().into())?
}
next_tx_num += 1;
Ok::<(), ProviderError>(())
});
if let Some(txs_writer) = &mut txs_writer {
txs_writer.increment_block(reth_primitives::StaticFileSegment::Transactions)?;
}
res
})?;
if let Some(txs_writer) = &mut txs_writer {
txs_writer.increment_block(reth_primitives::StaticFileSegment::Transactions)?;
txs_writer.commit()?;
}
res
})?;
}
tx.commit()?;
if let Some(txs_writer) = &mut txs_writer {
txs_writer.commit()?;
}
headers_writer.commit()
Ok(())
}
pub fn insert_tx_hash_numbers<I>(&self, tx_hash_numbers: I) -> ProviderResult<()>