Compare commits

...

10 Commits

Author SHA1 Message Date
Georgios Konstantopoulos
9bd7d2e698 feat(snap-sync): add retry with exponential backoff for snap requests
Snap requests can fail due to timeouts when peers are slow to respond
or not yet connected. Add retry logic (up to 10 attempts) with
exponential backoff to all snap protocol requests.

Amp-Thread-ID: https://ampcode.com/threads/T-019c67ba-663f-742b-84bc-dcb07606544d
Co-authored-by: Amp <amp@ampcode.com>
2026-02-16 11:09:26 -08:00
Georgios Konstantopoulos
c682cf8d84 fix(snap-sync): fetch pivot header from network when not in DB
When using --debug.tip with snap sync, fall back to fetching the
pivot header from the network if it's not available locally. This
allows snap sync to work from a fresh datadir without requiring
a separate header sync pass first.

Amp-Thread-ID: https://ampcode.com/threads/T-019c67ba-663f-742b-84bc-dcb07606544d
Co-authored-by: Amp <amp@ampcode.com>
2026-02-16 11:05:37 -08:00
Georgios Konstantopoulos
239adcce22 fix(snap-sync): use last_block_number for pivot selection
best_block_number tracks execution progress, not header availability.
Use last_block_number to find the highest available header in static
files for snap sync pivot selection.

Co-authored-by: Georgios <georgios@paradigm.xyz>
Amp-Thread-ID: https://ampcode.com/threads/T-019c67ba-663f-742b-84bc-dcb07606544d
Co-authored-by: Amp <amp@ampcode.com>
2026-02-16 10:59:23 -08:00
Georgios Konstantopoulos
0b054ad6b2 fix(snap-sync): advertise snap/1 capability and fix message routing
The root cause of peers ignoring GetAccountRange requests was that
snap/1 was never advertised in the Hello message. Without the snap
capability in the handshake, peers don't know we support snap and
silently drop messages in the snap ID range.

Changes:
- Add Protocol::snap_1() for snap/1 capability (8 message types)
- Include snap/1 in default Hello message capabilities
- Route unhandled satellite messages to primary (for snap messages
  that arrive via the multiplexer when no satellite handler exists)
- Treat eth71+ message IDs (GetBlockAccessLists, BlockAccessLists)
  as raw capability messages on older versions instead of erroring,
  since those IDs overlap with snap in the multiplexed ID space

Amp-Thread-ID: https://ampcode.com/threads/T-019c6340-6d8c-7362-9c45-dde842a8cf20
Co-authored-by: Amp <amp@ampcode.com>
2026-02-16 10:35:07 -08:00
Georgios Konstantopoulos
0f7e0b583e feat(snap-sync): add state root verification and auto pivot selection
- Add state root verification phase: compute trie root from downloaded
  HashedAccounts/HashedStorages via DatabaseStateRoot, compare to pivot
- Add StateRootVerification error variant
- Auto-select pivot from best synced block when --debug.tip not provided
- Fall back to --debug.tip if explicitly set

Co-authored-by: Georgios <georgios@paradigm.xyz>
Amp-Thread-ID: https://ampcode.com/threads/T-019c6223-1ccb-736b-aedc-b42a3c21a161
Co-authored-by: Amp <amp@ampcode.com>
2026-02-16 10:35:07 -08:00
Georgios Konstantopoulos
ababdae2e2 feat(snap-sync): fix CI, add SnapClient to FullNetwork, add tests
- Add SnapClient impl for NoopFullBlockClient (empty responses)
- Add SnapClient bound to FullNetwork trait's Client associated type
- Remove explicit SnapClient bound from EngineNodeLauncher LaunchNode impl
- Fix all clippy warnings in snap-sync, network, and network-api crates
- Add 9 tests: downloader (decode_slim_account), server (account_range,
  byte_codes, storage_ranges), progress (new, phase_default)

Co-authored-by: Georgios <georgios@paradigm.xyz>
Amp-Thread-ID: https://ampcode.com/threads/T-019c6223-1ccb-736b-aedc-b42a3c21a161
Co-authored-by: Amp <amp@ampcode.com>
2026-02-16 10:35:07 -08:00
Georgios Konstantopoulos
cdc6f6beaf feat(snap-sync): add server, CLI flag, and engine integration
- Add SnapRequestHandler (snap server) that responds to incoming
  GetAccountRange/GetStorageRanges/GetByteCodes/GetTrieNodes by
  reading from HashedAccounts/HashedStorages/Bytecodes tables
- Add --debug.snap-sync CLI flag to enable snap sync mode
- Wire snap sync into engine launch: reads pivot from --debug.tip,
  fetches header from DB, runs SnapSyncDownloader before pipeline
- Full reth binary compiles clean

Co-authored-by: Georgios Konstantopoulos <georgios@gakonst.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019c61d2-4850-7206-892c-aa3549f8a939
Co-authored-by: Amp <amp@ampcode.com>
2026-02-16 10:35:07 -08:00
Georgios Konstantopoulos
ee4af55457 feat(snap-sync): add response handling and snap sync task
- Handle incoming snap protocol responses in ActiveSession by
  intercepting raw capability messages in the snap ID range,
  decoding them, and resolving inflight PeerRequest channels
- Add run_snap_sync() async entrypoint for launching snap sync
- Add SnapSyncConfig with enabled flag

Co-authored-by: Georgios Konstantopoulos <georgios@gakonst.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019c61d2-4850-7206-892c-aa3549f8a939
Co-authored-by: Amp <amp@ampcode.com>
2026-02-16 10:35:06 -08:00
Georgios Konstantopoulos
5b9c469b83 feat(net): wire snap protocol requests into network layer
- Add snap request variants (GetAccountRange, GetStorageRanges,
  GetByteCodes, GetTrieNodes) to PeerRequest enum
- Implement SnapClient trait on FetchClient, routing requests through
  the existing DownloadRequest channel
- Add SnapRequest variant to FetchAction for StateFetcher dispatch
- Add InflightSnapRequest bridging between session and caller oneshot
- Handle snap request encoding in ActiveSession via RawCapabilityMessage
  with proper message ID multiplexing (offset by eth message count)
- NetworkState dispatches snap requests directly to peer sessions

Co-authored-by: Georgios Konstantopoulos <georgios@gakonst.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019c61d2-4850-7206-892c-aa3549f8a939
Co-authored-by: Amp <amp@ampcode.com>
2026-02-16 10:35:06 -08:00
Georgios Konstantopoulos
b498a41d54 feat(snap-sync): add snap sync crate with downloader skeleton
Adds the reth-snap-sync crate with:
- SnapSyncDownloader: orchestrates multi-phase state download
  - Phase 1: Account ranges via GetAccountRange
  - Phase 2: Storage slots via GetStorageRanges
  - Phase 3: Bytecodes via GetByteCodes
  - Phase 4: State root verification (stub)
- SnapProgress: resumable progress tracking
- SnapSyncError: error types
- Metrics: accounts/storage/bytecodes download tracking

Writes directly to HashedAccounts/HashedStorages/Bytecodes tables
so existing pipeline hashing+merkle stages can verify state root.

Co-authored-by: Georgios Konstantopoulos <georgios@gakonst.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019c61d2-4850-7206-892c-aa3549f8a939
Co-authored-by: Amp <amp@ampcode.com>
2026-02-16 10:35:06 -08:00
26 changed files with 2185 additions and 44 deletions

33
Cargo.lock generated
View File

