feat(net): integrate HeadersClient (#251)

* refactor: headers client

* chore: rustfmt

* chore(clippy): make clippy happy

* feat(net): integrate HeadersClient

* fix: parse td as U256 with from_little_endian

* conflicts

Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
This commit is contained in:
Matthias Seitz
2022-11-25 14:26:43 +01:00
committed by GitHub
parent dda8df7341
commit fb2861f112
9 changed files with 113 additions and 13 deletions

View File

@@ -1,7 +1,7 @@
use crate::p2p::error::RequestResult;
use async_trait::async_trait;
pub use reth_eth_wire::BlockHeaders;
use reth_primitives::{BlockHashOrNumber, H256};
use reth_primitives::{BlockHashOrNumber, H256, U256};
use std::fmt::Debug;
/// The header request struct to be sent to connected peers, which
@@ -24,7 +24,7 @@ pub trait HeadersClient: Send + Sync + Debug {
/// Update the node's Status message.
///
/// The updated Status message will be used during any new eth/65 handshakes.
fn update_status(&self, height: u64, hash: H256, td: H256);
fn update_status(&self, height: u64, hash: H256, td: U256);
/// Sends the header request to the p2p network and returns the header response received from a
/// peer.

View File

@@ -13,7 +13,7 @@ use crate::{
};
use futures::{Future, FutureExt, Stream};
use reth_eth_wire::BlockHeaders;
use reth_primitives::{BlockLocked, Header, SealedHeader, H256};
use reth_primitives::{BlockLocked, Header, SealedHeader, H256, U256};
use reth_rpc_types::engine::ForkchoiceState;
use std::{
pin::Pin,
@@ -120,6 +120,34 @@ impl BatchDownload for TestDownload {
fn into_stream_unordered(self) -> Box<dyn Stream<Item = Result<Self::Ok, Self::Error>>> {
Box::new(self)
}
// async fn download(
// &self,
// _: &SealedHeader,
// _: &ForkchoiceState,
// ) -> Result<Vec<SealedHeader>, DownloadError> {
// // call consensus stub first. fails if the flag is set
// let empty = SealedHeader::default();
// self.consensus
// .validate_header(&empty, &empty)
// .map_err(|error| DownloadError::HeaderValidation { hash: empty.hash(), error })?;
//
// let stream = self.client.stream_headers().await;
// let stream = stream.timeout(Duration::from_secs(1));
//
// match Box::pin(stream).try_next().await {
// Ok(Some(res)) => {
// let mut headers = res.headers.iter().map(|h|
// h.clone().seal()).collect::<Vec<_>>(); if !headers.is_empty() {
// headers.sort_unstable_by_key(|h| h.number);
// headers.remove(0); // remove head from response
// headers.reverse();
// }
// Ok(headers)
// }
// _ => Err(DownloadError::Timeout { request_id: 0 }),
// }
// }
}
/// A test client for fetching headers
@@ -145,7 +173,7 @@ impl TestHeadersClient {
#[async_trait::async_trait]
impl HeadersClient for TestHeadersClient {
fn update_status(&self, _height: u64, _hash: H256, _td: H256) {}
fn update_status(&self, _height: u64, _hash: H256, _td: U256) {}
async fn get_headers(&self, request: HeadersRequest) -> RequestResult<BlockHeaders> {
if let Some(err) = &mut *self.error.lock().await {

View File

@@ -1,9 +1,13 @@
//! A client implementation that can interact with the network and download data.
use crate::fetch::DownloadRequest;
use crate::fetch::{DownloadRequest, StatusUpdate};
use reth_interfaces::p2p::{error::RequestResult, headers::client::HeadersRequest};
use reth_primitives::Header;
use reth_eth_wire::BlockHeaders;
use reth_interfaces::p2p::{
error::RequestResult,
headers::client::{HeadersClient, HeadersRequest},
};
use reth_primitives::{Header, H256, U256};
use tokio::sync::{mpsc::UnboundedSender, oneshot};
/// Front-end API for fetching data from the network.
@@ -11,6 +15,8 @@ use tokio::sync::{mpsc::UnboundedSender, oneshot};
pub struct FetchClient {
/// Sender half of the request channel.
pub(crate) request_tx: UnboundedSender<DownloadRequest>,
/// Sender for sending Status updates
pub(crate) status_tx: UnboundedSender<StatusUpdate>,
}
impl FetchClient {
@@ -21,3 +27,16 @@ impl FetchClient {
rx.await?
}
}
#[async_trait::async_trait]
impl HeadersClient for FetchClient {
fn update_status(&self, height: u64, hash: H256, total_difficulty: U256) {
let _ = self.status_tx.send(StatusUpdate { height, hash, total_difficulty });
}
async fn get_headers(&self, request: HeadersRequest) -> RequestResult<BlockHeaders> {
let (response, rx) = oneshot::channel();
self.request_tx.send(DownloadRequest::GetBlockHeaders { request, response })?;
rx.await?.map(BlockHeaders::from)
}
}

View File

@@ -7,7 +7,7 @@ use reth_interfaces::p2p::{
error::{RequestError, RequestResult},
headers::client::HeadersRequest,
};
use reth_primitives::{Header, PeerId, H256};
use reth_primitives::{Header, PeerId, H256, U256};
use std::{
collections::{HashMap, VecDeque},
task::{Context, Poll},
@@ -36,6 +36,10 @@ pub struct StateFetcher {
download_requests_rx: UnboundedReceiverStream<DownloadRequest>,
/// Sender for download requests, used to detach a [`FetchClient`]
download_requests_tx: UnboundedSender<DownloadRequest>,
/// Receiver for new incoming [`StatusUpdate`] requests.
status_rx: UnboundedReceiverStream<StatusUpdate>,
/// Sender for updating the status, used to detach a [`FetchClient`]
status_tx: UnboundedSender<StatusUpdate>,
}
// === impl StateSyncer ===
@@ -120,6 +124,10 @@ impl StateFetcher {
return Poll::Ready(action)
}
if let Poll::Ready(Some(status)) = self.status_rx.poll_next_unpin(cx) {
return Poll::Ready(FetchAction::StatusUpdate(status))
}
loop {
// poll incoming requests
match self.download_requests_rx.poll_next_unpin(cx) {
@@ -220,13 +228,17 @@ impl StateFetcher {
/// Returns a new [`FetchClient`] that can send requests to this type.
pub(crate) fn client(&self) -> FetchClient {
FetchClient { request_tx: self.download_requests_tx.clone() }
FetchClient {
request_tx: self.download_requests_tx.clone(),
status_tx: self.status_tx.clone(),
}
}
}
impl Default for StateFetcher {
fn default() -> Self {
let (download_requests_tx, download_requests_rx) = mpsc::unbounded_channel();
let (status_tx, status_rx) = mpsc::unbounded_channel();
Self {
inflight_headers_requests: Default::default(),
inflight_bodies_requests: Default::default(),
@@ -234,6 +246,8 @@ impl Default for StateFetcher {
queued_requests: Default::default(),
download_requests_rx: UnboundedReceiverStream::new(download_requests_rx),
download_requests_tx,
status_rx: UnboundedReceiverStream::new(status_rx),
status_tx,
}
}
}
@@ -290,6 +304,14 @@ struct Request<Req, Resp> {
started: Instant,
}
/// A message to update the status.
#[derive(Debug, Clone)]
pub(crate) struct StatusUpdate {
pub(crate) height: u64,
pub(crate) hash: H256,
pub(crate) total_difficulty: U256,
}
/// Requests that can be sent to the Syncer from a [`FetchClient`]
pub(crate) enum DownloadRequest {
/// Download the requested headers and send response through channel
@@ -322,6 +344,8 @@ pub(crate) enum FetchAction {
/// The request to send
request: BlockRequest,
},
/// Propagate a received status update for the node
StatusUpdate(StatusUpdate),
}
/// Outcome of a processed response.

View File

@@ -455,6 +455,9 @@ where
.peers_mut()
.apply_reputation_change(&peer_id, ReputationChangeKind::FailedToConnect);
}
SwarmEvent::StatusUpdate(status) => {
this.swarm.sessions_mut().on_status_update(status.clone())
}
}
}

View File

@@ -1,6 +1,7 @@
//! Support for handling peer sessions.
pub use crate::message::PeerRequestSender;
use crate::{
fetch::StatusUpdate,
message::PeerMessage,
session::{
active::ActiveSession,
@@ -138,7 +139,14 @@ impl SessionManager {
self.spawned_tasks.spawn(async move { f.await });
}
/// A incoming TCP connection was received. This starts the authentication process to turn this
/// Invoked on a received status update
pub(crate) fn on_status_update(&mut self, status: StatusUpdate) {
self.status.blockhash = status.hash;
self.status.total_difficulty = status.total_difficulty;
self.fork_filter.set_head(status.height);
}
/// An incoming TCP connection was received. This starts the authentication process to turn this
/// stream into an active peer session.
///
/// Returns an error if the configured limit has been reached.

View File

@@ -3,7 +3,7 @@
use crate::{
cache::LruCache,
discovery::{Discovery, DiscoveryEvent},
fetch::{BlockResponseOutcome, StateFetcher},
fetch::{BlockResponseOutcome, FetchAction, StateFetcher, StatusUpdate},
message::{
BlockRequest, NewBlockMessage, PeerRequest, PeerRequestSender, PeerResponse,
PeerResponseResult,
@@ -325,6 +325,18 @@ where
self.on_discovery_event(discovery);
}
while let Poll::Ready(action) = self.state_fetcher.poll(cx) {
match action {
FetchAction::BlockRequest { peer_id, request } => {
self.handle_block_request(peer_id, request)
}
FetchAction::StatusUpdate(status) => {
// we want to return this directly
return Poll::Ready(StateAction::StatusUpdate(status))
}
}
}
let mut disconnect_sessions = Vec::new();
let mut received_responses = Vec::new();
// poll all connected peers for responses
@@ -387,7 +399,9 @@ pub struct ConnectedPeer {
}
/// Message variants triggered by the [`State`]
pub enum StateAction {
pub(crate) enum StateAction {
/// Received a node status update.
StatusUpdate(StatusUpdate),
/// Dispatch a `NewBlock` message to the peer
NewBlock {
/// Target of the message

View File

@@ -1,4 +1,5 @@
use crate::{
fetch::StatusUpdate,
listener::{ConnectionListener, ListenerEvent},
message::{PeerMessage, PeerRequestSender},
session::{Direction, SessionEvent, SessionId, SessionManager},
@@ -177,6 +178,7 @@ where
let msg = PeerMessage::NewBlockHashes(hashes);
self.sessions.send_message(&peer_id, msg);
}
StateAction::StatusUpdate(status) => return Some(SwarmEvent::StatusUpdate(status)),
}
None
}
@@ -234,6 +236,8 @@ where
/// All events created or delegated by the [`Swarm`] that represents changes to the state of the
/// network.
pub(crate) enum SwarmEvent {
/// Received a node status update.
StatusUpdate(StatusUpdate),
/// Events related to the actual network protocol.
ValidMessage {
/// The peer that sent the message

View File

@@ -131,7 +131,7 @@ impl<D: HeaderDownloader, C: Consensus, H: HeadersClient> HeaderStage<D, C, H> {
.get::<tables::CanonicalHeaders>(height)?
.ok_or(DatabaseIntegrityError::CanonicalHeader { number: height })?;
let td: Vec<u8> = tx.get::<tables::HeaderTD>((height, hash).into())?.unwrap(); // TODO:
self.client.update_status(height, hash, H256::from_slice(&td));
self.client.update_status(height, hash, H256::from_slice(&td).into_uint());
Ok(())
}