mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-28 00:28:20 -05:00
fix: do not truncate blocks to persist (#9986)
This commit is contained in:
@@ -549,9 +549,13 @@ where
|
||||
|
||||
if self.should_persist() && !self.persistence_state.in_progress() {
|
||||
let blocks_to_persist = self.get_canonical_blocks_to_persist();
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.persistence.save_blocks(blocks_to_persist, tx);
|
||||
self.persistence_state.start(rx);
|
||||
if !blocks_to_persist.is_empty() {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.persistence.save_blocks(blocks_to_persist, tx);
|
||||
self.persistence_state.start(rx);
|
||||
} else {
|
||||
warn!(target: "engine", "Returned empty set of blocks to persist");
|
||||
}
|
||||
}
|
||||
|
||||
if self.persistence_state.in_progress() {
|
||||
@@ -799,31 +803,34 @@ where
|
||||
self.config.persistence_threshold()
|
||||
}
|
||||
|
||||
/// Returns a batch of consecutive canonical blocks to persist. The expected order is
|
||||
/// oldest -> newest.
|
||||
/// Returns a batch of consecutive canonical blocks to persist in the range
|
||||
/// `(last_persisted_number .. canonical_head - threshold]` . The expected
|
||||
/// order is oldest -> newest.
|
||||
fn get_canonical_blocks_to_persist(&self) -> Vec<ExecutedBlock> {
|
||||
let mut blocks_to_persist = Vec::new();
|
||||
let mut current_hash = self.state.tree_state.canonical_block_hash();
|
||||
let last_persisted_number = self.persistence_state.last_persisted_block_number;
|
||||
|
||||
let canonical_head_number = self.state.tree_state.canonical_block_number();
|
||||
|
||||
let target_number =
|
||||
canonical_head_number.saturating_sub(self.config.persistence_threshold());
|
||||
|
||||
while let Some(block) = self.state.tree_state.blocks_by_hash.get(¤t_hash) {
|
||||
if block.block.number <= last_persisted_number {
|
||||
break;
|
||||
}
|
||||
|
||||
blocks_to_persist.push(block.clone());
|
||||
if block.block.number <= target_number {
|
||||
blocks_to_persist.push(block.clone());
|
||||
}
|
||||
|
||||
current_hash = block.block.parent_hash;
|
||||
}
|
||||
|
||||
// reverse the order so that the oldest block comes first
|
||||
blocks_to_persist.reverse();
|
||||
|
||||
// limit the number of blocks to persist if it exceeds the threshold
|
||||
let threshold = self.config.persistence_threshold() as usize;
|
||||
if blocks_to_persist.len() > threshold {
|
||||
blocks_to_persist.truncate(threshold);
|
||||
}
|
||||
|
||||
blocks_to_persist
|
||||
}
|
||||
|
||||
@@ -2007,10 +2014,10 @@ mod tests {
|
||||
let mut test_block_builder =
|
||||
TestBlockBuilder::default().with_chain_spec((*chain_spec).clone());
|
||||
|
||||
// we need more than PERSISTENCE_THRESHOLD blocks to trigger the
|
||||
// persistence task.
|
||||
// we need more than tree_config.persistence_threshold() +1 blocks to
|
||||
// trigger the persistence task.
|
||||
let blocks: Vec<_> = test_block_builder
|
||||
.get_executed_blocks(1..tree_config.persistence_threshold() + 1)
|
||||
.get_executed_blocks(1..tree_config.persistence_threshold() + 2)
|
||||
.collect();
|
||||
let test_harness = TestHarness::new(chain_spec).with_blocks(blocks.clone());
|
||||
std::thread::Builder::new()
|
||||
@@ -2022,11 +2029,13 @@ mod tests {
|
||||
test_harness.to_tree_tx.send(FromEngine::DownloadedBlocks(vec![])).unwrap();
|
||||
|
||||
let received_action =
|
||||
test_harness.action_rx.recv().expect("Failed to receive saved blocks");
|
||||
test_harness.action_rx.recv().expect("Failed to receive save blocks action");
|
||||
if let PersistenceAction::SaveBlocks((saved_blocks, _)) = received_action {
|
||||
// only PERSISTENCE_THRESHOLD will be persisted
|
||||
assert_eq!(saved_blocks.len() as u64, tree_config.persistence_threshold());
|
||||
assert_eq!(saved_blocks, blocks);
|
||||
// only blocks.len() - tree_config.persistence_threshold() will be
|
||||
// persisted
|
||||
let expected_persist_len = blocks.len() - tree_config.persistence_threshold() as usize;
|
||||
assert_eq!(saved_blocks.len(), expected_persist_len);
|
||||
assert_eq!(saved_blocks, blocks[..expected_persist_len]);
|
||||
} else {
|
||||
panic!("unexpected action received {received_action:?}");
|
||||
}
|
||||
@@ -2305,48 +2314,55 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_blocks_to_persist() {
|
||||
async fn test_get_canonical_blocks_to_persist() {
|
||||
let chain_spec = MAINNET.clone();
|
||||
let mut test_harness = TestHarness::new(chain_spec);
|
||||
let mut test_block_builder = TestBlockBuilder::default();
|
||||
|
||||
let blocks: Vec<_> = test_block_builder.get_executed_blocks(0..10).collect();
|
||||
let canonical_head_number = 9;
|
||||
let blocks: Vec<_> =
|
||||
test_block_builder.get_executed_blocks(0..canonical_head_number + 1).collect();
|
||||
test_harness = test_harness.with_blocks(blocks.clone());
|
||||
|
||||
test_harness.tree.persistence_state.last_persisted_block_number = 3;
|
||||
let last_persisted_block_number = 3;
|
||||
test_harness.tree.persistence_state.last_persisted_block_number =
|
||||
last_persisted_block_number;
|
||||
|
||||
test_harness.tree.config = TreeConfig::default().with_persistence_threshold(5);
|
||||
let persistence_threshold = 4;
|
||||
test_harness.tree.config =
|
||||
TreeConfig::default().with_persistence_threshold(persistence_threshold);
|
||||
|
||||
let blocks_to_persist = test_harness.tree.get_canonical_blocks_to_persist();
|
||||
|
||||
assert_eq!(blocks_to_persist.len(), 5);
|
||||
assert_eq!(blocks_to_persist[0].block.number, 4);
|
||||
assert_eq!(blocks_to_persist[4].block.number, 8);
|
||||
let expected_blocks_to_persist_length: usize =
|
||||
(canonical_head_number - persistence_threshold - last_persisted_block_number)
|
||||
.try_into()
|
||||
.unwrap();
|
||||
|
||||
for i in 0..4 {
|
||||
assert_eq!(
|
||||
blocks_to_persist[i].block.hash(),
|
||||
blocks_to_persist[i + 1].block.parent_hash
|
||||
);
|
||||
assert_eq!(blocks_to_persist.len(), expected_blocks_to_persist_length);
|
||||
for (i, item) in
|
||||
blocks_to_persist.iter().enumerate().take(expected_blocks_to_persist_length)
|
||||
{
|
||||
assert_eq!(item.block.number, last_persisted_block_number + i as u64 + 1);
|
||||
}
|
||||
|
||||
// make sure only canonical blocks are included
|
||||
let fork_block = test_block_builder.get_executed_block_with_number(7, B256::random());
|
||||
let fork_block = test_block_builder.get_executed_block_with_number(4, B256::random());
|
||||
let fork_block_hash = fork_block.block.hash();
|
||||
test_harness.tree.state.tree_state.insert_executed(fork_block);
|
||||
|
||||
assert!(test_harness.tree.state.tree_state.block_by_hash(fork_block_hash).is_some());
|
||||
|
||||
let blocks_to_persist = test_harness.tree.get_canonical_blocks_to_persist();
|
||||
assert_eq!(blocks_to_persist.len(), 5);
|
||||
assert_eq!(blocks_to_persist.len(), expected_blocks_to_persist_length);
|
||||
|
||||
// check that the fork block is not included in the blocks to persist
|
||||
assert!(!blocks_to_persist.iter().any(|b| b.block.hash() == fork_block_hash));
|
||||
|
||||
// check that the original block 7 is still included
|
||||
// check that the original block 4 is still included
|
||||
assert!(blocks_to_persist
|
||||
.iter()
|
||||
.any(|b| b.block.number == 7 && b.block.hash() == blocks[7].block.hash()));
|
||||
.any(|b| b.block.number == 4 && b.block.hash() == blocks[4].block.hash()));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
Reference in New Issue
Block a user