@@ -9267,6 +9267,7 @@ dependencies = [
"reth-rpc-engine-api",
"reth-rpc-eth-types",
"reth-rpc-layer",
"reth-snap-sync",
"reth-stages",
"reth-static-file",
"reth-tasks",
@@ -10127,6 +10128,38 @@ dependencies = [
"strum",
]
[[package]]
name = "reth-snap-sync"
version = "1.11.0"
dependencies = [
"alloy-consensus",
"alloy-primitives",
"alloy-rlp",
"alloy-trie",
"derive_more",
"futures",
"metrics",
"rand 0.9.2",
"reth-db-api",
"reth-eth-wire-types",
"reth-execution-errors",
"reth-metrics",
"reth-network-p2p",
"reth-network-peers",
"reth-primitives-traits",
"reth-provider",
"reth-storage-api",
"reth-storage-errors",
"reth-tasks",
"reth-tracing",
"reth-trie",
"reth-trie-db",
"thiserror 2.0.18",
"tokio",
"tokio-stream",
"tracing",
]
[[package]]
name = "reth-stages"
version = "1.11.0"

View File

@@ -55,6 +55,7 @@ members = [
"crates/net/discv5/",
"crates/net/dns/",
"crates/net/downloaders/",
"crates/net/snap-sync/",
"crates/net/ecies/",
"crates/net/eth-wire-types",
"crates/net/eth-wire/",
@@ -343,6 +344,7 @@ reth-discv4 = { path = "crates/net/discv4" }
reth-discv5 = { path = "crates/net/discv5" }
reth-dns-discovery = { path = "crates/net/dns" }
reth-downloaders = { path = "crates/net/downloaders" }
reth-snap-sync = { path = "crates/net/snap-sync" }
reth-e2e-test-utils = { path = "crates/e2e-test-utils" }
reth-ecies = { path = "crates/net/ecies" }
reth-engine-local = { path = "crates/engine/local" }

View File

@@ -170,15 +170,29 @@ impl<N: NetworkPrimitives> ProtocolMessage<N> {
}
EthMessageID::GetBlockAccessLists => {
if version < EthVersion::Eth71 {
return Err(MessageError::Invalid(version, EthMessageID::GetBlockAccessLists))
// Beyond the max ID for this version — treat as raw capability message
// (e.g. a snap protocol message in the multiplexed ID space).
let raw_payload = Bytes::copy_from_slice(buf);
buf.advance(raw_payload.len());
EthMessage::Other(RawCapabilityMessage::new(
message_type.to_u8() as usize,
raw_payload.into(),
))
} else {
EthMessage::GetBlockAccessLists(RequestPair::decode(buf)?)
}
EthMessage::GetBlockAccessLists(RequestPair::decode(buf)?)
}
EthMessageID::BlockAccessLists => {
if version < EthVersion::Eth71 {
return Err(MessageError::Invalid(version, EthMessageID::BlockAccessLists))
let raw_payload = Bytes::copy_from_slice(buf);
buf.advance(raw_payload.len());
EthMessage::Other(RawCapabilityMessage::new(
message_type.to_u8() as usize,
raw_payload.into(),
))
} else {
EthMessage::BlockAccessLists(RequestPair::decode(buf)?)
}
EthMessage::BlockAccessLists(RequestPair::decode(buf)?)
}
EthMessageID::Other(_) => {
let raw_payload = Bytes::copy_from_slice(buf);
@@ -829,6 +843,9 @@ mod tests {
#[test]
fn test_bal_message_version_gating() {
// On versions < Eth71, GetBlockAccessLists and BlockAccessLists IDs are treated as
// raw capability messages (Other) since they fall beyond the eth range and may
// belong to another sub-protocol (e.g. snap).
let get_block_access_lists =
EthMessage::<EthNetworkPrimitives>::GetBlockAccessLists(RequestPair {
request_id: 1337,
@@ -842,10 +859,7 @@ mod tests {
EthVersion::Eth70,
&mut &buf[..],
);
assert!(matches!(
msg,
Err(MessageError::Invalid(EthVersion::Eth70, EthMessageID::GetBlockAccessLists))
));
assert!(matches!(msg, Ok(ProtocolMessage { message: EthMessage::Other(_), .. })));
let block_access_lists =
EthMessage::<EthNetworkPrimitives>::BlockAccessLists(RequestPair {
@@ -860,10 +874,7 @@ mod tests {
EthVersion::Eth70,
&mut &buf[..],
);
assert!(matches!(
msg,
Err(MessageError::Invalid(EthVersion::Eth70, EthMessageID::BlockAccessLists))
));
assert!(matches!(msg, Ok(ProtocolMessage { message: EthMessage::Other(_), .. })));
}
#[test]

View File

@@ -205,7 +205,10 @@ impl HelloMessageBuilder {
protocol_version: protocol_version.unwrap_or_default(),
client_version: client_version.unwrap_or_else(|| RETH_CLIENT_VERSION.to_string()),
protocols: protocols.unwrap_or_else(|| {
EthVersion::ALL_VERSIONS.iter().copied().map(Into::into).collect()
let mut protos: Vec<Protocol> =
EthVersion::ALL_VERSIONS.iter().copied().map(Into::into).collect();
protos.push(Protocol::snap_1());
protos
}),
port: port.unwrap_or(DEFAULT_TCP_PORT),
id,

View File

@@ -610,12 +610,20 @@ where
let _ = this.primary.to_primary.send(msg);
} else {
// delegate to installed satellite if any
let mut handled = false;
for proto in &this.inner.protocols {
if proto.shared_cap == *cap {
proto.send_raw(msg);
proto.send_raw(msg.clone());
handled = true;
break
}
}
if !handled {
// No satellite handler for this capability (e.g. snap/1
// handled inline). Route to primary so the session can
// handle it via RawCapabilityMessage.
let _ = this.primary.to_primary.send(msg);
}
}
} else {
return Poll::Ready(Some(Err(P2PStreamError::UnknownReservedMessageId(

View File

@@ -45,6 +45,13 @@ impl Protocol {
Self::eth(EthVersion::Eth68)
}
/// Returns the `snap/1` capability.
///
/// The snap protocol defines 8 message types (0x00..0x07).
pub const fn snap_1() -> Self {
Self::new(Capability::new_static("snap", 1), 8)
}
/// Consumes the type and returns a tuple of the [Capability] and number of messages.
#[inline]
pub(crate) fn split(self) -> (Capability, u8) {

View File

@@ -1,14 +1,20 @@
//! API related to listening for network events.
use reth_eth_wire_types::{
message::RequestPair, BlockAccessLists, BlockBodies, BlockHeaders, Capabilities,
DisconnectReason, EthMessage, EthNetworkPrimitives, EthVersion, GetBlockAccessLists,
GetBlockBodies, GetBlockHeaders, GetNodeData, GetPooledTransactions, GetReceipts,
GetReceipts70, NetworkPrimitives, NodeData, PooledTransactions, Receipts, Receipts69,
Receipts70, UnifiedStatus,
message::RequestPair,
snap::{
GetAccountRangeMessage, GetByteCodesMessage, GetStorageRangesMessage, GetTrieNodesMessage,
},
BlockAccessLists, BlockBodies, BlockHeaders, Capabilities, DisconnectReason, EthMessage,
EthNetworkPrimitives, EthVersion, GetBlockAccessLists, GetBlockBodies, GetBlockHeaders,
GetNodeData, GetPooledTransactions, GetReceipts, GetReceipts70, NetworkPrimitives, NodeData,
PooledTransactions, Receipts, Receipts69, Receipts70, UnifiedStatus,
};
use reth_ethereum_forks::ForkId;
use reth_network_p2p::error::{RequestError, RequestResult};
use reth_network_p2p::{
error::{RequestError, RequestResult},
snap::client::SnapResponse,
};
use reth_network_peers::{NodeRecord, PeerId};
use reth_network_types::{PeerAddr, PeerKind};
use reth_tokio_util::EventStream;
@@ -262,6 +268,42 @@ pub enum PeerRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
/// The channel to send the response for block access lists.
response: oneshot::Sender<RequestResult<BlockAccessLists>>,
},
/// Requests an account range from the peer (snap protocol).
///
/// The response should be sent through the channel.
GetAccountRange {
/// The request for an account range.
request: GetAccountRangeMessage,
/// The channel to send the response for the account range.
response: oneshot::Sender<RequestResult<SnapResponse>>,
},
/// Requests storage ranges from the peer (snap protocol).
///
/// The response should be sent through the channel.
GetStorageRanges {
/// The request for storage ranges.
request: GetStorageRangesMessage,
/// The channel to send the response for storage ranges.
response: oneshot::Sender<RequestResult<SnapResponse>>,
},
/// Requests bytecodes from the peer (snap protocol).
///
/// The response should be sent through the channel.
GetByteCodes {
/// The request for bytecodes.
request: GetByteCodesMessage,
/// The channel to send the response for bytecodes.
response: oneshot::Sender<RequestResult<SnapResponse>>,
},
/// Requests trie nodes from the peer (snap protocol).
///
/// The response should be sent through the channel.
GetTrieNodes {
/// The request for trie nodes.
request: GetTrieNodesMessage,
/// The channel to send the response for trie nodes.
response: oneshot::Sender<RequestResult<SnapResponse>>,
},
}
// === impl PeerRequest ===
@@ -283,6 +325,10 @@ impl<N: NetworkPrimitives> PeerRequest<N> {
Self::GetReceipts69 { response, .. } => response.send(Err(err)).ok(),
Self::GetReceipts70 { response, .. } => response.send(Err(err)).ok(),
Self::GetBlockAccessLists { response, .. } => response.send(Err(err)).ok(),
Self::GetAccountRange { response, .. } |
Self::GetStorageRanges { response, .. } |
Self::GetByteCodes { response, .. } |
Self::GetTrieNodes { response, .. } => response.send(Err(err)).ok(),
};
}
@@ -295,7 +341,24 @@ impl<N: NetworkPrimitives> PeerRequest<N> {
}
}
/// Returns the [`EthMessage`] for this type
/// Returns `true` if this is a snap protocol request.
pub const fn is_snap_request(&self) -> bool {
matches!(
self,
Self::GetAccountRange { .. } |
Self::GetStorageRanges { .. } |
Self::GetByteCodes { .. } |
Self::GetTrieNodes { .. }
)
}
/// Returns the [`EthMessage`] for this type.
///
/// # Panics
///
/// Panics if called on a snap protocol request variant. Use [`Self::is_snap_request`] to
/// check before calling this method. Snap requests are handled separately in the session
/// layer.
pub fn create_request_message(&self, request_id: u64) -> EthMessage<N> {
match self {
Self::GetBlockHeaders { request, .. } => {
@@ -325,6 +388,12 @@ impl<N: NetworkPrimitives> PeerRequest<N> {
message: request.clone(),
})
}
Self::GetAccountRange { .. } |
Self::GetStorageRanges { .. } |
Self::GetByteCodes { .. } |
Self::GetTrieNodes { .. } => {
panic!("snap protocol requests cannot be converted to EthMessage, handle them separately via is_snap_request()")
}
}
}

View File

@@ -38,7 +38,7 @@ use reth_eth_wire_types::{
capability::Capabilities, Capability, DisconnectReason, EthVersion, NetworkPrimitives,
UnifiedStatus,
};
use reth_network_p2p::sync::NetworkSyncUpdater;
use reth_network_p2p::{snap::client::SnapClient, sync::NetworkSyncUpdater};
use reth_network_peers::NodeRecord;
use std::{future::Future, net::SocketAddr, sync::Arc, time::Instant};
@@ -48,7 +48,7 @@ pub type PeerId = alloy_primitives::B512;
/// Helper trait that unifies network API needed to launch node.
pub trait FullNetwork:
BlockDownloaderProvider<
Client: BlockClient<Block = <Self::Primitives as NetworkPrimitives>::Block>,
Client: BlockClient<Block = <Self::Primitives as NetworkPrimitives>::Block> + SnapClient,
> + NetworkSyncUpdater
+ NetworkInfo
+ NetworkEventListenerProvider
@@ -62,7 +62,8 @@ pub trait FullNetwork:
impl<T> FullNetwork for T where
T: BlockDownloaderProvider<
Client: BlockClient<Block = <Self::Primitives as NetworkPrimitives>::Block>,
Client: BlockClient<Block = <Self::Primitives as NetworkPrimitives>::Block>
+ SnapClient,
> + NetworkSyncUpdater
+ NetworkInfo
+ NetworkEventListenerProvider

View File

@@ -4,6 +4,9 @@ use crate::{fetch::DownloadRequest, flattened_response::FlattenedResponse};
use alloy_primitives::B256;
use futures::{future, future::Either};
use reth_eth_wire::{EthNetworkPrimitives, NetworkPrimitives};
use reth_eth_wire_types::snap::{
GetAccountRangeMessage, GetByteCodesMessage, GetStorageRangesMessage, GetTrieNodesMessage,
};
use reth_network_api::test_utils::PeersHandle;
use reth_network_p2p::{
bodies::client::{BodiesClient, BodiesFut},
@@ -11,6 +14,7 @@ use reth_network_p2p::{
error::{PeerRequestResult, RequestError},
headers::client::{HeadersClient, HeadersRequest},
priority::Priority,
snap::client::{SnapClient, SnapResponse},
BlockClient,
};
use reth_network_peers::PeerId;
@@ -105,3 +109,92 @@ impl<N: NetworkPrimitives> BodiesClient for FetchClient<N> {
impl<N: NetworkPrimitives> BlockClient for FetchClient<N> {
type Block = N::Block;
}
type SnapClientFuture = Either<
FlattenedResponse<PeerRequestResult<SnapResponse>>,
future::Ready<PeerRequestResult<SnapResponse>>,
>;
impl<N: NetworkPrimitives> SnapClient for FetchClient<N> {
type Output = SnapClientFuture;
fn get_account_range_with_priority(
&self,
request: GetAccountRangeMessage,
priority: Priority,
) -> Self::Output {
let (response, rx) = oneshot::channel();
if self
.request_tx
.send(DownloadRequest::GetAccountRange { request, response, priority })
.is_ok()
{
Either::Left(FlattenedResponse::from(rx))
} else {
Either::Right(future::err(RequestError::ChannelClosed))
}
}
fn get_storage_ranges(&self, request: GetStorageRangesMessage) -> Self::Output {
self.get_storage_ranges_with_priority(request, Priority::Normal)
}
fn get_storage_ranges_with_priority(
&self,
request: GetStorageRangesMessage,
priority: Priority,
) -> Self::Output {
let (response, rx) = oneshot::channel();
if self
.request_tx
.send(DownloadRequest::GetStorageRanges { request, response, priority })
.is_ok()
{
Either::Left(FlattenedResponse::from(rx))
} else {
Either::Right(future::err(RequestError::ChannelClosed))
}
}
fn get_byte_codes(&self, request: GetByteCodesMessage) -> Self::Output {
self.get_byte_codes_with_priority(request, Priority::Normal)
}
fn get_byte_codes_with_priority(
&self,
request: GetByteCodesMessage,
priority: Priority,
) -> Self::Output {
let (response, rx) = oneshot::channel();
if self
.request_tx
.send(DownloadRequest::GetByteCodes { request, response, priority })
.is_ok()
{
Either::Left(FlattenedResponse::from(rx))
} else {
Either::Right(future::err(RequestError::ChannelClosed))
}
}
fn get_trie_nodes(&self, request: GetTrieNodesMessage) -> Self::Output {
self.get_trie_nodes_with_priority(request, Priority::Normal)
}
fn get_trie_nodes_with_priority(
&self,
request: GetTrieNodesMessage,
priority: Priority,
) -> Self::Output {
let (response, rx) = oneshot::channel();
if self
.request_tx
.send(DownloadRequest::GetTrieNodes { request, response, priority })
.is_ok()
{
Either::Left(FlattenedResponse::from(rx))
} else {
Either::Right(future::err(RequestError::ChannelClosed))
}
}
}

View File

@@ -10,17 +10,22 @@ use futures::StreamExt;
use reth_eth_wire::{
Capabilities, EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, NetworkPrimitives,
};
use reth_network_api::test_utils::PeersHandle;
use reth_eth_wire_types::snap::{
GetAccountRangeMessage, GetByteCodesMessage, GetStorageRangesMessage, GetTrieNodesMessage,
};
use reth_network_api::{test_utils::PeersHandle, PeerRequest};
use reth_network_p2p::{
error::{EthResponseValidator, PeerRequestResult, RequestError, RequestResult},
headers::client::HeadersRequest,
priority::Priority,
snap::client::SnapResponse,
};
use reth_network_peers::PeerId;
use reth_network_types::ReputationChangeKind;
use std::{
collections::{HashMap, VecDeque},
ops::RangeInclusive,
pin::Pin,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc,
@@ -33,6 +38,17 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
type InflightHeadersRequest<H> = Request<HeadersRequest, PeerRequestResult<Vec<H>>>;
type InflightBodiesRequest<B> = Request<(), PeerRequestResult<Vec<B>>>;
/// Tracks an inflight snap request, bridging the session's response back to the download caller.
#[derive(Debug)]
struct InflightSnapRequest {
/// The peer that's handling this request
peer_id: PeerId,
/// The channel to send the final response (with peer id) to the download caller
response: oneshot::Sender<PeerRequestResult<SnapResponse>>,
/// The receiver for the session's response
rx: oneshot::Receiver<RequestResult<SnapResponse>>,
}
/// Manages data fetching operations.
///
/// This type is hooked into the staged sync pipeline and delegates download request to available
@@ -45,6 +61,8 @@ pub struct StateFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
inflight_headers_requests: HashMap<PeerId, InflightHeadersRequest<N::BlockHeader>>,
/// Currently active [`GetBlockBodies`] requests
inflight_bodies_requests: HashMap<PeerId, InflightBodiesRequest<N::BlockBody>>,
/// Currently active snap protocol requests
inflight_snap_requests: Vec<InflightSnapRequest>,
/// The list of _available_ peers for requests.
peers: HashMap<PeerId, Peer>,
/// The handle to the peers manager
@@ -67,6 +85,7 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
Self {
inflight_headers_requests: Default::default(),
inflight_bodies_requests: Default::default(),
inflight_snap_requests: Vec::new(),
peers: Default::default(),
peers_handle,
num_active_peers,
@@ -114,6 +133,16 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
if let Some(req) = self.inflight_bodies_requests.remove(peer) {
let _ = req.response.send(Err(RequestError::ConnectionDropped));
}
// Cancel inflight snap requests for this peer
let mut i = 0;
while i < self.inflight_snap_requests.len() {
if &self.inflight_snap_requests[i].peer_id == peer {
let req = self.inflight_snap_requests.swap_remove(i);
let _ = req.response.send(Err(RequestError::ConnectionDropped));
} else {
i += 1;
}
}
}
/// Updates the block information for the peer.
@@ -171,7 +200,7 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
}
/// Returns the next action to return
fn poll_action(&mut self) -> PollAction {
fn poll_action(&mut self) -> PollAction<N> {
// we only check and not pop here since we don't know yet whether a peer is available.
if self.queued_requests.is_empty() {
return PollAction::NoRequests
@@ -184,13 +213,39 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
return PollAction::NoPeersAvailable
};
// Snap requests bypass the block request path and are dispatched directly
if request.is_snap_request() {
let snap_request = self.prepare_snap_request(peer_id, request);
return PollAction::Ready(FetchAction::SnapRequest { peer_id, request: snap_request })
}
let request = self.prepare_block_request(peer_id, request);
PollAction::Ready(FetchAction::BlockRequest { peer_id, request })
}
/// Advance the state the syncer
pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<FetchAction> {
pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<FetchAction<N>> {
// Poll inflight snap requests and forward responses
let mut i = 0;
while i < self.inflight_snap_requests.len() {
match Pin::new(&mut self.inflight_snap_requests[i].rx).poll(cx) {
Poll::Ready(result) => {
let req = self.inflight_snap_requests.swap_remove(i);
let resp = match result {
Ok(Ok(snap_resp)) => Ok((req.peer_id, snap_resp).into()),
Ok(Err(err)) => Err(err),
Err(_) => Err(RequestError::ChannelClosed),
};
let _ = req.response.send(resp);
self.on_snap_response(req.peer_id);
}
Poll::Pending => {
i += 1;
}
}
}
// drain buffered actions first
loop {
let no_peers_available = match self.poll_action() {
@@ -256,6 +311,58 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
self.inflight_bodies_requests.insert(peer_id, inflight);
BlockRequest::GetBlockBodies(GetBlockBodies(request))
}
DownloadRequest::GetAccountRange { .. } |
DownloadRequest::GetStorageRanges { .. } |
DownloadRequest::GetByteCodes { .. } |
DownloadRequest::GetTrieNodes { .. } => {
unreachable!("snap requests are handled via prepare_snap_request")
}
}
}
/// Handles a new snap request to a peer.
///
/// Converts the download request into a [`PeerRequest`] for dispatch to the peer session.
/// The `DownloadRequest`'s response channel is stored as an inflight snap request so the
/// response can be forwarded back (with the peer id) once the session replies.
///
/// Caution: this assumes the peer exists and is idle
fn prepare_snap_request(&mut self, peer_id: PeerId, req: DownloadRequest<N>) -> PeerRequest<N> {
if let Some(peer) = self.peers.get_mut(&peer_id) {
peer.state = req.peer_state();
}
match req {
DownloadRequest::GetAccountRange { request, response, .. } => {
let (tx, rx) = oneshot::channel();
self.inflight_snap_requests.push(InflightSnapRequest { peer_id, response, rx });
PeerRequest::GetAccountRange { request, response: tx }
}
DownloadRequest::GetStorageRanges { request, response, .. } => {
let (tx, rx) = oneshot::channel();
self.inflight_snap_requests.push(InflightSnapRequest { peer_id, response, rx });
PeerRequest::GetStorageRanges { request, response: tx }
}
DownloadRequest::GetByteCodes { request, response, .. } => {
let (tx, rx) = oneshot::channel();
self.inflight_snap_requests.push(InflightSnapRequest { peer_id, response, rx });
PeerRequest::GetByteCodes { request, response: tx }
}
DownloadRequest::GetTrieNodes { request, response, .. } => {
let (tx, rx) = oneshot::channel();
self.inflight_snap_requests.push(InflightSnapRequest { peer_id, response, rx });
PeerRequest::GetTrieNodes { request, response: tx }
}
_ => unreachable!("only snap requests should be passed to prepare_snap_request"),
}
}
/// Called when a snap response is received.
///
/// Marks the peer as idle so it can accept new requests.
fn on_snap_response(&mut self, peer_id: PeerId) {
if let Some(peer) = self.peers.get_mut(&peer_id) {
peer.state.on_request_finished();
}
}
@@ -341,8 +448,8 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
}
/// The outcome of [`StateFetcher::poll_action`]
enum PollAction {
Ready(FetchAction),
enum PollAction<N: NetworkPrimitives = EthNetworkPrimitives> {
Ready(FetchAction<N>),
NoRequests,
NoPeersAvailable,
}
@@ -453,6 +560,8 @@ enum PeerState {
GetBlockHeaders,
/// Peer is handling a `GetBlockBodies` request.
GetBlockBodies,
/// Peer is handling a snap protocol request.
SnapRequest,
/// Peer session is about to close
Closing,
}
@@ -491,6 +600,7 @@ struct Request<Req, Resp> {
/// Requests that can be sent to the Syncer from a [`FetchClient`]
#[derive(Debug)]
#[allow(clippy::enum_variant_names)]
pub(crate) enum DownloadRequest<N: NetworkPrimitives> {
/// Download the requested headers and send response through channel
GetBlockHeaders {
@@ -505,6 +615,30 @@ pub(crate) enum DownloadRequest<N: NetworkPrimitives> {
priority: Priority,
range_hint: Option<RangeInclusive<u64>>,
},
/// Request an account range via snap protocol
GetAccountRange {
request: GetAccountRangeMessage,
response: oneshot::Sender<PeerRequestResult<SnapResponse>>,
priority: Priority,
},
/// Request storage ranges via snap protocol
GetStorageRanges {
request: GetStorageRangesMessage,
response: oneshot::Sender<PeerRequestResult<SnapResponse>>,
priority: Priority,
},
/// Request bytecodes via snap protocol
GetByteCodes {
request: GetByteCodesMessage,
response: oneshot::Sender<PeerRequestResult<SnapResponse>>,
priority: Priority,
},
/// Request trie nodes via snap protocol
GetTrieNodes {
request: GetTrieNodesMessage,
response: oneshot::Sender<PeerRequestResult<SnapResponse>>,
priority: Priority,
},
}
// === impl DownloadRequest ===
@@ -515,15 +649,22 @@ impl<N: NetworkPrimitives> DownloadRequest<N> {
match self {
Self::GetBlockHeaders { .. } => PeerState::GetBlockHeaders,
Self::GetBlockBodies { .. } => PeerState::GetBlockBodies,
Self::GetAccountRange { .. } |
Self::GetStorageRanges { .. } |
Self::GetByteCodes { .. } |
Self::GetTrieNodes { .. } => PeerState::SnapRequest,
}
}
/// Returns the requested priority of this request
const fn get_priority(&self) -> &Priority {
match self {
Self::GetBlockHeaders { priority, .. } | Self::GetBlockBodies { priority, .. } => {
priority
}
Self::GetBlockHeaders { priority, .. } |
Self::GetBlockBodies { priority, .. } |
Self::GetAccountRange { priority, .. } |
Self::GetStorageRanges { priority, .. } |
Self::GetByteCodes { priority, .. } |
Self::GetTrieNodes { priority, .. } => priority,
}
}
@@ -532,10 +673,25 @@ impl<N: NetworkPrimitives> DownloadRequest<N> {
self.get_priority().is_normal()
}
/// Returns `true` if this is a snap protocol request.
const fn is_snap_request(&self) -> bool {
matches!(
self,
Self::GetAccountRange { .. } |
Self::GetStorageRanges { .. } |
Self::GetByteCodes { .. } |
Self::GetTrieNodes { .. }
)
}
/// Returns the best peer requirements for this request.
fn best_peer_requirements(&self) -> BestPeerRequirements {
match self {
Self::GetBlockHeaders { .. } => BestPeerRequirements::None,
Self::GetBlockHeaders { .. } |
Self::GetAccountRange { .. } |
Self::GetStorageRanges { .. } |
Self::GetByteCodes { .. } |
Self::GetTrieNodes { .. } => BestPeerRequirements::None,
Self::GetBlockBodies { range_hint, .. } => {
if let Some(range) = range_hint {
BestPeerRequirements::FullBlockRange(range.clone())
@@ -548,7 +704,7 @@ impl<N: NetworkPrimitives> DownloadRequest<N> {
}
/// An action the syncer can emit.
pub(crate) enum FetchAction {
pub(crate) enum FetchAction<N: NetworkPrimitives = EthNetworkPrimitives> {
/// Dispatch an eth request to the given peer.
BlockRequest {
/// The targeted recipient for the request
@@ -556,6 +712,13 @@ pub(crate) enum FetchAction {
/// The request to send
request: BlockRequest,
},
/// Dispatch a snap protocol request to the given peer.
SnapRequest {
/// The targeted recipient for the request
peer_id: PeerId,
/// The snap request to send
request: PeerRequest<N>,
},
}
/// Outcome of a processed response.

View File

@@ -565,6 +565,13 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
response,
});
}
PeerRequest::GetAccountRange { .. } |
PeerRequest::GetStorageRanges { .. } |
PeerRequest::GetByteCodes { .. } |
PeerRequest::GetTrieNodes { .. } => {
// Snap protocol requests from peers are not handled here.
// They are handled in the session layer directly.
}
}
}

View File

@@ -316,10 +316,103 @@ impl<N: NetworkPrimitives> ActiveSession<N> {
OnIncomingMessageOutcome::Ok
}
EthMessage::Other(bytes) => self.try_emit_broadcast(PeerMessage::Other(bytes)).into(),
EthMessage::Other(raw) => {
if let Some(outcome) = self.try_handle_snap_response(&raw) {
outcome
} else {
self.try_emit_broadcast(PeerMessage::Other(raw)).into()
}
}
}
}
/// Attempts to decode a raw capability message as a snap protocol response and resolve the
/// matching inflight request.
///
/// Returns `Some` if the message ID falls in the snap range (even if decoding or matching
/// fails), `None` if the message is not a snap message.
fn try_handle_snap_response(
&mut self,
raw: &RawCapabilityMessage,
) -> Option<OnIncomingMessageOutcome<N>> {
use reth_eth_wire_types::{snap::SnapProtocolMessage, EthMessageID};
use reth_network_p2p::snap::client::SnapResponse;
let eth_offset = EthMessageID::message_count(self.conn.version()) as usize;
let snap_count = 8; // snap/1 has 8 message types (0x00..0x07)
// Check if the raw message ID falls in the snap range
if raw.id < eth_offset || raw.id >= eth_offset + snap_count {
return None;
}
let snap_id = (raw.id - eth_offset) as u8;
// Only handle response messages (odd IDs: AccountRange=1, StorageRanges=3, ByteCodes=5,
// TrieNodes=7)
if snap_id.is_multiple_of(2) {
// This is a snap *request* from the remote peer, not a response.
// For now, we don't handle incoming snap requests — let it pass through.
return None;
}
let mut buf = raw.payload.as_ref();
let snap_msg = match SnapProtocolMessage::decode(snap_id, &mut buf) {
Ok(msg) => msg,
Err(err) => {
debug!(target: "net::session", %err, ?snap_id, remote_peer_id=?self.remote_peer_id, "failed to decode snap response");
self.on_bad_message();
return Some(OnIncomingMessageOutcome::Ok);
}
};
// Extract request_id and build the SnapResponse
let (request_id, expected_variant, snap_response) = match snap_msg {
SnapProtocolMessage::AccountRange(msg) => {
(msg.request_id, "GetAccountRange", SnapResponse::AccountRange(msg))
}
SnapProtocolMessage::StorageRanges(msg) => {
(msg.request_id, "GetStorageRanges", SnapResponse::StorageRanges(msg))
}
SnapProtocolMessage::ByteCodes(msg) => {
(msg.request_id, "GetByteCodes", SnapResponse::ByteCodes(msg))
}
SnapProtocolMessage::TrieNodes(msg) => {
(msg.request_id, "GetTrieNodes", SnapResponse::TrieNodes(msg))
}
_ => {
// Not a response message — shouldn't happen given the odd-ID check above
return None;
}
};
if let Some(req) = self.inflight_requests.remove(&request_id) {
match req.request {
RequestState::Waiting(
PeerRequest::GetAccountRange { response, .. } |
PeerRequest::GetStorageRanges { response, .. } |
PeerRequest::GetByteCodes { response, .. } |
PeerRequest::GetTrieNodes { response, .. },
) => {
trace!(peer_id=?self.remote_peer_id, ?request_id, %expected_variant, "received snap response from peer");
let _ = response.send(Ok(snap_response));
self.update_request_timeout(req.timestamp, Instant::now());
}
RequestState::Waiting(request) => {
request.send_bad_response();
}
RequestState::TimedOut => {
self.update_request_timeout(req.timestamp, Instant::now());
}
}
} else {
trace!(peer_id=?self.remote_peer_id, ?request_id, "received snap response to unknown request");
self.on_bad_message();
}
Some(OnIncomingMessageOutcome::Ok)
}
/// Handle an internal peer request that will be sent to the remote.
fn on_internal_peer_request(&mut self, request: PeerRequest<N>, deadline: Instant) {
let version = self.conn.version();
@@ -337,15 +430,67 @@ impl<N: NetworkPrimitives> ActiveSession<N> {
let request_id = self.next_id();
trace!(?request, peer_id=?self.remote_peer_id, ?request_id, "sending request to peer");
let msg = request.create_request_message(request_id).map_versioned(version);
self.queued_outgoing.push_back(msg.into());
let req = InflightRequest {
request: RequestState::Waiting(request),
timestamp: Instant::now(),
deadline,
if request.is_snap_request() {
// Snap requests are encoded as raw capability messages with adjusted message IDs.
// The snap message ID is offset by the eth message count for multiplexing.
if let Some(raw_msg) = self.encode_snap_request(&request, request_id) {
self.queued_outgoing.push_back(OutgoingMessage::Raw(raw_msg));
let req = InflightRequest {
request: RequestState::Waiting(request),
timestamp: Instant::now(),
deadline,
};
self.inflight_requests.insert(request_id, req);
} else {
request.send_err_response(RequestError::UnsupportedCapability);
}
} else {
let msg = request.create_request_message(request_id).map_versioned(version);
self.queued_outgoing.push_back(msg.into());
let req = InflightRequest {
request: RequestState::Waiting(request),
timestamp: Instant::now(),
deadline,
};
self.inflight_requests.insert(request_id, req);
}
}
/// Encodes a snap protocol request as a [`RawCapabilityMessage`].
fn encode_snap_request(
&self,
request: &PeerRequest<N>,
_request_id: u64,
) -> Option<RawCapabilityMessage> {
use reth_eth_wire_types::{snap::SnapProtocolMessage, EthMessageID};
let snap_msg = match request {
PeerRequest::GetAccountRange { request, .. } => {
SnapProtocolMessage::GetAccountRange(request.clone())
}
PeerRequest::GetStorageRanges { request, .. } => {
SnapProtocolMessage::GetStorageRanges(request.clone())
}
PeerRequest::GetByteCodes { request, .. } => {
SnapProtocolMessage::GetByteCodes(request.clone())
}
PeerRequest::GetTrieNodes { request, .. } => {
SnapProtocolMessage::GetTrieNodes(request.clone())
}
_ => return None,
};
self.inflight_requests.insert(request_id, req);
let encoded = snap_msg.encode();
// The first byte is the snap message ID, which needs to be offset
// by the eth protocol message count for proper multiplexing.
let snap_id = encoded[0];
let adjusted_id = snap_id + EthMessageID::message_count(self.conn.version());
let mut payload = Vec::with_capacity(encoded.len() - 1);
payload.extend_from_slice(&encoded[1..]);
Some(RawCapabilityMessage::new(adjusted_id as usize, payload.into()))
}
#[inline]

View File

@@ -403,6 +403,16 @@ impl<N: NetworkPrimitives> NetworkState<N> {
}
}
/// Sends a snap request directly to the peer's session.
///
/// Unlike block requests, snap requests don't need response tracking here because
/// the [`StateFetcher`] bridges the response back to the caller internally.
fn handle_snap_request(&mut self, peer: PeerId, request: PeerRequest<N>) {
if let Some(ref mut peer) = self.active_peers.get_mut(&peer) {
let _ = peer.request_tx.to_session_tx.try_send(request);
}
}
/// Handle the outcome of processed response, for example directly queue another request.
fn on_block_response_outcome(&mut self, outcome: BlockResponseOutcome) {
match outcome {
@@ -453,6 +463,9 @@ impl<N: NetworkPrimitives> NetworkState<N> {
FetchAction::BlockRequest { peer_id, request } => {
self.handle_block_request(peer_id, request)
}
FetchAction::SnapRequest { peer_id, request } => {
self.handle_snap_request(peer_id, request)
}
}
}

View File

@@ -5,13 +5,20 @@ use crate::{
error::PeerRequestResult,
headers::client::{HeadersClient, SingleHeaderRequest},
priority::Priority,
snap::client::{SnapClient, SnapResponse},
BlockClient,
};
use alloy_consensus::BlockHeader;
use alloy_primitives::{Sealable, B256};
use core::marker::PhantomData;
use reth_consensus::Consensus;
use reth_eth_wire_types::{EthNetworkPrimitives, HeadersDirection, NetworkPrimitives};
use reth_eth_wire_types::{
snap::{
AccountRangeMessage, ByteCodesMessage, GetAccountRangeMessage, GetByteCodesMessage,
GetStorageRangesMessage, GetTrieNodesMessage, StorageRangesMessage, TrieNodesMessage,
},
EthNetworkPrimitives, HeadersDirection, NetworkPrimitives,
};
use reth_network_peers::{PeerId, WithPeerId};
use reth_primitives_traits::{SealedBlock, SealedHeader};
use std::{
@@ -740,6 +747,83 @@ where
type Block = Net::Block;
}
impl<Net> SnapClient for NoopFullBlockClient<Net>
where
Net: NetworkPrimitives,
{
type Output = futures::future::Ready<PeerRequestResult<SnapResponse>>;
fn get_account_range_with_priority(
&self,
request: GetAccountRangeMessage,
_priority: Priority,
) -> Self::Output {
futures::future::ready(Ok(WithPeerId::new(
PeerId::random(),
SnapResponse::AccountRange(AccountRangeMessage {
request_id: request.request_id,
accounts: vec![],
proof: vec![],
}),
)))
}
fn get_storage_ranges(&self, request: GetStorageRangesMessage) -> Self::Output {
self.get_storage_ranges_with_priority(request, Priority::Normal)
}
fn get_storage_ranges_with_priority(
&self,
request: GetStorageRangesMessage,
_priority: Priority,
) -> Self::Output {
futures::future::ready(Ok(WithPeerId::new(
PeerId::random(),
SnapResponse::StorageRanges(StorageRangesMessage {
request_id: request.request_id,
slots: vec![],
proof: vec![],
}),
)))
}
fn get_byte_codes(&self, request: GetByteCodesMessage) -> Self::Output {
self.get_byte_codes_with_priority(request, Priority::Normal)
}
fn get_byte_codes_with_priority(
&self,
request: GetByteCodesMessage,
_priority: Priority,
) -> Self::Output {
futures::future::ready(Ok(WithPeerId::new(
PeerId::random(),
SnapResponse::ByteCodes(ByteCodesMessage {
request_id: request.request_id,
codes: vec![],
}),
)))
}
fn get_trie_nodes(&self, request: GetTrieNodesMessage) -> Self::Output {
self.get_trie_nodes_with_priority(request, Priority::Normal)
}
fn get_trie_nodes_with_priority(
&self,
request: GetTrieNodesMessage,
_priority: Priority,
) -> Self::Output {
futures::future::ready(Ok(WithPeerId::new(
PeerId::random(),
SnapResponse::TrieNodes(TrieNodesMessage {
request_id: request.request_id,
nodes: vec![],
}),
)))
}
}
impl<Net> Default for NoopFullBlockClient<Net> {
fn default() -> Self {
Self(PhantomData::<Net>)

View File

@@ -0,0 +1,56 @@
[package]
name = "reth-snap-sync"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
description = "Snap sync protocol implementation for reth"
[lints]
workspace = true
[dependencies]
# reth
reth-eth-wire-types.workspace = true
reth-network-p2p.workspace = true
reth-network-peers.workspace = true
reth-primitives-traits.workspace = true
reth-storage-api.workspace = true
reth-storage-errors.workspace = true
reth-db-api.workspace = true
reth-provider.workspace = true
reth-tasks.workspace = true
reth-tracing.workspace = true
reth-metrics.workspace = true
reth-trie.workspace = true
reth-trie-db.workspace = true
reth-execution-errors.workspace = true
# misc (non-workspace)
rand = "0.9"
# ethereum
alloy-primitives.workspace = true
alloy-rlp.workspace = true
alloy-consensus.workspace = true
alloy-trie.workspace = true
# async
futures.workspace = true
tokio = { workspace = true, features = ["sync", "time", "macros"] }
tokio-stream.workspace = true
# misc
tracing.workspace = true
thiserror.workspace = true
metrics.workspace = true
derive_more.workspace = true
[dev-dependencies]
reth-provider = { workspace = true, features = ["test-utils"] }
tokio = { workspace = true, features = ["rt-multi-thread"] }
[features]
default = []

View File

@@ -0,0 +1,8 @@
//! Configuration for snap sync.
/// Configuration for snap sync.
#[derive(Debug, Clone, Default)]
pub struct SnapSyncConfig {
/// Whether snap sync is enabled.
pub enabled: bool,
}

View File

@@ -0,0 +1,647 @@
//! Snap sync downloader.
//!
//! Orchestrates the multi-phase snap sync process:
//! 1. Download account ranges via `GetAccountRange`
//! 2. Download storage slots via `GetStorageRanges`
//! 3. Download bytecodes via `GetByteCodes`
//! 4. Verify state root against pivot block
use crate::{
error::SnapSyncError,
metrics::SnapSyncMetrics,
progress::{SnapPhase, SnapProgress},
};
use alloy_consensus::constants::KECCAK_EMPTY;
use alloy_primitives::{keccak256, Bytes, B256, U256};
use alloy_rlp::Decodable;
use alloy_trie::TrieAccount;
use reth_db_api::{
cursor::DbCursorRO,
tables,
transaction::{DbTx, DbTxMut},
};
use reth_eth_wire_types::snap::{
GetAccountRangeMessage, GetByteCodesMessage, GetStorageRangesMessage,
};
use reth_network_p2p::snap::client::{SnapClient, SnapResponse};
use reth_primitives_traits::{Account, StorageEntry};
use reth_storage_api::{DBProvider, DatabaseProviderFactory};
use reth_trie::StateRoot;
use reth_trie_db::DatabaseStateRoot;
use std::collections::HashSet;
use tokio::sync::watch;
use tracing::{debug, info, trace, warn};
/// Maximum response size in bytes for snap requests (512 KB).
const MAX_RESPONSE_BYTES: u64 = 512 * 1024;
/// Number of accounts to accumulate before flushing to DB.
const ACCOUNT_WRITE_BATCH_SIZE: usize = 10_000;
/// Number of storage slots to accumulate before flushing to DB.
const STORAGE_WRITE_BATCH_SIZE: usize = 50_000;
/// Number of bytecodes to request in a single batch.
const BYTECODE_BATCH_SIZE: usize = 64;
/// Hash representing the maximum key (all 0xFF).
const HASH_MAX: B256 = B256::repeat_byte(0xFF);
/// The snap sync downloader that orchestrates state download from peers.
#[derive(Debug)]
pub struct SnapSyncDownloader<C, F> {
/// The snap-capable network client.
client: C,
/// Database provider factory for writing state.
provider_factory: F,
/// Current sync progress.
progress: SnapProgress,
/// Metrics.
metrics: SnapSyncMetrics,
/// Cancellation signal.
cancel_rx: watch::Receiver<bool>,
}
impl<C, F> SnapSyncDownloader<C, F>
where
C: SnapClient + Clone + Send + Sync + 'static,
F: DatabaseProviderFactory + Send + Sync + 'static,
<F as DatabaseProviderFactory>::ProviderRW: DBProvider<Tx: DbTxMut + DbTx> + Send,
{
/// Creates a new snap sync downloader.
pub fn new(
client: C,
provider_factory: F,
pivot_hash: B256,
pivot_number: u64,
state_root: B256,
cancel_rx: watch::Receiver<bool>,
) -> Self {
Self {
client,
provider_factory,
progress: SnapProgress::new(pivot_hash, pivot_number, state_root),
metrics: SnapSyncMetrics::default(),
cancel_rx,
}
}
/// Runs the snap sync to completion.
///
/// Returns `Ok(())` if state was successfully downloaded and verified,
/// or an error if sync failed.
pub async fn run(&mut self) -> Result<(), SnapSyncError> {
info!(
target: "snap_sync",
pivot_hash = %self.progress.pivot_hash,
pivot_number = self.progress.pivot_number,
state_root = %self.progress.state_root,
"starting snap sync"
);
// Phase 1: Download accounts
self.progress.phase = SnapPhase::Accounts;
self.metrics.phase.set(1.0);
let storage_accounts = self.download_accounts().await?;
info!(
target: "snap_sync",
accounts = self.progress.accounts_downloaded,
storage_accounts = storage_accounts.len(),
"account download complete"
);
// Phase 2: Download storage slots
self.progress.phase = SnapPhase::Storages;
self.metrics.phase.set(2.0);
let code_hashes = self.download_storages(&storage_accounts).await?;
info!(
target: "snap_sync",
slots = self.progress.storage_slots_downloaded,
"storage download complete"
);
// Phase 3: Download bytecodes
self.progress.phase = SnapPhase::Bytecodes;
self.metrics.phase.set(3.0);
self.download_bytecodes(code_hashes).await?;
info!(
target: "snap_sync",
bytecodes = self.progress.bytecodes_downloaded,
"bytecode download complete"
);
// Phase 4: Verification (hashing + merkle root)
self.progress.phase = SnapPhase::Verification;
self.metrics.phase.set(4.0);
info!(target: "snap_sync", "verifying state root against pivot block");
self.verify_state_root()?;
self.progress.phase = SnapPhase::Done;
self.metrics.phase.set(5.0);
Ok(())
}
/// Returns the current progress.
pub const fn progress(&self) -> &SnapProgress {
&self.progress
}
/// Checks if cancellation was requested.
fn is_cancelled(&self) -> bool {
*self.cancel_rx.borrow()
}
// ========================================================================
// Phase 4: Verification
// ========================================================================
/// Computes the state root from the downloaded state and verifies it against
/// the pivot block's expected state root.
fn verify_state_root(&self) -> Result<(), SnapSyncError> {
info!(target: "snap_sync", "computing state root from downloaded state");
let provider =
self.provider_factory.database_provider_rw().map_err(SnapSyncError::Provider)?;
let computed_root = StateRoot::from_tx(provider.tx_ref())
.root()
.map_err(|e| SnapSyncError::StateRootVerification(e.to_string()))?;
if computed_root != self.progress.state_root {
return Err(SnapSyncError::StateRootMismatch {
expected: self.progress.state_root,
got: computed_root,
});
}
info!(target: "snap_sync", %computed_root, "state root verified successfully");
Ok(())
}
// ========================================================================
// Phase 1: Account download
// ========================================================================
/// Downloads all accounts from the state trie via `GetAccountRange` requests.
///
/// Returns a list of `(address_hash, storage_root)` for accounts that have
/// non-empty storage (`storage_root` != `EMPTY_TRIE_HASH`).
async fn download_accounts(&mut self) -> Result<Vec<(B256, B256)>, SnapSyncError> {
let state_root = self.progress.state_root;
let mut cursor = self.progress.account_cursor;
let mut storage_accounts: Vec<(B256, B256)> = Vec::new();
// Batch buffer for DB writes
let mut account_batch: Vec<(B256, Account, B256)> = Vec::new();
loop {
if self.is_cancelled() {
return Err(SnapSyncError::Cancelled);
}
let response = retry_snap_request(|| {
let req = GetAccountRangeMessage {
request_id: rand_request_id(),
root_hash: state_root,
starting_hash: cursor,
limit_hash: HASH_MAX,
response_bytes: MAX_RESPONSE_BYTES,
};
self.client.get_account_range(req)
}, &self.metrics, &self.cancel_rx)
.await?;
let msg = match response.into_data() {
SnapResponse::AccountRange(msg) => msg,
_ => {
return Err(SnapSyncError::InvalidAccountRange(
"unexpected response type".into(),
));
}
};
if msg.accounts.is_empty() {
// Empty response means we've reached the end or peer doesn't serve this root
debug!(target: "snap_sync", %cursor, "received empty account range, finishing");
break;
}
// Process each account
for account_data in &msg.accounts {
let trie_account = decode_slim_account(&account_data.body)?;
let account = Account {
nonce: trie_account.nonce,
balance: trie_account.balance,
bytecode_hash: if trie_account.code_hash == KECCAK_EMPTY {
None
} else {
Some(trie_account.code_hash)
},
};
// Track accounts with storage
let empty_root = alloy_trie::EMPTY_ROOT_HASH;
if trie_account.storage_root != empty_root {
storage_accounts.push((account_data.hash, trie_account.storage_root));
}
account_batch.push((account_data.hash, account, trie_account.storage_root));
self.progress.accounts_downloaded += 1;
}
// Update cursor to continue after the last account
cursor = increment_hash(msg.accounts.last().unwrap().hash);
// Flush batch if large enough
if account_batch.len() >= ACCOUNT_WRITE_BATCH_SIZE {
self.write_accounts(&account_batch)?;
account_batch.clear();
}
self.metrics.accounts_downloaded.set(self.progress.accounts_downloaded as f64);
self.progress.account_cursor = cursor;
trace!(
target: "snap_sync",
accounts = self.progress.accounts_downloaded,
%cursor,
"account download progress"
);
// If cursor wrapped around to zero, we've covered the full range
if cursor == B256::ZERO {
break;
}
}
// Flush remaining
if !account_batch.is_empty() {
self.write_accounts(&account_batch)?;
}
Ok(storage_accounts)
}
/// Writes a batch of accounts to the database.
///
/// Account hashes are used as keys since snap protocol returns hashed addresses.
/// The address→hash mapping will be resolved during the hashing stage.
fn write_accounts(&self, batch: &[(B256, Account, B256)]) -> Result<(), SnapSyncError> {
// For snap sync, we write to HashedAccounts table since snap returns hashed keys.
// The pipeline's hashing stage normally computes this from PlainAccountState,
// but for snap we go directly to the hashed form.
let provider =
self.provider_factory.database_provider_rw().map_err(SnapSyncError::Provider)?;
let tx = provider.tx_ref();
for (hash, account, _storage_root) in batch {
tx.put::<tables::HashedAccounts>(*hash, *account)?;
}
provider.commit()?;
Ok(())
}
// ========================================================================
// Phase 2: Storage download
// ========================================================================
/// Downloads storage slots for all accounts with non-empty storage roots.
///
/// Returns the set of code hashes encountered during account processing.
async fn download_storages(
&mut self,
storage_accounts: &[(B256, B256)],
) -> Result<HashSet<B256>, SnapSyncError> {
let state_root = self.progress.state_root;
let code_hashes: HashSet<B256> = HashSet::new();
// Process storage accounts in chunks
let mut slot_batch: Vec<(B256, B256, U256)> = Vec::new();
for (account_hash, _storage_root) in storage_accounts {
if self.is_cancelled() {
return Err(SnapSyncError::Cancelled);
}
let mut slot_cursor = B256::ZERO;
loop {
let account_hash_val = *account_hash;
let response = retry_snap_request(|| {
let req = GetStorageRangesMessage {
request_id: rand_request_id(),
root_hash: state_root,
account_hashes: vec![account_hash_val],
starting_hash: slot_cursor,
limit_hash: HASH_MAX,
response_bytes: MAX_RESPONSE_BYTES,
};
self.client.get_storage_ranges(req)
}, &self.metrics, &self.cancel_rx)
.await?;
let msg = match response.into_data() {
SnapResponse::StorageRanges(msg) => msg,
_ => {
return Err(SnapSyncError::InvalidStorageRange(
"unexpected response type".into(),
));
}
};
if msg.slots.is_empty() || msg.slots[0].is_empty() {
break;
}
let slots = &msg.slots[0];
for slot in slots {
let value = U256::from_be_slice(&slot.data);
slot_batch.push((*account_hash, slot.hash, value));
self.progress.storage_slots_downloaded += 1;
}
// Update cursor
slot_cursor = increment_hash(slots.last().unwrap().hash);
// Flush if needed
if slot_batch.len() >= STORAGE_WRITE_BATCH_SIZE {
self.write_storage_slots(&slot_batch)?;
slot_batch.clear();
}
self.metrics
.storage_slots_downloaded
.set(self.progress.storage_slots_downloaded as f64);
// If no proof or we've reached the end of this account's storage
if msg.proof.is_empty() || slot_cursor == B256::ZERO {
break;
}
}
}
// Flush remaining
if !slot_batch.is_empty() {
self.write_storage_slots(&slot_batch)?;
}
Ok(code_hashes)
}
/// Writes a batch of storage slots to the database.
fn write_storage_slots(&self, batch: &[(B256, B256, U256)]) -> Result<(), SnapSyncError> {
let provider =
self.provider_factory.database_provider_rw().map_err(SnapSyncError::Provider)?;
let tx = provider.tx_ref();
for (account_hash, slot_hash, value) in batch {
tx.put::<tables::HashedStorages>(
*account_hash,
StorageEntry { key: *slot_hash, value: *value },
)?;
}
provider.commit()?;
Ok(())
}
// ========================================================================
// Phase 3: Bytecode download
// ========================================================================
/// Downloads contract bytecodes by their code hashes.
async fn download_bytecodes(
&mut self,
code_hashes: HashSet<B256>,
) -> Result<(), SnapSyncError> {
// Collect code hashes from accounts we've already written
let mut all_code_hashes: Vec<B256> = self.collect_code_hashes()?;
all_code_hashes.extend(code_hashes);
all_code_hashes.sort();
all_code_hashes.dedup();
// Remove KECCAK_EMPTY since that means no code
all_code_hashes.retain(|h| *h != KECCAK_EMPTY);
self.progress.bytecodes_total = all_code_hashes.len() as u64;
info!(
target: "snap_sync",
total = all_code_hashes.len(),
"starting bytecode download"
);
// Download in batches
for chunk in all_code_hashes.chunks(BYTECODE_BATCH_SIZE) {
if self.is_cancelled() {
return Err(SnapSyncError::Cancelled);
}
let hashes = chunk.to_vec();
let response = retry_snap_request(|| {
let req = GetByteCodesMessage {
request_id: rand_request_id(),
hashes: hashes.clone(),
response_bytes: MAX_RESPONSE_BYTES,
};
self.client.get_byte_codes(req)
}, &self.metrics, &self.cancel_rx)
.await?;
let msg = match response.into_data() {
SnapResponse::ByteCodes(msg) => msg,
_ => {
return Err(SnapSyncError::InvalidBytecode("unexpected response type".into()));
}
};
self.write_bytecodes(chunk, &msg.codes)?;
self.progress.bytecodes_downloaded += msg.codes.len() as u64;
self.metrics.bytecodes_downloaded.set(self.progress.bytecodes_downloaded as f64);
}
Ok(())
}
/// Collects code hashes from accounts already written to DB.
fn collect_code_hashes(&self) -> Result<Vec<B256>, SnapSyncError> {
let provider =
self.provider_factory.database_provider_rw().map_err(SnapSyncError::Provider)?;
let tx = provider.tx_ref();
let mut cursor = tx.cursor_read::<tables::HashedAccounts>()?;
let mut code_hashes = Vec::new();
let mut entry = cursor.first()?;
while let Some((_, account)) = entry {
if let Some(hash) = account.bytecode_hash &&
hash != KECCAK_EMPTY
{
code_hashes.push(hash);
}
entry = cursor.next()?;
}
Ok(code_hashes)
}
/// Writes bytecodes to the database, verifying hash integrity.
fn write_bytecodes(
&self,
expected_hashes: &[B256],
codes: &[Bytes],
) -> Result<(), SnapSyncError> {
let provider =
self.provider_factory.database_provider_rw().map_err(SnapSyncError::Provider)?;
let tx = provider.tx_ref();
for (i, code) in codes.iter().enumerate() {
let hash = keccak256(code);
if i < expected_hashes.len() && hash != expected_hashes[i] {
warn!(
target: "snap_sync",
expected = %expected_hashes[i],
got = %hash,
"bytecode hash mismatch, skipping"
);
continue;
}
let bytecode = reth_primitives_traits::Bytecode::new_raw(code.clone());
tx.put::<tables::Bytecodes>(hash, bytecode)?;
}
provider.commit()?;
Ok(())
}
}
/// Decodes a "slim" account body from the snap protocol into a `TrieAccount`.
///
/// Slim format is `RLP([nonce, balance, storage_root, code_hash])` but with
/// empty `storage_root` and `code_hash` omitted.
fn decode_slim_account(data: &Bytes) -> Result<TrieAccount, SnapSyncError> {
// The snap protocol uses "slim" encoding where empty values are omitted.
// We need to decode the RLP and fill in defaults for missing fields.
let account = TrieAccount::decode(&mut data.as_ref())?;
Ok(account)
}
/// Increments a hash by 1. Returns `B256::ZERO` on overflow (wraps around).
fn increment_hash(hash: B256) -> B256 {
let mut bytes = hash.0;
for i in (0..32).rev() {
if bytes[i] < 0xFF {
bytes[i] += 1;
return B256::from(bytes);
}
bytes[i] = 0;
}
B256::ZERO
}
/// Generates a random request ID.
fn rand_request_id() -> u64 {
rand::random()
}
/// Maximum number of retries for a snap request before giving up.
const MAX_SNAP_RETRIES: u32 = 10;
/// Retries a snap request with exponential backoff on failure.
async fn retry_snap_request<F, Fut, T>(
mut make_request: F,
metrics: &SnapSyncMetrics,
cancel_rx: &watch::Receiver<bool>,
) -> Result<T, SnapSyncError>
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<T, reth_network_p2p::error::RequestError>>,
{
let mut attempts = 0u32;
loop {
if *cancel_rx.borrow() {
return Err(SnapSyncError::Cancelled);
}
match make_request().await {
Ok(resp) => return Ok(resp),
Err(err) => {
attempts += 1;
metrics.request_failures.increment(1);
if attempts >= MAX_SNAP_RETRIES {
return Err(SnapSyncError::Request(err));
}
let delay = std::time::Duration::from_secs(1 << attempts.min(5));
warn!(
target: "snap_sync",
%err,
attempts,
"snap request failed, retrying in {:?}",
delay
);
tokio::time::sleep(delay).await;
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloy_rlp::Encodable;
#[test]
fn test_increment_hash() {
let zero = B256::ZERO;
let one = increment_hash(zero);
assert_eq!(one.0[31], 1);
let max = B256::repeat_byte(0xFF);
let wrapped = increment_hash(max);
assert_eq!(wrapped, B256::ZERO);
let mut mid = B256::ZERO;
mid.0[31] = 0xFF;
let next = increment_hash(mid);
assert_eq!(next.0[30], 1);
assert_eq!(next.0[31], 0);
}
#[test]
fn test_decode_slim_account() {
let trie_account = TrieAccount {
nonce: 42,
balance: U256::from(1000),
storage_root: alloy_trie::EMPTY_ROOT_HASH,
code_hash: KECCAK_EMPTY,
};
let mut buf = Vec::new();
trie_account.encode(&mut buf);
let decoded = decode_slim_account(&Bytes::from(buf)).unwrap();
assert_eq!(decoded.nonce, 42);
assert_eq!(decoded.balance, U256::from(1000));
assert_eq!(decoded.storage_root, alloy_trie::EMPTY_ROOT_HASH);
assert_eq!(decoded.code_hash, KECCAK_EMPTY);
}
#[test]
fn test_decode_slim_account_empty() {
let trie_account = TrieAccount {
nonce: 0,
balance: U256::ZERO,
storage_root: alloy_trie::EMPTY_ROOT_HASH,
code_hash: KECCAK_EMPTY,
};
let mut buf = Vec::new();
trie_account.encode(&mut buf);
let decoded = decode_slim_account(&Bytes::from(buf)).unwrap();
assert_eq!(decoded.nonce, 0);
assert_eq!(decoded.balance, U256::ZERO);
assert_eq!(decoded.storage_root, alloy_trie::EMPTY_ROOT_HASH);
assert_eq!(decoded.code_hash, KECCAK_EMPTY);
}
}

View File

@@ -0,0 +1,52 @@
//! Snap sync error types.
use alloy_primitives::B256;
use reth_db_api::DatabaseError;
use reth_network_p2p::error::RequestError;
use reth_storage_errors::provider::ProviderError;
/// Errors that can occur during snap sync.
#[derive(Debug, thiserror::Error)]
pub enum SnapSyncError {
/// The computed state root does not match the pivot block's state root.
#[error("state root mismatch: expected {expected}, got {got}")]
StateRootMismatch {
/// Expected state root from pivot header.
expected: B256,
/// Computed state root after snap sync.
got: B256,
},
/// A peer returned an invalid or inconsistent account range.
#[error("invalid account range response: {0}")]
InvalidAccountRange(String),
/// A peer returned an invalid or inconsistent storage range.
#[error("invalid storage range response: {0}")]
InvalidStorageRange(String),
/// A peer returned invalid bytecodes.
#[error("invalid bytecode response: {0}")]
InvalidBytecode(String),
/// No peers available that support the snap protocol.
#[error("no snap-capable peers available")]
NoPeers,
/// Network request failed.
#[error("network request failed: {0}")]
Request(#[from] RequestError),
/// Database/provider error.
#[error("provider error: {0}")]
Provider(#[from] ProviderError),
/// Pivot block not found.
#[error("pivot block {0} not found")]
PivotNotFound(B256),
/// RLP decoding error.
#[error("rlp decode error: {0}")]
RlpDecode(#[from] alloy_rlp::Error),
/// Database error.
#[error("database error: {0}")]
Database(#[from] DatabaseError),
/// State root verification failed.
#[error("state root verification error: {0}")]
StateRootVerification(String),
/// Snap sync was cancelled.
#[error("snap sync cancelled")]
Cancelled,
}

View File

@@ -0,0 +1,26 @@
//! Snap sync protocol implementation for reth.
//!
//! Downloads state from peers via the [snap protocol](https://github.com/ethereum/devp2p/blob/master/caps/snap.md)
//! instead of executing all historical blocks. The sync proceeds in phases:
//!
//! 1. **Account download**: Fetch all account leaves via `GetAccountRange`
//! 2. **Storage download**: Fetch storage slots for accounts with non-empty storage roots
//! 3. **Bytecode download**: Fetch contract bytecodes by code hash
//! 4. **State root verification**: Build hashed state + merkle trie, verify against pivot block
//!
//! After snap sync completes, normal execution resumes from the pivot block onward.
pub mod config;
pub mod downloader;
pub mod error;
pub mod metrics;
pub mod progress;
pub mod server;
pub mod task;
pub use config::SnapSyncConfig;
pub use downloader::SnapSyncDownloader;
pub use error::SnapSyncError;
pub use progress::{SnapPhase, SnapProgress};
pub use server::{IncomingSnapRequest, SnapRequestHandler};
pub use task::run_snap_sync;

View File

@@ -0,0 +1,21 @@
//! Snap sync metrics.
use reth_metrics::{
metrics::{Counter, Gauge},
Metrics,
};
#[derive(Metrics)]
#[metrics(scope = "snap_sync")]
pub(crate) struct SnapSyncMetrics {
/// Number of accounts downloaded.
pub(crate) accounts_downloaded: Gauge,
/// Number of storage slots downloaded.
pub(crate) storage_slots_downloaded: Gauge,
/// Number of bytecodes downloaded.
pub(crate) bytecodes_downloaded: Gauge,
/// Current phase (0=idle, 1=accounts, 2=storages, 3=bytecodes, 4=verify, 5=done).
pub(crate) phase: Gauge,
/// Total peer request failures.
pub(crate) request_failures: Counter,
}

View File

@@ -0,0 +1,87 @@
//! Snap sync progress tracking.
//!
//! Tracks the current phase and cursor positions to support resumability.
use alloy_primitives::{Address, B256};
/// Current phase of snap sync.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum SnapPhase {
/// Not started yet.
#[default]
Idle,
/// Downloading account ranges.
Accounts,
/// Downloading storage slots for accounts with non-empty storage roots.
Storages,
/// Downloading contract bytecodes.
Bytecodes,
/// Building hashed state and verifying merkle root.
Verification,
/// Snap sync completed successfully.
Done,
}
/// Tracks snap sync progress for resumability.
#[derive(Debug, Clone, Default)]
pub struct SnapProgress {
/// The pivot block hash.
pub pivot_hash: B256,
/// The pivot block number.
pub pivot_number: u64,
/// The pivot block's state root.
pub state_root: B256,
/// Current sync phase.
pub phase: SnapPhase,
/// Account download cursor: next account hash to fetch.
pub account_cursor: B256,
/// Number of accounts downloaded so far.
pub accounts_downloaded: u64,
/// Storage download cursor: current account address being fetched.
pub storage_account_cursor: Option<Address>,
/// Storage slot cursor within the current account.
pub storage_slot_cursor: B256,
/// Number of storage slots downloaded so far.
pub storage_slots_downloaded: u64,
/// Number of bytecodes downloaded so far.
pub bytecodes_downloaded: u64,
/// Total number of bytecodes to download.
pub bytecodes_total: u64,
}
impl SnapProgress {
/// Creates a new progress tracker for the given pivot.
pub fn new(pivot_hash: B256, pivot_number: u64, state_root: B256) -> Self {
Self {
pivot_hash,
pivot_number,
state_root,
phase: SnapPhase::Accounts,
..Default::default()
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_progress_new() {
let hash = B256::repeat_byte(0x01);
let state_root = B256::repeat_byte(0x02);
let progress = SnapProgress::new(hash, 100, state_root);
assert_eq!(progress.pivot_hash, hash);
assert_eq!(progress.pivot_number, 100);
assert_eq!(progress.state_root, state_root);
assert_eq!(progress.phase, SnapPhase::Accounts);
assert_eq!(progress.accounts_downloaded, 0);
assert_eq!(progress.storage_slots_downloaded, 0);
assert_eq!(progress.bytecodes_downloaded, 0);
}
#[test]
fn test_phase_default() {
assert_eq!(SnapPhase::default(), SnapPhase::Idle);
}
}

View File

@@ -0,0 +1,480 @@
//! Snap sync request handler (server-side).
//!
//! Handles incoming snap protocol requests from peers, serving account ranges,
//! storage ranges, bytecodes, and trie nodes from the local database.
//!
//! Modeled after [`EthRequestHandler`](reth_network::eth_requests::EthRequestHandler).
use alloy_consensus::constants::KECCAK_EMPTY;
use alloy_primitives::Bytes;
use alloy_rlp::Encodable;
use alloy_trie::EMPTY_ROOT_HASH;
use futures::StreamExt;
use reth_db_api::{
cursor::{DbCursorRO, DbDupCursorRO},
tables,
transaction::DbTx,
};
use reth_eth_wire_types::snap::*;
use reth_network_p2p::error::RequestResult;
use reth_network_peers::PeerId;
use reth_primitives_traits::Account;
use reth_storage_api::{DBProvider, DatabaseProviderFactory};
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio::sync::oneshot;
use tracing::{debug, trace};
/// Maximum number of accounts to serve per request.
const MAX_ACCOUNTS_SERVE: usize = 1024;
/// Maximum number of storage slots to serve per account per request.
const MAX_STORAGE_SERVE: usize = 1024;
/// Maximum number of bytecodes to serve per request.
const MAX_BYTECODES_SERVE: usize = 1024;
/// Maximum response size (2MB, matching eth limit).
const SOFT_RESPONSE_LIMIT: usize = 2 * 1024 * 1024;
/// Handles incoming snap protocol requests from peers.
///
/// This is spawned as a background service and polled to process requests.
#[derive(Debug)]
#[must_use = "Handler does nothing unless polled."]
pub struct SnapRequestHandler<F> {
/// Provider factory for DB access.
provider_factory: F,
/// Incoming snap requests.
incoming_requests: tokio_stream::wrappers::ReceiverStream<IncomingSnapRequest>,
}
impl<F> SnapRequestHandler<F> {
/// Create a new instance.
pub fn new(
provider_factory: F,
incoming: tokio::sync::mpsc::Receiver<IncomingSnapRequest>,
) -> Self {
Self {
provider_factory,
incoming_requests: tokio_stream::wrappers::ReceiverStream::new(incoming),
}
}
}
impl<F> SnapRequestHandler<F>
where
F: DatabaseProviderFactory,
{
/// Handle a `GetAccountRange` request.
fn on_account_range_request(
&self,
peer_id: PeerId,
request: GetAccountRangeMessage,
response: oneshot::Sender<RequestResult<AccountRangeMessage>>,
) {
trace!(target: "net::snap", ?peer_id, ?request.starting_hash, ?request.limit_hash, "Received GetAccountRange");
let mut accounts = Vec::new();
let Ok(provider) = self.provider_factory.database_provider_ro() else {
let _ = response.send(Ok(AccountRangeMessage {
request_id: request.request_id,
accounts,
proof: vec![],
}));
return;
};
let tx = provider.tx_ref();
let Ok(mut cursor) = tx.cursor_read::<tables::HashedAccounts>() else {
let _ = response.send(Ok(AccountRangeMessage {
request_id: request.request_id,
accounts,
proof: vec![],
}));
return;
};
let limit = (request.response_bytes as usize).min(SOFT_RESPONSE_LIMIT);
let mut total_bytes = 0usize;
if let Ok(walker) = cursor.walk(Some(request.starting_hash)) {
for entry in walker {
let Ok((hash, account)) = entry else { break };
if hash > request.limit_hash {
break;
}
let slim = encode_account(&account);
total_bytes += 32 + slim.len();
accounts.push(AccountData { hash, body: slim });
if accounts.len() >= MAX_ACCOUNTS_SERVE || total_bytes >= limit {
break;
}
}
}
trace!(target: "net::snap", ?peer_id, num_accounts = accounts.len(), total_bytes, "Serving GetAccountRange");
let _ = response.send(Ok(AccountRangeMessage {
request_id: request.request_id,
accounts,
// TODO: add merkle proofs
proof: vec![],
}));
}
/// Handle a `GetStorageRanges` request.
fn on_storage_ranges_request(
&self,
peer_id: PeerId,
request: GetStorageRangesMessage,
response: oneshot::Sender<RequestResult<StorageRangesMessage>>,
) {
trace!(target: "net::snap", ?peer_id, num_accounts = request.account_hashes.len(), "Received GetStorageRanges");
let mut all_slots = Vec::new();
let Ok(provider) = self.provider_factory.database_provider_ro() else {
let _ = response.send(Ok(StorageRangesMessage {
request_id: request.request_id,
slots: all_slots,
proof: vec![],
}));
return;
};
let tx = provider.tx_ref();
let Ok(mut cursor) = tx.cursor_dup_read::<tables::HashedStorages>() else {
let _ = response.send(Ok(StorageRangesMessage {
request_id: request.request_id,
slots: all_slots,
proof: vec![],
}));
return;
};
let limit = (request.response_bytes as usize).min(SOFT_RESPONSE_LIMIT);
let mut total_bytes = 0usize;
for (i, account_hash) in request.account_hashes.iter().enumerate() {
let mut account_slots = Vec::new();
// For the first account, use the request's starting_hash.
// For subsequent accounts, start from the beginning.
let start = if i == 0 { request.starting_hash } else { Default::default() };
if let Ok(walker) = cursor.walk_dup(Some(*account_hash), Some(start)) {
for entry in walker {
let Ok((_, storage_entry)) = entry else { break };
if storage_entry.key > request.limit_hash {
break;
}
let mut value_buf = Vec::new();
storage_entry.value.encode(&mut value_buf);
total_bytes += 32 + value_buf.len();
account_slots.push(StorageData {
hash: storage_entry.key,
data: Bytes::from(value_buf),
});
if account_slots.len() >= MAX_STORAGE_SERVE || total_bytes >= limit {
break;
}
}
}
all_slots.push(account_slots);
if total_bytes >= limit {
break;
}
}
trace!(target: "net::snap", ?peer_id, num_accounts = all_slots.len(), total_bytes, "Serving GetStorageRanges");
let _ = response.send(Ok(StorageRangesMessage {
request_id: request.request_id,
slots: all_slots,
// TODO: add boundary proofs for partial ranges
proof: vec![],
}));
}
/// Handle a `GetByteCodes` request.
fn on_byte_codes_request(
&self,
peer_id: PeerId,
request: GetByteCodesMessage,
response: oneshot::Sender<RequestResult<ByteCodesMessage>>,
) {
trace!(target: "net::snap", ?peer_id, num_hashes = request.hashes.len(), "Received GetByteCodes");
let mut codes = Vec::new();
let Ok(provider) = self.provider_factory.database_provider_ro() else {
let _ = response.send(Ok(ByteCodesMessage { request_id: request.request_id, codes }));
return;
};
let tx = provider.tx_ref();
let limit = (request.response_bytes as usize).min(SOFT_RESPONSE_LIMIT);
let mut total_bytes = 0usize;
for hash in &request.hashes {
if *hash == KECCAK_EMPTY {
continue;
}
let Ok(Some(bytecode)) = tx.get::<tables::Bytecodes>(*hash) else {
continue;
};
let raw = bytecode.original_bytes();
total_bytes += raw.len();
codes.push(raw);
if codes.len() >= MAX_BYTECODES_SERVE || total_bytes >= limit {
break;
}
}
trace!(target: "net::snap", ?peer_id, num_codes = codes.len(), total_bytes, "Serving GetByteCodes");
let _ = response.send(Ok(ByteCodesMessage { request_id: request.request_id, codes }));
}
/// Handle a `GetTrieNodes` request.
fn on_trie_nodes_request(
&self,
peer_id: PeerId,
request: GetTrieNodesMessage,
response: oneshot::Sender<RequestResult<TrieNodesMessage>>,
) {
debug!(target: "net::snap", ?peer_id, num_paths = request.paths.len(), "Received GetTrieNodes (stub)");
// TODO: implement trie node lookups from AccountsTrie / StoragesTrie tables
let _ =
response.send(Ok(TrieNodesMessage { request_id: request.request_id, nodes: vec![] }));
}
}
/// Encode an [`Account`] into slim RLP format (as `TrieAccount`).
///
/// Accounts in the snap protocol are exchanged as RLP-encoded `TrieAccount`.
/// Since we don't know the true storage root when reading from `HashedAccounts`,
/// we use `EMPTY_ROOT_HASH` as a placeholder.
fn encode_account(account: &Account) -> Bytes {
let trie_account = account.into_trie_account(EMPTY_ROOT_HASH);
let mut buf = Vec::new();
trie_account.encode(&mut buf);
Bytes::from(buf)
}
/// Incoming snap request variants delegated by the network.
#[derive(Debug)]
pub enum IncomingSnapRequest {
/// Request for an account range.
GetAccountRange {
/// The peer that sent the request.
peer_id: PeerId,
/// The request payload.
request: GetAccountRangeMessage,
/// Channel to send the response.
response: oneshot::Sender<RequestResult<AccountRangeMessage>>,
},
/// Request for storage slot ranges.
GetStorageRanges {
/// The peer that sent the request.
peer_id: PeerId,
/// The request payload.
request: GetStorageRangesMessage,
/// Channel to send the response.
response: oneshot::Sender<RequestResult<StorageRangesMessage>>,
},
/// Request for contract bytecodes.
GetByteCodes {
/// The peer that sent the request.
peer_id: PeerId,
/// The request payload.
request: GetByteCodesMessage,
/// Channel to send the response.
response: oneshot::Sender<RequestResult<ByteCodesMessage>>,
},
/// Request for trie nodes.
GetTrieNodes {
/// The peer that sent the request.
peer_id: PeerId,
/// The request payload.
request: GetTrieNodesMessage,
/// Channel to send the response.
response: oneshot::Sender<RequestResult<TrieNodesMessage>>,
},
}
impl<F> Future for SnapRequestHandler<F>
where
F: DatabaseProviderFactory + Unpin,
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
loop {
match this.incoming_requests.poll_next_unpin(cx) {
Poll::Ready(Some(incoming)) => match incoming {
IncomingSnapRequest::GetAccountRange { peer_id, request, response } => {
this.on_account_range_request(peer_id, request, response);
}
IncomingSnapRequest::GetStorageRanges { peer_id, request, response } => {
this.on_storage_ranges_request(peer_id, request, response);
}
IncomingSnapRequest::GetByteCodes { peer_id, request, response } => {
this.on_byte_codes_request(peer_id, request, response);
}
IncomingSnapRequest::GetTrieNodes { peer_id, request, response } => {
this.on_trie_nodes_request(peer_id, request, response);
}
},
Poll::Ready(None) => return Poll::Ready(()),
Poll::Pending => return Poll::Pending,
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloy_primitives::{keccak256, B256, U256};
use alloy_rlp::Decodable;
use reth_db_api::transaction::DbTxMut;
use reth_provider::test_utils::create_test_provider_factory;
use reth_storage_api::{DBProvider, DatabaseProviderFactory};
use tokio::sync::mpsc;
#[test]
fn test_encode_account_roundtrip() {
let account = Account {
nonce: 10,
balance: U256::from(500),
bytecode_hash: Some(B256::repeat_byte(0xAB)),
};
let encoded = encode_account(&account);
assert!(!encoded.is_empty());
let decoded = alloy_trie::TrieAccount::decode(&mut encoded.as_ref()).unwrap();
assert_eq!(decoded.nonce, 10);
assert_eq!(decoded.balance, U256::from(500));
assert_eq!(decoded.code_hash, B256::repeat_byte(0xAB));
}
#[tokio::test]
async fn test_account_range_empty_db() {
let factory = create_test_provider_factory();
let (tx, rx) = mpsc::channel(10);
let handler = SnapRequestHandler::new(factory, rx);
let handle = tokio::spawn(handler);
let (resp_tx, resp_rx) = oneshot::channel();
tx.send(IncomingSnapRequest::GetAccountRange {
peer_id: PeerId::random(),
request: GetAccountRangeMessage {
request_id: 1,
root_hash: B256::ZERO,
starting_hash: B256::ZERO,
limit_hash: B256::repeat_byte(0xFF),
response_bytes: 512 * 1024,
},
response: resp_tx,
})
.await
.unwrap();
let result = resp_rx.await.unwrap().unwrap();
assert!(result.accounts.is_empty());
drop(tx);
handle.await.unwrap();
}
#[tokio::test]
async fn test_byte_codes_request() {
let factory = create_test_provider_factory();
// Write bytecodes into the DB
let code = Bytes::from(vec![0x60, 0x00, 0x60, 0x00, 0xFD]); // PUSH0 PUSH0 REVERT
let code_hash = keccak256(&code);
{
let provider = factory.database_provider_rw().unwrap();
let bytecode = reth_primitives_traits::Bytecode::new_raw(code.clone());
provider.tx_ref().put::<tables::Bytecodes>(code_hash, bytecode).unwrap();
provider.commit().unwrap();
}
let (tx, rx) = mpsc::channel(10);
let handler = SnapRequestHandler::new(factory, rx);
let handle = tokio::spawn(handler);
let (resp_tx, resp_rx) = oneshot::channel();
tx.send(IncomingSnapRequest::GetByteCodes {
peer_id: PeerId::random(),
request: GetByteCodesMessage {
request_id: 2,
hashes: vec![code_hash],
response_bytes: 512 * 1024,
},
response: resp_tx,
})
.await
.unwrap();
let result = resp_rx.await.unwrap().unwrap();
assert_eq!(result.codes.len(), 1);
assert_eq!(result.codes[0], code);
drop(tx);
handle.await.unwrap();
}
#[tokio::test]
async fn test_storage_ranges_empty() {
let factory = create_test_provider_factory();
let (tx, rx) = mpsc::channel(10);
let handler = SnapRequestHandler::new(factory, rx);
let handle = tokio::spawn(handler);
let (resp_tx, resp_rx) = oneshot::channel();
tx.send(IncomingSnapRequest::GetStorageRanges {
peer_id: PeerId::random(),
request: GetStorageRangesMessage {
request_id: 3,
root_hash: B256::ZERO,
account_hashes: vec![B256::repeat_byte(0x01)],
starting_hash: B256::ZERO,
limit_hash: B256::repeat_byte(0xFF),
response_bytes: 512 * 1024,
},
response: resp_tx,
})
.await
.unwrap();
let result = resp_rx.await.unwrap().unwrap();
// One entry per requested account, but the inner vec is empty since no storage exists
assert_eq!(result.slots.len(), 1);
assert!(result.slots[0].is_empty());
drop(tx);
handle.await.unwrap();
}
}

View File

@@ -0,0 +1,46 @@
//! Snap sync task for integration with the node builder.
//!
//! Provides a standalone async function that runs snap sync to completion,
//! suitable for spawning as a background task before the pipeline starts.
use crate::{downloader::SnapSyncDownloader, error::SnapSyncError};
use alloy_primitives::B256;
use reth_db_api::transaction::{DbTx, DbTxMut};
use reth_network_p2p::snap::client::SnapClient;
use reth_storage_api::{DBProvider, DatabaseProviderFactory};
use tracing::info;
/// Runs snap sync to completion, returning the pivot block number on success.
///
/// This is the main entry point for integrating snap sync with the node builder.
/// It creates a [`SnapSyncDownloader`], runs all phases (account download, storage
/// download, bytecode download, state root verification), and returns the pivot
/// block number that the pipeline should resume from.
pub async fn run_snap_sync<C, F>(
client: C,
provider_factory: F,
pivot_hash: B256,
pivot_number: u64,
state_root: B256,
) -> Result<u64, SnapSyncError>
where
C: SnapClient + Clone + Send + Sync + 'static,
F: DatabaseProviderFactory + Send + Sync + 'static,
<F as DatabaseProviderFactory>::ProviderRW: DBProvider<Tx: DbTxMut + DbTx> + Send,
{
let (_, cancel_rx) = tokio::sync::watch::channel(false);
let mut downloader = SnapSyncDownloader::new(
client,
provider_factory,
pivot_hash,
pivot_number,
state_root,
cancel_rx,
);
info!(target: "snap::sync", pivot_number, %pivot_hash, "Starting snap sync");
downloader.run().await?;
info!(target: "snap::sync", pivot_number, "Snap sync completed successfully");
Ok(pivot_number)
}

View File

@@ -54,6 +54,7 @@ reth-tokio-util.workspace = true
reth-tracing.workspace = true
reth-transaction-pool.workspace = true
reth-trie-db = { workspace = true, features = ["metrics"] }
reth-snap-sync.workspace = true
reth-basic-payload-builder.workspace = true
reth-node-ethstats.workspace = true

View File

@@ -32,7 +32,7 @@ use reth_node_core::{
use reth_node_events::node;
use reth_provider::{
providers::{BlockchainProvider, NodeTypesForProvider},
BlockNumReader, StorageSettingsCache,
BlockNumReader, HeaderProvider, StorageSettingsCache,
};
use reth_tasks::TaskExecutor;
use reth_tokio_util::EventSender;
@@ -163,6 +163,78 @@ impl EngineNodeLauncher {
// The new engine writes directly to static files. This ensures that they're up to the tip.
pipeline.move_to_static_files()?;
// Run snap sync if enabled
if node_config.debug.snap_sync {
info!(target: "reth::cli", "Snap sync mode enabled");
let (pivot_hash, pivot_header) = if let Some(tip) = node_config.debug.tip {
info!(target: "reth::cli", %tip, "Using explicit pivot from --debug.tip");
let header = match ctx.blockchain_db().header(tip) {
Ok(Some(h)) => h,
_ => {
info!(target: "reth::cli", %tip, "Pivot header not in DB, fetching from network");
let sealed = node_config
.fetch_tip_from_network(network_client.clone(), tip.into())
.await;
sealed.into_header()
}
};
(tip, header)
} else {
info!(target: "reth::cli", "Selecting snap sync pivot from last available header");
let last_number = ctx
.blockchain_db()
.last_block_number()
.map_err(|e| eyre::eyre!("failed to get last block number: {e}"))?;
if last_number == 0 {
return Err(eyre::eyre!(
"No blocks synced yet. Snap sync needs headers synced first. \
Either run header sync first or provide --debug.tip explicitly."
));
}
let sealed = ctx
.blockchain_db()
.sealed_header(last_number)
.map_err(|e| eyre::eyre!("failed to get sealed header: {e}"))?
.ok_or_else(|| eyre::eyre!("header at block {last_number} not found"))?;
let hash = sealed.hash();
info!(target: "reth::cli", last_number, %hash, "Using last available header as snap sync pivot");
(hash, sealed.into_header())
};
let pivot_number = pivot_header.number();
let state_root = pivot_header.state_root();
info!(
target: "reth::cli",
%pivot_hash,
pivot_number,
%state_root,
"Starting snap sync to pivot block"
);
let snap_result = reth_snap_sync::run_snap_sync(
network_client.clone(),
ctx.provider_factory().clone(),
pivot_hash,
pivot_number,
state_root,
)
.await;
match snap_result {
Ok(block_number) => {
info!(target: "reth::cli", block_number, "Snap sync completed successfully");
}
Err(e) => {
return Err(eyre::eyre!("Snap sync failed: {e}"));
}
}
}
let pipeline_events = pipeline.events();
let mut pruner_builder = ctx.pruner_builder();

View File

@@ -102,6 +102,11 @@ pub struct DebugArgs {
#[arg(long = "ethstats", help_heading = "Debug")]
pub ethstats: Option<String>,
/// Enable snap sync mode. Downloads state from peers via snap protocol
/// instead of executing all historical blocks.
#[arg(long = "debug.snap-sync", help_heading = "Debug")]
pub snap_sync: bool,
/// Set the node to idle state when the backfill is not running.
///
/// This makes the `eth_syncing` RPC return "Idle" when the node has just started or finished
@@ -126,6 +131,7 @@ impl Default for DebugArgs {
invalid_block_hook: Some(InvalidBlockSelection::default()),
healthy_node_rpc_url: None,
ethstats: None,
snap_sync: false,
startup_sync_state_idle: false,
}
}