mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
10 Commits
devnet4
...
georgios/s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9bd7d2e698 | ||
|
|
c682cf8d84 | ||
|
|
239adcce22 | ||
|
|
0b054ad6b2 | ||
|
|
0f7e0b583e | ||
|
|
ababdae2e2 | ||
|
|
cdc6f6beaf | ||
|
|
ee4af55457 | ||
|
|
5b9c469b83 | ||
|
|
b498a41d54 |
33
Cargo.lock
generated
33
Cargo.lock
generated
@@ -9267,6 +9267,7 @@ dependencies = [
|
||||
"reth-rpc-engine-api",
|
||||
"reth-rpc-eth-types",
|
||||
"reth-rpc-layer",
|
||||
"reth-snap-sync",
|
||||
"reth-stages",
|
||||
"reth-static-file",
|
||||
"reth-tasks",
|
||||
@@ -10127,6 +10128,38 @@ dependencies = [
|
||||
"strum",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "reth-snap-sync"
|
||||
version = "1.11.0"
|
||||
dependencies = [
|
||||
"alloy-consensus",
|
||||
"alloy-primitives",
|
||||
"alloy-rlp",
|
||||
"alloy-trie",
|
||||
"derive_more",
|
||||
"futures",
|
||||
"metrics",
|
||||
"rand 0.9.2",
|
||||
"reth-db-api",
|
||||
"reth-eth-wire-types",
|
||||
"reth-execution-errors",
|
||||
"reth-metrics",
|
||||
"reth-network-p2p",
|
||||
"reth-network-peers",
|
||||
"reth-primitives-traits",
|
||||
"reth-provider",
|
||||
"reth-storage-api",
|
||||
"reth-storage-errors",
|
||||
"reth-tasks",
|
||||
"reth-tracing",
|
||||
"reth-trie",
|
||||
"reth-trie-db",
|
||||
"thiserror 2.0.18",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "reth-stages"
|
||||
version = "1.11.0"
|
||||
|
||||
@@ -55,6 +55,7 @@ members = [
|
||||
"crates/net/discv5/",
|
||||
"crates/net/dns/",
|
||||
"crates/net/downloaders/",
|
||||
"crates/net/snap-sync/",
|
||||
"crates/net/ecies/",
|
||||
"crates/net/eth-wire-types",
|
||||
"crates/net/eth-wire/",
|
||||
@@ -343,6 +344,7 @@ reth-discv4 = { path = "crates/net/discv4" }
|
||||
reth-discv5 = { path = "crates/net/discv5" }
|
||||
reth-dns-discovery = { path = "crates/net/dns" }
|
||||
reth-downloaders = { path = "crates/net/downloaders" }
|
||||
reth-snap-sync = { path = "crates/net/snap-sync" }
|
||||
reth-e2e-test-utils = { path = "crates/e2e-test-utils" }
|
||||
reth-ecies = { path = "crates/net/ecies" }
|
||||
reth-engine-local = { path = "crates/engine/local" }
|
||||
|
||||
@@ -170,15 +170,29 @@ impl<N: NetworkPrimitives> ProtocolMessage<N> {
|
||||
}
|
||||
EthMessageID::GetBlockAccessLists => {
|
||||
if version < EthVersion::Eth71 {
|
||||
return Err(MessageError::Invalid(version, EthMessageID::GetBlockAccessLists))
|
||||
// Beyond the max ID for this version — treat as raw capability message
|
||||
// (e.g. a snap protocol message in the multiplexed ID space).
|
||||
let raw_payload = Bytes::copy_from_slice(buf);
|
||||
buf.advance(raw_payload.len());
|
||||
EthMessage::Other(RawCapabilityMessage::new(
|
||||
message_type.to_u8() as usize,
|
||||
raw_payload.into(),
|
||||
))
|
||||
} else {
|
||||
EthMessage::GetBlockAccessLists(RequestPair::decode(buf)?)
|
||||
}
|
||||
EthMessage::GetBlockAccessLists(RequestPair::decode(buf)?)
|
||||
}
|
||||
EthMessageID::BlockAccessLists => {
|
||||
if version < EthVersion::Eth71 {
|
||||
return Err(MessageError::Invalid(version, EthMessageID::BlockAccessLists))
|
||||
let raw_payload = Bytes::copy_from_slice(buf);
|
||||
buf.advance(raw_payload.len());
|
||||
EthMessage::Other(RawCapabilityMessage::new(
|
||||
message_type.to_u8() as usize,
|
||||
raw_payload.into(),
|
||||
))
|
||||
} else {
|
||||
EthMessage::BlockAccessLists(RequestPair::decode(buf)?)
|
||||
}
|
||||
EthMessage::BlockAccessLists(RequestPair::decode(buf)?)
|
||||
}
|
||||
EthMessageID::Other(_) => {
|
||||
let raw_payload = Bytes::copy_from_slice(buf);
|
||||
@@ -829,6 +843,9 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_bal_message_version_gating() {
|
||||
// On versions < Eth71, GetBlockAccessLists and BlockAccessLists IDs are treated as
|
||||
// raw capability messages (Other) since they fall beyond the eth range and may
|
||||
// belong to another sub-protocol (e.g. snap).
|
||||
let get_block_access_lists =
|
||||
EthMessage::<EthNetworkPrimitives>::GetBlockAccessLists(RequestPair {
|
||||
request_id: 1337,
|
||||
@@ -842,10 +859,7 @@ mod tests {
|
||||
EthVersion::Eth70,
|
||||
&mut &buf[..],
|
||||
);
|
||||
assert!(matches!(
|
||||
msg,
|
||||
Err(MessageError::Invalid(EthVersion::Eth70, EthMessageID::GetBlockAccessLists))
|
||||
));
|
||||
assert!(matches!(msg, Ok(ProtocolMessage { message: EthMessage::Other(_), .. })));
|
||||
|
||||
let block_access_lists =
|
||||
EthMessage::<EthNetworkPrimitives>::BlockAccessLists(RequestPair {
|
||||
@@ -860,10 +874,7 @@ mod tests {
|
||||
EthVersion::Eth70,
|
||||
&mut &buf[..],
|
||||
);
|
||||
assert!(matches!(
|
||||
msg,
|
||||
Err(MessageError::Invalid(EthVersion::Eth70, EthMessageID::BlockAccessLists))
|
||||
));
|
||||
assert!(matches!(msg, Ok(ProtocolMessage { message: EthMessage::Other(_), .. })));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -205,7 +205,10 @@ impl HelloMessageBuilder {
|
||||
protocol_version: protocol_version.unwrap_or_default(),
|
||||
client_version: client_version.unwrap_or_else(|| RETH_CLIENT_VERSION.to_string()),
|
||||
protocols: protocols.unwrap_or_else(|| {
|
||||
EthVersion::ALL_VERSIONS.iter().copied().map(Into::into).collect()
|
||||
let mut protos: Vec<Protocol> =
|
||||
EthVersion::ALL_VERSIONS.iter().copied().map(Into::into).collect();
|
||||
protos.push(Protocol::snap_1());
|
||||
protos
|
||||
}),
|
||||
port: port.unwrap_or(DEFAULT_TCP_PORT),
|
||||
id,
|
||||
|
||||
@@ -610,12 +610,20 @@ where
|
||||
let _ = this.primary.to_primary.send(msg);
|
||||
} else {
|
||||
// delegate to installed satellite if any
|
||||
let mut handled = false;
|
||||
for proto in &this.inner.protocols {
|
||||
if proto.shared_cap == *cap {
|
||||
proto.send_raw(msg);
|
||||
proto.send_raw(msg.clone());
|
||||
handled = true;
|
||||
break
|
||||
}
|
||||
}
|
||||
if !handled {
|
||||
// No satellite handler for this capability (e.g. snap/1
|
||||
// handled inline). Route to primary so the session can
|
||||
// handle it via RawCapabilityMessage.
|
||||
let _ = this.primary.to_primary.send(msg);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return Poll::Ready(Some(Err(P2PStreamError::UnknownReservedMessageId(
|
||||
|
||||
@@ -45,6 +45,13 @@ impl Protocol {
|
||||
Self::eth(EthVersion::Eth68)
|
||||
}
|
||||
|
||||
/// Returns the `snap/1` capability.
|
||||
///
|
||||
/// The snap protocol defines 8 message types (0x00..0x07).
|
||||
pub const fn snap_1() -> Self {
|
||||
Self::new(Capability::new_static("snap", 1), 8)
|
||||
}
|
||||
|
||||
/// Consumes the type and returns a tuple of the [Capability] and number of messages.
|
||||
#[inline]
|
||||
pub(crate) fn split(self) -> (Capability, u8) {
|
||||
|
||||
@@ -1,14 +1,20 @@
|
||||
//! API related to listening for network events.
|
||||
|
||||
use reth_eth_wire_types::{
|
||||
message::RequestPair, BlockAccessLists, BlockBodies, BlockHeaders, Capabilities,
|
||||
DisconnectReason, EthMessage, EthNetworkPrimitives, EthVersion, GetBlockAccessLists,
|
||||
GetBlockBodies, GetBlockHeaders, GetNodeData, GetPooledTransactions, GetReceipts,
|
||||
GetReceipts70, NetworkPrimitives, NodeData, PooledTransactions, Receipts, Receipts69,
|
||||
Receipts70, UnifiedStatus,
|
||||
message::RequestPair,
|
||||
snap::{
|
||||
GetAccountRangeMessage, GetByteCodesMessage, GetStorageRangesMessage, GetTrieNodesMessage,
|
||||
},
|
||||
BlockAccessLists, BlockBodies, BlockHeaders, Capabilities, DisconnectReason, EthMessage,
|
||||
EthNetworkPrimitives, EthVersion, GetBlockAccessLists, GetBlockBodies, GetBlockHeaders,
|
||||
GetNodeData, GetPooledTransactions, GetReceipts, GetReceipts70, NetworkPrimitives, NodeData,
|
||||
PooledTransactions, Receipts, Receipts69, Receipts70, UnifiedStatus,
|
||||
};
|
||||
use reth_ethereum_forks::ForkId;
|
||||
use reth_network_p2p::error::{RequestError, RequestResult};
|
||||
use reth_network_p2p::{
|
||||
error::{RequestError, RequestResult},
|
||||
snap::client::SnapResponse,
|
||||
};
|
||||
use reth_network_peers::{NodeRecord, PeerId};
|
||||
use reth_network_types::{PeerAddr, PeerKind};
|
||||
use reth_tokio_util::EventStream;
|
||||
@@ -262,6 +268,42 @@ pub enum PeerRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
|
||||
/// The channel to send the response for block access lists.
|
||||
response: oneshot::Sender<RequestResult<BlockAccessLists>>,
|
||||
},
|
||||
/// Requests an account range from the peer (snap protocol).
|
||||
///
|
||||
/// The response should be sent through the channel.
|
||||
GetAccountRange {
|
||||
/// The request for an account range.
|
||||
request: GetAccountRangeMessage,
|
||||
/// The channel to send the response for the account range.
|
||||
response: oneshot::Sender<RequestResult<SnapResponse>>,
|
||||
},
|
||||
/// Requests storage ranges from the peer (snap protocol).
|
||||
///
|
||||
/// The response should be sent through the channel.
|
||||
GetStorageRanges {
|
||||
/// The request for storage ranges.
|
||||
request: GetStorageRangesMessage,
|
||||
/// The channel to send the response for storage ranges.
|
||||
response: oneshot::Sender<RequestResult<SnapResponse>>,
|
||||
},
|
||||
/// Requests bytecodes from the peer (snap protocol).
|
||||
///
|
||||
/// The response should be sent through the channel.
|
||||
GetByteCodes {
|
||||
/// The request for bytecodes.
|
||||
request: GetByteCodesMessage,
|
||||
/// The channel to send the response for bytecodes.
|
||||
response: oneshot::Sender<RequestResult<SnapResponse>>,
|
||||
},
|
||||
/// Requests trie nodes from the peer (snap protocol).
|
||||
///
|
||||
/// The response should be sent through the channel.
|
||||
GetTrieNodes {
|
||||
/// The request for trie nodes.
|
||||
request: GetTrieNodesMessage,
|
||||
/// The channel to send the response for trie nodes.
|
||||
response: oneshot::Sender<RequestResult<SnapResponse>>,
|
||||
},
|
||||
}
|
||||
|
||||
// === impl PeerRequest ===
|
||||
@@ -283,6 +325,10 @@ impl<N: NetworkPrimitives> PeerRequest<N> {
|
||||
Self::GetReceipts69 { response, .. } => response.send(Err(err)).ok(),
|
||||
Self::GetReceipts70 { response, .. } => response.send(Err(err)).ok(),
|
||||
Self::GetBlockAccessLists { response, .. } => response.send(Err(err)).ok(),
|
||||
Self::GetAccountRange { response, .. } |
|
||||
Self::GetStorageRanges { response, .. } |
|
||||
Self::GetByteCodes { response, .. } |
|
||||
Self::GetTrieNodes { response, .. } => response.send(Err(err)).ok(),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -295,7 +341,24 @@ impl<N: NetworkPrimitives> PeerRequest<N> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the [`EthMessage`] for this type
|
||||
/// Returns `true` if this is a snap protocol request.
|
||||
pub const fn is_snap_request(&self) -> bool {
|
||||
matches!(
|
||||
self,
|
||||
Self::GetAccountRange { .. } |
|
||||
Self::GetStorageRanges { .. } |
|
||||
Self::GetByteCodes { .. } |
|
||||
Self::GetTrieNodes { .. }
|
||||
)
|
||||
}
|
||||
|
||||
/// Returns the [`EthMessage`] for this type.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if called on a snap protocol request variant. Use [`Self::is_snap_request`] to
|
||||
/// check before calling this method. Snap requests are handled separately in the session
|
||||
/// layer.
|
||||
pub fn create_request_message(&self, request_id: u64) -> EthMessage<N> {
|
||||
match self {
|
||||
Self::GetBlockHeaders { request, .. } => {
|
||||
@@ -325,6 +388,12 @@ impl<N: NetworkPrimitives> PeerRequest<N> {
|
||||
message: request.clone(),
|
||||
})
|
||||
}
|
||||
Self::GetAccountRange { .. } |
|
||||
Self::GetStorageRanges { .. } |
|
||||
Self::GetByteCodes { .. } |
|
||||
Self::GetTrieNodes { .. } => {
|
||||
panic!("snap protocol requests cannot be converted to EthMessage, handle them separately via is_snap_request()")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -38,7 +38,7 @@ use reth_eth_wire_types::{
|
||||
capability::Capabilities, Capability, DisconnectReason, EthVersion, NetworkPrimitives,
|
||||
UnifiedStatus,
|
||||
};
|
||||
use reth_network_p2p::sync::NetworkSyncUpdater;
|
||||
use reth_network_p2p::{snap::client::SnapClient, sync::NetworkSyncUpdater};
|
||||
use reth_network_peers::NodeRecord;
|
||||
use std::{future::Future, net::SocketAddr, sync::Arc, time::Instant};
|
||||
|
||||
@@ -48,7 +48,7 @@ pub type PeerId = alloy_primitives::B512;
|
||||
/// Helper trait that unifies network API needed to launch node.
|
||||
pub trait FullNetwork:
|
||||
BlockDownloaderProvider<
|
||||
Client: BlockClient<Block = <Self::Primitives as NetworkPrimitives>::Block>,
|
||||
Client: BlockClient<Block = <Self::Primitives as NetworkPrimitives>::Block> + SnapClient,
|
||||
> + NetworkSyncUpdater
|
||||
+ NetworkInfo
|
||||
+ NetworkEventListenerProvider
|
||||
@@ -62,7 +62,8 @@ pub trait FullNetwork:
|
||||
|
||||
impl<T> FullNetwork for T where
|
||||
T: BlockDownloaderProvider<
|
||||
Client: BlockClient<Block = <Self::Primitives as NetworkPrimitives>::Block>,
|
||||
Client: BlockClient<Block = <Self::Primitives as NetworkPrimitives>::Block>
|
||||
+ SnapClient,
|
||||
> + NetworkSyncUpdater
|
||||
+ NetworkInfo
|
||||
+ NetworkEventListenerProvider
|
||||
|
||||
@@ -4,6 +4,9 @@ use crate::{fetch::DownloadRequest, flattened_response::FlattenedResponse};
|
||||
use alloy_primitives::B256;
|
||||
use futures::{future, future::Either};
|
||||
use reth_eth_wire::{EthNetworkPrimitives, NetworkPrimitives};
|
||||
use reth_eth_wire_types::snap::{
|
||||
GetAccountRangeMessage, GetByteCodesMessage, GetStorageRangesMessage, GetTrieNodesMessage,
|
||||
};
|
||||
use reth_network_api::test_utils::PeersHandle;
|
||||
use reth_network_p2p::{
|
||||
bodies::client::{BodiesClient, BodiesFut},
|
||||
@@ -11,6 +14,7 @@ use reth_network_p2p::{
|
||||
error::{PeerRequestResult, RequestError},
|
||||
headers::client::{HeadersClient, HeadersRequest},
|
||||
priority::Priority,
|
||||
snap::client::{SnapClient, SnapResponse},
|
||||
BlockClient,
|
||||
};
|
||||
use reth_network_peers::PeerId;
|
||||
@@ -105,3 +109,92 @@ impl<N: NetworkPrimitives> BodiesClient for FetchClient<N> {
|
||||
impl<N: NetworkPrimitives> BlockClient for FetchClient<N> {
|
||||
type Block = N::Block;
|
||||
}
|
||||
|
||||
type SnapClientFuture = Either<
|
||||
FlattenedResponse<PeerRequestResult<SnapResponse>>,
|
||||
future::Ready<PeerRequestResult<SnapResponse>>,
|
||||
>;
|
||||
|
||||
impl<N: NetworkPrimitives> SnapClient for FetchClient<N> {
|
||||
type Output = SnapClientFuture;
|
||||
|
||||
fn get_account_range_with_priority(
|
||||
&self,
|
||||
request: GetAccountRangeMessage,
|
||||
priority: Priority,
|
||||
) -> Self::Output {
|
||||
let (response, rx) = oneshot::channel();
|
||||
if self
|
||||
.request_tx
|
||||
.send(DownloadRequest::GetAccountRange { request, response, priority })
|
||||
.is_ok()
|
||||
{
|
||||
Either::Left(FlattenedResponse::from(rx))
|
||||
} else {
|
||||
Either::Right(future::err(RequestError::ChannelClosed))
|
||||
}
|
||||
}
|
||||
|
||||
fn get_storage_ranges(&self, request: GetStorageRangesMessage) -> Self::Output {
|
||||
self.get_storage_ranges_with_priority(request, Priority::Normal)
|
||||
}
|
||||
|
||||
fn get_storage_ranges_with_priority(
|
||||
&self,
|
||||
request: GetStorageRangesMessage,
|
||||
priority: Priority,
|
||||
) -> Self::Output {
|
||||
let (response, rx) = oneshot::channel();
|
||||
if self
|
||||
.request_tx
|
||||
.send(DownloadRequest::GetStorageRanges { request, response, priority })
|
||||
.is_ok()
|
||||
{
|
||||
Either::Left(FlattenedResponse::from(rx))
|
||||
} else {
|
||||
Either::Right(future::err(RequestError::ChannelClosed))
|
||||
}
|
||||
}
|
||||
|
||||
fn get_byte_codes(&self, request: GetByteCodesMessage) -> Self::Output {
|
||||
self.get_byte_codes_with_priority(request, Priority::Normal)
|
||||
}
|
||||
|
||||
fn get_byte_codes_with_priority(
|
||||
&self,
|
||||
request: GetByteCodesMessage,
|
||||
priority: Priority,
|
||||
) -> Self::Output {
|
||||
let (response, rx) = oneshot::channel();
|
||||
if self
|
||||
.request_tx
|
||||
.send(DownloadRequest::GetByteCodes { request, response, priority })
|
||||
.is_ok()
|
||||
{
|
||||
Either::Left(FlattenedResponse::from(rx))
|
||||
} else {
|
||||
Either::Right(future::err(RequestError::ChannelClosed))
|
||||
}
|
||||
}
|
||||
|
||||
fn get_trie_nodes(&self, request: GetTrieNodesMessage) -> Self::Output {
|
||||
self.get_trie_nodes_with_priority(request, Priority::Normal)
|
||||
}
|
||||
|
||||
fn get_trie_nodes_with_priority(
|
||||
&self,
|
||||
request: GetTrieNodesMessage,
|
||||
priority: Priority,
|
||||
) -> Self::Output {
|
||||
let (response, rx) = oneshot::channel();
|
||||
if self
|
||||
.request_tx
|
||||
.send(DownloadRequest::GetTrieNodes { request, response, priority })
|
||||
.is_ok()
|
||||
{
|
||||
Either::Left(FlattenedResponse::from(rx))
|
||||
} else {
|
||||
Either::Right(future::err(RequestError::ChannelClosed))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,17 +10,22 @@ use futures::StreamExt;
|
||||
use reth_eth_wire::{
|
||||
Capabilities, EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, NetworkPrimitives,
|
||||
};
|
||||
use reth_network_api::test_utils::PeersHandle;
|
||||
use reth_eth_wire_types::snap::{
|
||||
GetAccountRangeMessage, GetByteCodesMessage, GetStorageRangesMessage, GetTrieNodesMessage,
|
||||
};
|
||||
use reth_network_api::{test_utils::PeersHandle, PeerRequest};
|
||||
use reth_network_p2p::{
|
||||
error::{EthResponseValidator, PeerRequestResult, RequestError, RequestResult},
|
||||
headers::client::HeadersRequest,
|
||||
priority::Priority,
|
||||
snap::client::SnapResponse,
|
||||
};
|
||||
use reth_network_peers::PeerId;
|
||||
use reth_network_types::ReputationChangeKind;
|
||||
use std::{
|
||||
collections::{HashMap, VecDeque},
|
||||
ops::RangeInclusive,
|
||||
pin::Pin,
|
||||
sync::{
|
||||
atomic::{AtomicU64, AtomicUsize, Ordering},
|
||||
Arc,
|
||||
@@ -33,6 +38,17 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
type InflightHeadersRequest<H> = Request<HeadersRequest, PeerRequestResult<Vec<H>>>;
|
||||
type InflightBodiesRequest<B> = Request<(), PeerRequestResult<Vec<B>>>;
|
||||
|
||||
/// Tracks an inflight snap request, bridging the session's response back to the download caller.
|
||||
#[derive(Debug)]
|
||||
struct InflightSnapRequest {
|
||||
/// The peer that's handling this request
|
||||
peer_id: PeerId,
|
||||
/// The channel to send the final response (with peer id) to the download caller
|
||||
response: oneshot::Sender<PeerRequestResult<SnapResponse>>,
|
||||
/// The receiver for the session's response
|
||||
rx: oneshot::Receiver<RequestResult<SnapResponse>>,
|
||||
}
|
||||
|
||||
/// Manages data fetching operations.
|
||||
///
|
||||
/// This type is hooked into the staged sync pipeline and delegates download request to available
|
||||
@@ -45,6 +61,8 @@ pub struct StateFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
|
||||
inflight_headers_requests: HashMap<PeerId, InflightHeadersRequest<N::BlockHeader>>,
|
||||
/// Currently active [`GetBlockBodies`] requests
|
||||
inflight_bodies_requests: HashMap<PeerId, InflightBodiesRequest<N::BlockBody>>,
|
||||
/// Currently active snap protocol requests
|
||||
inflight_snap_requests: Vec<InflightSnapRequest>,
|
||||
/// The list of _available_ peers for requests.
|
||||
peers: HashMap<PeerId, Peer>,
|
||||
/// The handle to the peers manager
|
||||
@@ -67,6 +85,7 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
|
||||
Self {
|
||||
inflight_headers_requests: Default::default(),
|
||||
inflight_bodies_requests: Default::default(),
|
||||
inflight_snap_requests: Vec::new(),
|
||||
peers: Default::default(),
|
||||
peers_handle,
|
||||
num_active_peers,
|
||||
@@ -114,6 +133,16 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
|
||||
if let Some(req) = self.inflight_bodies_requests.remove(peer) {
|
||||
let _ = req.response.send(Err(RequestError::ConnectionDropped));
|
||||
}
|
||||
// Cancel inflight snap requests for this peer
|
||||
let mut i = 0;
|
||||
while i < self.inflight_snap_requests.len() {
|
||||
if &self.inflight_snap_requests[i].peer_id == peer {
|
||||
let req = self.inflight_snap_requests.swap_remove(i);
|
||||
let _ = req.response.send(Err(RequestError::ConnectionDropped));
|
||||
} else {
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Updates the block information for the peer.
|
||||
@@ -171,7 +200,7 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
|
||||
}
|
||||
|
||||
/// Returns the next action to return
|
||||
fn poll_action(&mut self) -> PollAction {
|
||||
fn poll_action(&mut self) -> PollAction<N> {
|
||||
// we only check and not pop here since we don't know yet whether a peer is available.
|
||||
if self.queued_requests.is_empty() {
|
||||
return PollAction::NoRequests
|
||||
@@ -184,13 +213,39 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
|
||||
return PollAction::NoPeersAvailable
|
||||
};
|
||||
|
||||
// Snap requests bypass the block request path and are dispatched directly
|
||||
if request.is_snap_request() {
|
||||
let snap_request = self.prepare_snap_request(peer_id, request);
|
||||
return PollAction::Ready(FetchAction::SnapRequest { peer_id, request: snap_request })
|
||||
}
|
||||
|
||||
let request = self.prepare_block_request(peer_id, request);
|
||||
|
||||
PollAction::Ready(FetchAction::BlockRequest { peer_id, request })
|
||||
}
|
||||
|
||||
/// Advance the state the syncer
|
||||
pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<FetchAction> {
|
||||
pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<FetchAction<N>> {
|
||||
// Poll inflight snap requests and forward responses
|
||||
let mut i = 0;
|
||||
while i < self.inflight_snap_requests.len() {
|
||||
match Pin::new(&mut self.inflight_snap_requests[i].rx).poll(cx) {
|
||||
Poll::Ready(result) => {
|
||||
let req = self.inflight_snap_requests.swap_remove(i);
|
||||
let resp = match result {
|
||||
Ok(Ok(snap_resp)) => Ok((req.peer_id, snap_resp).into()),
|
||||
Ok(Err(err)) => Err(err),
|
||||
Err(_) => Err(RequestError::ChannelClosed),
|
||||
};
|
||||
let _ = req.response.send(resp);
|
||||
self.on_snap_response(req.peer_id);
|
||||
}
|
||||
Poll::Pending => {
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// drain buffered actions first
|
||||
loop {
|
||||
let no_peers_available = match self.poll_action() {
|
||||
@@ -256,6 +311,58 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
|
||||
self.inflight_bodies_requests.insert(peer_id, inflight);
|
||||
BlockRequest::GetBlockBodies(GetBlockBodies(request))
|
||||
}
|
||||
DownloadRequest::GetAccountRange { .. } |
|
||||
DownloadRequest::GetStorageRanges { .. } |
|
||||
DownloadRequest::GetByteCodes { .. } |
|
||||
DownloadRequest::GetTrieNodes { .. } => {
|
||||
unreachable!("snap requests are handled via prepare_snap_request")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles a new snap request to a peer.
|
||||
///
|
||||
/// Converts the download request into a [`PeerRequest`] for dispatch to the peer session.
|
||||
/// The `DownloadRequest`'s response channel is stored as an inflight snap request so the
|
||||
/// response can be forwarded back (with the peer id) once the session replies.
|
||||
///
|
||||
/// Caution: this assumes the peer exists and is idle
|
||||
fn prepare_snap_request(&mut self, peer_id: PeerId, req: DownloadRequest<N>) -> PeerRequest<N> {
|
||||
if let Some(peer) = self.peers.get_mut(&peer_id) {
|
||||
peer.state = req.peer_state();
|
||||
}
|
||||
|
||||
match req {
|
||||
DownloadRequest::GetAccountRange { request, response, .. } => {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.inflight_snap_requests.push(InflightSnapRequest { peer_id, response, rx });
|
||||
PeerRequest::GetAccountRange { request, response: tx }
|
||||
}
|
||||
DownloadRequest::GetStorageRanges { request, response, .. } => {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.inflight_snap_requests.push(InflightSnapRequest { peer_id, response, rx });
|
||||
PeerRequest::GetStorageRanges { request, response: tx }
|
||||
}
|
||||
DownloadRequest::GetByteCodes { request, response, .. } => {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.inflight_snap_requests.push(InflightSnapRequest { peer_id, response, rx });
|
||||
PeerRequest::GetByteCodes { request, response: tx }
|
||||
}
|
||||
DownloadRequest::GetTrieNodes { request, response, .. } => {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.inflight_snap_requests.push(InflightSnapRequest { peer_id, response, rx });
|
||||
PeerRequest::GetTrieNodes { request, response: tx }
|
||||
}
|
||||
_ => unreachable!("only snap requests should be passed to prepare_snap_request"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Called when a snap response is received.
|
||||
///
|
||||
/// Marks the peer as idle so it can accept new requests.
|
||||
fn on_snap_response(&mut self, peer_id: PeerId) {
|
||||
if let Some(peer) = self.peers.get_mut(&peer_id) {
|
||||
peer.state.on_request_finished();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -341,8 +448,8 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
|
||||
}
|
||||
|
||||
/// The outcome of [`StateFetcher::poll_action`]
|
||||
enum PollAction {
|
||||
Ready(FetchAction),
|
||||
enum PollAction<N: NetworkPrimitives = EthNetworkPrimitives> {
|
||||
Ready(FetchAction<N>),
|
||||
NoRequests,
|
||||
NoPeersAvailable,
|
||||
}
|
||||
@@ -453,6 +560,8 @@ enum PeerState {
|
||||
GetBlockHeaders,
|
||||
/// Peer is handling a `GetBlockBodies` request.
|
||||
GetBlockBodies,
|
||||
/// Peer is handling a snap protocol request.
|
||||
SnapRequest,
|
||||
/// Peer session is about to close
|
||||
Closing,
|
||||
}
|
||||
@@ -491,6 +600,7 @@ struct Request<Req, Resp> {
|
||||
|
||||
/// Requests that can be sent to the Syncer from a [`FetchClient`]
|
||||
#[derive(Debug)]
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
pub(crate) enum DownloadRequest<N: NetworkPrimitives> {
|
||||
/// Download the requested headers and send response through channel
|
||||
GetBlockHeaders {
|
||||
@@ -505,6 +615,30 @@ pub(crate) enum DownloadRequest<N: NetworkPrimitives> {
|
||||
priority: Priority,
|
||||
range_hint: Option<RangeInclusive<u64>>,
|
||||
},
|
||||
/// Request an account range via snap protocol
|
||||
GetAccountRange {
|
||||
request: GetAccountRangeMessage,
|
||||
response: oneshot::Sender<PeerRequestResult<SnapResponse>>,
|
||||
priority: Priority,
|
||||
},
|
||||
/// Request storage ranges via snap protocol
|
||||
GetStorageRanges {
|
||||
request: GetStorageRangesMessage,
|
||||
response: oneshot::Sender<PeerRequestResult<SnapResponse>>,
|
||||
priority: Priority,
|
||||
},
|
||||
/// Request bytecodes via snap protocol
|
||||
GetByteCodes {
|
||||
request: GetByteCodesMessage,
|
||||
response: oneshot::Sender<PeerRequestResult<SnapResponse>>,
|
||||
priority: Priority,
|
||||
},
|
||||
/// Request trie nodes via snap protocol
|
||||
GetTrieNodes {
|
||||
request: GetTrieNodesMessage,
|
||||
response: oneshot::Sender<PeerRequestResult<SnapResponse>>,
|
||||
priority: Priority,
|
||||
},
|
||||
}
|
||||
|
||||
// === impl DownloadRequest ===
|
||||
@@ -515,15 +649,22 @@ impl<N: NetworkPrimitives> DownloadRequest<N> {
|
||||
match self {
|
||||
Self::GetBlockHeaders { .. } => PeerState::GetBlockHeaders,
|
||||
Self::GetBlockBodies { .. } => PeerState::GetBlockBodies,
|
||||
Self::GetAccountRange { .. } |
|
||||
Self::GetStorageRanges { .. } |
|
||||
Self::GetByteCodes { .. } |
|
||||
Self::GetTrieNodes { .. } => PeerState::SnapRequest,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the requested priority of this request
|
||||
const fn get_priority(&self) -> &Priority {
|
||||
match self {
|
||||
Self::GetBlockHeaders { priority, .. } | Self::GetBlockBodies { priority, .. } => {
|
||||
priority
|
||||
}
|
||||
Self::GetBlockHeaders { priority, .. } |
|
||||
Self::GetBlockBodies { priority, .. } |
|
||||
Self::GetAccountRange { priority, .. } |
|
||||
Self::GetStorageRanges { priority, .. } |
|
||||
Self::GetByteCodes { priority, .. } |
|
||||
Self::GetTrieNodes { priority, .. } => priority,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -532,10 +673,25 @@ impl<N: NetworkPrimitives> DownloadRequest<N> {
|
||||
self.get_priority().is_normal()
|
||||
}
|
||||
|
||||
/// Returns `true` if this is a snap protocol request.
|
||||
const fn is_snap_request(&self) -> bool {
|
||||
matches!(
|
||||
self,
|
||||
Self::GetAccountRange { .. } |
|
||||
Self::GetStorageRanges { .. } |
|
||||
Self::GetByteCodes { .. } |
|
||||
Self::GetTrieNodes { .. }
|
||||
)
|
||||
}
|
||||
|
||||
/// Returns the best peer requirements for this request.
|
||||
fn best_peer_requirements(&self) -> BestPeerRequirements {
|
||||
match self {
|
||||
Self::GetBlockHeaders { .. } => BestPeerRequirements::None,
|
||||
Self::GetBlockHeaders { .. } |
|
||||
Self::GetAccountRange { .. } |
|
||||
Self::GetStorageRanges { .. } |
|
||||
Self::GetByteCodes { .. } |
|
||||
Self::GetTrieNodes { .. } => BestPeerRequirements::None,
|
||||
Self::GetBlockBodies { range_hint, .. } => {
|
||||
if let Some(range) = range_hint {
|
||||
BestPeerRequirements::FullBlockRange(range.clone())
|
||||
@@ -548,7 +704,7 @@ impl<N: NetworkPrimitives> DownloadRequest<N> {
|
||||
}
|
||||
|
||||
/// An action the syncer can emit.
|
||||
pub(crate) enum FetchAction {
|
||||
pub(crate) enum FetchAction<N: NetworkPrimitives = EthNetworkPrimitives> {
|
||||
/// Dispatch an eth request to the given peer.
|
||||
BlockRequest {
|
||||
/// The targeted recipient for the request
|
||||
@@ -556,6 +712,13 @@ pub(crate) enum FetchAction {
|
||||
/// The request to send
|
||||
request: BlockRequest,
|
||||
},
|
||||
/// Dispatch a snap protocol request to the given peer.
|
||||
SnapRequest {
|
||||
/// The targeted recipient for the request
|
||||
peer_id: PeerId,
|
||||
/// The snap request to send
|
||||
request: PeerRequest<N>,
|
||||
},
|
||||
}
|
||||
|
||||
/// Outcome of a processed response.
|
||||
|
||||
@@ -565,6 +565,13 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
|
||||
response,
|
||||
});
|
||||
}
|
||||
PeerRequest::GetAccountRange { .. } |
|
||||
PeerRequest::GetStorageRanges { .. } |
|
||||
PeerRequest::GetByteCodes { .. } |
|
||||
PeerRequest::GetTrieNodes { .. } => {
|
||||
// Snap protocol requests from peers are not handled here.
|
||||
// They are handled in the session layer directly.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -316,10 +316,103 @@ impl<N: NetworkPrimitives> ActiveSession<N> {
|
||||
|
||||
OnIncomingMessageOutcome::Ok
|
||||
}
|
||||
EthMessage::Other(bytes) => self.try_emit_broadcast(PeerMessage::Other(bytes)).into(),
|
||||
EthMessage::Other(raw) => {
|
||||
if let Some(outcome) = self.try_handle_snap_response(&raw) {
|
||||
outcome
|
||||
} else {
|
||||
self.try_emit_broadcast(PeerMessage::Other(raw)).into()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempts to decode a raw capability message as a snap protocol response and resolve the
|
||||
/// matching inflight request.
|
||||
///
|
||||
/// Returns `Some` if the message ID falls in the snap range (even if decoding or matching
|
||||
/// fails), `None` if the message is not a snap message.
|
||||
fn try_handle_snap_response(
|
||||
&mut self,
|
||||
raw: &RawCapabilityMessage,
|
||||
) -> Option<OnIncomingMessageOutcome<N>> {
|
||||
use reth_eth_wire_types::{snap::SnapProtocolMessage, EthMessageID};
|
||||
use reth_network_p2p::snap::client::SnapResponse;
|
||||
|
||||
let eth_offset = EthMessageID::message_count(self.conn.version()) as usize;
|
||||
let snap_count = 8; // snap/1 has 8 message types (0x00..0x07)
|
||||
|
||||
// Check if the raw message ID falls in the snap range
|
||||
if raw.id < eth_offset || raw.id >= eth_offset + snap_count {
|
||||
return None;
|
||||
}
|
||||
|
||||
let snap_id = (raw.id - eth_offset) as u8;
|
||||
|
||||
// Only handle response messages (odd IDs: AccountRange=1, StorageRanges=3, ByteCodes=5,
|
||||
// TrieNodes=7)
|
||||
if snap_id.is_multiple_of(2) {
|
||||
// This is a snap *request* from the remote peer, not a response.
|
||||
// For now, we don't handle incoming snap requests — let it pass through.
|
||||
return None;
|
||||
}
|
||||
|
||||
let mut buf = raw.payload.as_ref();
|
||||
let snap_msg = match SnapProtocolMessage::decode(snap_id, &mut buf) {
|
||||
Ok(msg) => msg,
|
||||
Err(err) => {
|
||||
debug!(target: "net::session", %err, ?snap_id, remote_peer_id=?self.remote_peer_id, "failed to decode snap response");
|
||||
self.on_bad_message();
|
||||
return Some(OnIncomingMessageOutcome::Ok);
|
||||
}
|
||||
};
|
||||
|
||||
// Extract request_id and build the SnapResponse
|
||||
let (request_id, expected_variant, snap_response) = match snap_msg {
|
||||
SnapProtocolMessage::AccountRange(msg) => {
|
||||
(msg.request_id, "GetAccountRange", SnapResponse::AccountRange(msg))
|
||||
}
|
||||
SnapProtocolMessage::StorageRanges(msg) => {
|
||||
(msg.request_id, "GetStorageRanges", SnapResponse::StorageRanges(msg))
|
||||
}
|
||||
SnapProtocolMessage::ByteCodes(msg) => {
|
||||
(msg.request_id, "GetByteCodes", SnapResponse::ByteCodes(msg))
|
||||
}
|
||||
SnapProtocolMessage::TrieNodes(msg) => {
|
||||
(msg.request_id, "GetTrieNodes", SnapResponse::TrieNodes(msg))
|
||||
}
|
||||
_ => {
|
||||
// Not a response message — shouldn't happen given the odd-ID check above
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(req) = self.inflight_requests.remove(&request_id) {
|
||||
match req.request {
|
||||
RequestState::Waiting(
|
||||
PeerRequest::GetAccountRange { response, .. } |
|
||||
PeerRequest::GetStorageRanges { response, .. } |
|
||||
PeerRequest::GetByteCodes { response, .. } |
|
||||
PeerRequest::GetTrieNodes { response, .. },
|
||||
) => {
|
||||
trace!(peer_id=?self.remote_peer_id, ?request_id, %expected_variant, "received snap response from peer");
|
||||
let _ = response.send(Ok(snap_response));
|
||||
self.update_request_timeout(req.timestamp, Instant::now());
|
||||
}
|
||||
RequestState::Waiting(request) => {
|
||||
request.send_bad_response();
|
||||
}
|
||||
RequestState::TimedOut => {
|
||||
self.update_request_timeout(req.timestamp, Instant::now());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
trace!(peer_id=?self.remote_peer_id, ?request_id, "received snap response to unknown request");
|
||||
self.on_bad_message();
|
||||
}
|
||||
|
||||
Some(OnIncomingMessageOutcome::Ok)
|
||||
}
|
||||
|
||||
/// Handle an internal peer request that will be sent to the remote.
|
||||
fn on_internal_peer_request(&mut self, request: PeerRequest<N>, deadline: Instant) {
|
||||
let version = self.conn.version();
|
||||
@@ -337,15 +430,67 @@ impl<N: NetworkPrimitives> ActiveSession<N> {
|
||||
|
||||
let request_id = self.next_id();
|
||||
trace!(?request, peer_id=?self.remote_peer_id, ?request_id, "sending request to peer");
|
||||
let msg = request.create_request_message(request_id).map_versioned(version);
|
||||
|
||||
self.queued_outgoing.push_back(msg.into());
|
||||
let req = InflightRequest {
|
||||
request: RequestState::Waiting(request),
|
||||
timestamp: Instant::now(),
|
||||
deadline,
|
||||
if request.is_snap_request() {
|
||||
// Snap requests are encoded as raw capability messages with adjusted message IDs.
|
||||
// The snap message ID is offset by the eth message count for multiplexing.
|
||||
if let Some(raw_msg) = self.encode_snap_request(&request, request_id) {
|
||||
self.queued_outgoing.push_back(OutgoingMessage::Raw(raw_msg));
|
||||
let req = InflightRequest {
|
||||
request: RequestState::Waiting(request),
|
||||
timestamp: Instant::now(),
|
||||
deadline,
|
||||
};
|
||||
self.inflight_requests.insert(request_id, req);
|
||||
} else {
|
||||
request.send_err_response(RequestError::UnsupportedCapability);
|
||||
}
|
||||
} else {
|
||||
let msg = request.create_request_message(request_id).map_versioned(version);
|
||||
self.queued_outgoing.push_back(msg.into());
|
||||
let req = InflightRequest {
|
||||
request: RequestState::Waiting(request),
|
||||
timestamp: Instant::now(),
|
||||
deadline,
|
||||
};
|
||||
self.inflight_requests.insert(request_id, req);
|
||||
}
|
||||
}
|
||||
|
||||
/// Encodes a snap protocol request as a [`RawCapabilityMessage`].
|
||||
fn encode_snap_request(
|
||||
&self,
|
||||
request: &PeerRequest<N>,
|
||||
_request_id: u64,
|
||||
) -> Option<RawCapabilityMessage> {
|
||||
use reth_eth_wire_types::{snap::SnapProtocolMessage, EthMessageID};
|
||||
|
||||
let snap_msg = match request {
|
||||
PeerRequest::GetAccountRange { request, .. } => {
|
||||
SnapProtocolMessage::GetAccountRange(request.clone())
|
||||
}
|
||||
PeerRequest::GetStorageRanges { request, .. } => {
|
||||
SnapProtocolMessage::GetStorageRanges(request.clone())
|
||||
}
|
||||
PeerRequest::GetByteCodes { request, .. } => {
|
||||
SnapProtocolMessage::GetByteCodes(request.clone())
|
||||
}
|
||||
PeerRequest::GetTrieNodes { request, .. } => {
|
||||
SnapProtocolMessage::GetTrieNodes(request.clone())
|
||||
}
|
||||
_ => return None,
|
||||
};
|
||||
self.inflight_requests.insert(request_id, req);
|
||||
|
||||
let encoded = snap_msg.encode();
|
||||
// The first byte is the snap message ID, which needs to be offset
|
||||
// by the eth protocol message count for proper multiplexing.
|
||||
let snap_id = encoded[0];
|
||||
let adjusted_id = snap_id + EthMessageID::message_count(self.conn.version());
|
||||
|
||||
let mut payload = Vec::with_capacity(encoded.len() - 1);
|
||||
payload.extend_from_slice(&encoded[1..]);
|
||||
|
||||
Some(RawCapabilityMessage::new(adjusted_id as usize, payload.into()))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
||||
@@ -403,6 +403,16 @@ impl<N: NetworkPrimitives> NetworkState<N> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Sends a snap request directly to the peer's session.
|
||||
///
|
||||
/// Unlike block requests, snap requests don't need response tracking here because
|
||||
/// the [`StateFetcher`] bridges the response back to the caller internally.
|
||||
fn handle_snap_request(&mut self, peer: PeerId, request: PeerRequest<N>) {
|
||||
if let Some(ref mut peer) = self.active_peers.get_mut(&peer) {
|
||||
let _ = peer.request_tx.to_session_tx.try_send(request);
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle the outcome of processed response, for example directly queue another request.
|
||||
fn on_block_response_outcome(&mut self, outcome: BlockResponseOutcome) {
|
||||
match outcome {
|
||||
@@ -453,6 +463,9 @@ impl<N: NetworkPrimitives> NetworkState<N> {
|
||||
FetchAction::BlockRequest { peer_id, request } => {
|
||||
self.handle_block_request(peer_id, request)
|
||||
}
|
||||
FetchAction::SnapRequest { peer_id, request } => {
|
||||
self.handle_snap_request(peer_id, request)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -5,13 +5,20 @@ use crate::{
|
||||
error::PeerRequestResult,
|
||||
headers::client::{HeadersClient, SingleHeaderRequest},
|
||||
priority::Priority,
|
||||
snap::client::{SnapClient, SnapResponse},
|
||||
BlockClient,
|
||||
};
|
||||
use alloy_consensus::BlockHeader;
|
||||
use alloy_primitives::{Sealable, B256};
|
||||
use core::marker::PhantomData;
|
||||
use reth_consensus::Consensus;
|
||||
use reth_eth_wire_types::{EthNetworkPrimitives, HeadersDirection, NetworkPrimitives};
|
||||
use reth_eth_wire_types::{
|
||||
snap::{
|
||||
AccountRangeMessage, ByteCodesMessage, GetAccountRangeMessage, GetByteCodesMessage,
|
||||
GetStorageRangesMessage, GetTrieNodesMessage, StorageRangesMessage, TrieNodesMessage,
|
||||
},
|
||||
EthNetworkPrimitives, HeadersDirection, NetworkPrimitives,
|
||||
};
|
||||
use reth_network_peers::{PeerId, WithPeerId};
|
||||
use reth_primitives_traits::{SealedBlock, SealedHeader};
|
||||
use std::{
|
||||
@@ -740,6 +747,83 @@ where
|
||||
type Block = Net::Block;
|
||||
}
|
||||
|
||||
impl<Net> SnapClient for NoopFullBlockClient<Net>
|
||||
where
|
||||
Net: NetworkPrimitives,
|
||||
{
|
||||
type Output = futures::future::Ready<PeerRequestResult<SnapResponse>>;
|
||||
|
||||
fn get_account_range_with_priority(
|
||||
&self,
|
||||
request: GetAccountRangeMessage,
|
||||
_priority: Priority,
|
||||
) -> Self::Output {
|
||||
futures::future::ready(Ok(WithPeerId::new(
|
||||
PeerId::random(),
|
||||
SnapResponse::AccountRange(AccountRangeMessage {
|
||||
request_id: request.request_id,
|
||||
accounts: vec![],
|
||||
proof: vec![],
|
||||
}),
|
||||
)))
|
||||
}
|
||||
|
||||
fn get_storage_ranges(&self, request: GetStorageRangesMessage) -> Self::Output {
|
||||
self.get_storage_ranges_with_priority(request, Priority::Normal)
|
||||
}
|
||||
|
||||
fn get_storage_ranges_with_priority(
|
||||
&self,
|
||||
request: GetStorageRangesMessage,
|
||||
_priority: Priority,
|
||||
) -> Self::Output {
|
||||
futures::future::ready(Ok(WithPeerId::new(
|
||||
PeerId::random(),
|
||||
SnapResponse::StorageRanges(StorageRangesMessage {
|
||||
request_id: request.request_id,
|
||||
slots: vec![],
|
||||
proof: vec![],
|
||||
}),
|
||||
)))
|
||||
}
|
||||
|
||||
fn get_byte_codes(&self, request: GetByteCodesMessage) -> Self::Output {
|
||||
self.get_byte_codes_with_priority(request, Priority::Normal)
|
||||
}
|
||||
|
||||
fn get_byte_codes_with_priority(
|
||||
&self,
|
||||
request: GetByteCodesMessage,
|
||||
_priority: Priority,
|
||||
) -> Self::Output {
|
||||
futures::future::ready(Ok(WithPeerId::new(
|
||||
PeerId::random(),
|
||||
SnapResponse::ByteCodes(ByteCodesMessage {
|
||||
request_id: request.request_id,
|
||||
codes: vec![],
|
||||
}),
|
||||
)))
|
||||
}
|
||||
|
||||
fn get_trie_nodes(&self, request: GetTrieNodesMessage) -> Self::Output {
|
||||
self.get_trie_nodes_with_priority(request, Priority::Normal)
|
||||
}
|
||||
|
||||
fn get_trie_nodes_with_priority(
|
||||
&self,
|
||||
request: GetTrieNodesMessage,
|
||||
_priority: Priority,
|
||||
) -> Self::Output {
|
||||
futures::future::ready(Ok(WithPeerId::new(
|
||||
PeerId::random(),
|
||||
SnapResponse::TrieNodes(TrieNodesMessage {
|
||||
request_id: request.request_id,
|
||||
nodes: vec![],
|
||||
}),
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
impl<Net> Default for NoopFullBlockClient<Net> {
|
||||
fn default() -> Self {
|
||||
Self(PhantomData::<Net>)
|
||||
|
||||
56
crates/net/snap-sync/Cargo.toml
Normal file
56
crates/net/snap-sync/Cargo.toml
Normal file
@@ -0,0 +1,56 @@
|
||||
[package]
|
||||
name = "reth-snap-sync"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
rust-version.workspace = true
|
||||
license.workspace = true
|
||||
homepage.workspace = true
|
||||
repository.workspace = true
|
||||
description = "Snap sync protocol implementation for reth"
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
# reth
|
||||
reth-eth-wire-types.workspace = true
|
||||
reth-network-p2p.workspace = true
|
||||
reth-network-peers.workspace = true
|
||||
reth-primitives-traits.workspace = true
|
||||
reth-storage-api.workspace = true
|
||||
reth-storage-errors.workspace = true
|
||||
reth-db-api.workspace = true
|
||||
reth-provider.workspace = true
|
||||
reth-tasks.workspace = true
|
||||
reth-tracing.workspace = true
|
||||
reth-metrics.workspace = true
|
||||
reth-trie.workspace = true
|
||||
reth-trie-db.workspace = true
|
||||
reth-execution-errors.workspace = true
|
||||
|
||||
# misc (non-workspace)
|
||||
rand = "0.9"
|
||||
|
||||
# ethereum
|
||||
alloy-primitives.workspace = true
|
||||
alloy-rlp.workspace = true
|
||||
alloy-consensus.workspace = true
|
||||
alloy-trie.workspace = true
|
||||
|
||||
# async
|
||||
futures.workspace = true
|
||||
tokio = { workspace = true, features = ["sync", "time", "macros"] }
|
||||
tokio-stream.workspace = true
|
||||
|
||||
# misc
|
||||
tracing.workspace = true
|
||||
thiserror.workspace = true
|
||||
metrics.workspace = true
|
||||
derive_more.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
reth-provider = { workspace = true, features = ["test-utils"] }
|
||||
tokio = { workspace = true, features = ["rt-multi-thread"] }
|
||||
|
||||
[features]
|
||||
default = []
|
||||
8
crates/net/snap-sync/src/config.rs
Normal file
8
crates/net/snap-sync/src/config.rs
Normal file
@@ -0,0 +1,8 @@
|
||||
//! Configuration for snap sync.
|
||||
|
||||
/// Configuration for snap sync.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct SnapSyncConfig {
|
||||
/// Whether snap sync is enabled.
|
||||
pub enabled: bool,
|
||||
}
|
||||
647
crates/net/snap-sync/src/downloader.rs
Normal file
647
crates/net/snap-sync/src/downloader.rs
Normal file
@@ -0,0 +1,647 @@
|
||||
//! Snap sync downloader.
|
||||
//!
|
||||
//! Orchestrates the multi-phase snap sync process:
|
||||
//! 1. Download account ranges via `GetAccountRange`
|
||||
//! 2. Download storage slots via `GetStorageRanges`
|
||||
//! 3. Download bytecodes via `GetByteCodes`
|
||||
//! 4. Verify state root against pivot block
|
||||
|
||||
use crate::{
|
||||
error::SnapSyncError,
|
||||
metrics::SnapSyncMetrics,
|
||||
progress::{SnapPhase, SnapProgress},
|
||||
};
|
||||
use alloy_consensus::constants::KECCAK_EMPTY;
|
||||
use alloy_primitives::{keccak256, Bytes, B256, U256};
|
||||
use alloy_rlp::Decodable;
|
||||
use alloy_trie::TrieAccount;
|
||||
use reth_db_api::{
|
||||
cursor::DbCursorRO,
|
||||
tables,
|
||||
transaction::{DbTx, DbTxMut},
|
||||
};
|
||||
use reth_eth_wire_types::snap::{
|
||||
GetAccountRangeMessage, GetByteCodesMessage, GetStorageRangesMessage,
|
||||
};
|
||||
use reth_network_p2p::snap::client::{SnapClient, SnapResponse};
|
||||
use reth_primitives_traits::{Account, StorageEntry};
|
||||
use reth_storage_api::{DBProvider, DatabaseProviderFactory};
|
||||
use reth_trie::StateRoot;
|
||||
use reth_trie_db::DatabaseStateRoot;
|
||||
use std::collections::HashSet;
|
||||
use tokio::sync::watch;
|
||||
use tracing::{debug, info, trace, warn};
|
||||
|
||||
/// Maximum response size in bytes for snap requests (512 KB).
|
||||
const MAX_RESPONSE_BYTES: u64 = 512 * 1024;
|
||||
|
||||
/// Number of accounts to accumulate before flushing to DB.
|
||||
const ACCOUNT_WRITE_BATCH_SIZE: usize = 10_000;
|
||||
|
||||
/// Number of storage slots to accumulate before flushing to DB.
|
||||
const STORAGE_WRITE_BATCH_SIZE: usize = 50_000;
|
||||
|
||||
/// Number of bytecodes to request in a single batch.
|
||||
const BYTECODE_BATCH_SIZE: usize = 64;
|
||||
|
||||
/// Hash representing the maximum key (all 0xFF).
|
||||
const HASH_MAX: B256 = B256::repeat_byte(0xFF);
|
||||
|
||||
/// The snap sync downloader that orchestrates state download from peers.
|
||||
#[derive(Debug)]
|
||||
pub struct SnapSyncDownloader<C, F> {
|
||||
/// The snap-capable network client.
|
||||
client: C,
|
||||
/// Database provider factory for writing state.
|
||||
provider_factory: F,
|
||||
/// Current sync progress.
|
||||
progress: SnapProgress,
|
||||
/// Metrics.
|
||||
metrics: SnapSyncMetrics,
|
||||
/// Cancellation signal.
|
||||
cancel_rx: watch::Receiver<bool>,
|
||||
}
|
||||
|
||||
impl<C, F> SnapSyncDownloader<C, F>
|
||||
where
|
||||
C: SnapClient + Clone + Send + Sync + 'static,
|
||||
F: DatabaseProviderFactory + Send + Sync + 'static,
|
||||
<F as DatabaseProviderFactory>::ProviderRW: DBProvider<Tx: DbTxMut + DbTx> + Send,
|
||||
{
|
||||
/// Creates a new snap sync downloader.
|
||||
pub fn new(
|
||||
client: C,
|
||||
provider_factory: F,
|
||||
pivot_hash: B256,
|
||||
pivot_number: u64,
|
||||
state_root: B256,
|
||||
cancel_rx: watch::Receiver<bool>,
|
||||
) -> Self {
|
||||
Self {
|
||||
client,
|
||||
provider_factory,
|
||||
progress: SnapProgress::new(pivot_hash, pivot_number, state_root),
|
||||
metrics: SnapSyncMetrics::default(),
|
||||
cancel_rx,
|
||||
}
|
||||
}
|
||||
|
||||
/// Runs the snap sync to completion.
|
||||
///
|
||||
/// Returns `Ok(())` if state was successfully downloaded and verified,
|
||||
/// or an error if sync failed.
|
||||
pub async fn run(&mut self) -> Result<(), SnapSyncError> {
|
||||
info!(
|
||||
target: "snap_sync",
|
||||
pivot_hash = %self.progress.pivot_hash,
|
||||
pivot_number = self.progress.pivot_number,
|
||||
state_root = %self.progress.state_root,
|
||||
"starting snap sync"
|
||||
);
|
||||
|
||||
// Phase 1: Download accounts
|
||||
self.progress.phase = SnapPhase::Accounts;
|
||||
self.metrics.phase.set(1.0);
|
||||
let storage_accounts = self.download_accounts().await?;
|
||||
info!(
|
||||
target: "snap_sync",
|
||||
accounts = self.progress.accounts_downloaded,
|
||||
storage_accounts = storage_accounts.len(),
|
||||
"account download complete"
|
||||
);
|
||||
|
||||
// Phase 2: Download storage slots
|
||||
self.progress.phase = SnapPhase::Storages;
|
||||
self.metrics.phase.set(2.0);
|
||||
let code_hashes = self.download_storages(&storage_accounts).await?;
|
||||
info!(
|
||||
target: "snap_sync",
|
||||
slots = self.progress.storage_slots_downloaded,
|
||||
"storage download complete"
|
||||
);
|
||||
|
||||
// Phase 3: Download bytecodes
|
||||
self.progress.phase = SnapPhase::Bytecodes;
|
||||
self.metrics.phase.set(3.0);
|
||||
self.download_bytecodes(code_hashes).await?;
|
||||
info!(
|
||||
target: "snap_sync",
|
||||
bytecodes = self.progress.bytecodes_downloaded,
|
||||
"bytecode download complete"
|
||||
);
|
||||
|
||||
// Phase 4: Verification (hashing + merkle root)
|
||||
self.progress.phase = SnapPhase::Verification;
|
||||
self.metrics.phase.set(4.0);
|
||||
info!(target: "snap_sync", "verifying state root against pivot block");
|
||||
self.verify_state_root()?;
|
||||
|
||||
self.progress.phase = SnapPhase::Done;
|
||||
self.metrics.phase.set(5.0);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns the current progress.
|
||||
pub const fn progress(&self) -> &SnapProgress {
|
||||
&self.progress
|
||||
}
|
||||
|
||||
/// Checks if cancellation was requested.
|
||||
fn is_cancelled(&self) -> bool {
|
||||
*self.cancel_rx.borrow()
|
||||
}
|
||||
|
||||
// ========================================================================
|
||||
// Phase 4: Verification
|
||||
// ========================================================================
|
||||
|
||||
/// Computes the state root from the downloaded state and verifies it against
|
||||
/// the pivot block's expected state root.
|
||||
fn verify_state_root(&self) -> Result<(), SnapSyncError> {
|
||||
info!(target: "snap_sync", "computing state root from downloaded state");
|
||||
|
||||
let provider =
|
||||
self.provider_factory.database_provider_rw().map_err(SnapSyncError::Provider)?;
|
||||
|
||||
let computed_root = StateRoot::from_tx(provider.tx_ref())
|
||||
.root()
|
||||
.map_err(|e| SnapSyncError::StateRootVerification(e.to_string()))?;
|
||||
|
||||
if computed_root != self.progress.state_root {
|
||||
return Err(SnapSyncError::StateRootMismatch {
|
||||
expected: self.progress.state_root,
|
||||
got: computed_root,
|
||||
});
|
||||
}
|
||||
|
||||
info!(target: "snap_sync", %computed_root, "state root verified successfully");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ========================================================================
|
||||
// Phase 1: Account download
|
||||
// ========================================================================
|
||||
|
||||
/// Downloads all accounts from the state trie via `GetAccountRange` requests.
|
||||
///
|
||||
/// Returns a list of `(address_hash, storage_root)` for accounts that have
|
||||
/// non-empty storage (`storage_root` != `EMPTY_TRIE_HASH`).
|
||||
async fn download_accounts(&mut self) -> Result<Vec<(B256, B256)>, SnapSyncError> {
|
||||
let state_root = self.progress.state_root;
|
||||
let mut cursor = self.progress.account_cursor;
|
||||
let mut storage_accounts: Vec<(B256, B256)> = Vec::new();
|
||||
|
||||
// Batch buffer for DB writes
|
||||
let mut account_batch: Vec<(B256, Account, B256)> = Vec::new();
|
||||
|
||||
loop {
|
||||
if self.is_cancelled() {
|
||||
return Err(SnapSyncError::Cancelled);
|
||||
}
|
||||
|
||||
let response = retry_snap_request(|| {
|
||||
let req = GetAccountRangeMessage {
|
||||
request_id: rand_request_id(),
|
||||
root_hash: state_root,
|
||||
starting_hash: cursor,
|
||||
limit_hash: HASH_MAX,
|
||||
response_bytes: MAX_RESPONSE_BYTES,
|
||||
};
|
||||
self.client.get_account_range(req)
|
||||
}, &self.metrics, &self.cancel_rx)
|
||||
.await?;
|
||||
|
||||
let msg = match response.into_data() {
|
||||
SnapResponse::AccountRange(msg) => msg,
|
||||
_ => {
|
||||
return Err(SnapSyncError::InvalidAccountRange(
|
||||
"unexpected response type".into(),
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
if msg.accounts.is_empty() {
|
||||
// Empty response means we've reached the end or peer doesn't serve this root
|
||||
debug!(target: "snap_sync", %cursor, "received empty account range, finishing");
|
||||
break;
|
||||
}
|
||||
|
||||
// Process each account
|
||||
for account_data in &msg.accounts {
|
||||
let trie_account = decode_slim_account(&account_data.body)?;
|
||||
|
||||
let account = Account {
|
||||
nonce: trie_account.nonce,
|
||||
balance: trie_account.balance,
|
||||
bytecode_hash: if trie_account.code_hash == KECCAK_EMPTY {
|
||||
None
|
||||
} else {
|
||||
Some(trie_account.code_hash)
|
||||
},
|
||||
};
|
||||
|
||||
// Track accounts with storage
|
||||
let empty_root = alloy_trie::EMPTY_ROOT_HASH;
|
||||
if trie_account.storage_root != empty_root {
|
||||
storage_accounts.push((account_data.hash, trie_account.storage_root));
|
||||
}
|
||||
|
||||
account_batch.push((account_data.hash, account, trie_account.storage_root));
|
||||
self.progress.accounts_downloaded += 1;
|
||||
}
|
||||
|
||||
// Update cursor to continue after the last account
|
||||
cursor = increment_hash(msg.accounts.last().unwrap().hash);
|
||||
|
||||
// Flush batch if large enough
|
||||
if account_batch.len() >= ACCOUNT_WRITE_BATCH_SIZE {
|
||||
self.write_accounts(&account_batch)?;
|
||||
account_batch.clear();
|
||||
}
|
||||
|
||||
self.metrics.accounts_downloaded.set(self.progress.accounts_downloaded as f64);
|
||||
self.progress.account_cursor = cursor;
|
||||
|
||||
trace!(
|
||||
target: "snap_sync",
|
||||
accounts = self.progress.accounts_downloaded,
|
||||
%cursor,
|
||||
"account download progress"
|
||||
);
|
||||
|
||||
// If cursor wrapped around to zero, we've covered the full range
|
||||
if cursor == B256::ZERO {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Flush remaining
|
||||
if !account_batch.is_empty() {
|
||||
self.write_accounts(&account_batch)?;
|
||||
}
|
||||
|
||||
Ok(storage_accounts)
|
||||
}
|
||||
|
||||
/// Writes a batch of accounts to the database.
|
||||
///
|
||||
/// Account hashes are used as keys since snap protocol returns hashed addresses.
|
||||
/// The address→hash mapping will be resolved during the hashing stage.
|
||||
fn write_accounts(&self, batch: &[(B256, Account, B256)]) -> Result<(), SnapSyncError> {
|
||||
// For snap sync, we write to HashedAccounts table since snap returns hashed keys.
|
||||
// The pipeline's hashing stage normally computes this from PlainAccountState,
|
||||
// but for snap we go directly to the hashed form.
|
||||
let provider =
|
||||
self.provider_factory.database_provider_rw().map_err(SnapSyncError::Provider)?;
|
||||
|
||||
let tx = provider.tx_ref();
|
||||
for (hash, account, _storage_root) in batch {
|
||||
tx.put::<tables::HashedAccounts>(*hash, *account)?;
|
||||
}
|
||||
|
||||
provider.commit()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ========================================================================
|
||||
// Phase 2: Storage download
|
||||
// ========================================================================
|
||||
|
||||
/// Downloads storage slots for all accounts with non-empty storage roots.
|
||||
///
|
||||
/// Returns the set of code hashes encountered during account processing.
|
||||
async fn download_storages(
|
||||
&mut self,
|
||||
storage_accounts: &[(B256, B256)],
|
||||
) -> Result<HashSet<B256>, SnapSyncError> {
|
||||
let state_root = self.progress.state_root;
|
||||
let code_hashes: HashSet<B256> = HashSet::new();
|
||||
|
||||
// Process storage accounts in chunks
|
||||
let mut slot_batch: Vec<(B256, B256, U256)> = Vec::new();
|
||||
|
||||
for (account_hash, _storage_root) in storage_accounts {
|
||||
if self.is_cancelled() {
|
||||
return Err(SnapSyncError::Cancelled);
|
||||
}
|
||||
|
||||
let mut slot_cursor = B256::ZERO;
|
||||
|
||||
loop {
|
||||
let account_hash_val = *account_hash;
|
||||
let response = retry_snap_request(|| {
|
||||
let req = GetStorageRangesMessage {
|
||||
request_id: rand_request_id(),
|
||||
root_hash: state_root,
|
||||
account_hashes: vec![account_hash_val],
|
||||
starting_hash: slot_cursor,
|
||||
limit_hash: HASH_MAX,
|
||||
response_bytes: MAX_RESPONSE_BYTES,
|
||||
};
|
||||
self.client.get_storage_ranges(req)
|
||||
}, &self.metrics, &self.cancel_rx)
|
||||
.await?;
|
||||
|
||||
let msg = match response.into_data() {
|
||||
SnapResponse::StorageRanges(msg) => msg,
|
||||
_ => {
|
||||
return Err(SnapSyncError::InvalidStorageRange(
|
||||
"unexpected response type".into(),
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
if msg.slots.is_empty() || msg.slots[0].is_empty() {
|
||||
break;
|
||||
}
|
||||
|
||||
let slots = &msg.slots[0];
|
||||
for slot in slots {
|
||||
let value = U256::from_be_slice(&slot.data);
|
||||
slot_batch.push((*account_hash, slot.hash, value));
|
||||
self.progress.storage_slots_downloaded += 1;
|
||||
}
|
||||
|
||||
// Update cursor
|
||||
slot_cursor = increment_hash(slots.last().unwrap().hash);
|
||||
|
||||
// Flush if needed
|
||||
if slot_batch.len() >= STORAGE_WRITE_BATCH_SIZE {
|
||||
self.write_storage_slots(&slot_batch)?;
|
||||
slot_batch.clear();
|
||||
}
|
||||
|
||||
self.metrics
|
||||
.storage_slots_downloaded
|
||||
.set(self.progress.storage_slots_downloaded as f64);
|
||||
|
||||
// If no proof or we've reached the end of this account's storage
|
||||
if msg.proof.is_empty() || slot_cursor == B256::ZERO {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Flush remaining
|
||||
if !slot_batch.is_empty() {
|
||||
self.write_storage_slots(&slot_batch)?;
|
||||
}
|
||||
|
||||
Ok(code_hashes)
|
||||
}
|
||||
|
||||
/// Writes a batch of storage slots to the database.
|
||||
fn write_storage_slots(&self, batch: &[(B256, B256, U256)]) -> Result<(), SnapSyncError> {
|
||||
let provider =
|
||||
self.provider_factory.database_provider_rw().map_err(SnapSyncError::Provider)?;
|
||||
|
||||
let tx = provider.tx_ref();
|
||||
for (account_hash, slot_hash, value) in batch {
|
||||
tx.put::<tables::HashedStorages>(
|
||||
*account_hash,
|
||||
StorageEntry { key: *slot_hash, value: *value },
|
||||
)?;
|
||||
}
|
||||
|
||||
provider.commit()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ========================================================================
|
||||
// Phase 3: Bytecode download
|
||||
// ========================================================================
|
||||
|
||||
/// Downloads contract bytecodes by their code hashes.
|
||||
async fn download_bytecodes(
|
||||
&mut self,
|
||||
code_hashes: HashSet<B256>,
|
||||
) -> Result<(), SnapSyncError> {
|
||||
// Collect code hashes from accounts we've already written
|
||||
let mut all_code_hashes: Vec<B256> = self.collect_code_hashes()?;
|
||||
all_code_hashes.extend(code_hashes);
|
||||
all_code_hashes.sort();
|
||||
all_code_hashes.dedup();
|
||||
|
||||
// Remove KECCAK_EMPTY since that means no code
|
||||
all_code_hashes.retain(|h| *h != KECCAK_EMPTY);
|
||||
|
||||
self.progress.bytecodes_total = all_code_hashes.len() as u64;
|
||||
|
||||
info!(
|
||||
target: "snap_sync",
|
||||
total = all_code_hashes.len(),
|
||||
"starting bytecode download"
|
||||
);
|
||||
|
||||
// Download in batches
|
||||
for chunk in all_code_hashes.chunks(BYTECODE_BATCH_SIZE) {
|
||||
if self.is_cancelled() {
|
||||
return Err(SnapSyncError::Cancelled);
|
||||
}
|
||||
|
||||
let hashes = chunk.to_vec();
|
||||
let response = retry_snap_request(|| {
|
||||
let req = GetByteCodesMessage {
|
||||
request_id: rand_request_id(),
|
||||
hashes: hashes.clone(),
|
||||
response_bytes: MAX_RESPONSE_BYTES,
|
||||
};
|
||||
self.client.get_byte_codes(req)
|
||||
}, &self.metrics, &self.cancel_rx)
|
||||
.await?;
|
||||
|
||||
let msg = match response.into_data() {
|
||||
SnapResponse::ByteCodes(msg) => msg,
|
||||
_ => {
|
||||
return Err(SnapSyncError::InvalidBytecode("unexpected response type".into()));
|
||||
}
|
||||
};
|
||||
|
||||
self.write_bytecodes(chunk, &msg.codes)?;
|
||||
self.progress.bytecodes_downloaded += msg.codes.len() as u64;
|
||||
self.metrics.bytecodes_downloaded.set(self.progress.bytecodes_downloaded as f64);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Collects code hashes from accounts already written to DB.
|
||||
fn collect_code_hashes(&self) -> Result<Vec<B256>, SnapSyncError> {
|
||||
let provider =
|
||||
self.provider_factory.database_provider_rw().map_err(SnapSyncError::Provider)?;
|
||||
|
||||
let tx = provider.tx_ref();
|
||||
let mut cursor = tx.cursor_read::<tables::HashedAccounts>()?;
|
||||
let mut code_hashes = Vec::new();
|
||||
|
||||
let mut entry = cursor.first()?;
|
||||
while let Some((_, account)) = entry {
|
||||
if let Some(hash) = account.bytecode_hash &&
|
||||
hash != KECCAK_EMPTY
|
||||
{
|
||||
code_hashes.push(hash);
|
||||
}
|
||||
entry = cursor.next()?;
|
||||
}
|
||||
|
||||
Ok(code_hashes)
|
||||
}
|
||||
|
||||
/// Writes bytecodes to the database, verifying hash integrity.
|
||||
fn write_bytecodes(
|
||||
&self,
|
||||
expected_hashes: &[B256],
|
||||
codes: &[Bytes],
|
||||
) -> Result<(), SnapSyncError> {
|
||||
let provider =
|
||||
self.provider_factory.database_provider_rw().map_err(SnapSyncError::Provider)?;
|
||||
|
||||
let tx = provider.tx_ref();
|
||||
for (i, code) in codes.iter().enumerate() {
|
||||
let hash = keccak256(code);
|
||||
if i < expected_hashes.len() && hash != expected_hashes[i] {
|
||||
warn!(
|
||||
target: "snap_sync",
|
||||
expected = %expected_hashes[i],
|
||||
got = %hash,
|
||||
"bytecode hash mismatch, skipping"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
let bytecode = reth_primitives_traits::Bytecode::new_raw(code.clone());
|
||||
tx.put::<tables::Bytecodes>(hash, bytecode)?;
|
||||
}
|
||||
|
||||
provider.commit()?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Decodes a "slim" account body from the snap protocol into a `TrieAccount`.
|
||||
///
|
||||
/// Slim format is `RLP([nonce, balance, storage_root, code_hash])` but with
|
||||
/// empty `storage_root` and `code_hash` omitted.
|
||||
fn decode_slim_account(data: &Bytes) -> Result<TrieAccount, SnapSyncError> {
|
||||
// The snap protocol uses "slim" encoding where empty values are omitted.
|
||||
// We need to decode the RLP and fill in defaults for missing fields.
|
||||
let account = TrieAccount::decode(&mut data.as_ref())?;
|
||||
Ok(account)
|
||||
}
|
||||
|
||||
/// Increments a hash by 1. Returns `B256::ZERO` on overflow (wraps around).
|
||||
fn increment_hash(hash: B256) -> B256 {
|
||||
let mut bytes = hash.0;
|
||||
for i in (0..32).rev() {
|
||||
if bytes[i] < 0xFF {
|
||||
bytes[i] += 1;
|
||||
return B256::from(bytes);
|
||||
}
|
||||
bytes[i] = 0;
|
||||
}
|
||||
B256::ZERO
|
||||
}
|
||||
|
||||
/// Generates a random request ID.
|
||||
fn rand_request_id() -> u64 {
|
||||
rand::random()
|
||||
}
|
||||
|
||||
/// Maximum number of retries for a snap request before giving up.
|
||||
const MAX_SNAP_RETRIES: u32 = 10;
|
||||
|
||||
/// Retries a snap request with exponential backoff on failure.
|
||||
async fn retry_snap_request<F, Fut, T>(
|
||||
mut make_request: F,
|
||||
metrics: &SnapSyncMetrics,
|
||||
cancel_rx: &watch::Receiver<bool>,
|
||||
) -> Result<T, SnapSyncError>
|
||||
where
|
||||
F: FnMut() -> Fut,
|
||||
Fut: std::future::Future<Output = Result<T, reth_network_p2p::error::RequestError>>,
|
||||
{
|
||||
let mut attempts = 0u32;
|
||||
loop {
|
||||
if *cancel_rx.borrow() {
|
||||
return Err(SnapSyncError::Cancelled);
|
||||
}
|
||||
|
||||
match make_request().await {
|
||||
Ok(resp) => return Ok(resp),
|
||||
Err(err) => {
|
||||
attempts += 1;
|
||||
metrics.request_failures.increment(1);
|
||||
if attempts >= MAX_SNAP_RETRIES {
|
||||
return Err(SnapSyncError::Request(err));
|
||||
}
|
||||
let delay = std::time::Duration::from_secs(1 << attempts.min(5));
|
||||
warn!(
|
||||
target: "snap_sync",
|
||||
%err,
|
||||
attempts,
|
||||
"snap request failed, retrying in {:?}",
|
||||
delay
|
||||
);
|
||||
tokio::time::sleep(delay).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use alloy_rlp::Encodable;
|
||||
|
||||
#[test]
|
||||
fn test_increment_hash() {
|
||||
let zero = B256::ZERO;
|
||||
let one = increment_hash(zero);
|
||||
assert_eq!(one.0[31], 1);
|
||||
|
||||
let max = B256::repeat_byte(0xFF);
|
||||
let wrapped = increment_hash(max);
|
||||
assert_eq!(wrapped, B256::ZERO);
|
||||
|
||||
let mut mid = B256::ZERO;
|
||||
mid.0[31] = 0xFF;
|
||||
let next = increment_hash(mid);
|
||||
assert_eq!(next.0[30], 1);
|
||||
assert_eq!(next.0[31], 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_decode_slim_account() {
|
||||
let trie_account = TrieAccount {
|
||||
nonce: 42,
|
||||
balance: U256::from(1000),
|
||||
storage_root: alloy_trie::EMPTY_ROOT_HASH,
|
||||
code_hash: KECCAK_EMPTY,
|
||||
};
|
||||
let mut buf = Vec::new();
|
||||
trie_account.encode(&mut buf);
|
||||
let decoded = decode_slim_account(&Bytes::from(buf)).unwrap();
|
||||
assert_eq!(decoded.nonce, 42);
|
||||
assert_eq!(decoded.balance, U256::from(1000));
|
||||
assert_eq!(decoded.storage_root, alloy_trie::EMPTY_ROOT_HASH);
|
||||
assert_eq!(decoded.code_hash, KECCAK_EMPTY);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_decode_slim_account_empty() {
|
||||
let trie_account = TrieAccount {
|
||||
nonce: 0,
|
||||
balance: U256::ZERO,
|
||||
storage_root: alloy_trie::EMPTY_ROOT_HASH,
|
||||
code_hash: KECCAK_EMPTY,
|
||||
};
|
||||
let mut buf = Vec::new();
|
||||
trie_account.encode(&mut buf);
|
||||
let decoded = decode_slim_account(&Bytes::from(buf)).unwrap();
|
||||
assert_eq!(decoded.nonce, 0);
|
||||
assert_eq!(decoded.balance, U256::ZERO);
|
||||
assert_eq!(decoded.storage_root, alloy_trie::EMPTY_ROOT_HASH);
|
||||
assert_eq!(decoded.code_hash, KECCAK_EMPTY);
|
||||
}
|
||||
}
|
||||
52
crates/net/snap-sync/src/error.rs
Normal file
52
crates/net/snap-sync/src/error.rs
Normal file
@@ -0,0 +1,52 @@
|
||||
//! Snap sync error types.
|
||||
|
||||
use alloy_primitives::B256;
|
||||
use reth_db_api::DatabaseError;
|
||||
use reth_network_p2p::error::RequestError;
|
||||
use reth_storage_errors::provider::ProviderError;
|
||||
|
||||
/// Errors that can occur during snap sync.
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum SnapSyncError {
|
||||
/// The computed state root does not match the pivot block's state root.
|
||||
#[error("state root mismatch: expected {expected}, got {got}")]
|
||||
StateRootMismatch {
|
||||
/// Expected state root from pivot header.
|
||||
expected: B256,
|
||||
/// Computed state root after snap sync.
|
||||
got: B256,
|
||||
},
|
||||
/// A peer returned an invalid or inconsistent account range.
|
||||
#[error("invalid account range response: {0}")]
|
||||
InvalidAccountRange(String),
|
||||
/// A peer returned an invalid or inconsistent storage range.
|
||||
#[error("invalid storage range response: {0}")]
|
||||
InvalidStorageRange(String),
|
||||
/// A peer returned invalid bytecodes.
|
||||
#[error("invalid bytecode response: {0}")]
|
||||
InvalidBytecode(String),
|
||||
/// No peers available that support the snap protocol.
|
||||
#[error("no snap-capable peers available")]
|
||||
NoPeers,
|
||||
/// Network request failed.
|
||||
#[error("network request failed: {0}")]
|
||||
Request(#[from] RequestError),
|
||||
/// Database/provider error.
|
||||
#[error("provider error: {0}")]
|
||||
Provider(#[from] ProviderError),
|
||||
/// Pivot block not found.
|
||||
#[error("pivot block {0} not found")]
|
||||
PivotNotFound(B256),
|
||||
/// RLP decoding error.
|
||||
#[error("rlp decode error: {0}")]
|
||||
RlpDecode(#[from] alloy_rlp::Error),
|
||||
/// Database error.
|
||||
#[error("database error: {0}")]
|
||||
Database(#[from] DatabaseError),
|
||||
/// State root verification failed.
|
||||
#[error("state root verification error: {0}")]
|
||||
StateRootVerification(String),
|
||||
/// Snap sync was cancelled.
|
||||
#[error("snap sync cancelled")]
|
||||
Cancelled,
|
||||
}
|
||||
26
crates/net/snap-sync/src/lib.rs
Normal file
26
crates/net/snap-sync/src/lib.rs
Normal file
@@ -0,0 +1,26 @@
|
||||
//! Snap sync protocol implementation for reth.
|
||||
//!
|
||||
//! Downloads state from peers via the [snap protocol](https://github.com/ethereum/devp2p/blob/master/caps/snap.md)
|
||||
//! instead of executing all historical blocks. The sync proceeds in phases:
|
||||
//!
|
||||
//! 1. **Account download**: Fetch all account leaves via `GetAccountRange`
|
||||
//! 2. **Storage download**: Fetch storage slots for accounts with non-empty storage roots
|
||||
//! 3. **Bytecode download**: Fetch contract bytecodes by code hash
|
||||
//! 4. **State root verification**: Build hashed state + merkle trie, verify against pivot block
|
||||
//!
|
||||
//! After snap sync completes, normal execution resumes from the pivot block onward.
|
||||
|
||||
pub mod config;
|
||||
pub mod downloader;
|
||||
pub mod error;
|
||||
pub mod metrics;
|
||||
pub mod progress;
|
||||
pub mod server;
|
||||
pub mod task;
|
||||
|
||||
pub use config::SnapSyncConfig;
|
||||
pub use downloader::SnapSyncDownloader;
|
||||
pub use error::SnapSyncError;
|
||||
pub use progress::{SnapPhase, SnapProgress};
|
||||
pub use server::{IncomingSnapRequest, SnapRequestHandler};
|
||||
pub use task::run_snap_sync;
|
||||
21
crates/net/snap-sync/src/metrics.rs
Normal file
21
crates/net/snap-sync/src/metrics.rs
Normal file
@@ -0,0 +1,21 @@
|
||||
//! Snap sync metrics.
|
||||
|
||||
use reth_metrics::{
|
||||
metrics::{Counter, Gauge},
|
||||
Metrics,
|
||||
};
|
||||
|
||||
#[derive(Metrics)]
|
||||
#[metrics(scope = "snap_sync")]
|
||||
pub(crate) struct SnapSyncMetrics {
|
||||
/// Number of accounts downloaded.
|
||||
pub(crate) accounts_downloaded: Gauge,
|
||||
/// Number of storage slots downloaded.
|
||||
pub(crate) storage_slots_downloaded: Gauge,
|
||||
/// Number of bytecodes downloaded.
|
||||
pub(crate) bytecodes_downloaded: Gauge,
|
||||
/// Current phase (0=idle, 1=accounts, 2=storages, 3=bytecodes, 4=verify, 5=done).
|
||||
pub(crate) phase: Gauge,
|
||||
/// Total peer request failures.
|
||||
pub(crate) request_failures: Counter,
|
||||
}
|
||||
87
crates/net/snap-sync/src/progress.rs
Normal file
87
crates/net/snap-sync/src/progress.rs
Normal file
@@ -0,0 +1,87 @@
|
||||
//! Snap sync progress tracking.
|
||||
//!
|
||||
//! Tracks the current phase and cursor positions to support resumability.
|
||||
|
||||
use alloy_primitives::{Address, B256};
|
||||
|
||||
/// Current phase of snap sync.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
|
||||
pub enum SnapPhase {
|
||||
/// Not started yet.
|
||||
#[default]
|
||||
Idle,
|
||||
/// Downloading account ranges.
|
||||
Accounts,
|
||||
/// Downloading storage slots for accounts with non-empty storage roots.
|
||||
Storages,
|
||||
/// Downloading contract bytecodes.
|
||||
Bytecodes,
|
||||
/// Building hashed state and verifying merkle root.
|
||||
Verification,
|
||||
/// Snap sync completed successfully.
|
||||
Done,
|
||||
}
|
||||
|
||||
/// Tracks snap sync progress for resumability.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct SnapProgress {
|
||||
/// The pivot block hash.
|
||||
pub pivot_hash: B256,
|
||||
/// The pivot block number.
|
||||
pub pivot_number: u64,
|
||||
/// The pivot block's state root.
|
||||
pub state_root: B256,
|
||||
/// Current sync phase.
|
||||
pub phase: SnapPhase,
|
||||
/// Account download cursor: next account hash to fetch.
|
||||
pub account_cursor: B256,
|
||||
/// Number of accounts downloaded so far.
|
||||
pub accounts_downloaded: u64,
|
||||
/// Storage download cursor: current account address being fetched.
|
||||
pub storage_account_cursor: Option<Address>,
|
||||
/// Storage slot cursor within the current account.
|
||||
pub storage_slot_cursor: B256,
|
||||
/// Number of storage slots downloaded so far.
|
||||
pub storage_slots_downloaded: u64,
|
||||
/// Number of bytecodes downloaded so far.
|
||||
pub bytecodes_downloaded: u64,
|
||||
/// Total number of bytecodes to download.
|
||||
pub bytecodes_total: u64,
|
||||
}
|
||||
|
||||
impl SnapProgress {
|
||||
/// Creates a new progress tracker for the given pivot.
|
||||
pub fn new(pivot_hash: B256, pivot_number: u64, state_root: B256) -> Self {
|
||||
Self {
|
||||
pivot_hash,
|
||||
pivot_number,
|
||||
state_root,
|
||||
phase: SnapPhase::Accounts,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_progress_new() {
|
||||
let hash = B256::repeat_byte(0x01);
|
||||
let state_root = B256::repeat_byte(0x02);
|
||||
let progress = SnapProgress::new(hash, 100, state_root);
|
||||
assert_eq!(progress.pivot_hash, hash);
|
||||
assert_eq!(progress.pivot_number, 100);
|
||||
assert_eq!(progress.state_root, state_root);
|
||||
assert_eq!(progress.phase, SnapPhase::Accounts);
|
||||
assert_eq!(progress.accounts_downloaded, 0);
|
||||
assert_eq!(progress.storage_slots_downloaded, 0);
|
||||
assert_eq!(progress.bytecodes_downloaded, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_phase_default() {
|
||||
assert_eq!(SnapPhase::default(), SnapPhase::Idle);
|
||||
}
|
||||
}
|
||||
480
crates/net/snap-sync/src/server.rs
Normal file
480
crates/net/snap-sync/src/server.rs
Normal file
@@ -0,0 +1,480 @@
|
||||
//! Snap sync request handler (server-side).
|
||||
//!
|
||||
//! Handles incoming snap protocol requests from peers, serving account ranges,
|
||||
//! storage ranges, bytecodes, and trie nodes from the local database.
|
||||
//!
|
||||
//! Modeled after [`EthRequestHandler`](reth_network::eth_requests::EthRequestHandler).
|
||||
|
||||
use alloy_consensus::constants::KECCAK_EMPTY;
|
||||
use alloy_primitives::Bytes;
|
||||
use alloy_rlp::Encodable;
|
||||
use alloy_trie::EMPTY_ROOT_HASH;
|
||||
use futures::StreamExt;
|
||||
use reth_db_api::{
|
||||
cursor::{DbCursorRO, DbDupCursorRO},
|
||||
tables,
|
||||
transaction::DbTx,
|
||||
};
|
||||
use reth_eth_wire_types::snap::*;
|
||||
use reth_network_p2p::error::RequestResult;
|
||||
use reth_network_peers::PeerId;
|
||||
use reth_primitives_traits::Account;
|
||||
use reth_storage_api::{DBProvider, DatabaseProviderFactory};
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tokio::sync::oneshot;
|
||||
use tracing::{debug, trace};
|
||||
|
||||
/// Maximum number of accounts to serve per request.
|
||||
const MAX_ACCOUNTS_SERVE: usize = 1024;
|
||||
|
||||
/// Maximum number of storage slots to serve per account per request.
|
||||
const MAX_STORAGE_SERVE: usize = 1024;
|
||||
|
||||
/// Maximum number of bytecodes to serve per request.
|
||||
const MAX_BYTECODES_SERVE: usize = 1024;
|
||||
|
||||
/// Maximum response size (2MB, matching eth limit).
|
||||
const SOFT_RESPONSE_LIMIT: usize = 2 * 1024 * 1024;
|
||||
|
||||
/// Handles incoming snap protocol requests from peers.
|
||||
///
|
||||
/// This is spawned as a background service and polled to process requests.
|
||||
#[derive(Debug)]
|
||||
#[must_use = "Handler does nothing unless polled."]
|
||||
pub struct SnapRequestHandler<F> {
|
||||
/// Provider factory for DB access.
|
||||
provider_factory: F,
|
||||
/// Incoming snap requests.
|
||||
incoming_requests: tokio_stream::wrappers::ReceiverStream<IncomingSnapRequest>,
|
||||
}
|
||||
|
||||
impl<F> SnapRequestHandler<F> {
|
||||
/// Create a new instance.
|
||||
pub fn new(
|
||||
provider_factory: F,
|
||||
incoming: tokio::sync::mpsc::Receiver<IncomingSnapRequest>,
|
||||
) -> Self {
|
||||
Self {
|
||||
provider_factory,
|
||||
incoming_requests: tokio_stream::wrappers::ReceiverStream::new(incoming),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<F> SnapRequestHandler<F>
|
||||
where
|
||||
F: DatabaseProviderFactory,
|
||||
{
|
||||
/// Handle a `GetAccountRange` request.
|
||||
fn on_account_range_request(
|
||||
&self,
|
||||
peer_id: PeerId,
|
||||
request: GetAccountRangeMessage,
|
||||
response: oneshot::Sender<RequestResult<AccountRangeMessage>>,
|
||||
) {
|
||||
trace!(target: "net::snap", ?peer_id, ?request.starting_hash, ?request.limit_hash, "Received GetAccountRange");
|
||||
|
||||
let mut accounts = Vec::new();
|
||||
|
||||
let Ok(provider) = self.provider_factory.database_provider_ro() else {
|
||||
let _ = response.send(Ok(AccountRangeMessage {
|
||||
request_id: request.request_id,
|
||||
accounts,
|
||||
proof: vec![],
|
||||
}));
|
||||
return;
|
||||
};
|
||||
|
||||
let tx = provider.tx_ref();
|
||||
let Ok(mut cursor) = tx.cursor_read::<tables::HashedAccounts>() else {
|
||||
let _ = response.send(Ok(AccountRangeMessage {
|
||||
request_id: request.request_id,
|
||||
accounts,
|
||||
proof: vec![],
|
||||
}));
|
||||
return;
|
||||
};
|
||||
|
||||
let limit = (request.response_bytes as usize).min(SOFT_RESPONSE_LIMIT);
|
||||
let mut total_bytes = 0usize;
|
||||
|
||||
if let Ok(walker) = cursor.walk(Some(request.starting_hash)) {
|
||||
for entry in walker {
|
||||
let Ok((hash, account)) = entry else { break };
|
||||
if hash > request.limit_hash {
|
||||
break;
|
||||
}
|
||||
|
||||
let slim = encode_account(&account);
|
||||
total_bytes += 32 + slim.len();
|
||||
accounts.push(AccountData { hash, body: slim });
|
||||
|
||||
if accounts.len() >= MAX_ACCOUNTS_SERVE || total_bytes >= limit {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
trace!(target: "net::snap", ?peer_id, num_accounts = accounts.len(), total_bytes, "Serving GetAccountRange");
|
||||
|
||||
let _ = response.send(Ok(AccountRangeMessage {
|
||||
request_id: request.request_id,
|
||||
accounts,
|
||||
// TODO: add merkle proofs
|
||||
proof: vec![],
|
||||
}));
|
||||
}
|
||||
|
||||
/// Handle a `GetStorageRanges` request.
|
||||
fn on_storage_ranges_request(
|
||||
&self,
|
||||
peer_id: PeerId,
|
||||
request: GetStorageRangesMessage,
|
||||
response: oneshot::Sender<RequestResult<StorageRangesMessage>>,
|
||||
) {
|
||||
trace!(target: "net::snap", ?peer_id, num_accounts = request.account_hashes.len(), "Received GetStorageRanges");
|
||||
|
||||
let mut all_slots = Vec::new();
|
||||
|
||||
let Ok(provider) = self.provider_factory.database_provider_ro() else {
|
||||
let _ = response.send(Ok(StorageRangesMessage {
|
||||
request_id: request.request_id,
|
||||
slots: all_slots,
|
||||
proof: vec![],
|
||||
}));
|
||||
return;
|
||||
};
|
||||
|
||||
let tx = provider.tx_ref();
|
||||
let Ok(mut cursor) = tx.cursor_dup_read::<tables::HashedStorages>() else {
|
||||
let _ = response.send(Ok(StorageRangesMessage {
|
||||
request_id: request.request_id,
|
||||
slots: all_slots,
|
||||
proof: vec![],
|
||||
}));
|
||||
return;
|
||||
};
|
||||
|
||||
let limit = (request.response_bytes as usize).min(SOFT_RESPONSE_LIMIT);
|
||||
let mut total_bytes = 0usize;
|
||||
|
||||
for (i, account_hash) in request.account_hashes.iter().enumerate() {
|
||||
let mut account_slots = Vec::new();
|
||||
|
||||
// For the first account, use the request's starting_hash.
|
||||
// For subsequent accounts, start from the beginning.
|
||||
let start = if i == 0 { request.starting_hash } else { Default::default() };
|
||||
|
||||
if let Ok(walker) = cursor.walk_dup(Some(*account_hash), Some(start)) {
|
||||
for entry in walker {
|
||||
let Ok((_, storage_entry)) = entry else { break };
|
||||
if storage_entry.key > request.limit_hash {
|
||||
break;
|
||||
}
|
||||
|
||||
let mut value_buf = Vec::new();
|
||||
storage_entry.value.encode(&mut value_buf);
|
||||
total_bytes += 32 + value_buf.len();
|
||||
|
||||
account_slots.push(StorageData {
|
||||
hash: storage_entry.key,
|
||||
data: Bytes::from(value_buf),
|
||||
});
|
||||
|
||||
if account_slots.len() >= MAX_STORAGE_SERVE || total_bytes >= limit {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
all_slots.push(account_slots);
|
||||
|
||||
if total_bytes >= limit {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
trace!(target: "net::snap", ?peer_id, num_accounts = all_slots.len(), total_bytes, "Serving GetStorageRanges");
|
||||
|
||||
let _ = response.send(Ok(StorageRangesMessage {
|
||||
request_id: request.request_id,
|
||||
slots: all_slots,
|
||||
// TODO: add boundary proofs for partial ranges
|
||||
proof: vec![],
|
||||
}));
|
||||
}
|
||||
|
||||
/// Handle a `GetByteCodes` request.
|
||||
fn on_byte_codes_request(
|
||||
&self,
|
||||
peer_id: PeerId,
|
||||
request: GetByteCodesMessage,
|
||||
response: oneshot::Sender<RequestResult<ByteCodesMessage>>,
|
||||
) {
|
||||
trace!(target: "net::snap", ?peer_id, num_hashes = request.hashes.len(), "Received GetByteCodes");
|
||||
|
||||
let mut codes = Vec::new();
|
||||
|
||||
let Ok(provider) = self.provider_factory.database_provider_ro() else {
|
||||
let _ = response.send(Ok(ByteCodesMessage { request_id: request.request_id, codes }));
|
||||
return;
|
||||
};
|
||||
|
||||
let tx = provider.tx_ref();
|
||||
let limit = (request.response_bytes as usize).min(SOFT_RESPONSE_LIMIT);
|
||||
let mut total_bytes = 0usize;
|
||||
|
||||
for hash in &request.hashes {
|
||||
if *hash == KECCAK_EMPTY {
|
||||
continue;
|
||||
}
|
||||
|
||||
let Ok(Some(bytecode)) = tx.get::<tables::Bytecodes>(*hash) else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let raw = bytecode.original_bytes();
|
||||
total_bytes += raw.len();
|
||||
codes.push(raw);
|
||||
|
||||
if codes.len() >= MAX_BYTECODES_SERVE || total_bytes >= limit {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
trace!(target: "net::snap", ?peer_id, num_codes = codes.len(), total_bytes, "Serving GetByteCodes");
|
||||
|
||||
let _ = response.send(Ok(ByteCodesMessage { request_id: request.request_id, codes }));
|
||||
}
|
||||
|
||||
/// Handle a `GetTrieNodes` request.
|
||||
fn on_trie_nodes_request(
|
||||
&self,
|
||||
peer_id: PeerId,
|
||||
request: GetTrieNodesMessage,
|
||||
response: oneshot::Sender<RequestResult<TrieNodesMessage>>,
|
||||
) {
|
||||
debug!(target: "net::snap", ?peer_id, num_paths = request.paths.len(), "Received GetTrieNodes (stub)");
|
||||
|
||||
// TODO: implement trie node lookups from AccountsTrie / StoragesTrie tables
|
||||
let _ =
|
||||
response.send(Ok(TrieNodesMessage { request_id: request.request_id, nodes: vec![] }));
|
||||
}
|
||||
}
|
||||
|
||||
/// Encode an [`Account`] into slim RLP format (as `TrieAccount`).
|
||||
///
|
||||
/// Accounts in the snap protocol are exchanged as RLP-encoded `TrieAccount`.
|
||||
/// Since we don't know the true storage root when reading from `HashedAccounts`,
|
||||
/// we use `EMPTY_ROOT_HASH` as a placeholder.
|
||||
fn encode_account(account: &Account) -> Bytes {
|
||||
let trie_account = account.into_trie_account(EMPTY_ROOT_HASH);
|
||||
let mut buf = Vec::new();
|
||||
trie_account.encode(&mut buf);
|
||||
Bytes::from(buf)
|
||||
}
|
||||
|
||||
/// Incoming snap request variants delegated by the network.
|
||||
#[derive(Debug)]
|
||||
pub enum IncomingSnapRequest {
|
||||
/// Request for an account range.
|
||||
GetAccountRange {
|
||||
/// The peer that sent the request.
|
||||
peer_id: PeerId,
|
||||
/// The request payload.
|
||||
request: GetAccountRangeMessage,
|
||||
/// Channel to send the response.
|
||||
response: oneshot::Sender<RequestResult<AccountRangeMessage>>,
|
||||
},
|
||||
/// Request for storage slot ranges.
|
||||
GetStorageRanges {
|
||||
/// The peer that sent the request.
|
||||
peer_id: PeerId,
|
||||
/// The request payload.
|
||||
request: GetStorageRangesMessage,
|
||||
/// Channel to send the response.
|
||||
response: oneshot::Sender<RequestResult<StorageRangesMessage>>,
|
||||
},
|
||||
/// Request for contract bytecodes.
|
||||
GetByteCodes {
|
||||
/// The peer that sent the request.
|
||||
peer_id: PeerId,
|
||||
/// The request payload.
|
||||
request: GetByteCodesMessage,
|
||||
/// Channel to send the response.
|
||||
response: oneshot::Sender<RequestResult<ByteCodesMessage>>,
|
||||
},
|
||||
/// Request for trie nodes.
|
||||
GetTrieNodes {
|
||||
/// The peer that sent the request.
|
||||
peer_id: PeerId,
|
||||
/// The request payload.
|
||||
request: GetTrieNodesMessage,
|
||||
/// Channel to send the response.
|
||||
response: oneshot::Sender<RequestResult<TrieNodesMessage>>,
|
||||
},
|
||||
}
|
||||
|
||||
impl<F> Future for SnapRequestHandler<F>
|
||||
where
|
||||
F: DatabaseProviderFactory + Unpin,
|
||||
{
|
||||
type Output = ();
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.get_mut();
|
||||
|
||||
loop {
|
||||
match this.incoming_requests.poll_next_unpin(cx) {
|
||||
Poll::Ready(Some(incoming)) => match incoming {
|
||||
IncomingSnapRequest::GetAccountRange { peer_id, request, response } => {
|
||||
this.on_account_range_request(peer_id, request, response);
|
||||
}
|
||||
IncomingSnapRequest::GetStorageRanges { peer_id, request, response } => {
|
||||
this.on_storage_ranges_request(peer_id, request, response);
|
||||
}
|
||||
IncomingSnapRequest::GetByteCodes { peer_id, request, response } => {
|
||||
this.on_byte_codes_request(peer_id, request, response);
|
||||
}
|
||||
IncomingSnapRequest::GetTrieNodes { peer_id, request, response } => {
|
||||
this.on_trie_nodes_request(peer_id, request, response);
|
||||
}
|
||||
},
|
||||
Poll::Ready(None) => return Poll::Ready(()),
|
||||
Poll::Pending => return Poll::Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use alloy_primitives::{keccak256, B256, U256};
|
||||
use alloy_rlp::Decodable;
|
||||
use reth_db_api::transaction::DbTxMut;
|
||||
use reth_provider::test_utils::create_test_provider_factory;
|
||||
use reth_storage_api::{DBProvider, DatabaseProviderFactory};
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
#[test]
|
||||
fn test_encode_account_roundtrip() {
|
||||
let account = Account {
|
||||
nonce: 10,
|
||||
balance: U256::from(500),
|
||||
bytecode_hash: Some(B256::repeat_byte(0xAB)),
|
||||
};
|
||||
let encoded = encode_account(&account);
|
||||
assert!(!encoded.is_empty());
|
||||
|
||||
let decoded = alloy_trie::TrieAccount::decode(&mut encoded.as_ref()).unwrap();
|
||||
assert_eq!(decoded.nonce, 10);
|
||||
assert_eq!(decoded.balance, U256::from(500));
|
||||
assert_eq!(decoded.code_hash, B256::repeat_byte(0xAB));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_account_range_empty_db() {
|
||||
let factory = create_test_provider_factory();
|
||||
let (tx, rx) = mpsc::channel(10);
|
||||
let handler = SnapRequestHandler::new(factory, rx);
|
||||
|
||||
let handle = tokio::spawn(handler);
|
||||
|
||||
let (resp_tx, resp_rx) = oneshot::channel();
|
||||
tx.send(IncomingSnapRequest::GetAccountRange {
|
||||
peer_id: PeerId::random(),
|
||||
request: GetAccountRangeMessage {
|
||||
request_id: 1,
|
||||
root_hash: B256::ZERO,
|
||||
starting_hash: B256::ZERO,
|
||||
limit_hash: B256::repeat_byte(0xFF),
|
||||
response_bytes: 512 * 1024,
|
||||
},
|
||||
response: resp_tx,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let result = resp_rx.await.unwrap().unwrap();
|
||||
assert!(result.accounts.is_empty());
|
||||
|
||||
drop(tx);
|
||||
handle.await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_byte_codes_request() {
|
||||
let factory = create_test_provider_factory();
|
||||
|
||||
// Write bytecodes into the DB
|
||||
let code = Bytes::from(vec![0x60, 0x00, 0x60, 0x00, 0xFD]); // PUSH0 PUSH0 REVERT
|
||||
let code_hash = keccak256(&code);
|
||||
{
|
||||
let provider = factory.database_provider_rw().unwrap();
|
||||
let bytecode = reth_primitives_traits::Bytecode::new_raw(code.clone());
|
||||
provider.tx_ref().put::<tables::Bytecodes>(code_hash, bytecode).unwrap();
|
||||
provider.commit().unwrap();
|
||||
}
|
||||
|
||||
let (tx, rx) = mpsc::channel(10);
|
||||
let handler = SnapRequestHandler::new(factory, rx);
|
||||
let handle = tokio::spawn(handler);
|
||||
|
||||
let (resp_tx, resp_rx) = oneshot::channel();
|
||||
tx.send(IncomingSnapRequest::GetByteCodes {
|
||||
peer_id: PeerId::random(),
|
||||
request: GetByteCodesMessage {
|
||||
request_id: 2,
|
||||
hashes: vec![code_hash],
|
||||
response_bytes: 512 * 1024,
|
||||
},
|
||||
response: resp_tx,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let result = resp_rx.await.unwrap().unwrap();
|
||||
assert_eq!(result.codes.len(), 1);
|
||||
assert_eq!(result.codes[0], code);
|
||||
|
||||
drop(tx);
|
||||
handle.await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_storage_ranges_empty() {
|
||||
let factory = create_test_provider_factory();
|
||||
let (tx, rx) = mpsc::channel(10);
|
||||
let handler = SnapRequestHandler::new(factory, rx);
|
||||
let handle = tokio::spawn(handler);
|
||||
|
||||
let (resp_tx, resp_rx) = oneshot::channel();
|
||||
tx.send(IncomingSnapRequest::GetStorageRanges {
|
||||
peer_id: PeerId::random(),
|
||||
request: GetStorageRangesMessage {
|
||||
request_id: 3,
|
||||
root_hash: B256::ZERO,
|
||||
account_hashes: vec![B256::repeat_byte(0x01)],
|
||||
starting_hash: B256::ZERO,
|
||||
limit_hash: B256::repeat_byte(0xFF),
|
||||
response_bytes: 512 * 1024,
|
||||
},
|
||||
response: resp_tx,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let result = resp_rx.await.unwrap().unwrap();
|
||||
// One entry per requested account, but the inner vec is empty since no storage exists
|
||||
assert_eq!(result.slots.len(), 1);
|
||||
assert!(result.slots[0].is_empty());
|
||||
|
||||
drop(tx);
|
||||
handle.await.unwrap();
|
||||
}
|
||||
}
|
||||
46
crates/net/snap-sync/src/task.rs
Normal file
46
crates/net/snap-sync/src/task.rs
Normal file
@@ -0,0 +1,46 @@
|
||||
//! Snap sync task for integration with the node builder.
|
||||
//!
|
||||
//! Provides a standalone async function that runs snap sync to completion,
|
||||
//! suitable for spawning as a background task before the pipeline starts.
|
||||
|
||||
use crate::{downloader::SnapSyncDownloader, error::SnapSyncError};
|
||||
use alloy_primitives::B256;
|
||||
use reth_db_api::transaction::{DbTx, DbTxMut};
|
||||
use reth_network_p2p::snap::client::SnapClient;
|
||||
use reth_storage_api::{DBProvider, DatabaseProviderFactory};
|
||||
use tracing::info;
|
||||
|
||||
/// Runs snap sync to completion, returning the pivot block number on success.
|
||||
///
|
||||
/// This is the main entry point for integrating snap sync with the node builder.
|
||||
/// It creates a [`SnapSyncDownloader`], runs all phases (account download, storage
|
||||
/// download, bytecode download, state root verification), and returns the pivot
|
||||
/// block number that the pipeline should resume from.
|
||||
pub async fn run_snap_sync<C, F>(
|
||||
client: C,
|
||||
provider_factory: F,
|
||||
pivot_hash: B256,
|
||||
pivot_number: u64,
|
||||
state_root: B256,
|
||||
) -> Result<u64, SnapSyncError>
|
||||
where
|
||||
C: SnapClient + Clone + Send + Sync + 'static,
|
||||
F: DatabaseProviderFactory + Send + Sync + 'static,
|
||||
<F as DatabaseProviderFactory>::ProviderRW: DBProvider<Tx: DbTxMut + DbTx> + Send,
|
||||
{
|
||||
let (_, cancel_rx) = tokio::sync::watch::channel(false);
|
||||
let mut downloader = SnapSyncDownloader::new(
|
||||
client,
|
||||
provider_factory,
|
||||
pivot_hash,
|
||||
pivot_number,
|
||||
state_root,
|
||||
cancel_rx,
|
||||
);
|
||||
|
||||
info!(target: "snap::sync", pivot_number, %pivot_hash, "Starting snap sync");
|
||||
downloader.run().await?;
|
||||
info!(target: "snap::sync", pivot_number, "Snap sync completed successfully");
|
||||
|
||||
Ok(pivot_number)
|
||||
}
|
||||
@@ -54,6 +54,7 @@ reth-tokio-util.workspace = true
|
||||
reth-tracing.workspace = true
|
||||
reth-transaction-pool.workspace = true
|
||||
reth-trie-db = { workspace = true, features = ["metrics"] }
|
||||
reth-snap-sync.workspace = true
|
||||
reth-basic-payload-builder.workspace = true
|
||||
reth-node-ethstats.workspace = true
|
||||
|
||||
|
||||
@@ -32,7 +32,7 @@ use reth_node_core::{
|
||||
use reth_node_events::node;
|
||||
use reth_provider::{
|
||||
providers::{BlockchainProvider, NodeTypesForProvider},
|
||||
BlockNumReader, StorageSettingsCache,
|
||||
BlockNumReader, HeaderProvider, StorageSettingsCache,
|
||||
};
|
||||
use reth_tasks::TaskExecutor;
|
||||
use reth_tokio_util::EventSender;
|
||||
@@ -163,6 +163,78 @@ impl EngineNodeLauncher {
|
||||
// The new engine writes directly to static files. This ensures that they're up to the tip.
|
||||
pipeline.move_to_static_files()?;
|
||||
|
||||
// Run snap sync if enabled
|
||||
if node_config.debug.snap_sync {
|
||||
info!(target: "reth::cli", "Snap sync mode enabled");
|
||||
|
||||
let (pivot_hash, pivot_header) = if let Some(tip) = node_config.debug.tip {
|
||||
info!(target: "reth::cli", %tip, "Using explicit pivot from --debug.tip");
|
||||
let header = match ctx.blockchain_db().header(tip) {
|
||||
Ok(Some(h)) => h,
|
||||
_ => {
|
||||
info!(target: "reth::cli", %tip, "Pivot header not in DB, fetching from network");
|
||||
let sealed = node_config
|
||||
.fetch_tip_from_network(network_client.clone(), tip.into())
|
||||
.await;
|
||||
sealed.into_header()
|
||||
}
|
||||
};
|
||||
(tip, header)
|
||||
} else {
|
||||
info!(target: "reth::cli", "Selecting snap sync pivot from last available header");
|
||||
let last_number = ctx
|
||||
.blockchain_db()
|
||||
.last_block_number()
|
||||
.map_err(|e| eyre::eyre!("failed to get last block number: {e}"))?;
|
||||
|
||||
if last_number == 0 {
|
||||
return Err(eyre::eyre!(
|
||||
"No blocks synced yet. Snap sync needs headers synced first. \
|
||||
Either run header sync first or provide --debug.tip explicitly."
|
||||
));
|
||||
}
|
||||
|
||||
let sealed = ctx
|
||||
.blockchain_db()
|
||||
.sealed_header(last_number)
|
||||
.map_err(|e| eyre::eyre!("failed to get sealed header: {e}"))?
|
||||
.ok_or_else(|| eyre::eyre!("header at block {last_number} not found"))?;
|
||||
|
||||
let hash = sealed.hash();
|
||||
info!(target: "reth::cli", last_number, %hash, "Using last available header as snap sync pivot");
|
||||
(hash, sealed.into_header())
|
||||
};
|
||||
|
||||
let pivot_number = pivot_header.number();
|
||||
let state_root = pivot_header.state_root();
|
||||
|
||||
info!(
|
||||
target: "reth::cli",
|
||||
%pivot_hash,
|
||||
pivot_number,
|
||||
%state_root,
|
||||
"Starting snap sync to pivot block"
|
||||
);
|
||||
|
||||
let snap_result = reth_snap_sync::run_snap_sync(
|
||||
network_client.clone(),
|
||||
ctx.provider_factory().clone(),
|
||||
pivot_hash,
|
||||
pivot_number,
|
||||
state_root,
|
||||
)
|
||||
.await;
|
||||
|
||||
match snap_result {
|
||||
Ok(block_number) => {
|
||||
info!(target: "reth::cli", block_number, "Snap sync completed successfully");
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(eyre::eyre!("Snap sync failed: {e}"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let pipeline_events = pipeline.events();
|
||||
|
||||
let mut pruner_builder = ctx.pruner_builder();
|
||||
|
||||
@@ -102,6 +102,11 @@ pub struct DebugArgs {
|
||||
#[arg(long = "ethstats", help_heading = "Debug")]
|
||||
pub ethstats: Option<String>,
|
||||
|
||||
/// Enable snap sync mode. Downloads state from peers via snap protocol
|
||||
/// instead of executing all historical blocks.
|
||||
#[arg(long = "debug.snap-sync", help_heading = "Debug")]
|
||||
pub snap_sync: bool,
|
||||
|
||||
/// Set the node to idle state when the backfill is not running.
|
||||
///
|
||||
/// This makes the `eth_syncing` RPC return "Idle" when the node has just started or finished
|
||||
@@ -126,6 +131,7 @@ impl Default for DebugArgs {
|
||||
invalid_block_hook: Some(InvalidBlockSelection::default()),
|
||||
healthy_node_rpc_url: None,
|
||||
ethstats: None,
|
||||
snap_sync: false,
|
||||
startup_sync_state_idle: false,
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user