Compare commits

...

41 Commits

Author SHA1 Message Date
Brian Picciano
c07e228412 fix(provider): clamp partial trie unwind during reorgs
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019dd7d9-2cea-745d-a1ea-e8965237cf20
Co-authored-by: Amp <amp@ampcode.com>
2026-04-29 07:14:10 +00:00
Brian Picciano
4b4a1b80d8 fix(trie): mask storage entries in disjoint merges
Only drop an entire storage entry when the masking batch wipes or deletes it.
Otherwise, filter overlapping storage slots and storage trie nodes individually to preserve the rest of the account state.

Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019dd592-8e45-756e-9e7c-7c9c98f8687c
Co-authored-by: Amp <amp@ampcode.com>
2026-04-28 19:41:52 +00:00
Brian Picciano
5ab335d04e refactor(provider): drive save_blocks from plan steps
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019dd4f6-c7f3-7649-884e-55217d141526
Co-authored-by: Amp <amp@ampcode.com>
2026-04-28 16:57:41 +00:00
Brian Picciano
baf6ef9778 Revert "fix(provider): preserve masked persistence frontier state"
This reverts commit e71cf3040b.
2026-04-28 16:55:26 +00:00
Brian Picciano
e71cf3040b fix(provider): preserve masked persistence frontier state
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019dd4f6-c7f3-7649-884e-55217d141526
Co-authored-by: Amp <amp@ampcode.com>
2026-04-28 16:50:08 +00:00
Brian Picciano
adf2930e84 refactor(engine): split partial persistence frontiers
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019dd442-503a-7710-90ee-60264449117e
Co-authored-by: Amp <amp@ampcode.com>
2026-04-28 14:25:06 +00:00
Brian Picciano
d9d3f69557 fix(engine): wire deferred trie persistence config
Expose --engine.deferred-trie-blocks and make threshold-driven persistence advance the full persisted region before leaving the deferred trie tail. This keeps the trie and non-trie frontiers aligned with the configured in-memory buffer and deferred-trie window.

Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019dcf65-4035-724f-b1ce-ebd1250af9f1
Co-authored-by: Amp <amp@ampcode.com>
2026-04-27 15:29:10 +00:00
Brian Picciano
8248aa29d1 chore: merge origin/main
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019dcefd-7813-7693-9eca-9c8ad3da5c5b
Co-authored-by: Amp <amp@ampcode.com>
2026-04-27 14:25:47 +00:00
Brian Picciano
0d19b17bf3 fix(provider): preserve partial trie frontier across overlay and unwind
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019dcefd-7813-7693-9eca-9c8ad3da5c5b
Co-authored-by: Amp <amp@ampcode.com>
2026-04-27 14:03:31 +00:00
Brian Picciano
5d4019049a fix(provider): stop masking trie writes with in-memory suffix
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019dce00-c0f1-7665-a244-00f02292fc3c
Co-authored-by: Amp <amp@ampcode.com>
2026-04-27 08:52:28 +00:00
Brian Picciano
743d42ff6d fix(provider): anchor overlay state providers to trie frontier
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019dcdca-2c60-711c-b1b8-7a6001a950fd
Co-authored-by: Amp <amp@ampcode.com>
2026-04-27 08:04:06 +00:00
Brian Picciano
843b5a826a merge(engine): bring lazy overlay refactor into partial persistence
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019dcdca-2c60-711c-b1b8-7a6001a950fd
Co-authored-by: Amp <amp@ampcode.com>
2026-04-27 08:03:56 +00:00
Brian Picciano
b6eec2e684 refactor(provider): require overlay builder anchor hash
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019dbf08-3c9d-736b-9b47-3070f2bf2a54
Co-authored-by: Amp <amp@ampcode.com>
2026-04-24 15:28:57 +00:00
Brian Picciano
31d0c7852d fix(ci): clean bench checkouts and lock cargo builds
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019dbee7-f576-769d-923c-dfdfe0a40858
Co-authored-by: Amp <amp@ampcode.com>
2026-04-24 10:06:10 +00:00
Brian Picciano
b60758ef73 fix(trie): remove unused parallel test dependency
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019dbc30-68d0-76b8-a32c-e1173122ca48
Co-authored-by: Amp <amp@ampcode.com>
2026-04-24 09:32:47 +00:00
Brian Picciano
6e8dbe34a4 Merge branch 'main' into mediocregopher/lazyoverlay-refactor
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019dbc30-68d0-76b8-a32c-e1173122ca48
Co-authored-by: Amp <amp@ampcode.com>
2026-04-23 21:29:57 +00:00
Brian Picciano
c4d0949a23 style(engine): format sparse trie overlay test
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019db57a-7f08-7761-9a50-27a2a5c8f917
Co-authored-by: Amp <amp@ampcode.com>
2026-04-22 14:14:11 +00:00
Brian Picciano
d5169eda88 fix(engine): update sparse trie overlay factory test
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019db57a-7f08-7761-9a50-27a2a5c8f917
Co-authored-by: Amp <amp@ampcode.com>
2026-04-22 14:11:03 +00:00
Brian Picciano
87b5240ec1 Merge branch 'main' into mediocregopher/lazyoverlay-refactor
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019db57a-7f08-7761-9a50-27a2a5c8f917
Co-authored-by: Amp <amp@ampcode.com>
2026-04-22 13:59:13 +00:00
Brian Picciano
ffb0587b19 fix(provider): satisfy overlay lint checks
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019dafbb-3571-73ef-ae67-43c242e2bf23
Co-authored-by: Amp <amp@ampcode.com>
2026-04-21 11:53:34 +00:00
Brian Picciano
45db5e0b5d fix(trie): initialize test overlay anchors
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019dafbb-3571-73ef-ae67-43c242e2bf23
Co-authored-by: Amp <amp@ampcode.com>
2026-04-21 11:36:34 +00:00
Brian Picciano
7db14d095d fix(engine): anchor state-root test overlay factory
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019dafbb-3571-73ef-ae67-43c242e2bf23
Co-authored-by: Amp <amp@ampcode.com>
2026-04-21 11:22:09 +00:00
Brian Picciano
134a7f364b fix(provider): pass overlay anchors via constructor
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019daba4-3598-758d-8771-cb5db784db81
Co-authored-by: Amp <amp@ampcode.com>
2026-04-21 11:08:27 +00:00
Brian Picciano
d92ad5aa34 fix(provider): anchor overlay state providers by hash
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019dafa5-05ee-739e-a0b6-037cc30e4a0e
Co-authored-by: Amp <amp@ampcode.com>
2026-04-21 10:53:23 +00:00
Brian Picciano
b5ad0018a3 refactor(provider): thread explicit requested anchors
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019daba4-3598-758d-8771-cb5db784db81
Co-authored-by: Amp <amp@ampcode.com>
2026-04-20 16:36:09 +00:00
Brian Picciano
5036eb59fb refactor(provider): infer overlay anchors from sources
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019daba4-3598-758d-8771-cb5db784db81
Co-authored-by: Amp <amp@ampcode.com>
2026-04-20 16:23:57 +00:00
Brian Picciano
812e479b69 refactor(provider): separate overlay anchors from revert state
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019daba4-3598-758d-8771-cb5db784db81
Co-authored-by: Amp <amp@ampcode.com>
2026-04-20 16:10:42 +00:00
Brian Picciano
5041d55bc3 refactor(provider): resolve lazy overlay anchors at use time
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019dab6f-7cee-755e-9f9c-309ae0b8517c
Co-authored-by: Amp <amp@ampcode.com>
2026-04-20 15:46:58 +00:00
Brian Picciano
ebfaa6f4c5 refactor(chain-state): cache lazy overlays by anchor
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019dab6f-7cee-755e-9f9c-309ae0b8517c
Co-authored-by: Amp <amp@ampcode.com>
2026-04-20 15:17:35 +00:00
Brian Picciano
cacb69aca9 refactor(chain-state): address lazy overlay review feedback
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019dab40-7d65-709b-90d6-98965c5c6a65
Co-authored-by: Amp <amp@ampcode.com>
2026-04-20 15:01:54 +00:00
Brian Picciano
be4e8cd017 refactor(chain-state): derive lazy overlay anchor from blocks
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019dab40-7d65-709b-90d6-98965c5c6a65
Co-authored-by: Amp <amp@ampcode.com>
2026-04-20 14:51:24 +00:00
Brian Picciano
c757a310e1 feat(engine): add dual-frontier persistence planning
Co-Authored-By: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
2026-04-17 12:04:12 +00:00
Brian Picciano
ca20cc13ef fix(engine): retain partial trie suffix after persistence
Return finish-stage partial trie progress from the persistence thread and keep blocks above that trie boundary resident in memory after persistence completes.

This preserves the old prune-through-tip behavior when the partial trie matches the persisted tip or is absent.

Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019d9ab4-1eab-713f-9d5d-71903b7bc724
Co-authored-by: Amp <amp@ampcode.com>
2026-04-17 10:02:17 +00:00
Brian Picciano
9e38dde3e0 fix(provider): persist partial trie finish checkpoint
Make masked save_blocks persistence track partial trie progress in the Finish checkpoint and switch the trie-masking API to a masked-suffix start index so the final block is always covered by the mask when partial trie persistence is used.

Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019d9a2b-716e-774d-8db7-b0308fa96a23
Co-authored-by: Amp <amp@ampcode.com>
2026-04-17 09:05:04 +00:00
Brian Picciano
ca352832b8 fix(engine): raise persistence defaults
Raise the default persistence threshold to 10 and derive the default backpressure threshold from the persistence and in-memory thresholds. Enable alloy getrandom for reth-engine-primitives tests so the existing B256::random() test coverage keeps compiling.

Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019d9a2b-716e-774d-8db7-b0308fa96a23
Co-authored-by: Amp <amp@ampcode.com>
2026-04-17 08:32:16 +00:00
Brian Picciano
84b520560b docs(provider): explain save_blocks trie masking range
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019d9728-846b-7418-8a69-ddbba65ce656
Co-authored-by: Amp <amp@ampcode.com>
2026-04-16 21:26:24 +00:00
Brian Picciano
e2bd518097 refactor(provider): simplify save_blocks trie masking API
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019d9728-846b-7418-8a69-ddbba65ce656
Co-authored-by: Amp <amp@ampcode.com>
2026-04-16 21:25:21 +00:00
Brian Picciano
761acad803 fix(node): unwind startup to partial trie checkpoint
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019d968b-970d-735e-844c-b83ff245ce05
Co-authored-by: Amp <amp@ampcode.com>
2026-04-16 16:33:52 +00:00
Brian Picciano
b97544a05e feat(stages): move partial trie progress into finish checkpoint
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019d968b-970d-735e-844c-b83ff245ce05
Co-authored-by: Amp <amp@ampcode.com>
2026-04-16 14:10:51 +00:00
Brian Picciano
037828f6aa feat(stages): add partial finish checkpoint field
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019d9639-935f-73a4-ba50-7682a7b9aca0
Co-authored-by: Amp <amp@ampcode.com>
2026-04-16 12:55:43 +00:00
Brian Picciano
9883b7140e feat(trie): add disjoint_by_keys for sorted overlays
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019d9639-935f-73a4-ba50-7682a7b9aca0
Co-authored-by: Amp <amp@ampcode.com>
2026-04-16 12:38:41 +00:00
33 changed files with 2706 additions and 431 deletions

View File

@@ -39,7 +39,7 @@ Both `new-payload-fcu` and `new-payload-only` support `--rpc-block-fetch-retries
to control how many times block fetches are retried after an RPC failure. The default is `10`.
Use `--rpc-block-fetch-retries forever` to keep retrying indefinitely.
When using `--wait-for-persistence`, the benchmark waits after every `(threshold + 1)` blocks, where the threshold defaults to the engine's persistence threshold (2). This can be customized with `--persistence-threshold <N>`.
When using `--wait-for-persistence`, the benchmark waits after every `(threshold + 1)` blocks, where the threshold defaults to the engine's persistence threshold. This can be customized with `--persistence-threshold <N>`.
By default, the WebSocket URL for persistence subscriptions is derived from `--engine-rpc-url` (converting to ws:// on port 8546). Use `--ws-rpc-url` to override this.

View File

@@ -67,9 +67,8 @@ pub struct Command {
/// Engine persistence threshold used for deciding when to wait for persistence.
///
/// The benchmark waits after every `(threshold + 1)` blocks. By default this
/// matches the engine's `DEFAULT_PERSISTENCE_THRESHOLD` (2), so waits occur
/// at blocks 3, 6, 9, etc.
/// The benchmark waits after every `(threshold + 1)` blocks.
/// By default this matches the engine's `DEFAULT_PERSISTENCE_THRESHOLD`.
#[arg(
long = "persistence-threshold",
value_name = "PERSISTENCE_THRESHOLD",

View File

@@ -320,6 +320,19 @@ impl<N: NodePrimitives> CanonicalInMemoryState<N> {
/// This will update the links between blocks and remove all blocks that are [..
/// `persisted_height`].
pub fn remove_persisted_blocks(&self, persisted_num_hash: BlockNumHash) {
self.remove_persisted_blocks_until(persisted_num_hash, persisted_num_hash.number);
}
/// Removes blocks from the in-memory state through `remove_until` while still reporting the
/// provided block as the persisted tip.
///
/// This is used when block bodies/plain state have been persisted further than trie data, so a
/// suffix still needs to remain in memory for trie-backed operations.
pub fn remove_persisted_blocks_until(
&self,
persisted_num_hash: BlockNumHash,
remove_until: BlockNumber,
) {
self.set_persisted(persisted_num_hash);
// if the persisted hash is not in the canonical in memory state, do nothing, because it
// means canonical blocks were not actually persisted.
@@ -337,16 +350,15 @@ impl<N: NodePrimitives> CanonicalInMemoryState<N> {
let mut numbers = self.inner.in_memory_state.numbers.write();
let mut blocks = self.inner.in_memory_state.blocks.write();
let BlockNumHash { number: persisted_height, hash: _ } = persisted_num_hash;
let remove_until = remove_until.min(persisted_num_hash.number);
// clear all numbers
numbers.clear();
// drain all blocks and only keep the ones that are not persisted (below the persisted
// height)
// Drain all blocks and keep only the suffix that still has to stay in memory.
let mut old_blocks = blocks
.drain()
.filter(|(_, b)| b.block_ref().recovered_block().number() > persisted_height)
.filter(|(_, b)| b.block_ref().recovered_block().number() > remove_until)
.map(|(_, b)| b.block.clone())
.collect::<Vec<_>>();

View File

@@ -374,7 +374,7 @@ async fn test_setup_builder_with_custom_tree_config() -> Result<()> {
PayloadAttributes::default()
})
.with_tree_config_modifier(|config| {
config.with_persistence_threshold(0).with_memory_block_buffer_target(5)
config.with_persistence_threshold(6).with_memory_block_buffer_target(5)
})
.build()
.await?;

View File

@@ -189,7 +189,7 @@ async fn test_rocksdb_transaction_queries() -> Result<()> {
test_attributes_generator,
)
.with_storage_v2()
.with_tree_config_modifier(|config| config.with_persistence_threshold(0))
.with_tree_config_modifier(|config| config.with_persistence_threshold(1))
.build()
.await?;
@@ -200,7 +200,7 @@ async fn test_rocksdb_transaction_queries() -> Result<()> {
let signer = wallets[0].clone();
let client = nodes[0].rpc_client().expect("RPC client should be available");
let raw_tx = TransactionTestContext::transfer_tx_bytes(chain_id, signer).await;
let raw_tx = TransactionTestContext::transfer_tx_bytes(chain_id, signer.clone()).await;
let tx_hash = nodes[0].rpc.inject_tx(raw_tx).await?;
// Wait for tx to enter pending pool before mining
@@ -209,6 +209,14 @@ async fn test_rocksdb_transaction_queries() -> Result<()> {
let payload = nodes[0].advance_block().await?;
assert_eq!(payload.block().number(), 1);
let flush_tx =
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer.clone(), 1).await;
let flush_tx_hash = nodes[0].rpc.inject_tx(flush_tx).await?;
wait_for_pending_tx(&client, flush_tx_hash).await;
let flush_payload = nodes[0].advance_block().await?;
assert_eq!(flush_payload.block().number(), 2);
// Query each transaction by hash
let tx: Option<Transaction> = client.request("eth_getTransactionByHash", [tx_hash]).await?;
let tx = tx.expect("Transaction should be found");
@@ -256,7 +264,7 @@ async fn test_rocksdb_multi_tx_same_block() -> Result<()> {
test_attributes_generator,
)
.with_storage_v2()
.with_tree_config_modifier(|config| config.with_persistence_threshold(0))
.with_tree_config_modifier(|config| config.with_persistence_threshold(1))
.build()
.await?;
@@ -283,6 +291,14 @@ async fn test_rocksdb_multi_tx_same_block() -> Result<()> {
let payload = nodes[0].advance_block().await?;
assert_eq!(payload.block().number(), 1);
let flush_tx =
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer.clone(), 3).await;
let flush_tx_hash = nodes[0].rpc.inject_tx(flush_tx).await?;
wait_for_pending_tx(&client, flush_tx_hash).await;
let flush_payload = nodes[0].advance_block().await?;
assert_eq!(flush_payload.block().number(), 2);
// Verify block contains all 3 txs
let block: Option<alloy_rpc_types_eth::Block> =
client.request("eth_getBlockByNumber", ("0x1", true)).await?;
@@ -324,7 +340,7 @@ async fn test_rocksdb_txs_across_blocks() -> Result<()> {
test_attributes_generator,
)
.with_storage_v2()
.with_tree_config_modifier(|config| config.with_persistence_threshold(0))
.with_tree_config_modifier(|config| config.with_persistence_threshold(1))
.build()
.await?;
@@ -409,7 +425,7 @@ async fn test_rocksdb_pending_tx_not_in_storage() -> Result<()> {
test_attributes_generator,
)
.with_storage_v2()
.with_tree_config_modifier(|config| config.with_persistence_threshold(0))
.with_tree_config_modifier(|config| config.with_persistence_threshold(1))
.build()
.await?;
@@ -417,7 +433,7 @@ async fn test_rocksdb_pending_tx_not_in_storage() -> Result<()> {
let signer = wallets[0].clone();
// Inject tx but do NOT mine
let raw_tx = TransactionTestContext::transfer_tx_bytes(chain_id, signer).await;
let raw_tx = TransactionTestContext::transfer_tx_bytes(chain_id, signer.clone()).await;
let tx_hash = nodes[0].rpc.inject_tx(raw_tx).await?;
// Verify tx is in pending pool via RPC
@@ -442,6 +458,14 @@ async fn test_rocksdb_pending_tx_not_in_storage() -> Result<()> {
let payload = nodes[0].advance_block().await?;
assert_eq!(payload.block().number(), 1);
let flush_tx =
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer.clone(), 1).await;
let flush_tx_hash = nodes[0].rpc.inject_tx(flush_tx).await?;
wait_for_pending_tx(&client, flush_tx_hash).await;
let flush_payload = nodes[0].advance_block().await?;
assert_eq!(flush_payload.block().number(), 2);
// Poll until tx appears in RocksDB
let tx_number = poll_tx_in_rocksdb(&nodes[0].inner.provider, tx_hash).await;
assert_eq!(tx_number, 0, "First tx should have tx_number 0");
@@ -473,7 +497,7 @@ async fn test_rocksdb_reorg_unwind() -> Result<()> {
test_attributes_generator,
)
.with_storage_v2()
.with_tree_config_modifier(|config| config.with_persistence_threshold(0))
.with_tree_config_modifier(|config| config.with_persistence_threshold(1))
.build()
.await?;
@@ -495,10 +519,6 @@ async fn test_rocksdb_reorg_unwind() -> Result<()> {
let block1_hash = payload1.block().hash();
assert_eq!(payload1.block().number(), 1);
// Poll until tx1 appears in RocksDB (ensures persistence happened)
let tx_number1 = poll_tx_in_rocksdb(&nodes[0].inner.provider, tx_hash1).await;
assert_eq!(tx_number1, 0, "First tx should have tx_number 0");
// Mine block 2 with transaction from signer1 (nonce 1)
let raw_tx2 =
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer1.clone(), 1).await;
@@ -508,6 +528,10 @@ async fn test_rocksdb_reorg_unwind() -> Result<()> {
let payload2 = nodes[0].advance_block().await?;
assert_eq!(payload2.block().number(), 2);
// The second block triggers the first persistence cycle, which flushes both block 1 and 2.
let tx_number1 = poll_tx_in_rocksdb(&nodes[0].inner.provider, tx_hash1).await;
assert_eq!(tx_number1, 0, "First tx should have tx_number 0");
// Poll until tx2 appears in RocksDB
let tx_number2 = poll_tx_in_rocksdb(&nodes[0].inner.provider, tx_hash2).await;
assert_eq!(tx_number2, 1, "Second tx should have tx_number 1");
@@ -521,6 +545,14 @@ async fn test_rocksdb_reorg_unwind() -> Result<()> {
let payload3 = nodes[0].advance_block().await?;
assert_eq!(payload3.block().number(), 3);
let flush_tx =
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer1.clone(), 3).await;
let flush_tx_hash = nodes[0].rpc.inject_tx(flush_tx).await?;
wait_for_pending_tx(&client, flush_tx_hash).await;
let flush_payload = nodes[0].advance_block().await?;
assert_eq!(flush_payload.block().number(), 4);
// Poll until tx3 appears in RocksDB
let tx_number3 = poll_tx_in_rocksdb(&nodes[0].inner.provider, tx_hash3).await;
assert_eq!(tx_number3, 2, "Third tx should have tx_number 2");
@@ -532,7 +564,7 @@ async fn test_rocksdb_reorg_unwind() -> Result<()> {
let alt_tx_hash = nodes[0].rpc.inject_tx(raw_alt_tx).await?;
wait_for_pending_tx(&client, alt_tx_hash).await;
// Build an alternate payload (this builds on top of the current head, i.e., block 3)
// Build an alternate payload on top of the current flushed head.
// But we want to reorg back to block 1, so we'll use the payload and then FCU to it
let alt_payload = nodes[0].new_payload().await?;
let alt_block_hash = nodes[0].submit_payload(alt_payload.clone()).await?;
@@ -550,8 +582,8 @@ async fn test_rocksdb_reorg_unwind() -> Result<()> {
let latest: Option<alloy_rpc_types_eth::Block> =
client.request("eth_getBlockByNumber", ("latest", false)).await?;
let latest = latest.expect("Latest block should exist");
// The alt block is at height 4 (on top of block 3)
assert!(latest.header.number >= 3, "Should be at height >= 3 after operation");
// The alt block is built on top of the flushed canonical head.
assert!(latest.header.number >= 4, "Should be at height >= 4 after operation");
// tx1 from block 1 should still be there
let tx1: Option<Transaction> = client.request("eth_getTransactionByHash", [tx_hash1]).await?;
@@ -596,7 +628,7 @@ async fn test_rocksdb_historical_account_queries() -> Result<()> {
test_attributes_generator,
)
.with_storage_v2()
.with_tree_config_modifier(|config| config.with_persistence_threshold(0))
.with_tree_config_modifier(|config| config.with_persistence_threshold(1))
.build()
.await?;
@@ -621,8 +653,6 @@ async fn test_rocksdb_historical_account_queries() -> Result<()> {
let payload1 = nodes[0].advance_block().await?;
assert_eq!(payload1.block().number(), 1);
poll_tx_in_rocksdb(&nodes[0].inner.provider, tx_hash1).await;
// Record state after block 1
let balance_at_1: U256 = client.request("eth_getBalance", (sender, "0x1")).await?;
let nonce_at_1: U256 = client.request("eth_getTransactionCount", (sender, "0x1")).await?;
@@ -637,8 +667,6 @@ async fn test_rocksdb_historical_account_queries() -> Result<()> {
let payload2 = nodes[0].advance_block().await?;
assert_eq!(payload2.block().number(), 2);
poll_tx_in_rocksdb(&nodes[0].inner.provider, tx_hash2).await;
let balance_at_2: U256 = client.request("eth_getBalance", (sender, "0x2")).await?;
let nonce_at_2: U256 = client.request("eth_getTransactionCount", (sender, "0x2")).await?;
assert!(balance_at_2 < balance_at_1, "Balance should decrease further after second tx");
@@ -652,18 +680,14 @@ async fn test_rocksdb_historical_account_queries() -> Result<()> {
let payload3 = nodes[0].advance_block().await?;
assert_eq!(payload3.block().number(), 3);
poll_tx_in_rocksdb(&nodes[0].inner.provider, tx_hash3).await;
let balance_at_3: U256 = client.request("eth_getBalance", (sender, "0x3")).await?;
let nonce_at_3: U256 = client.request("eth_getTransactionCount", (sender, "0x3")).await?;
assert!(balance_at_3 < balance_at_2, "Balance should decrease further after third tx");
assert_eq!(nonce_at_3, U256::from(3), "Nonce should be 3 after third tx");
// Mine additional blocks to push blocks 1-3 out of the in-memory overlay.
// With persistence_threshold=0 and memory_block_buffer_target=0, each new block
// triggers persistence up to `head` followed by in-memory eviction. Mining several
// more blocks ensures the engine loop has completed at least one full
// persist-then-evict cycle covering blocks 1-3.
// With a persistence threshold of 1, every second block triggers a flush, so a few extra
// blocks are enough to durably persist and evict the earlier history we want to query.
// Each block needs a transaction because the payload builder requires non-empty payloads.
for nonce in 3..8u64 {
let raw_tx =
@@ -673,6 +697,7 @@ async fn test_rocksdb_historical_account_queries() -> Result<()> {
wait_for_pending_tx(&client, tx_hash).await;
nodes[0].advance_block().await?;
}
poll_tx_in_rocksdb(&nodes[0].inner.provider, tx_hash3).await;
// Allow the engine loop to process the persistence completions
tokio::time::sleep(Duration::from_millis(500)).await;
@@ -743,7 +768,7 @@ async fn test_rocksdb_account_history_pruning() -> Result<()> {
test_attributes_generator,
)
.with_storage_v2()
.with_tree_config_modifier(|config| config.with_persistence_threshold(0))
.with_tree_config_modifier(|config| config.with_persistence_threshold(1))
.with_node_config_modifier(|mut config| {
config.pruning.account_history_distance = Some(PRUNE_DISTANCE);
config.pruning.minimum_distance = Some(PRUNE_DISTANCE);
@@ -840,7 +865,7 @@ async fn test_rocksdb_storage_history_pruning() -> Result<()> {
test_attributes_generator,
)
.with_storage_v2()
.with_tree_config_modifier(|config| config.with_persistence_threshold(0))
.with_tree_config_modifier(|config| config.with_persistence_threshold(1))
.with_node_config_modifier(|mut config| {
config.pruning.storage_history_distance = Some(PRUNE_DISTANCE);
config.pruning.minimum_distance = Some(PRUNE_DISTANCE);
@@ -912,10 +937,6 @@ async fn test_rocksdb_storage_history_pruning() -> Result<()> {
let payload1 = nodes[0].advance_block().await?;
assert_eq!(payload1.block().number(), 1);
poll_tx_in_rocksdb(&nodes[0].inner.provider, deploy_hash).await;
// Let the persistence cycle complete before the next block (same cadence as the loop below)
tokio::time::sleep(Duration::from_millis(300)).await;
// Get the deployed contract address from the receipt
let receipt: Option<TransactionReceipt> =
@@ -965,6 +986,10 @@ async fn test_rocksdb_storage_history_pruning() -> Result<()> {
assert_eq!(payload.block().number(), block_num);
last_tx_hash = tx_hash;
if nonce == 1 {
poll_tx_in_rocksdb(&nodes[0].inner.provider, deploy_hash).await;
}
// Let the persistence cycle complete before the next block
tokio::time::sleep(Duration::from_millis(300)).await;
}

View File

@@ -37,6 +37,9 @@ auto_impl.workspace = true
serde.workspace = true
thiserror.workspace = true
[dev-dependencies]
alloy-primitives = { workspace = true, features = ["getrandom"] }
[features]
default = ["std"]
trie-debug = []

View File

@@ -6,12 +6,33 @@ use core::time::Duration;
/// Triggers persistence when the number of canonical blocks in memory exceeds this threshold.
pub const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 2;
/// Maximum canonical-minus-persisted gap before engine API processing is stalled.
pub const DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD: u64 = 16;
/// Maximum number of consecutive canonical blocks whose non-trie outputs may be persisted ahead
/// of trie persistence.
pub const DEFAULT_DEFERRED_TRIE_BLOCKS: u64 = 0;
/// How close to the canonical head we persist blocks.
pub const DEFAULT_MEMORY_BLOCK_BUFFER_TARGET: u64 = 0;
/// Derives the default canonical-minus-persisted gap that triggers backpressure.
pub const fn default_persistence_backpressure_threshold(
persistence_threshold: u64,
memory_block_buffer_target: u64,
) -> u64 {
let threshold = 2 * (persistence_threshold + memory_block_buffer_target);
if threshold < 16 {
16
} else {
threshold
}
}
/// Maximum canonical-minus-persisted gap before engine API processing is stalled.
pub const DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD: u64 =
default_persistence_backpressure_threshold(
DEFAULT_PERSISTENCE_THRESHOLD,
DEFAULT_MEMORY_BLOCK_BUFFER_TARGET,
);
/// The size of proof targets chunk to spawn in one multiproof calculation.
pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE: usize = 5;
@@ -60,6 +81,17 @@ const fn assert_backpressure_threshold_invariant(
);
}
const fn assert_state_masking_invariant(
persistence_threshold: u64,
num_state_masking_blocks: u64,
memory_block_buffer_target: u64,
) {
debug_assert!(
num_state_masking_blocks + memory_block_buffer_target < persistence_threshold,
"num_state_masking_blocks + memory_block_buffer_target must be less than persistence_threshold",
);
}
const fn default_cross_block_cache_size() -> usize {
if cfg!(test) {
1024 * 1024 // 1 MB in tests
@@ -93,6 +125,9 @@ pub struct TreeConfig {
/// Maximum number of blocks to be kept only in memory without triggering
/// persistence.
persistence_threshold: u64,
/// Number of persisted blocks whose state/trie writes are masked instead of being durably
/// written in the current cycle.
num_state_masking_blocks: u64,
/// How close to the canonical head we persist blocks. Represents the ideal
/// number of most recent blocks to keep in memory for quick access and reorgs.
///
@@ -204,14 +239,24 @@ pub struct TreeConfig {
impl Default for TreeConfig {
fn default() -> Self {
let persistence_backpressure_threshold = default_persistence_backpressure_threshold(
DEFAULT_PERSISTENCE_THRESHOLD,
DEFAULT_MEMORY_BLOCK_BUFFER_TARGET,
);
assert_backpressure_threshold_invariant(
DEFAULT_PERSISTENCE_THRESHOLD,
DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD,
persistence_backpressure_threshold,
);
assert_state_masking_invariant(
DEFAULT_PERSISTENCE_THRESHOLD,
DEFAULT_DEFERRED_TRIE_BLOCKS,
DEFAULT_MEMORY_BLOCK_BUFFER_TARGET,
);
Self {
persistence_threshold: DEFAULT_PERSISTENCE_THRESHOLD,
num_state_masking_blocks: DEFAULT_DEFERRED_TRIE_BLOCKS,
memory_block_buffer_target: DEFAULT_MEMORY_BLOCK_BUFFER_TARGET,
persistence_backpressure_threshold: DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD,
persistence_backpressure_threshold,
block_buffer_limit: DEFAULT_BLOCK_BUFFER_LIMIT,
max_invalid_header_cache_length: DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH,
invalid_header_hit_eviction_threshold: DEFAULT_INVALID_HEADER_HIT_EVICTION_THRESHOLD,
@@ -253,6 +298,7 @@ impl TreeConfig {
#[expect(clippy::too_many_arguments)]
pub const fn new(
persistence_threshold: u64,
num_state_masking_blocks: u64,
memory_block_buffer_target: u64,
persistence_backpressure_threshold: u64,
block_buffer_limit: u32,
@@ -285,8 +331,14 @@ impl TreeConfig {
persistence_threshold,
persistence_backpressure_threshold,
);
assert_state_masking_invariant(
persistence_threshold,
num_state_masking_blocks,
memory_block_buffer_target,
);
Self {
persistence_threshold,
num_state_masking_blocks,
memory_block_buffer_target,
persistence_backpressure_threshold,
block_buffer_limit,
@@ -329,6 +381,11 @@ impl TreeConfig {
self.persistence_threshold
}
/// Return the number of persisted blocks whose state/trie writes are masked.
pub const fn num_state_masking_blocks(&self) -> u64 {
self.num_state_masking_blocks
}
/// Return the memory block buffer target.
pub const fn memory_block_buffer_target(&self) -> u64 {
self.memory_block_buffer_target
@@ -447,6 +504,22 @@ impl TreeConfig {
self.persistence_threshold,
self.persistence_backpressure_threshold,
);
assert_state_masking_invariant(
self.persistence_threshold,
self.num_state_masking_blocks,
self.memory_block_buffer_target,
);
self
}
/// Setter for the number of persisted blocks whose state/trie writes are masked.
pub const fn with_num_state_masking_blocks(mut self, num_state_masking_blocks: u64) -> Self {
self.num_state_masking_blocks = num_state_masking_blocks;
assert_state_masking_invariant(
self.persistence_threshold,
self.num_state_masking_blocks,
self.memory_block_buffer_target,
);
self
}
@@ -456,6 +529,11 @@ impl TreeConfig {
memory_block_buffer_target: u64,
) -> Self {
self.memory_block_buffer_target = memory_block_buffer_target;
assert_state_masking_invariant(
self.persistence_threshold,
self.num_state_masking_blocks,
self.memory_block_buffer_target,
);
self
}
@@ -765,7 +843,26 @@ impl TreeConfig {
#[cfg(test)]
mod tests {
use super::TreeConfig;
use super::{
default_persistence_backpressure_threshold, TreeConfig, DEFAULT_DEFERRED_TRIE_BLOCKS,
DEFAULT_MEMORY_BLOCK_BUFFER_TARGET, DEFAULT_PERSISTENCE_THRESHOLD,
};
#[test]
fn default_thresholds_use_derived_backpressure_threshold() {
let config = TreeConfig::default();
assert_eq!(config.persistence_threshold(), DEFAULT_PERSISTENCE_THRESHOLD);
assert_eq!(config.num_state_masking_blocks(), DEFAULT_DEFERRED_TRIE_BLOCKS);
assert_eq!(config.memory_block_buffer_target(), DEFAULT_MEMORY_BLOCK_BUFFER_TARGET);
assert_eq!(
config.persistence_backpressure_threshold(),
default_persistence_backpressure_threshold(
DEFAULT_PERSISTENCE_THRESHOLD,
DEFAULT_MEMORY_BLOCK_BUFFER_TARGET,
)
);
}
#[test]
#[should_panic(
@@ -776,4 +873,15 @@ mod tests {
.with_persistence_threshold(4)
.with_persistence_backpressure_threshold(4);
}
#[test]
#[should_panic(
expected = "num_state_masking_blocks + memory_block_buffer_target must be less than persistence_threshold"
)]
fn rejects_state_masking_window_at_or_above_persistence_threshold() {
let _ = TreeConfig::default()
.with_persistence_threshold(4)
.with_num_state_masking_blocks(2)
.with_memory_block_buffer_target(2);
}
}

View File

@@ -1,16 +1,16 @@
use crate::metrics::PersistenceMetrics;
use alloy_eips::BlockNumHash;
use crossbeam_channel::Sender as CrossbeamSender;
use reth_chain_state::ExecutedBlock;
use reth_errors::ProviderError;
use reth_ethereum_primitives::EthPrimitives;
use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
use reth_provider::{
providers::ProviderNodeTypes, BlockExecutionWriter, BlockHashReader, ChainStateBlockWriter,
DBProvider, DatabaseProviderFactory, ProviderFactory, SaveBlocksMode,
DBProvider, DatabaseProviderFactory, ProviderFactory, SaveBlocksMode, SaveBlocksPlan,
StageCheckpointReader,
};
use reth_prune::{PrunerError, PrunerWithFactory};
use reth_stages_api::{MetricEvent, MetricEventsSender};
use reth_stages_api::{MetricEvent, MetricEventsSender, StageId};
use reth_tasks::spawn_os_thread;
use std::{
sync::{
@@ -26,8 +26,13 @@ use tracing::{debug, error, instrument};
/// Unified result of any persistence operation.
#[derive(Debug)]
pub struct PersistenceResult {
/// The last block that was persisted, if any.
/// The highest block whose non-state/trie outputs are persisted, if any.
pub last_block: Option<BlockNumHash>,
/// The highest block whose state/trie data is fully persisted, if known.
///
/// When this lags behind [`Self::last_block`], callers must retain the suffix
/// above it in memory so trie-backed operations can still unwind from that point.
pub last_state_trie_block: Option<u64>,
/// The commit duration, only available for save-blocks operations.
pub commit_duration: Option<Duration>,
}
@@ -96,14 +101,14 @@ where
while let Ok(action) = self.incoming.recv() {
match action {
PersistenceAction::RemoveBlocksAbove(new_tip_num, sender) => {
let last_block = self.on_remove_blocks_above(new_tip_num)?;
let result = self.on_remove_blocks_above(new_tip_num)?;
// send new sync metrics based on removed blocks
let _ =
self.sync_metrics_tx.send(MetricEvent::SyncHeight { height: new_tip_num });
let _ = sender.send(PersistenceResult { last_block, commit_duration: None });
let _ = sender.send(result);
}
PersistenceAction::SaveBlocks(blocks, sender) => {
let result = self.on_save_blocks(blocks)?;
PersistenceAction::SaveBlocks(plan, sender) => {
let result = self.on_save_blocks(plan)?;
let result_number = result.last_block.map(|b| b.number);
let _ = sender.send(result);
@@ -130,28 +135,40 @@ where
fn on_remove_blocks_above(
&self,
new_tip_num: u64,
) -> Result<Option<BlockNumHash>, PersistenceError> {
) -> Result<PersistenceResult, PersistenceError> {
debug!(target: "engine::persistence", ?new_tip_num, "Removing blocks");
let start_time = Instant::now();
let provider_rw = self.provider.database_provider_rw()?;
let new_tip_hash = provider_rw.block_hash(new_tip_num)?;
provider_rw.remove_block_and_execution_above(new_tip_num)?;
let last_state_trie_block =
provider_rw.get_stage_checkpoint(StageId::Finish)?.map(|checkpoint| {
checkpoint
.finish_stage_checkpoint()
.and_then(|finish| finish.partial_state_trie)
.unwrap_or(checkpoint.block_number)
});
provider_rw.commit()?;
debug!(target: "engine::persistence", ?new_tip_num, ?new_tip_hash, "Removed blocks from disk");
self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed());
Ok(new_tip_hash.map(|hash| BlockNumHash { hash, number: new_tip_num }))
Ok(PersistenceResult {
last_block: new_tip_hash.map(|hash| BlockNumHash { hash, number: new_tip_num }),
last_state_trie_block,
commit_duration: None,
})
}
#[instrument(level = "debug", target = "engine::persistence", skip_all, fields(block_count = blocks.len()))]
#[instrument(level = "debug", target = "engine::persistence", skip_all, fields(block_count = plan.blocks.len()))]
fn on_save_blocks(
&mut self,
blocks: Vec<ExecutedBlock<N::Primitives>>,
plan: SaveBlocksPlan<N::Primitives>,
) -> Result<PersistenceResult, PersistenceError> {
let first_block = blocks.first().map(|b| b.recovered_block.num_hash());
let last_block = blocks.last().map(|b| b.recovered_block.num_hash());
let block_count = blocks.len();
let first_block = plan.blocks.first().map(|block| block.recovered_block().num_hash());
let last_block = plan.last_block();
let block_count = plan.blocks.len();
let mut last_state_trie_block = None;
let pending_finalized = self.pending_finalized_block.take();
let pending_safe = self.pending_safe_block.take();
@@ -160,19 +177,27 @@ where
let start_time = Instant::now();
if let Some(last) = last_block {
if let Some(last_block) = last_block {
let provider_rw = self.provider.database_provider_rw()?;
provider_rw.save_blocks(blocks, SaveBlocksMode::Full)?;
provider_rw.save_blocks(&plan, SaveBlocksMode::Full)?;
last_state_trie_block = provider_rw
.get_stage_checkpoint(StageId::Finish)?
.and_then(|checkpoint| {
checkpoint
.finish_stage_checkpoint()
.and_then(|finish| finish.partial_state_trie)
})
.or(Some(last_block.number));
if let Some(finalized) = pending_finalized {
provider_rw.save_finalized_block_number(finalized.min(last.number))?;
if finalized > last.number {
provider_rw.save_finalized_block_number(finalized.min(last_block.number))?;
if finalized > last_block.number {
self.pending_finalized_block = Some(finalized);
}
}
if let Some(safe) = pending_safe {
provider_rw.save_safe_block_number(safe.min(last.number))?;
if safe > last.number {
provider_rw.save_safe_block_number(safe.min(last_block.number))?;
if safe > last_block.number {
self.pending_safe_block = Some(safe);
}
}
@@ -185,13 +210,13 @@ where
//
// The pruner reads the indices from rocksdb, filters it, and writes to indices, so it
// must be able to read anything written by save_blocks.
if self.pruner.is_pruning_needed(last.number) {
debug!(target: "engine::persistence", block_num=?last.number, "Running pruner");
if self.pruner.is_pruning_needed(last_block.number) {
debug!(target: "engine::persistence", block_num=?last_block.number, "Running pruner");
let prune_start = Instant::now();
let provider_rw = self.provider.database_provider_rw()?;
let _ = self.pruner.run_with_provider(&provider_rw, last.number)?;
let _ = self.pruner.run_with_provider(&provider_rw, last_block.number)?;
provider_rw.commit()?;
debug!(target: "engine::persistence", tip=?last.number, "Finished pruning after saving blocks");
debug!(target: "engine::persistence", tip=?last_block.number, "Finished pruning after saving blocks");
self.metrics.prune_before_duration_seconds.record(prune_start.elapsed());
}
}
@@ -200,7 +225,7 @@ where
self.metrics.save_blocks_batch_size.record(block_count as f64);
self.metrics.save_blocks_duration_seconds.record(elapsed);
Ok(PersistenceResult { last_block, commit_duration: Some(elapsed) })
Ok(PersistenceResult { last_block, last_state_trie_block, commit_duration: Some(elapsed) })
}
}
@@ -222,9 +247,10 @@ pub enum PersistenceAction<N: NodePrimitives = EthPrimitives> {
/// The section of tree state that should be persisted. These blocks are expected in order of
/// increasing block number.
///
/// First, header, transaction, and receipt-related data should be written to static files.
/// Then the execution history-related data will be written to the database.
SaveBlocks(Vec<ExecutedBlock<N>>, CrossbeamSender<PersistenceResult>),
/// First, header, transaction, and receipt-related data should be written to static files for
/// the deferred trie region. Then the execution history-related data will be written to the
/// database, while trie catchup is persisted for the prefix.
SaveBlocks(SaveBlocksPlan<N>, CrossbeamSender<PersistenceResult>),
/// Removes block data above the given block number from the database.
///
@@ -308,10 +334,10 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
/// If there are no blocks to persist, then `None` is sent in the sender.
pub fn save_blocks(
&self,
blocks: Vec<ExecutedBlock<T>>,
plan: SaveBlocksPlan<T>,
tx: CrossbeamSender<PersistenceResult>,
) -> Result<(), SendError<PersistenceAction<T>>> {
self.send_action(PersistenceAction::SaveBlocks(blocks, tx))
self.send_action(PersistenceAction::SaveBlocks(plan, tx))
}
/// Queues the finalized block number to be persisted on disk.
@@ -375,12 +401,12 @@ impl Drop for ServiceGuard {
mod tests {
use super::*;
use alloy_primitives::{B256, U256};
use reth_chain_state::test_utils::TestBlockBuilder;
use reth_chain_state::{test_utils::TestBlockBuilder, ExecutedBlock};
use reth_exex_types::FinishedExExHeight;
use reth_provider::{
providers::{ProviderFactoryBuilder, ReadOnlyConfig},
test_utils::{create_test_provider_factory, MockNodeTypes},
AccountReader, ChainSpecProvider, HeaderProvider, StorageSettingsCache,
AccountReader, ChainSpecProvider, HeaderProvider, SaveBlocksPlanStep, StorageSettingsCache,
TryIntoHistoricalStateProvider,
};
use reth_prune::Pruner;
@@ -389,6 +415,13 @@ mod tests {
fn default_persistence_handle() -> PersistenceHandle<EthPrimitives> {
let provider = create_test_provider_factory();
persistence_handle(provider)
}
fn persistence_handle<N>(provider: ProviderFactory<N>) -> PersistenceHandle<EthPrimitives>
where
N: ProviderNodeTypes<Primitives = EthPrimitives>,
{
let (_finished_exex_height_tx, finished_exex_height_rx) =
tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
@@ -399,18 +432,31 @@ mod tests {
PersistenceHandle::<EthPrimitives>::spawn_service(provider, pruner, sync_metrics_tx)
}
fn full_save_plan(blocks: Vec<ExecutedBlock<EthPrimitives>>) -> SaveBlocksPlan<EthPrimitives> {
let full_range = 0..blocks.len();
SaveBlocksPlan::new(
blocks,
vec![SaveBlocksPlanStep::new(
full_range.clone(),
Some(full_range.end..full_range.end),
true,
)],
)
}
#[test]
fn test_save_blocks_empty() {
reth_tracing::init_test_tracing();
let handle = default_persistence_handle();
let blocks = vec![];
let blocks = full_save_plan(vec![]);
let (tx, rx) = crossbeam_channel::bounded(1);
handle.save_blocks(blocks, tx).unwrap();
let result = rx.recv().unwrap();
assert!(result.last_block.is_none());
assert!(result.last_state_trie_block.is_none());
}
#[test]
@@ -423,14 +469,16 @@ mod tests {
test_block_builder.get_executed_block_with_number(block_number, B256::random());
let block_hash = executed.recovered_block().hash();
let blocks = vec![executed];
let blocks = full_save_plan(vec![executed]);
let (tx, rx) = crossbeam_channel::bounded(1);
handle.save_blocks(blocks, tx).unwrap();
let result = rx.recv_timeout(std::time::Duration::from_secs(10)).expect("test timed out");
assert_eq!(block_hash, result.last_block.unwrap().hash);
let last_block = result.last_block.unwrap();
assert_eq!(block_hash, last_block.hash);
assert_eq!(result.last_state_trie_block, Some(last_block.number));
}
#[test]
@@ -443,9 +491,11 @@ mod tests {
let last_hash = blocks.last().unwrap().recovered_block().hash();
let (tx, rx) = crossbeam_channel::bounded(1);
handle.save_blocks(blocks, tx).unwrap();
handle.save_blocks(full_save_plan(blocks), tx).unwrap();
let result = rx.recv().unwrap();
assert_eq!(last_hash, result.last_block.unwrap().hash);
let last_block = result.last_block.unwrap();
assert_eq!(last_hash, last_block.hash);
assert_eq!(result.last_state_trie_block, Some(last_block.number));
}
#[test]
@@ -460,13 +510,57 @@ mod tests {
let last_hash = blocks.last().unwrap().recovered_block().hash();
let (tx, rx) = crossbeam_channel::bounded(1);
handle.save_blocks(blocks, tx).unwrap();
handle.save_blocks(full_save_plan(blocks), tx).unwrap();
let result = rx.recv().unwrap();
assert_eq!(last_hash, result.last_block.unwrap().hash);
let last_block = result.last_block.unwrap();
assert_eq!(last_hash, last_block.hash);
assert_eq!(result.last_state_trie_block, Some(last_block.number));
}
}
#[test]
fn test_remove_blocks_above_preserves_partial_state_trie() {
reth_tracing::init_test_tracing();
let provider = create_test_provider_factory();
let mut test_block_builder = TestBlockBuilder::eth().with_state();
let blocks = test_block_builder.get_executed_blocks(0..4).collect::<Vec<_>>();
let provider_rw = provider.database_provider_rw().unwrap();
provider_rw
.save_blocks(
&SaveBlocksPlan::new(
blocks,
vec![
SaveBlocksPlanStep::new(0..2, Some(2..4), true),
SaveBlocksPlanStep::new(2..4, None, true),
],
),
SaveBlocksMode::Full,
)
.unwrap();
provider_rw.commit().unwrap();
let handle = persistence_handle(provider.clone());
let (tx, rx) = crossbeam_channel::bounded(1);
handle.remove_blocks_above(2, tx).unwrap();
let result = rx.recv_timeout(std::time::Duration::from_secs(10)).expect("test timed out");
let last_block = result.last_block.unwrap();
assert_eq!(last_block.number, 2);
assert_eq!(result.last_state_trie_block, Some(1));
let finish_checkpoint =
provider.provider().unwrap().get_stage_checkpoint(StageId::Finish).unwrap().unwrap();
assert_eq!(finish_checkpoint.block_number, 2);
assert_eq!(
finish_checkpoint.finish_stage_checkpoint().unwrap().partial_state_trie,
Some(1)
);
}
/// Verifies that committing `save_blocks` history before running the pruner
/// prevents the pruner from overwriting new entries.
///
@@ -555,7 +649,7 @@ mod tests {
{
let provider_rw = provider_factory.database_provider_rw().unwrap();
provider_rw.save_blocks(blocks_a, SaveBlocksMode::Full).unwrap();
provider_rw.save_blocks(&full_save_plan(blocks_a), SaveBlocksMode::Full).unwrap();
provider_rw.commit().unwrap();
}
@@ -612,7 +706,12 @@ mod tests {
provider_rw.commit().unwrap();
let provider_rw = pf.database_provider_rw().unwrap();
provider_rw.save_blocks(vec![block_b2], SaveBlocksMode::Full).unwrap();
provider_rw
.save_blocks(
&full_save_plan(std::slice::from_ref(&block_b2).to_vec()),
SaveBlocksMode::Full,
)
.unwrap();
provider_rw.commit().unwrap();
});

View File

@@ -30,9 +30,9 @@ use reth_primitives_traits::{
};
use reth_provider::{
BlockExecutionOutput, BlockExecutionResult, BlockReader, ChangeSetReader,
DatabaseProviderFactory, HashedPostStateProvider, ProviderError, StageCheckpointReader,
StateProviderBox, StateProviderFactory, StateReader, StorageChangeSetReader,
StorageSettingsCache, TransactionVariant,
DatabaseProviderFactory, HashedPostStateProvider, ProviderError, SaveBlocksPlan,
SaveBlocksPlanStep, StageCheckpointReader, StateProviderBox, StateProviderFactory, StateReader,
StorageChangeSetReader, StorageSettingsCache, TransactionVariant,
};
use reth_revm::database::StateProviderDatabase;
use reth_stages_api::ControlFlow;
@@ -433,6 +433,7 @@ where
let persistence_state = PersistenceState {
last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
last_state_trie_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
rx: None,
};
@@ -1350,7 +1351,7 @@ where
/// Helper method to remove blocks and set the persistence state. This ensures we keep track of
/// the current persistence action while we're removing blocks.
fn remove_blocks(&mut self, new_tip_num: u64) {
debug!(target: "engine::tree", ?new_tip_num, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
debug!(target: "engine::tree", ?new_tip_num, last_persisted_block=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
if new_tip_num < self.persistence_state.last_persisted_block.number {
debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
let (tx, rx) = crossbeam_channel::bounded(1);
@@ -1361,24 +1362,25 @@ where
/// Helper method to save blocks and set the persistence state. This ensures we keep track of
/// the current persistence action while we're saving blocks.
fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlock<N>>) {
if blocks_to_persist.is_empty() {
fn persist_blocks(&mut self, plan: SaveBlocksPlan<N>) {
if plan.is_empty() {
debug!(target: "engine::tree", "Returned empty set of blocks to persist");
return
}
// NOTE: checked non-empty above
let highest_num_hash = blocks_to_persist
.iter()
.max_by_key(|block| block.recovered_block().number())
.map(|b| b.recovered_block().num_hash())
.expect("Checked non-empty persisting blocks");
let last_block = plan.last_block().expect("checked non-empty persisting blocks");
debug!(target: "engine::tree", count=blocks_to_persist.len(), blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
debug!(
target: "engine::tree",
count = plan.blocks.len(),
steps = ?plan.steps,
blocks = ?plan.blocks.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(),
"Persisting blocks"
);
let (tx, rx) = crossbeam_channel::bounded(1);
let _ = self.persistence.save_blocks(blocks_to_persist, tx);
let _ = self.persistence.save_blocks(plan, tx);
self.persistence_state.start_save(highest_num_hash, rx);
self.persistence_state.start_save(last_block, rx);
}
/// Triggers new persistence actions if no persistence task is currently in progress.
@@ -1390,9 +1392,8 @@ where
if let Some(new_tip_num) = self.find_disk_reorg()? {
self.remove_blocks(new_tip_num)
} else if self.should_persist() {
let blocks_to_persist =
self.get_canonical_blocks_to_persist(PersistTarget::Threshold)?;
self.persist_blocks(blocks_to_persist);
let plan = self.get_save_blocks_plan(PersistTarget::Threshold)?;
self.persist_blocks(plan);
}
}
@@ -1423,15 +1424,15 @@ where
self.on_persistence_complete(result, start_time)?;
}
let blocks_to_persist = self.get_canonical_blocks_to_persist(PersistTarget::Head)?;
let plan = self.get_save_blocks_plan(PersistTarget::Head)?;
if blocks_to_persist.is_empty() {
if plan.is_empty() {
debug!(target: "engine::tree", "persistence complete, signaling termination");
return Ok(())
}
debug!(target: "engine::tree", count = blocks_to_persist.len(), "persisting remaining blocks before shutdown");
self.persist_blocks(blocks_to_persist);
debug!(target: "engine::tree", count = plan.blocks.len(), "persisting remaining blocks before shutdown");
self.persist_blocks(plan);
}
}
@@ -1467,25 +1468,25 @@ where
) -> Result<(), AdvancePersistenceError> {
self.metrics.engine.persistence_duration.record(start_time.elapsed());
let commit_duration = result.commit_duration;
let Some(BlockNumHash {
hash: last_persisted_block_hash,
number: last_persisted_block_number,
}) = result.last_block
let PersistenceResult { last_block, last_state_trie_block, commit_duration } = result;
let Some(BlockNumHash { hash: last_block_hash, number: last_block_number }) = last_block
else {
// if this happened, then we persisted no blocks because we sent an empty vec of blocks
warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
return Ok(())
};
debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, elapsed=?start_time.elapsed(), "Finished persisting, calling finish");
self.persistence_state.finish(last_persisted_block_hash, last_persisted_block_number);
let last_block = BlockNumHash::new(last_block_number, last_block_hash);
let last_state_trie_persisted_block =
self.last_state_trie_persisted_block(last_block, last_state_trie_block)?;
debug!(target: "engine::tree", ?last_block_hash, ?last_block_number, last_state_trie_persisted_block = last_state_trie_persisted_block.number, elapsed=?start_time.elapsed(), "Finished persisting, calling finish");
self.persistence_state.finish(last_block, last_state_trie_persisted_block);
// Evict trie changesets for blocks below the eviction threshold.
// Keep at least CHANGESET_CACHE_RETENTION_BLOCKS from the persisted tip, and also respect
// the finalized block if set.
let min_threshold =
last_persisted_block_number.saturating_sub(CHANGESET_CACHE_RETENTION_BLOCKS);
let min_threshold = last_block_number.saturating_sub(CHANGESET_CACHE_RETENTION_BLOCKS);
let eviction_threshold =
if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() {
// Use the minimum of finalized block and retention threshold to be conservative
@@ -1496,7 +1497,7 @@ where
};
debug!(
target: "engine::tree",
last_persisted = last_persisted_block_number,
last_persisted_block = last_block_number,
finalized_number = ?self.canonical_in_memory_state.get_finalized_num_hash().map(|f| f.number),
eviction_threshold,
"Evicting changesets below threshold"
@@ -1506,7 +1507,7 @@ where
// Invalidate cached overlay since the anchor has changed
self.state.tree_state.invalidate_cached_overlay();
self.on_new_persisted_block()?;
self.on_new_persisted_block(last_state_trie_persisted_block)?;
// Re-prepare overlay for the current canonical head with the new anchor.
// Spawn a background task to trigger computation so it's ready when the next payload
@@ -1517,11 +1518,39 @@ where
});
}
self.purge_timing_stats(last_persisted_block_number, commit_duration);
self.purge_timing_stats(last_block_number, commit_duration);
Ok(())
}
/// Returns the highest block that can be dropped from memory after persistence completes.
fn last_state_trie_persisted_block(
&self,
last_block: BlockNumHash,
last_state_trie_block: Option<u64>,
) -> ProviderResult<BlockNumHash> {
let Some(last_state_trie_block) = last_state_trie_block else { return Ok(last_block) };
debug_assert!(
last_state_trie_block <= last_block.number,
"state/trie frontier cannot exceed the last persisted block"
);
if last_state_trie_block >= last_block.number {
return Ok(last_block)
}
let hash = self
.canonical_in_memory_state
.hash_by_number(last_state_trie_block)
.map(Ok)
.unwrap_or_else(|| {
self.provider
.block_hash(last_state_trie_block)?
.ok_or_else(|| ProviderError::HeaderNotFound(last_state_trie_block.into()))
})?;
Ok(BlockNumHash::new(last_state_trie_block, hash))
}
/// Handles a message from the engine.
///
/// Returns `ControlFlow::Break(())` if the engine should terminate.
@@ -1825,7 +1854,7 @@ where
// update the tracked chain height, after backfill sync both the canonical height and
// persisted height are the same
self.state.tree_state.set_canonical_head(new_head.num_hash());
self.persistence_state.finish(new_head.hash(), new_head.number());
self.persistence_state.finish(new_head.num_hash(), new_head.num_hash());
// update the tracked canonical head
self.canonical_in_memory_state.set_canonical_head(new_head);
@@ -2033,62 +2062,96 @@ where
self.config.persistence_threshold()
}
/// Returns a batch of consecutive canonical blocks to persist in the range
/// `(last_persisted_number .. target]`. The expected order is oldest -> newest.
fn get_canonical_blocks_to_persist(
/// Returns the save plan for the next persistence cycle.
fn get_save_blocks_plan(
&self,
target: PersistTarget,
) -> Result<Vec<ExecutedBlock<N>>, AdvancePersistenceError> {
) -> Result<SaveBlocksPlan<N>, AdvancePersistenceError> {
// We will calculate the state root using the database, so we need to be sure there are no
// changes
debug_assert!(!self.persistence_state.in_progress());
let mut blocks_to_persist = Vec::new();
let mut blocks = Vec::new();
let mut current_hash = self.state.tree_state.canonical_block_hash();
let last_persisted_number = self.persistence_state.last_persisted_block.number;
let last_state_trie_persisted_block_number =
self.persistence_state.last_state_trie_persisted_block.number;
let last_persisted_block_number = self.persistence_state.last_persisted_block.number;
let canonical_head_number = self.state.tree_state.canonical_block_number();
let target_number = match target {
PersistTarget::Head => canonical_head_number,
let last_block_target_number = match target {
PersistTarget::Threshold => {
canonical_head_number.saturating_sub(self.config.memory_block_buffer_target())
}
PersistTarget::Head => canonical_head_number,
};
debug!(
target: "engine::tree",
?current_hash,
?last_persisted_number,
?last_state_trie_persisted_block_number,
?last_persisted_block_number,
?canonical_head_number,
?target_number,
"Returning canonical blocks to persist"
target = ?target,
"Returning save plan"
);
while let Some(block) = self.state.tree_state.blocks_by_hash.get(&current_hash) {
if block.recovered_block().number() <= last_persisted_number {
if block.recovered_block().number() <= last_state_trie_persisted_block_number {
break;
}
if block.recovered_block().number() <= target_number {
blocks_to_persist.push(block.clone());
if block.recovered_block().number() <= last_block_target_number {
blocks.push(block.clone());
}
current_hash = block.recovered_block().parent_hash();
}
// Reverse the order so that the oldest block comes first
blocks_to_persist.reverse();
blocks.reverse();
Ok(blocks_to_persist)
let trie_catchup_block_count = last_persisted_block_number
.saturating_sub(last_state_trie_persisted_block_number)
.min(blocks.len() as u64) as usize;
let persist_rest_block_count = blocks.len().saturating_sub(trie_catchup_block_count);
let state_masking_block_count =
persist_rest_block_count.min(self.config.num_state_masking_blocks() as usize);
let full_persist_block_count = persist_rest_block_count - state_masking_block_count;
let full_persist_start = trie_catchup_block_count;
let state_masking_start = full_persist_start + full_persist_block_count;
let state_masking_range = state_masking_start..blocks.len();
let mut steps = Vec::new();
if trie_catchup_block_count > 0 {
steps.push(SaveBlocksPlanStep::new(
0..trie_catchup_block_count,
Some(state_masking_range.clone()),
false,
));
}
if full_persist_block_count > 0 {
steps.push(SaveBlocksPlanStep::new(
full_persist_start..state_masking_start,
Some(state_masking_range.clone()),
true,
));
}
if state_masking_block_count > 0 {
steps.push(SaveBlocksPlanStep::new(state_masking_range, None, true));
}
Ok(SaveBlocksPlan::new(blocks, steps))
}
/// This clears the blocks from the in-memory tree state that have been persisted to the
/// database.
/// This clears the blocks from the in-memory tree state that no longer need to stay resident
/// after persistence completes.
///
/// This also updates the canonical in-memory state to reflect the newest persisted block
/// height.
/// This also updates the canonical in-memory state to reflect the newest persisted block tip,
/// even if trie persistence only advanced through an earlier block.
///
/// Assumes that `finish` has been called on the `persistence_state` at least once
fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
fn on_new_persisted_block(
&mut self,
in_memory_persisted_block: BlockNumHash,
) -> ProviderResult<()> {
// If we have an on-disk reorg, we need to handle it first before touching the in-memory
// state.
if let Some(remove_above) = self.find_disk_reorg()? {
@@ -2097,11 +2160,11 @@ where
}
let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
number: self.persistence_state.last_persisted_block.number,
hash: self.persistence_state.last_persisted_block.hash,
});
self.remove_before(in_memory_persisted_block, finalized)?;
self.canonical_in_memory_state.remove_persisted_blocks_until(
self.persistence_state.last_persisted_block,
in_memory_persisted_block.number,
);
Ok(())
}

View File

@@ -22,7 +22,6 @@
use crate::persistence::PersistenceResult;
use alloy_eips::BlockNumHash;
use alloy_primitives::B256;
use crossbeam_channel::Receiver as CrossbeamReceiver;
use reth_primitives_traits::FastInstant as Instant;
use tracing::trace;
@@ -30,10 +29,12 @@ use tracing::trace;
/// The state of the persistence task.
#[derive(Debug)]
pub struct PersistenceState {
/// Hash and number of the last block persisted.
/// Hash and number of the highest block whose non-state/trie outputs are persisted.
///
/// This tracks the chain height that is persisted on disk
/// This tracks the highest canonical block with durable block/static-file/plain-state data.
pub(crate) last_persisted_block: BlockNumHash,
/// Hash and number of the highest block whose state/trie outputs are persisted.
pub(crate) last_state_trie_persisted_block: BlockNumHash,
/// Receiver end of channel where the result of the persistence task will be
/// sent when done. A None value means there's no persistence task in progress.
pub(crate) rx:
@@ -76,13 +77,18 @@ impl PersistenceState {
/// Sets state for a finished persistence task.
pub(crate) fn finish(
&mut self,
last_persisted_block_hash: B256,
last_persisted_block_number: u64,
last_persisted_block: BlockNumHash,
last_state_trie_persisted_block: BlockNumHash,
) {
trace!(target: "engine::tree", block= %last_persisted_block_number, hash=%last_persisted_block_hash, "updating persistence state");
trace!(
target: "engine::tree",
last_persisted_block = %last_persisted_block.number,
last_state_trie_persisted_block = %last_state_trie_persisted_block.number,
"updating persistence state"
);
self.rx = None;
self.last_persisted_block =
BlockNumHash::new(last_persisted_block_number, last_persisted_block_hash);
self.last_persisted_block = last_persisted_block;
self.last_state_trie_persisted_block = last_state_trie_persisted_block;
}
}

View File

@@ -222,7 +222,11 @@ impl TestHarness {
engine_api_tree_state,
canonical_in_memory_state,
persistence_handle,
PersistenceState { last_persisted_block: BlockNumHash::default(), rx: None },
PersistenceState {
last_persisted_block: BlockNumHash::default(),
last_state_trie_persisted_block: BlockNumHash::default(),
rx: None,
},
payload_builder,
tree_config,
EngineApiKind::Ethereum,
@@ -360,6 +364,17 @@ impl TestHarness {
}
}
type ExpectedPlanStep = (std::ops::Range<usize>, Option<std::ops::Range<usize>>, bool);
fn assert_plan_steps(plan: &SaveBlocksPlan<EthPrimitives>, expected: &[ExpectedPlanStep]) {
assert_eq!(plan.steps.len(), expected.len());
for (step, (block_range, masking_range, persist_rest)) in plan.steps.iter().zip(expected) {
assert_eq!(&step.block_range, block_range);
assert_eq!(&step.state_trie_masking_range, masking_range);
assert_eq!(step.persist_rest, *persist_rest);
}
}
/// Simplified test metrics for validation calls
#[derive(Debug, Default)]
struct TestMetrics {
@@ -554,12 +569,16 @@ async fn test_tree_persist_blocks() {
let received_action =
test_harness.action_rx.recv().expect("Failed to receive save blocks action");
if let PersistenceAction::SaveBlocks(saved_blocks, _) = received_action {
if let PersistenceAction::SaveBlocks(plan, _) = received_action {
// only blocks.len() - tree_config.memory_block_buffer_target() will be
// persisted
let expected_persist_len = blocks.len() - tree_config.memory_block_buffer_target() as usize;
assert_eq!(saved_blocks.len(), expected_persist_len);
assert_eq!(saved_blocks, blocks[..expected_persist_len]);
assert_eq!(plan.blocks.len(), expected_persist_len);
assert_eq!(plan.blocks, blocks[..expected_persist_len]);
assert_plan_steps(
&plan,
&[(0..expected_persist_len, Some(expected_persist_len..expected_persist_len), true)],
);
} else {
panic!("unexpected action received {received_action:?}");
}
@@ -704,8 +723,8 @@ fn test_backpressure_waits_for_persistence_before_reading_incoming() {
test_harness.tree.config = test_harness
.tree
.config
.with_persistence_threshold(0)
.with_persistence_backpressure_threshold(1);
.with_persistence_threshold(1)
.with_persistence_backpressure_threshold(2);
let (persist_tx, persist_rx) = crossbeam_channel::bounded(1);
let persisted = blocks.last().unwrap().recovered_block().num_hash();
@@ -736,6 +755,7 @@ fn test_backpressure_waits_for_persistence_before_reading_incoming() {
persist_tx
.send(PersistenceResult {
last_block: Some(persisted),
last_state_trie_block: Some(persisted.number),
commit_duration: Some(Duration::ZERO),
})
.unwrap();
@@ -770,10 +790,10 @@ async fn test_tree_state_on_new_head_reorg() {
reth_tracing::init_test_tracing();
let chain_spec = MAINNET.clone();
// Set persistence_threshold to 1
// Keep a single block in memory while still leaving room for the persistence threshold.
let mut test_harness = TestHarness::new(chain_spec);
test_harness.tree.config =
test_harness.tree.config.with_persistence_threshold(1).with_memory_block_buffer_target(1);
test_harness.tree.config.with_persistence_threshold(2).with_memory_block_buffer_target(1);
let mut test_block_builder = TestBlockBuilder::eth();
let blocks: Vec<_> = test_block_builder.get_executed_blocks(1..6).collect();
@@ -824,15 +844,16 @@ async fn test_tree_state_on_new_head_reorg() {
// get rid of the prev action
let received_action = test_harness.action_rx.recv().unwrap();
let PersistenceAction::SaveBlocks(saved_blocks, sender) = received_action else {
let PersistenceAction::SaveBlocks(plan, sender) = received_action else {
panic!("received wrong action");
};
assert_eq!(saved_blocks, vec![blocks[0].clone(), blocks[1].clone()]);
assert_eq!(plan.blocks, vec![blocks[0].clone(), blocks[1].clone()]);
// send the response so we can advance again
sender
.send(PersistenceResult {
last_block: Some(blocks[1].recovered_block().num_hash()),
last_state_trie_block: Some(blocks[1].recovered_block().number()),
commit_duration: Some(Duration::ZERO),
})
.unwrap();
@@ -968,8 +989,10 @@ async fn test_get_canonical_blocks_to_persist() {
test_harness = test_harness.with_blocks(blocks.clone());
let last_persisted_block_number = 3;
test_harness.tree.persistence_state.last_persisted_block =
let last_persisted_block =
blocks[last_persisted_block_number as usize].recovered_block.num_hash();
test_harness.tree.persistence_state.last_persisted_block = last_persisted_block;
test_harness.tree.persistence_state.last_state_trie_persisted_block = last_persisted_block;
let persistence_threshold = 4;
let memory_block_buffer_target = 3;
@@ -977,16 +1000,15 @@ async fn test_get_canonical_blocks_to_persist() {
.with_persistence_threshold(persistence_threshold)
.with_memory_block_buffer_target(memory_block_buffer_target);
let blocks_to_persist =
test_harness.tree.get_canonical_blocks_to_persist(PersistTarget::Threshold).unwrap();
let plan = test_harness.tree.get_save_blocks_plan(PersistTarget::Threshold).unwrap();
let expected_blocks_to_persist_length: usize =
(canonical_head_number - memory_block_buffer_target - last_persisted_block_number)
.try_into()
.unwrap();
assert_eq!(blocks_to_persist.len(), expected_blocks_to_persist_length);
for (i, item) in blocks_to_persist.iter().enumerate().take(expected_blocks_to_persist_length) {
assert_eq!(plan.blocks.len(), expected_blocks_to_persist_length);
for (i, item) in plan.blocks.iter().enumerate().take(expected_blocks_to_persist_length) {
assert_eq!(item.recovered_block().number, last_persisted_block_number + i as u64 + 1);
}
@@ -997,15 +1019,14 @@ async fn test_get_canonical_blocks_to_persist() {
assert!(test_harness.tree.state.tree_state.sealed_header_by_hash(&fork_block_hash).is_some());
let blocks_to_persist =
test_harness.tree.get_canonical_blocks_to_persist(PersistTarget::Threshold).unwrap();
assert_eq!(blocks_to_persist.len(), expected_blocks_to_persist_length);
let plan = test_harness.tree.get_save_blocks_plan(PersistTarget::Threshold).unwrap();
assert_eq!(plan.blocks.len(), expected_blocks_to_persist_length);
// check that the fork block is not included in the blocks to persist
assert!(!blocks_to_persist.iter().any(|b| b.recovered_block().hash() == fork_block_hash));
assert!(!plan.blocks.iter().any(|b| b.recovered_block().hash() == fork_block_hash));
// check that the original block 4 is still included
assert!(blocks_to_persist.iter().any(|b| b.recovered_block().number == 4 &&
assert!(plan.blocks.iter().any(|b| b.recovered_block().number == 4 &&
b.recovered_block().hash() == blocks[4].recovered_block().hash()));
// check that if we advance persistence, the persistence action is the correct value
@@ -1013,11 +1034,193 @@ async fn test_get_canonical_blocks_to_persist() {
assert_eq!(
test_harness.tree.persistence_state.current_action().cloned(),
Some(CurrentPersistenceAction::SavingBlocks {
highest: blocks_to_persist.last().unwrap().recovered_block().num_hash()
highest: plan.blocks.last().unwrap().recovered_block().num_hash()
})
);
}
#[test]
fn test_get_save_blocks_plan_with_deferred_trie_blocks() {
let chain_spec = MAINNET.clone();
let mut test_harness = TestHarness::new(chain_spec);
let mut test_block_builder = TestBlockBuilder::eth();
let blocks: Vec<_> = test_block_builder.get_executed_blocks(0..7).collect();
test_harness = test_harness.with_blocks(blocks.clone());
test_harness.tree.persistence_state.last_state_trie_persisted_block =
blocks[1].recovered_block().num_hash();
test_harness.tree.persistence_state.last_persisted_block =
blocks[3].recovered_block().num_hash();
test_harness.tree.config = TreeConfig::default()
.with_persistence_threshold(4)
.with_memory_block_buffer_target(1)
.with_num_state_masking_blocks(2);
let plan = test_harness.tree.get_save_blocks_plan(PersistTarget::Threshold).unwrap();
assert_plan_steps(&plan, &[(0..2, Some(2..4), false), (2..4, None, true)]);
assert_eq!(plan.blocks.len(), 4);
assert_eq!(
plan.blocks.iter().map(|block| block.recovered_block().number()).collect::<Vec<_>>(),
vec![2, 3, 4, 5]
);
assert_eq!(plan.last_block(), Some(blocks[5].recovered_block().num_hash()));
}
#[test]
fn test_get_save_blocks_plan_persists_full_region_before_deferred_tail() {
let chain_spec = MAINNET.clone();
let mut test_harness = TestHarness::new(chain_spec);
let mut test_block_builder = TestBlockBuilder::eth();
let blocks: Vec<_> = test_block_builder.get_executed_blocks(0..31).collect();
test_harness = test_harness.with_blocks(blocks.clone());
test_harness.tree.persistence_state.last_state_trie_persisted_block =
blocks[12].recovered_block().num_hash();
test_harness.tree.persistence_state.last_persisted_block =
blocks[15].recovered_block().num_hash();
test_harness.tree.config = TreeConfig::default()
.with_persistence_threshold(5)
.with_memory_block_buffer_target(2)
.with_num_state_masking_blocks(2);
let plan = test_harness.tree.get_save_blocks_plan(PersistTarget::Threshold).unwrap();
assert_plan_steps(
&plan,
&[(0..3, Some(14..16), false), (3..14, Some(14..16), true), (14..16, None, true)],
);
assert_eq!(plan.blocks.len(), 16);
assert_eq!(
plan.blocks.iter().map(|block| block.recovered_block().number()).collect::<Vec<_>>(),
(13..=28).collect::<Vec<_>>()
);
assert_eq!(plan.last_block(), Some(blocks[28].recovered_block().num_hash()));
}
#[test]
fn test_on_persistence_complete_retains_blocks_above_partial_state_trie() {
let chain_spec = MAINNET.clone();
let mut test_harness = TestHarness::new(chain_spec);
let mut test_block_builder = TestBlockBuilder::eth();
let blocks: Vec<_> = test_block_builder.get_executed_blocks(0..7).collect();
test_harness = test_harness.with_blocks(blocks.clone());
test_harness.tree.persistence_state.last_persisted_block =
blocks[1].recovered_block().num_hash();
test_harness.tree.persistence_state.last_state_trie_persisted_block =
blocks[1].recovered_block().num_hash();
let persisted_tip = blocks[5].recovered_block().num_hash();
let last_state_trie_block = blocks[3].recovered_block().number();
test_harness
.tree
.on_persistence_complete(
PersistenceResult {
last_block: Some(persisted_tip),
last_state_trie_block: Some(last_state_trie_block),
commit_duration: Some(Duration::ZERO),
},
Instant::now(),
)
.unwrap();
assert_eq!(test_harness.tree.persistence_state.last_persisted_block, persisted_tip);
assert_eq!(
test_harness.tree.persistence_state.last_state_trie_persisted_block,
blocks[3].recovered_block().num_hash()
);
assert_eq!(
test_harness.tree.canonical_in_memory_state.get_persisted_num_hash(),
Some(persisted_tip)
);
for block in &blocks[..=last_state_trie_block as usize] {
assert!(test_harness
.tree
.state
.tree_state
.executed_block_by_hash(block.recovered_block().hash())
.is_none());
assert!(test_harness
.tree
.canonical_in_memory_state
.state_by_number(block.recovered_block().number())
.is_none());
}
for block in &blocks[last_state_trie_block as usize + 1..] {
assert!(test_harness
.tree
.state
.tree_state
.executed_block_by_hash(block.recovered_block().hash())
.is_some());
assert!(test_harness
.tree
.canonical_in_memory_state
.state_by_number(block.recovered_block().number())
.is_some());
}
}
#[test]
fn test_on_persistence_complete_without_partial_state_trie_prunes_through_tip() {
let chain_spec = MAINNET.clone();
let mut test_harness = TestHarness::new(chain_spec);
let mut test_block_builder = TestBlockBuilder::eth();
let blocks: Vec<_> = test_block_builder.get_executed_blocks(0..7).collect();
test_harness = test_harness.with_blocks(blocks.clone());
test_harness.tree.persistence_state.last_persisted_block =
blocks[1].recovered_block().num_hash();
test_harness.tree.persistence_state.last_state_trie_persisted_block =
blocks[1].recovered_block().num_hash();
let persisted_tip = blocks[5].recovered_block().num_hash();
test_harness
.tree
.on_persistence_complete(
PersistenceResult {
last_block: Some(persisted_tip),
last_state_trie_block: None,
commit_duration: Some(Duration::ZERO),
},
Instant::now(),
)
.unwrap();
for block in &blocks[..=persisted_tip.number as usize] {
assert!(test_harness
.tree
.state
.tree_state
.executed_block_by_hash(block.recovered_block().hash())
.is_none());
assert!(test_harness
.tree
.canonical_in_memory_state
.state_by_number(block.recovered_block().number())
.is_none());
}
for block in &blocks[persisted_tip.number as usize + 1..] {
assert!(test_harness
.tree
.state
.tree_state
.executed_block_by_hash(block.recovered_block().hash())
.is_some());
assert!(test_harness
.tree
.canonical_in_memory_state
.state_by_number(block.recovered_block().number())
.is_some());
}
}
#[tokio::test]
async fn test_engine_tree_fcu_missing_head() {
let chain_spec = MAINNET.clone();
@@ -2112,15 +2315,18 @@ mod forkchoice_updated_tests {
break;
}
if let Ok(PersistenceAction::SaveBlocks(saved_blocks, sender)) =
if let Ok(PersistenceAction::SaveBlocks(plan, sender)) =
action_rx.recv_timeout(std::time::Duration::from_millis(100))
{
if let Some(last) = saved_blocks.last() {
if let Some(last) = plan.last_block() {
last_persisted_number = last.number;
} else if let Some(last) = plan.blocks.last() {
last_persisted_number = last.recovered_block().number;
}
sender
.send(PersistenceResult {
last_block: saved_blocks.last().map(|b| b.recovered_block().num_hash()),
last_block: plan.last_block(),
last_state_trie_block: plan.last_block().map(|tip| tip.number),
commit_duration: Some(Duration::ZERO),
})
.unwrap();

View File

@@ -66,8 +66,8 @@ use reth_node_metrics::{
};
use reth_provider::{
providers::{NodeTypesForProvider, ProviderNodeTypes, RocksDBProvider, StaticFileProvider},
BlockHashReader, BlockNumReader, ProviderError, ProviderFactory, ProviderResult,
RocksDBProviderFactory, StageCheckpointReader, StaticFileProviderBuilder,
BlockHashReader, BlockNumReader, DatabaseProviderFactory, ProviderError, ProviderFactory,
ProviderResult, RocksDBProviderFactory, StageCheckpointReader, StaticFileProviderBuilder,
StaticFileProviderFactory,
};
use reth_prune::{PruneModes, PrunerBuilder};
@@ -75,7 +75,7 @@ use reth_rpc_builder::config::RethRpcServerConfig;
use reth_rpc_layer::JwtSecret;
use reth_stages::{
sets::DefaultStages, stages::EraImportSource, MetricEvent, PipelineBuilder, PipelineTarget,
StageId,
StageCheckpoint, StageId,
};
use reth_static_file::StaticFileProducer;
use reth_tasks::TaskExecutor;
@@ -518,19 +518,26 @@ where
// the unwind targets for each storage layer if inconsistencies are
// found.
let (rocksdb_unwind, static_file_unwind) = factory.check_consistency()?;
let partial_trie_unwind = partial_trie_unwind_target(
factory.database_provider_ro()?.get_stage_checkpoint(StageId::Finish)?,
);
// Take the minimum block number to ensure all storage layers are consistent.
let unwind_target = [rocksdb_unwind, static_file_unwind].into_iter().flatten().min();
let unwind_target =
[rocksdb_unwind, static_file_unwind, partial_trie_unwind].into_iter().flatten().min();
if let Some(unwind_block) = unwind_target {
let inconsistency_source = [
rocksdb_unwind.map(|_| "RocksDB"),
static_file_unwind.map(|_| "static file"),
partial_trie_unwind.map(|_| "partial state trie"),
]
.into_iter()
.flatten()
.collect::<Vec<_>>()
.join(" and ");
// Highly unlikely to happen, and given its destructive nature, it's better to panic
// instead. Unwinding to 0 would leave MDBX with a huge free list size.
let inconsistency_source = match (rocksdb_unwind, static_file_unwind) {
(Some(_), Some(_)) => "RocksDB and static file",
(Some(_), None) => "RocksDB",
(None, Some(_)) => "static file",
(None, None) => unreachable!(),
};
assert_ne!(
unwind_block, 0,
"A {} inconsistency was found that would trigger an unwind to block 0",
@@ -1269,11 +1276,19 @@ pub fn metrics_hooks<N: NodeTypesWithDB>(provider_factory: &ProviderFactory<N>)
.build()
}
fn partial_trie_unwind_target(finish_checkpoint: Option<StageCheckpoint>) -> Option<BlockNumber> {
let finish_checkpoint = finish_checkpoint?;
let partial_state_trie = finish_checkpoint.finish_stage_checkpoint()?.partial_state_trie?;
(partial_state_trie != finish_checkpoint.block_number).then_some(partial_state_trie)
}
#[cfg(test)]
mod tests {
use super::{LaunchContext, NodeConfig};
use super::{partial_trie_unwind_target, LaunchContext, NodeConfig};
use reth_config::Config;
use reth_node_core::args::PruningArgs;
use reth_stages::{FinishCheckpoint, StageCheckpoint};
const EXTENSION: &str = "toml";
@@ -1325,4 +1340,24 @@ mod tests {
assert_eq!(reth_config, loaded_config);
})
}
#[test]
fn partial_trie_unwind_target_uses_partial_finish_checkpoint() {
let finish_checkpoint = StageCheckpoint::new(42)
.with_finish_stage_checkpoint(FinishCheckpoint { partial_state_trie: Some(21) });
assert_eq!(partial_trie_unwind_target(Some(finish_checkpoint)), Some(21));
}
#[test]
fn partial_trie_unwind_target_ignores_matching_or_missing_partial_checkpoint() {
let matching_finish_checkpoint = StageCheckpoint::new(42)
.with_finish_stage_checkpoint(FinishCheckpoint { partial_state_trie: Some(42) });
let missing_partial_finish_checkpoint = StageCheckpoint::new(42)
.with_finish_stage_checkpoint(FinishCheckpoint { partial_state_trie: None });
assert_eq!(partial_trie_unwind_target(Some(matching_finish_checkpoint)), None);
assert_eq!(partial_trie_unwind_target(Some(missing_partial_finish_checkpoint)), None);
assert_eq!(partial_trie_unwind_target(None), None);
}
}

View File

@@ -4,9 +4,9 @@ use clap::{builder::Resettable, Args};
use eyre::ensure;
use reth_cli_util::{parse_duration_from_secs_or_ms, parsers::format_duration_as_secs_or_ms};
use reth_engine_primitives::{
TreeConfig, DEFAULT_INVALID_HEADER_HIT_EVICTION_THRESHOLD, DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE,
DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD, DEFAULT_SPARSE_TRIE_MAX_HOT_ACCOUNTS,
DEFAULT_SPARSE_TRIE_MAX_HOT_SLOTS,
default_persistence_backpressure_threshold, TreeConfig, DEFAULT_DEFERRED_TRIE_BLOCKS,
DEFAULT_INVALID_HEADER_HIT_EVICTION_THRESHOLD, DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE,
DEFAULT_SPARSE_TRIE_MAX_HOT_ACCOUNTS, DEFAULT_SPARSE_TRIE_MAX_HOT_SLOTS,
};
use std::{sync::OnceLock, time::Duration};
@@ -24,7 +24,8 @@ static ENGINE_DEFAULTS: OnceLock<DefaultEngineValues> = OnceLock::new();
#[derive(Debug, Clone)]
pub struct DefaultEngineValues {
persistence_threshold: u64,
persistence_backpressure_threshold: u64,
persistence_backpressure_threshold: Option<u64>,
deferred_trie_blocks: u64,
memory_block_buffer_target: u64,
invalid_header_hit_eviction_threshold: u8,
legacy_state_root_task_enabled: bool,
@@ -73,9 +74,26 @@ impl DefaultEngineValues {
self
}
/// Get the default persistence backpressure threshold.
pub const fn persistence_backpressure_threshold(&self) -> u64 {
match self.persistence_backpressure_threshold {
Some(v) => v,
None => default_persistence_backpressure_threshold(
self.persistence_threshold,
self.memory_block_buffer_target,
),
}
}
/// Set the default persistence backpressure threshold
pub const fn with_persistence_backpressure_threshold(mut self, v: u64) -> Self {
self.persistence_backpressure_threshold = v;
self.persistence_backpressure_threshold = Some(v);
self
}
/// Set the default deferred trie block target
pub const fn with_deferred_trie_blocks(mut self, v: u64) -> Self {
self.deferred_trie_blocks = v;
self
}
@@ -261,7 +279,8 @@ impl Default for DefaultEngineValues {
fn default() -> Self {
Self {
persistence_threshold: DEFAULT_PERSISTENCE_THRESHOLD,
persistence_backpressure_threshold: DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD,
persistence_backpressure_threshold: None,
deferred_trie_blocks: DEFAULT_DEFERRED_TRIE_BLOCKS,
memory_block_buffer_target: DEFAULT_MEMORY_BLOCK_BUFFER_TARGET,
invalid_header_hit_eviction_threshold: DEFAULT_INVALID_HEADER_HIT_EVICTION_THRESHOLD,
legacy_state_root_task_enabled: false,
@@ -311,9 +330,14 @@ pub struct EngineArgs {
/// Configure the maximum canonical-minus-persisted gap before engine API processing stalls.
///
/// This value must be greater than `--engine.persistence-threshold`.
#[arg(long = "engine.persistence-backpressure-threshold", default_value_t = DefaultEngineValues::get_global().persistence_backpressure_threshold)]
#[arg(long = "engine.persistence-backpressure-threshold", default_value_t = DefaultEngineValues::get_global().persistence_backpressure_threshold())]
pub persistence_backpressure_threshold: u64,
/// Configure how many of the blocks being persisted should only mask state/trie writes instead
/// of durably persisting their state/trie updates in the current cycle.
#[arg(long = "engine.deferred-trie-blocks", default_value_t = DefaultEngineValues::get_global().deferred_trie_blocks)]
pub deferred_trie_blocks: u64,
/// Configure the target number of blocks to keep in memory.
#[arg(long = "engine.memory-block-buffer-target", default_value_t = DefaultEngineValues::get_global().memory_block_buffer_target)]
pub memory_block_buffer_target: u64,
@@ -546,6 +570,7 @@ impl Default for EngineArgs {
let DefaultEngineValues {
persistence_threshold,
persistence_backpressure_threshold,
deferred_trie_blocks,
memory_block_buffer_target,
invalid_header_hit_eviction_threshold,
legacy_state_root_task_enabled,
@@ -578,7 +603,15 @@ impl Default for EngineArgs {
} = DefaultEngineValues::get_global().clone();
Self {
persistence_threshold,
persistence_backpressure_threshold,
persistence_backpressure_threshold: persistence_backpressure_threshold.unwrap_or_else(
|| {
default_persistence_backpressure_threshold(
persistence_threshold,
memory_block_buffer_target,
)
},
),
deferred_trie_blocks,
memory_block_buffer_target,
invalid_header_hit_eviction_threshold,
legacy_state_root_task_enabled,
@@ -630,6 +663,13 @@ impl EngineArgs {
self.persistence_backpressure_threshold,
self.persistence_threshold
);
ensure!(
self.deferred_trie_blocks + self.memory_block_buffer_target < self.persistence_threshold,
"--engine.deferred-trie-blocks ({}) + --engine.memory-block-buffer-target ({}) must be less than --engine.persistence-threshold ({})",
self.deferred_trie_blocks,
self.memory_block_buffer_target,
self.persistence_threshold,
);
Ok(())
}
@@ -638,6 +678,7 @@ impl EngineArgs {
let config = TreeConfig::default()
.with_persistence_threshold(self.persistence_threshold)
.with_persistence_backpressure_threshold(self.persistence_backpressure_threshold)
.with_num_state_masking_blocks(self.deferred_trie_blocks)
.with_memory_block_buffer_target(self.memory_block_buffer_target)
.with_invalid_header_hit_eviction_threshold(self.invalid_header_hit_eviction_threshold)
.with_legacy_state_root(self.legacy_state_root_task_enabled)
@@ -695,12 +736,48 @@ mod tests {
assert_eq!(args, default_args);
}
#[test]
fn default_engine_values_derive_backpressure_threshold() {
let defaults = DefaultEngineValues::default()
.with_persistence_threshold(10)
.with_memory_block_buffer_target(3);
assert_eq!(defaults.persistence_backpressure_threshold(), 26);
}
#[test]
fn explicit_backpressure_default_override_is_preserved() {
let defaults = DefaultEngineValues::default()
.with_persistence_backpressure_threshold(99)
.with_persistence_threshold(10)
.with_memory_block_buffer_target(3);
assert_eq!(defaults.persistence_backpressure_threshold(), 99);
}
#[test]
fn engine_args_default_thresholds_match_expected_defaults() {
let args = EngineArgs::default();
assert_eq!(args.persistence_threshold, DEFAULT_PERSISTENCE_THRESHOLD);
assert_eq!(args.deferred_trie_blocks, DEFAULT_DEFERRED_TRIE_BLOCKS);
assert_eq!(args.memory_block_buffer_target, DEFAULT_MEMORY_BLOCK_BUFFER_TARGET);
assert_eq!(
args.persistence_backpressure_threshold,
default_persistence_backpressure_threshold(
args.persistence_threshold,
args.memory_block_buffer_target,
)
);
}
#[test]
#[allow(deprecated)]
fn engine_args() {
let args = EngineArgs {
persistence_threshold: 100,
persistence_backpressure_threshold: 101,
deferred_trie_blocks: 25,
memory_block_buffer_target: 50,
invalid_header_hit_eviction_threshold: 7,
legacy_state_root_task_enabled: true,
@@ -745,6 +822,8 @@ mod tests {
"100",
"--engine.persistence-backpressure-threshold",
"101",
"--engine.deferred-trie-blocks",
"25",
"--engine.memory-block-buffer-target",
"50",
"--engine.invalid-header-cache-hit-eviction-threshold",
@@ -788,6 +867,21 @@ mod tests {
assert_eq!(parsed_args, args);
}
#[test]
fn test_parse_deferred_trie_blocks() {
let args = CommandParser::<EngineArgs>::parse_from([
"reth",
"--engine.persistence-threshold",
"8",
"--engine.deferred-trie-blocks",
"7",
])
.args;
assert_eq!(args.deferred_trie_blocks, 7);
assert_eq!(args.tree_config().num_state_masking_blocks(), 7);
}
#[test]
fn validate_rejects_invalid_backpressure_threshold() {
let args = EngineArgs {
@@ -801,6 +895,21 @@ mod tests {
assert!(err.contains("engine.persistence-threshold"));
}
#[test]
fn validate_rejects_state_masking_window_at_or_above_threshold() {
let args = EngineArgs {
persistence_threshold: 4,
deferred_trie_blocks: 2,
memory_block_buffer_target: 2,
..EngineArgs::default()
};
let err = args.validate().unwrap_err().to_string();
assert!(err.contains("engine.deferred-trie-blocks"));
assert!(err.contains("engine.memory-block-buffer-target"));
assert!(err.contains("engine.persistence-threshold"));
}
#[test]
fn test_parse_slow_block_threshold() {
// Test default value (None - disabled)

View File

@@ -295,7 +295,8 @@ mod tests {
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed, // 1 seeded block body + batch size
total // seeded headers
}))
})),
..
}, done: false }) if block_number < 200 &&
processed == batch_size + 1 && total == previous_stage + 1
);
@@ -333,7 +334,8 @@ mod tests {
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total
}))
})),
..
},
done: true
}) if processed + 1 == total && total == previous_stage + 1
@@ -370,7 +372,8 @@ mod tests {
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total
}))
})),
..
}, done: false }) if block_number >= 10 &&
processed - 1 == batch_size && total == previous_stage + 1
);
@@ -391,7 +394,8 @@ mod tests {
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total
}))
})),
..
}, done: true }) if block_number > first_run_checkpoint.block_number &&
processed + 1 == total && total == previous_stage + 1
);
@@ -432,7 +436,8 @@ mod tests {
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total
}))
})),
..
}, done: true }) if block_number == previous_stage &&
processed + 1 == total && total == previous_stage + 1
);
@@ -460,7 +465,8 @@ mod tests {
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed: 1,
total
}))
})),
..
}}) if total == previous_stage + 1
);

View File

@@ -298,7 +298,7 @@ mod tests {
assert_matches!(
output,
Ok(ExecOutput {
checkpoint: StageCheckpoint { block_number, stage_checkpoint: None },
checkpoint: StageCheckpoint { block_number, stage_checkpoint: None, .. },
done: false
}) if block_number == era_cap
);
@@ -318,7 +318,7 @@ mod tests {
assert_matches!(
output,
Ok(ExecOutput {
checkpoint: StageCheckpoint { block_number, stage_checkpoint: None },
checkpoint: StageCheckpoint { block_number, stage_checkpoint: None, .. },
done: true
}) if block_number == target
);

View File

@@ -1015,7 +1015,8 @@ mod tests {
processed,
total
}
}))
})),
..
},
done: true
} if processed == total && total == block.gas_used);
@@ -1170,7 +1171,8 @@ mod tests {
processed: 0,
total
}
}))
})),
..
}
} if total == block.gas_used);

View File

@@ -397,6 +397,7 @@ mod tests {
},
..
})),
..
},
done: true,
}) if block_number == previous_stage &&

View File

@@ -594,7 +594,8 @@ mod tests {
processed,
total,
}
}))
})),
..
}, done: true }) if block_number == tip.number &&
from == checkpoint && to == previous_stage &&
// -1 because we don't need to download the local head
@@ -666,7 +667,8 @@ mod tests {
processed,
total,
}
}))
})),
..
}, done: true }) if block_number == tip.number &&
from == checkpoint && to == previous_stage &&
// -1 because we don't need to download the local head

View File

@@ -502,7 +502,8 @@ mod tests {
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total
}))
})),
..
},
done: true
}) if block_number == previous_stage && processed == total &&
@@ -542,7 +543,8 @@ mod tests {
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total
}))
})),
..
},
done: true
}) if block_number == previous_stage && processed == total &&
@@ -584,7 +586,8 @@ mod tests {
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total
}))
})),
..
},
done: true
}) if block_number == previous_stage && processed == total &&

View File

@@ -527,7 +527,8 @@ mod tests {
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed: 1,
total: 1
}))
})),
..
}, done: true }) if block_number == previous_stage
);

View File

@@ -337,12 +337,12 @@ mod tests {
result,
Ok(ExecOutput {
checkpoint: StageCheckpoint {
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total
}))
}, done: true }) if block_number == previous_stage && processed == total &&
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total
}))
}, done: true }) if block_number == previous_stage && processed == total &&
total == runner.db.count_entries::<tables::Transactions>().unwrap() as u64
);
@@ -383,12 +383,12 @@ mod tests {
result,
Ok(ExecOutput {
checkpoint: StageCheckpoint {
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total
}))
}, done: true }) if block_number == previous_stage && processed == total &&
block_number,
stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
processed,
total
}))
}, done: true }) if block_number == previous_stage && processed == total &&
total == runner.db.count_entries::<tables::Transactions>().unwrap() as u64
);

View File

@@ -379,6 +379,9 @@ pub struct StageCheckpoint {
pub stage_checkpoint: Option<StageUnitCheckpoint>,
}
#[cfg(any(test, feature = "reth-codec"))]
reth_codecs::impl_compression_for_compact!(StageCheckpoint);
impl StageCheckpoint {
/// Creates a new [`StageCheckpoint`] with only `block_number` set.
pub fn new(block_number: BlockNumber) -> Self {
@@ -431,13 +434,21 @@ impl StageCheckpoint {
progress: entities,
..
}) => Some(entities),
StageUnitCheckpoint::MerkleChangeSets(_) => None,
StageUnitCheckpoint::MerkleChangeSets(_) | StageUnitCheckpoint::Finish(_) => None,
}
}
}
#[cfg(any(test, feature = "reth-codec"))]
reth_codecs::impl_compression_for_compact!(StageCheckpoint);
/// Saves the progress of the Finish stage.
#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)]
#[cfg_attr(any(test, feature = "test-utils"), derive(arbitrary::Arbitrary))]
#[cfg_attr(any(test, feature = "reth-codec"), derive(reth_codecs::Compact))]
#[cfg_attr(any(test, feature = "reth-codec"), reth_codecs::add_arbitrary_tests(compact))]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct FinishCheckpoint {
/// The highest block with a partially persisted state and trie.
pub partial_state_trie: Option<BlockNumber>,
}
// TODO(alexey): add a merkle checkpoint. Currently it's hard because [`MerkleCheckpoint`]
// is not a Copy type.
@@ -465,6 +476,8 @@ pub enum StageUnitCheckpoint {
/// Note: This variant is only kept for backward compatibility with the Compact codec.
/// The `MerkleChangeSets` stage has been removed.
MerkleChangeSets(MerkleChangeSetsCheckpoint),
/// Saves the progress of the Finish stage.
Finish(FinishCheckpoint),
}
impl StageUnitCheckpoint {
@@ -573,6 +586,15 @@ stage_unit_checkpoints!(
index_history_stage_checkpoint,
/// Sets the stage checkpoint to index history.
with_index_history_stage_checkpoint
),
(
6,
Finish,
FinishCheckpoint,
/// Returns the finish stage checkpoint, if any.
finish_stage_checkpoint,
/// Sets the stage checkpoint to finish.
with_finish_stage_checkpoint
)
);
@@ -664,4 +686,15 @@ mod tests {
let (decoded, _) = MerkleCheckpoint::from_compact(&buf, encoded);
assert_eq!(decoded, checkpoint);
}
#[test]
fn finish_checkpoint_roundtrip() {
let checkpoint = StageCheckpoint::new(42)
.with_finish_stage_checkpoint(FinishCheckpoint { partial_state_trie: Some(21) });
let mut buf = Vec::new();
let encoded = checkpoint.to_compact(&mut buf);
let (decoded, _) = StageCheckpoint::from_compact(&buf, encoded);
assert_eq!(decoded, checkpoint);
}
}

View File

@@ -18,7 +18,7 @@ pub use id::StageId;
mod checkpoints;
pub use checkpoints::{
AccountHashingCheckpoint, CheckpointBlockRange, EntitiesCheckpoint, ExecutionCheckpoint,
HeadersCheckpoint, IndexHistoryCheckpoint, MerkleCheckpoint, StageCheckpoint,
FinishCheckpoint, HeadersCheckpoint, IndexHistoryCheckpoint, MerkleCheckpoint, StageCheckpoint,
StageUnitCheckpoint, StorageHashingCheckpoint, StorageRootMerkleCheckpoint,
};

View File

@@ -24,8 +24,8 @@ pub mod providers;
pub use providers::{
DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, HistoricalStateProvider,
HistoricalStateProviderRef, LatestStateProvider, LatestStateProviderRef, ProviderFactory,
PruneShardOutcome, PrunedIndices, SaveBlocksMode, StaticFileAccess, StaticFileProviderBuilder,
StaticFileWriteCtx, StaticFileWriter,
PruneShardOutcome, PrunedIndices, SaveBlocksMode, SaveBlocksPlan, SaveBlocksPlanStep,
StaticFileAccess, StaticFileProviderBuilder, StaticFileWriteCtx, StaticFileWriter,
};
pub mod changeset_walker;

View File

@@ -790,7 +790,8 @@ mod tests {
create_test_provider_factory, create_test_provider_factory_with_chain_spec,
MockNodeTypesWithDB,
},
BlockWriter, CanonChainTracker, ProviderFactory, SaveBlocksMode,
BlockWriter, CanonChainTracker, ProviderFactory, SaveBlocksMode, SaveBlocksPlan,
SaveBlocksPlanStep,
};
use alloy_eips::{BlockHashOrNumber, BlockNumHash, BlockNumberOrTag};
use alloy_primitives::{BlockNumber, TxNumber, B256};
@@ -1007,7 +1008,15 @@ mod tests {
// Push to disk
let provider_rw = hook_provider.database_provider_rw().unwrap();
provider_rw.save_blocks(vec![lowest_memory_block], SaveBlocksMode::Full).unwrap();
provider_rw
.save_blocks(
&SaveBlocksPlan::new(
vec![lowest_memory_block],
vec![SaveBlocksPlanStep::new(0..1, Some(1..1), true)],
),
SaveBlocksMode::Full,
)
.unwrap();
provider_rw.commit().unwrap();
// Remove from memory

View File

@@ -51,6 +51,9 @@ pub use provider::{
CommitOrder, DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, SaveBlocksMode,
};
mod save_blocks;
pub use save_blocks::{SaveBlocksPlan, SaveBlocksPlanStep};
use super::ProviderNodeTypes;
use reth_trie::KeccakKeyHasher;

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,90 @@
use alloy_eips::BlockNumHash;
use reth_chain_state::ExecutedBlock;
use reth_ethereum_primitives::EthPrimitives;
use reth_primitives_traits::NodePrimitives;
use std::ops::Range;
/// A single persistence step over a contiguous region of [`SaveBlocksPlan::blocks`].
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SaveBlocksPlanStep {
/// Range of [`SaveBlocksPlan::blocks`] covered by this step.
pub block_range: Range<usize>,
/// Optional range of blocks whose state/trie updates should be used to mask this step's
/// durable state/trie writes.
///
/// `Some(empty_range)` means persist state/trie without any masking. `None` means skip
/// durable state/trie persistence for this step.
pub state_trie_masking_range: Option<Range<usize>>,
/// Whether to persist non-state/trie data for this step.
pub persist_rest: bool,
}
impl SaveBlocksPlanStep {
/// Creates a new persistence step.
pub const fn new(
block_range: Range<usize>,
state_trie_masking_range: Option<Range<usize>>,
persist_rest: bool,
) -> Self {
Self { block_range, state_trie_masking_range, persist_rest }
}
/// Returns `true` if this step persists state/trie data.
pub const fn persists_state_trie(&self) -> bool {
self.state_trie_masking_range.is_some()
}
}
/// Plan for a single `save_blocks` persistence cycle.
#[derive(Debug, Clone)]
pub struct SaveBlocksPlan<N: NodePrimitives = EthPrimitives> {
/// Canonical blocks covered by this plan.
pub blocks: Vec<ExecutedBlock<N>>,
/// Ordered persistence steps over [`Self::blocks`].
pub steps: Vec<SaveBlocksPlanStep>,
}
impl<N: NodePrimitives> SaveBlocksPlan<N> {
/// Creates a new save plan.
pub const fn new(blocks: Vec<ExecutedBlock<N>>, steps: Vec<SaveBlocksPlanStep>) -> Self {
Self { blocks, steps }
}
/// Returns `true` if the plan contains no blocks to persist.
pub fn is_empty(&self) -> bool {
self.last_block().is_none()
}
/// Returns the highest block covered by this plan.
pub fn last_block(&self) -> Option<BlockNumHash> {
let last_index =
self.steps.iter().rev().find_map(|step| step.block_range.end.checked_sub(1))?;
self.blocks.get(last_index).map(|block| block.recovered_block().num_hash())
}
/// Returns the highest block whose state/trie data is durably persisted by this plan.
pub fn last_state_trie_block(&self) -> Option<BlockNumHash> {
let last_index = self
.steps
.iter()
.rev()
.find(|step| step.persists_state_trie())?
.block_range
.end
.checked_sub(1)?;
self.blocks.get(last_index).map(|block| block.recovered_block().num_hash())
}
/// Returns the contiguous range of blocks whose non-state/trie outputs are persisted.
pub fn persist_rest_range(&self) -> Option<Range<usize>> {
let mut ranges =
self.steps.iter().filter(|step| step.persist_rest).map(|step| &step.block_range);
let first = ranges.next()?.clone();
let merged = ranges.fold(first, |mut merged, range| {
debug_assert_eq!(merged.end, range.start, "persist_rest steps must be contiguous");
merged.end = range.end;
merged
});
Some(merged)
}
}

View File

@@ -212,24 +212,33 @@ impl<N: NodePrimitives> OverlayBuilder<N> {
.ok_or(ProviderError::BlockHashNotFound(self.anchor_hash))
}
/// Returns the block which is at the tip of the DB, i.e. the block which the state tables of
/// the DB are currently synced to.
fn get_db_tip_block<Provider>(&self, provider: &Provider) -> ProviderResult<BlockNumHash>
/// Returns the highest blocks whose state/trie data and non-state/trie data are durably
/// available in the database.
fn get_db_tip_blocks<Provider>(
&self,
provider: &Provider,
) -> ProviderResult<(BlockNumHash, BlockNumHash)>
where
Provider: StageCheckpointReader + BlockNumReader,
{
let block_number = provider
.get_stage_checkpoint(StageId::Finish)?
.as_ref()
.map(|chk| chk.block_number)
.ok_or_else(|| ProviderError::InsufficientChangesets {
requested: 0,
available: 0..=0,
})?;
let hash = provider
let checkpoint = provider.get_stage_checkpoint(StageId::Finish)?.ok_or_else(|| {
ProviderError::InsufficientChangesets { requested: 0, available: 0..=0 }
})?;
let block_number = checkpoint
.finish_stage_checkpoint()
.and_then(|finish| finish.partial_state_trie)
.unwrap_or(checkpoint.block_number);
let state_trie_tip_hash = provider
.convert_number(block_number.into())?
.ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?;
Ok(BlockNumHash::new(block_number, hash))
let finish_tip_number = checkpoint.block_number;
let finish_tip_hash = provider
.convert_number(finish_tip_number.into())?
.ok_or_else(|| ProviderError::HeaderNotFound(finish_tip_number.into()))?;
Ok((
BlockNumHash::new(block_number, state_trie_tip_hash),
BlockNumHash::new(finish_tip_number, finish_tip_hash),
))
}
/// Returns whether or not it is required to collect reverts, and validates that there are
@@ -240,22 +249,16 @@ impl<N: NodePrimitives> OverlayBuilder<N> {
fn reverts_required<Provider>(
&self,
provider: &Provider,
db_tip_block: BlockNumHash,
state_trie_tip_block: BlockNumHash,
finish_tip_block: BlockNumHash,
) -> ProviderResult<Option<RangeInclusive<BlockNumber>>>
where
Provider: BlockNumReader + PruneCheckpointReader,
{
// If the anchor is the DB tip then there won't be any reverts necessary.
if db_tip_block.hash == self.anchor_hash {
return Ok(None)
}
// If the DB tip has moved forward into the `LazyOverlay` then we still don't need to
// revert, the `LazyOverlay` will generate a new in-memory overlay using only the relevant
// blocks data.
if let Some(OverlaySource::Lazy(lazy_overlay)) = &self.overlay_source &&
lazy_overlay.has_anchor_hash(db_tip_block.hash)
{
// If the anchor is the current durable state/trie frontier then there won't be any
// reverts
// necessary.
if state_trie_tip_block.hash == self.anchor_hash {
return Ok(None)
}
@@ -270,7 +273,7 @@ impl<N: NodePrimitives> OverlayBuilder<N> {
.map(|block_number| block_number + 1)
.unwrap_or_default();
let available_range = lower_bound..=db_tip_block.number;
let available_range = lower_bound..=finish_tip_block.number;
// Check if the requested block is within the available range
if !available_range.contains(&anchor_number) {
@@ -280,20 +283,29 @@ impl<N: NodePrimitives> OverlayBuilder<N> {
});
}
Ok(Some(anchor_number + 1..=db_tip_block.number))
if anchor_number > state_trie_tip_block.number {
return Err(ProviderError::InsufficientChangesets {
requested: anchor_number,
available: lower_bound..=state_trie_tip_block.number,
})
}
Ok(Some(anchor_number + 1..=finish_tip_block.number))
}
/// Calculates a new [`Overlay`] given a transaction and the current db tip.
/// Calculates a new [`Overlay`] given a transaction and the current durable state/trie
/// frontier.
#[instrument(
level = "debug",
target = "providers::state::overlay",
skip_all,
fields(?db_tip_block, anchor_hash = ?self.anchor_hash)
fields(?state_trie_tip_block, ?finish_tip_block, anchor_hash = ?self.anchor_hash)
)]
fn calculate_overlay<Provider>(
&self,
provider: &Provider,
db_tip_block: BlockNumHash,
state_trie_tip_block: BlockNumHash,
finish_tip_block: BlockNumHash,
) -> ProviderResult<Overlay>
where
Provider: ChangeSetReader
@@ -314,7 +326,7 @@ impl<N: NodePrimitives> OverlayBuilder<N> {
// Collect any reverts which are required to bring the DB view back to the anchor hash.
let (trie_updates, hashed_post_state) = if let Some(revert_blocks) =
self.reverts_required(provider, db_tip_block)?
self.reverts_required(provider, state_trie_tip_block, finish_tip_block)?
{
debug!(
target: "providers::state::overlay",
@@ -383,9 +395,9 @@ impl<N: NodePrimitives> OverlayBuilder<N> {
(trie_updates, hashed_state_updates)
} else {
// If no reverts are needed then we can assume that the db tip is the anchor hash or
// overlaps with the `LazyOverlay`. Use overlays directly.
let (trie_updates, hashed_state) = self.resolve_overlays(db_tip_block.hash)?;
// If no reverts are needed then the requested anchor is exactly the durable
// state/trie frontier. Use overlays directly from that frontier.
let (trie_updates, hashed_state) = self.resolve_overlays(state_trie_tip_block.hash)?;
retrieve_trie_reverts_duration = Duration::ZERO;
retrieve_hashed_state_reverts_duration = Duration::ZERO;
@@ -420,8 +432,8 @@ impl<N: NodePrimitives> OverlayBuilder<N> {
+ BlockNumReader
+ StorageSettingsCache,
{
let db_tip_block = self.get_db_tip_block(provider)?;
self.calculate_overlay(provider, db_tip_block)
let (state_trie_tip_block, finish_tip_block) = self.get_db_tip_blocks(provider)?;
self.calculate_overlay(provider, state_trie_tip_block, finish_tip_block)
}
}
@@ -435,9 +447,11 @@ pub struct OverlayStateProviderFactory<F, N: NodePrimitives = EthPrimitives> {
factory: F,
/// Overlay builder containing the configuration and overlay calculation logic.
overlay_builder: OverlayBuilder<N>,
/// A cache which maps `db_tip -> Overlay`. If the db tip changes during usage of the factory
/// then a new entry will get added to this, but in most cases only one entry is present.
overlay_cache: Arc<DashMap<BlockHash, Overlay>>,
/// A cache which maps `(state_trie_tip_hash, finish_tip_hash) -> Overlay`.
///
/// Under partial persistence the overlay depends on both the durable trie frontier and the
/// fully durable Finish frontier, so both hashes are part of the cache key.
overlay_cache: Arc<DashMap<(BlockHash, BlockHash), Overlay>>,
}
impl<F, N: NodePrimitives> OverlayStateProviderFactory<F, N> {
@@ -470,8 +484,8 @@ impl<F, N: NodePrimitives> OverlayStateProviderFactory<F, N> {
self
}
/// Fetches an [`Overlay`] from the cache based on the current db tip block. If there is no
/// cached value then this calculates the [`Overlay`] and populates the cache.
/// Fetches an [`Overlay`] from the cache based on the current durable frontiers. If there is
/// no cached value then this calculates the [`Overlay`] and populates the cache.
#[instrument(level = "debug", target = "providers::state::overlay", skip_all)]
fn get_overlay<Provider>(&self, provider: &Provider) -> ProviderResult<Overlay>
where
@@ -483,17 +497,19 @@ impl<F, N: NodePrimitives> OverlayStateProviderFactory<F, N> {
+ BlockNumReader
+ StorageSettingsCache,
{
let db_tip_block = self.overlay_builder.get_db_tip_block(provider)?;
let (state_trie_tip_block, finish_tip_block) =
self.overlay_builder.get_db_tip_blocks(provider)?;
let overlay = match self.overlay_cache.entry(db_tip_block.hash) {
dashmap::Entry::Occupied(entry) => entry.get().clone(),
dashmap::Entry::Vacant(entry) => {
self.overlay_builder.metrics.overlay_cache_misses.increment(1);
let overlay = self.overlay_builder.build_overlay(provider)?;
entry.insert(overlay.clone());
overlay
}
};
let overlay =
match self.overlay_cache.entry((state_trie_tip_block.hash, finish_tip_block.hash)) {
dashmap::Entry::Occupied(entry) => entry.get().clone(),
dashmap::Entry::Vacant(entry) => {
self.overlay_builder.metrics.overlay_cache_misses.increment(1);
let overlay = self.overlay_builder.build_overlay(provider)?;
entry.insert(overlay.clone());
overlay
}
};
Ok(overlay)
}
@@ -651,3 +667,191 @@ where
hashed_cursor_factory.hashed_storage_cursor(hashed_address)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
test_utils::create_test_provider_factory, BlockWriter, SaveBlocksMode, SaveBlocksPlan,
SaveBlocksPlanStep,
};
use alloy_primitives::{B256, U256};
use reth_chain_state::{test_utils::TestBlockBuilder, ComputedTrieData, ExecutedBlock};
use reth_primitives_traits::Account;
use reth_stages_types::{FinishCheckpoint, StageCheckpoint};
use reth_storage_api::StageCheckpointWriter;
use reth_trie::{updates::TrieUpdatesSorted, HashedPostState, HashedStorage};
use std::sync::Arc;
fn full_save_plan(
blocks: impl IntoIterator<Item = ExecutedBlock<EthPrimitives>>,
) -> SaveBlocksPlan<EthPrimitives> {
let blocks = blocks.into_iter().collect::<Vec<_>>();
let full_range = 0..blocks.len();
SaveBlocksPlan::new(
blocks,
vec![SaveBlocksPlanStep::new(
full_range.clone(),
Some(full_range.end..full_range.end),
true,
)],
)
}
fn partial_save_plan(
blocks: impl IntoIterator<Item = ExecutedBlock<EthPrimitives>>,
steps: Vec<SaveBlocksPlanStep>,
) -> SaveBlocksPlan<EthPrimitives> {
SaveBlocksPlan::new(blocks.into_iter().collect(), steps)
}
fn with_unique_state(
block: &ExecutedBlock<EthPrimitives>,
id: u8,
) -> ExecutedBlock<EthPrimitives> {
let hashed_address = B256::with_last_byte(id);
let hashed_slot = B256::with_last_byte(id.saturating_add(32));
let hashed_state = HashedPostState::default()
.with_accounts([(hashed_address, Some(Account::default()))])
.with_storages([(
hashed_address,
HashedStorage::from_iter(false, [(hashed_slot, U256::from(id))]),
)])
.into_sorted();
ExecutedBlock::new(
Arc::clone(&block.recovered_block),
Arc::clone(&block.execution_output),
ComputedTrieData::without_trie_input(
Arc::new(hashed_state),
Arc::new(TrieUpdatesSorted::default()),
),
)
}
#[test]
fn build_overlay_uses_partial_trie_frontier_as_lazy_overlay_base() {
let factory = create_test_provider_factory();
let mut block_builder = TestBlockBuilder::eth();
let blocks = block_builder
.get_executed_blocks(0..5)
.enumerate()
.map(|(index, block)| with_unique_state(&block, index as u8 + 1))
.collect::<Vec<_>>();
let state_trie_tip = &blocks[1];
let finish_tip = &blocks[3];
let lazy_overlay_blocks = vec![blocks[4].clone(), blocks[3].clone(), blocks[2].clone()];
let provider_rw = factory.provider_rw().unwrap();
provider_rw.insert_block(blocks[0].recovered_block()).unwrap();
provider_rw.insert_block(state_trie_tip.recovered_block()).unwrap();
provider_rw.insert_block(blocks[2].recovered_block()).unwrap();
provider_rw.insert_block(finish_tip.recovered_block()).unwrap();
provider_rw
.save_stage_checkpoint(
StageId::Finish,
StageCheckpoint::new(finish_tip.block_number()).with_finish_stage_checkpoint(
FinishCheckpoint { partial_state_trie: Some(state_trie_tip.block_number()) },
),
)
.unwrap();
provider_rw.commit().unwrap();
let provider = factory.provider().unwrap();
let overlay = OverlayBuilder::<EthPrimitives>::new(
state_trie_tip.recovered_block().hash(),
ChangesetCache::new(),
)
.with_lazy_overlay(Some(LazyOverlay::new(lazy_overlay_blocks)))
.build_overlay(&provider)
.unwrap();
assert_eq!(overlay.hashed_post_state.accounts.len(), 3);
}
#[test]
fn build_overlay_rejects_anchor_between_state_trie_frontier_and_finish() {
let factory = create_test_provider_factory();
let mut block_builder = TestBlockBuilder::eth().with_state();
let genesis = block_builder.get_executed_blocks(0..1).next().unwrap();
let blocks = block_builder.get_executed_blocks(1..4).collect::<Vec<_>>();
let provider_rw = factory.provider_rw().unwrap();
provider_rw
.save_blocks(
&full_save_plan(std::slice::from_ref(&genesis).to_vec()),
SaveBlocksMode::Full,
)
.unwrap();
provider_rw.commit().unwrap();
let provider_rw = factory.provider_rw().unwrap();
provider_rw
.save_blocks(
&partial_save_plan(
blocks.clone(),
vec![
SaveBlocksPlanStep::new(0..1, Some(1..3), true),
SaveBlocksPlanStep::new(1..3, None, true),
],
),
SaveBlocksMode::Full,
)
.unwrap();
provider_rw.commit().unwrap();
let provider = factory.provider().unwrap();
let anchor = blocks[1].recovered_block().hash();
let err = OverlayBuilder::<EthPrimitives>::new(anchor, ChangesetCache::new())
.with_lazy_overlay(Some(LazyOverlay::new(vec![blocks[2].clone()])))
.build_overlay(&provider)
.unwrap_err();
assert!(matches!(err, ProviderError::InsufficientChangesets { .. }));
}
#[test]
fn build_overlay_rejects_finish_anchor_without_trie_bridge() {
let factory = create_test_provider_factory();
let mut block_builder = TestBlockBuilder::eth().with_state();
let genesis = block_builder.get_executed_blocks(0..1).next().unwrap();
let blocks = block_builder.get_executed_blocks(1..4).collect::<Vec<_>>();
let provider_rw = factory.provider_rw().unwrap();
provider_rw
.save_blocks(
&full_save_plan(std::slice::from_ref(&genesis).to_vec()),
SaveBlocksMode::Full,
)
.unwrap();
provider_rw.commit().unwrap();
let provider_rw = factory.provider_rw().unwrap();
provider_rw
.save_blocks(
&partial_save_plan(
blocks.clone(),
vec![
SaveBlocksPlanStep::new(0..1, Some(1..3), true),
SaveBlocksPlanStep::new(1..3, None, true),
],
),
SaveBlocksMode::Full,
)
.unwrap();
provider_rw.commit().unwrap();
let provider = factory.provider().unwrap();
let finish_anchor = blocks[2].recovered_block().hash();
let err = OverlayBuilder::<EthPrimitives>::new(finish_anchor, ChangesetCache::new())
.with_lazy_overlay(None)
.build_overlay(&provider)
.unwrap_err();
assert!(matches!(err, ProviderError::InsufficientChangesets { .. }));
}
}

View File

@@ -3,7 +3,7 @@ use core::ops::Not;
use crate::{
added_removed_keys::MultiAddedRemovedKeys,
prefix_set::{PrefixSetMut, TriePrefixSetsMut},
utils::{extend_sorted_vec, kway_merge_sorted},
utils::{extend_sorted_vec, kway_merge_disjoint_sorted, kway_merge_sorted},
KeyHasher, MultiProofTargets, Nibbles,
};
use alloc::{borrow::Cow, vec::Vec};
@@ -691,6 +691,100 @@ impl HashedPostStateSorted {
Self { accounts, storages }
}
/// Merges the batch and removes any overlapping keys present in the mask.
///
/// Account keys are masked at the top level, while storage entries are only masked at the slot
/// level unless the mask wipes the entire storage. For duplicate keys in the batch, later
/// items take precedence over earlier ones. The order of the mask does not matter.
pub fn disjointed_merge_batch<'a>(batch: Vec<&'a Self>, mask: Vec<&'a Self>) -> Self {
let accounts = kway_merge_disjoint_sorted(
batch.iter().map(|item| item.accounts.len()).sum(),
batch.iter().rev().map(|item| item.accounts.as_slice()),
mask.iter().map(|item| item.accounts.as_slice()),
);
struct StorageAcc<'a> {
wiped: bool,
sealed: bool,
slot_count: usize,
slices: Vec<&'a [(B256, U256)]>,
}
#[derive(Default)]
struct StorageMaskAcc<'a> {
wiped: bool,
slices: Vec<&'a [(B256, U256)]>,
}
let mut storages = B256Map::with_capacity_and_hasher(
batch.iter().map(|item| item.storages.len()).sum(),
Default::default(),
);
for item in batch.iter().rev() {
for (hashed_address, storage) in &item.storages {
let entry = storages.entry(*hashed_address).or_insert_with(|| StorageAcc {
wiped: false,
sealed: false,
slot_count: 0,
slices: Vec::new(),
});
if entry.sealed {
continue;
}
entry.slices.push(storage.storage_slots.as_slice());
entry.slot_count += storage.storage_slots.len();
if storage.wiped {
entry.wiped = true;
entry.sealed = true;
}
}
}
let mut storage_masks: B256Map<StorageMaskAcc<'a>> = B256Map::with_capacity_and_hasher(
mask.iter().map(|item| item.storages.len()).sum(),
Default::default(),
);
for item in mask {
for (hashed_address, storage) in &item.storages {
let entry = storage_masks.entry(*hashed_address).or_default();
if entry.wiped {
continue;
}
if storage.wiped {
entry.wiped = true;
entry.slices.clear();
} else {
entry.slices.push(storage.storage_slots.as_slice());
}
}
}
let storages = storages
.into_iter()
.filter_map(|(hashed_address, entry)| {
let storage_slots = match storage_masks.get(&hashed_address) {
Some(mask_entry) if mask_entry.wiped => return None,
Some(mask_entry) => kway_merge_disjoint_sorted(
entry.slot_count,
entry.slices,
mask_entry.slices.iter().copied(),
),
None => kway_merge_sorted(entry.slices),
};
(!storage_slots.is_empty() || entry.wiped).then_some((
hashed_address,
HashedStorageSorted { wiped: entry.wiped, storage_slots },
))
})
.collect();
Self { accounts, storages }
}
/// Clears all accounts and storage data.
pub fn clear(&mut self) {
self.accounts.clear();
@@ -1534,6 +1628,152 @@ mod tests {
assert_eq!(state.accounts.get(&addr1), Some(&None));
}
#[test]
fn test_hashed_post_state_sorted_disjointed_merge_batch() {
fn account(nonce: u64) -> Account {
Account { nonce, balance: U256::ZERO, bytecode_hash: None }
}
let kept_account = B256::with_last_byte(1);
let removed_account = B256::with_last_byte(2);
let kept_storage = B256::with_last_byte(3);
let removed_storage = B256::with_last_byte(4);
let slot1 = B256::with_last_byte(11);
let slot2 = B256::with_last_byte(12);
let older = HashedPostStateSorted::new(
vec![(kept_account, Some(account(1))), (removed_account, Some(account(10)))],
B256Map::from_iter([
(
kept_storage,
HashedStorageSorted {
wiped: false,
storage_slots: vec![(slot1, U256::from(1))],
},
),
(
removed_storage,
HashedStorageSorted {
wiped: false,
storage_slots: vec![(slot1, U256::from(2))],
},
),
]),
);
let newer = HashedPostStateSorted::new(
vec![(kept_account, Some(account(2)))],
B256Map::from_iter([(
kept_storage,
HashedStorageSorted {
wiped: false,
storage_slots: vec![(slot1, U256::from(3)), (slot2, U256::from(4))],
},
)]),
);
let remove_a = HashedPostStateSorted::new(
vec![(removed_account, None)],
B256Map::from_iter([
(
kept_storage,
HashedStorageSorted { wiped: false, storage_slots: vec![(slot2, U256::ZERO)] },
),
(removed_storage, HashedStorageSorted { wiped: true, storage_slots: vec![] }),
]),
);
let remove_b = HashedPostStateSorted::new(
vec![(B256::with_last_byte(255), Some(account(99)))],
B256Map::default(),
);
let result = HashedPostStateSorted::disjointed_merge_batch(
vec![&older, &newer],
vec![&remove_b, &remove_a],
);
assert_eq!(result.accounts, vec![(kept_account, Some(account(2)))]);
assert_eq!(result.storages.len(), 1);
assert_eq!(
result.storages.get(&kept_storage),
Some(&HashedStorageSorted {
wiped: false,
storage_slots: vec![(slot1, U256::from(3))],
})
);
assert!(!result.storages.contains_key(&removed_storage));
}
#[test]
fn test_hashed_post_state_sorted_disjointed_merge_batch_removes_overlapping_batch_key() {
fn account(nonce: u64) -> Account {
Account { nonce, balance: U256::ZERO, bytecode_hash: None }
}
let overlapping_account = B256::with_last_byte(21);
let overlapping_storage = B256::with_last_byte(22);
let slot = B256::with_last_byte(23);
let older = HashedPostStateSorted::new(
vec![(overlapping_account, Some(account(1)))],
B256Map::from_iter([(
overlapping_storage,
HashedStorageSorted { wiped: false, storage_slots: vec![(slot, U256::from(1))] },
)]),
);
let newer = HashedPostStateSorted::new(
vec![(overlapping_account, Some(account(2)))],
B256Map::from_iter([(
overlapping_storage,
HashedStorageSorted { wiped: false, storage_slots: vec![(slot, U256::from(2))] },
)]),
);
let remove = HashedPostStateSorted::new(
vec![(overlapping_account, None)],
B256Map::from_iter([(
overlapping_storage,
HashedStorageSorted { wiped: true, storage_slots: vec![] },
)]),
);
let result =
HashedPostStateSorted::disjointed_merge_batch(vec![&older, &newer], vec![&remove]);
assert!(result.accounts.is_empty());
assert!(result.storages.is_empty());
}
#[test]
fn test_hashed_post_state_sorted_disjointed_merge_batch_ignores_empty_storage_mask() {
let storage = B256::with_last_byte(31);
let slot = B256::with_last_byte(32);
let batch = HashedPostStateSorted::new(
vec![],
B256Map::from_iter([(
storage,
HashedStorageSorted { wiped: false, storage_slots: vec![(slot, U256::from(1))] },
)]),
);
let mask = HashedPostStateSorted::new(
vec![],
B256Map::from_iter([(
storage,
HashedStorageSorted { wiped: false, storage_slots: vec![] },
)]),
);
let result = HashedPostStateSorted::disjointed_merge_batch(vec![&batch], vec![&mask]);
assert_eq!(
result.storages.get(&storage),
Some(&HashedStorageSorted { wiped: false, storage_slots: vec![(slot, U256::from(1))] })
);
}
/// Test non-wiped storage merges both zero and non-zero valued slots
#[test]
fn test_hashed_storage_extend_from_sorted_non_wiped() {

View File

@@ -1,5 +1,5 @@
use crate::{
utils::{extend_sorted_vec, kway_merge_sorted},
utils::{extend_sorted_vec, kway_merge_disjoint_sorted, kway_merge_sorted},
BranchNodeCompact, HashBuilder, Nibbles,
};
use alloc::{
@@ -710,6 +710,101 @@ impl TrieUpdatesSorted {
Self { account_nodes, storage_tries }
}
/// Merges the batch and removes any overlapping keys present in the mask.
///
/// Account trie nodes are masked at the top level, while storage trie entries are only masked
/// at the node level unless the mask deletes the entire storage trie. For duplicate keys in
/// the batch, later items take precedence over earlier ones. The order of the mask does not
/// matter.
pub fn disjointed_merge_batch<'a>(batch: Vec<&'a Self>, mask: Vec<&'a Self>) -> Self {
let account_nodes = kway_merge_disjoint_sorted(
batch.iter().map(|item| item.account_nodes.len()).sum(),
batch.iter().rev().map(|item| item.account_nodes.as_slice()),
mask.iter().map(|item| item.account_nodes.as_slice()),
);
struct StorageAcc<'a> {
is_deleted: bool,
sealed: bool,
node_count: usize,
slices: Vec<&'a [(Nibbles, Option<BranchNodeCompact>)]>,
}
#[derive(Default)]
struct StorageMaskAcc<'a> {
is_deleted: bool,
slices: Vec<&'a [(Nibbles, Option<BranchNodeCompact>)]>,
}
let mut storage_tries = B256Map::with_capacity_and_hasher(
batch.iter().map(|item| item.storage_tries.len()).sum(),
Default::default(),
);
for item in batch.iter().rev() {
for (hashed_address, storage_trie) in &item.storage_tries {
let entry = storage_tries.entry(*hashed_address).or_insert_with(|| StorageAcc {
is_deleted: false,
sealed: false,
node_count: 0,
slices: Vec::new(),
});
if entry.sealed {
continue;
}
entry.slices.push(storage_trie.storage_nodes.as_slice());
entry.node_count += storage_trie.storage_nodes.len();
if storage_trie.is_deleted {
entry.is_deleted = true;
entry.sealed = true;
}
}
}
let mut storage_masks: B256Map<StorageMaskAcc<'a>> = B256Map::with_capacity_and_hasher(
mask.iter().map(|item| item.storage_tries.len()).sum(),
Default::default(),
);
for item in mask {
for (hashed_address, storage_trie) in &item.storage_tries {
let entry = storage_masks.entry(*hashed_address).or_default();
if entry.is_deleted {
continue;
}
if storage_trie.is_deleted {
entry.is_deleted = true;
entry.slices.clear();
} else {
entry.slices.push(storage_trie.storage_nodes.as_slice());
}
}
}
let storage_tries = storage_tries
.into_iter()
.filter_map(|(hashed_address, entry)| {
let storage_nodes = match storage_masks.get(&hashed_address) {
Some(mask_entry) if mask_entry.is_deleted => return None,
Some(mask_entry) => kway_merge_disjoint_sorted(
entry.node_count,
entry.slices,
mask_entry.slices.iter().copied(),
),
None => kway_merge_sorted(entry.slices),
};
(!storage_nodes.is_empty() || entry.is_deleted).then_some((
hashed_address,
StorageTrieUpdatesSorted { is_deleted: entry.is_deleted, storage_nodes },
))
})
.collect();
Self::new(account_nodes, storage_tries)
}
}
impl AsRef<Self> for TrieUpdatesSorted {
@@ -977,6 +1072,158 @@ mod tests {
assert_eq!(storage3.storage_nodes[1].0, Nibbles::from_nibbles_unchecked([0x07]));
}
#[test]
fn test_trie_updates_sorted_disjointed_merge_batch() {
let kept_node = Nibbles::from_nibbles_unchecked([0x01]);
let removed_node = Nibbles::from_nibbles_unchecked([0x02]);
let kept_storage = B256::from([3; 32]);
let removed_storage = B256::from([4; 32]);
let slot1 = Nibbles::from_nibbles_unchecked([0x0a]);
let slot2 = Nibbles::from_nibbles_unchecked([0x0b]);
let older = TrieUpdatesSorted::new(
vec![(kept_node, Some(BranchNodeCompact::default())), (removed_node, None)],
B256Map::from_iter([
(
kept_storage,
StorageTrieUpdatesSorted {
is_deleted: false,
storage_nodes: vec![(slot1, None)],
},
),
(
removed_storage,
StorageTrieUpdatesSorted {
is_deleted: false,
storage_nodes: vec![(slot1, Some(BranchNodeCompact::default()))],
},
),
]),
);
let newer = TrieUpdatesSorted::new(
vec![(kept_node, None)],
B256Map::from_iter([(
kept_storage,
StorageTrieUpdatesSorted {
is_deleted: false,
storage_nodes: vec![(slot1, Some(BranchNodeCompact::default())), (slot2, None)],
},
)]),
);
let remove_a = TrieUpdatesSorted::new(
vec![(removed_node, Some(BranchNodeCompact::default()))],
B256Map::from_iter([
(
kept_storage,
StorageTrieUpdatesSorted {
is_deleted: false,
storage_nodes: vec![(slot2, Some(BranchNodeCompact::default()))],
},
),
(
removed_storage,
StorageTrieUpdatesSorted { is_deleted: true, storage_nodes: vec![] },
),
]),
);
let remove_b = TrieUpdatesSorted::new(
vec![(Nibbles::from_nibbles_unchecked([0x0f]), Some(BranchNodeCompact::default()))],
B256Map::default(),
);
let result = TrieUpdatesSorted::disjointed_merge_batch(
vec![&older, &newer],
vec![&remove_b, &remove_a],
);
assert_eq!(result.account_nodes, vec![(kept_node, None)]);
assert_eq!(result.storage_tries.len(), 1);
assert_eq!(
result.storage_tries.get(&kept_storage),
Some(&StorageTrieUpdatesSorted {
is_deleted: false,
storage_nodes: vec![(slot1, Some(BranchNodeCompact::default()))],
})
);
assert!(!result.storage_tries.contains_key(&removed_storage));
}
#[test]
fn test_trie_updates_sorted_disjointed_merge_batch_removes_overlapping_batch_key() {
let overlapping_node = Nibbles::from_nibbles_unchecked([0x03]);
let overlapping_storage = B256::from([5; 32]);
let slot = Nibbles::from_nibbles_unchecked([0x0c]);
let older = TrieUpdatesSorted::new(
vec![(overlapping_node, Some(BranchNodeCompact::default()))],
B256Map::from_iter([(
overlapping_storage,
StorageTrieUpdatesSorted {
is_deleted: false,
storage_nodes: vec![(slot, Some(BranchNodeCompact::default()))],
},
)]),
);
let newer = TrieUpdatesSorted::new(
vec![(overlapping_node, None)],
B256Map::from_iter([(
overlapping_storage,
StorageTrieUpdatesSorted { is_deleted: false, storage_nodes: vec![(slot, None)] },
)]),
);
let remove = TrieUpdatesSorted::new(
vec![(overlapping_node, Some(BranchNodeCompact::default()))],
B256Map::from_iter([(
overlapping_storage,
StorageTrieUpdatesSorted { is_deleted: true, storage_nodes: vec![] },
)]),
);
let result = TrieUpdatesSorted::disjointed_merge_batch(vec![&older, &newer], vec![&remove]);
assert!(result.account_nodes.is_empty());
assert!(result.storage_tries.is_empty());
}
#[test]
fn test_trie_updates_sorted_disjointed_merge_batch_ignores_empty_storage_mask() {
let storage = B256::from([6; 32]);
let slot = Nibbles::from_nibbles_unchecked([0x0d]);
let batch = TrieUpdatesSorted::new(
vec![],
B256Map::from_iter([(
storage,
StorageTrieUpdatesSorted {
is_deleted: false,
storage_nodes: vec![(slot, Some(BranchNodeCompact::default()))],
},
)]),
);
let mask = TrieUpdatesSorted::new(
vec![],
B256Map::from_iter([(
storage,
StorageTrieUpdatesSorted { is_deleted: false, storage_nodes: vec![] },
)]),
);
let result = TrieUpdatesSorted::disjointed_merge_batch(vec![&batch], vec![&mask]);
assert_eq!(
result.storage_tries.get(&storage),
Some(&StorageTrieUpdatesSorted {
is_deleted: false,
storage_nodes: vec![(slot, Some(BranchNodeCompact::default()))],
})
);
}
/// Test extending with storage tries adds both nodes and removed nodes correctly
#[test]
fn test_trie_updates_extend_from_sorted_with_storage_tries() {

View File

@@ -26,6 +26,51 @@ where
.collect()
}
/// Merge sorted left slices into a sorted `Vec`, excluding keys present in any right slice.
///
/// Callers pass left slices in priority order (index 0 = highest priority), so the first
/// left slice's value for a key takes precedence over later slices. Right slice order is ignored;
/// the right-hand side only contributes keys to exclude.
pub(crate) fn kway_merge_disjoint_sorted<'a, K, V>(
capacity: usize,
left_slices: impl IntoIterator<Item = &'a [(K, V)]>,
right_slices: impl IntoIterator<Item = &'a [(K, V)]>,
) -> Vec<(K, V)>
where
K: Ord + Clone + 'a,
V: Clone + 'a,
{
let mut right_keys = right_slices
.into_iter()
.filter(|s| !s.is_empty())
.map(|s| s.iter().map(|(k, _)| k))
.kmerge()
.dedup()
.peekable();
let mut out = Vec::with_capacity(capacity);
for (_, key, value) in left_slices
.into_iter()
.filter(|s| !s.is_empty())
.enumerate()
.map(|(i, s)| s.iter().map(move |(k, v)| (i, k, v)))
.kmerge_by(|(i1, k1, _), (i2, k2, _)| (k1, i1) < (k2, i2))
.dedup_by(|(_, k1, _), (_, k2, _)| *k1 == *k2)
{
while right_keys.peek().is_some_and(|right_key| *right_key < key) {
right_keys.next();
}
if right_keys.peek().is_some_and(|right_key| *right_key == key) {
continue;
}
out.push((key.clone(), value.clone()));
}
out
}
/// Extend a sorted vector with another sorted vector using 2 pointer merge.
/// Values from `other` take precedence for duplicate keys.
pub(crate) fn extend_sorted_vec<K, V>(target: &mut Vec<(K, V)>, other: &[(K, V)])
@@ -183,4 +228,20 @@ mod tests {
let result: Vec<(i32, &str)> = kway_merge_sorted(Vec::<&[(i32, &str)]>::new());
assert!(result.is_empty());
}
#[test]
fn test_kway_merge_disjoint_sorted() {
let left_old = vec![(1, "old"), (2, "drop"), (4, "keep")];
let left_new = vec![(1, "new"), (3, "new_only")];
let right_a = vec![(2, "ignored"), (5, "ignored")];
let right_b = vec![(3, "ignored")];
let result = kway_merge_disjoint_sorted(
left_old.len() + left_new.len(),
[left_new.as_slice(), left_old.as_slice()],
[right_a.as_slice(), right_b.as_slice()],
);
assert_eq!(result, vec![(1, "new"), (4, "keep")]);
}
}

View File

@@ -950,6 +950,11 @@ Engine:
[default: 16]
--engine.deferred-trie-blocks <DEFERRED_TRIE_BLOCKS>
Configure how many of the blocks being persisted should only mask state/trie writes instead of durably persisting their state/trie updates in the current cycle
[default: 0]
--engine.memory-block-buffer-target <MEMORY_BLOCK_BUFFER_TARGET>
Configure the target number of blocks to keep in memory