Multiple sentry support (#202)

This commit is contained in:
Eugene
2022-06-20 13:55:14 +02:00
committed by GitHub
parent bd5b8541ec
commit 36decfbf98
7 changed files with 111 additions and 74 deletions

View File

@@ -8,12 +8,17 @@ use crate::{
use bytes::{BufMut, BytesMut};
use dashmap::DashSet;
use ethereum_interfaces::{sentry as grpc_sentry, sentry::sentry_client::SentryClient};
use ethereum_types::H512;
use fastrlp::*;
use futures::stream::FuturesUnordered;
use hashlink::LruCache;
use parking_lot::{Mutex, RwLock};
use rand::{thread_rng, Rng};
use std::{future::pending, sync::Arc, time::Duration};
use std::{
future::{pending, Future},
sync::Arc,
time::Duration,
};
use task_group::TaskGroup;
use tokio::sync::watch;
use tokio_stream::StreamExt;
@@ -21,10 +26,14 @@ use tonic::transport::Channel;
use tracing::*;
pub type Sentry = SentryClient<Channel>;
pub type SentryId = usize;
pub type PeerId = H512;
pub type RequestId = u64;
#[derive(Debug)]
pub(crate) struct BlockCaches {
pub struct BlockCaches {
/// Mapping from the child hash to it's parent.
pub(crate) parent_cache: LruCache<H256, H256>,
/// Mapping from the block hash to it's number.
@@ -33,22 +42,22 @@ pub(crate) struct BlockCaches {
#[derive(Debug)]
pub struct Node {
pub(crate) stash: Arc<dyn Stash>,
pub stash: Arc<dyn Stash>,
/// The sentry clients.
pub(crate) sentries: Vec<Sentry>,
pub sentries: Vec<Sentry>,
/// The current Node status message.
pub(crate) status: RwLock<Status>,
pub status: RwLock<Status>,
/// Node chain config.
pub(crate) config: ChainConfig,
pub config: ChainConfig,
/// Highest persistent chain tip.
pub(crate) chain_tip: watch::Receiver<(BlockNumber, H256)>,
pub(crate) chain_tip_sender: watch::Sender<(BlockNumber, H256)>,
pub chain_tip: watch::Receiver<(BlockNumber, H256)>,
pub chain_tip_sender: watch::Sender<(BlockNumber, H256)>,
/// Block caches
pub(crate) block_caches: Mutex<BlockCaches>,
pub block_caches: Mutex<BlockCaches>,
/// Table of block hashes of the blocks known to not belong to the canonical chain.
pub(crate) bad_blocks: DashSet<H256>,
pub bad_blocks: DashSet<H256>,
/// Chain forks.
pub(crate) forks: Vec<u64>,
pub forks: Vec<u64>,
}
impl Node {
@@ -66,10 +75,9 @@ impl Node {
let tasks = TaskGroup::new();
let (tx, mut rx) = tokio::sync::mpsc::channel(128);
let requested = Arc::new(Mutex::new(LruCache::new(128)));
let _ = tasks.spawn({
tasks.spawn({
let handler = self.clone();
let requested = requested.clone();
@@ -79,14 +87,20 @@ impl Node {
if let Some(msg) = stream.next().await {
let (block_number, _) = *handler.chain_tip.borrow();
let peer_id = msg.peer_id;
let sentry_id = msg.sentry_id;
match msg.msg {
Message::NewBlockHashes(ref blocks) => {
for b in &blocks.0 {
if b.number > block_number {
let id = thread_rng().gen::<u64>();
tx.send((id, PeerFilter::PeerId(peer_id), b.hash, 0u64))
.await?;
tx.send((
id,
PeerFilter::Peer(peer_id, sentry_id),
b.hash,
0u64,
))
.await?;
requested.lock().insert(id, ());
}
}
@@ -150,7 +164,7 @@ impl Node {
}
}
}
Ok::<(), anyhow::Error>(())
Ok::<_, anyhow::Error>(())
}
});
@@ -218,23 +232,31 @@ impl Node {
async move {
while let Some(msg) = stream.next().await {
let peer_id = msg.peer_id;
let sentry_id = msg.sentry_id;
match msg.msg {
Message::GetBlockHeaders(inner) => {
let msg = Message::BlockHeaders(BlockHeaders {
request_id: inner.request_id,
headers: self.stash.get_headers(inner.params).unwrap_or_default(),
headers: handler
.stash
.get_headers(inner.params)
.unwrap_or_default(),
});
self.send_message(msg, PeerFilter::PeerId(peer_id)).await?;
handler
.send_message(msg, PeerFilter::Peer(peer_id, sentry_id))
.await?;
}
Message::GetBlockBodies(inner) => {
let msg = Message::BlockBodies(BlockBodies {
request_id: inner.request_id,
bodies: self.stash.get_bodies(inner.hashes).unwrap_or_default(),
bodies: handler.stash.get_bodies(inner.hashes).unwrap_or_default(),
});
self.send_message(msg, PeerFilter::PeerId(peer_id)).await?;
handler
.send_message(msg, PeerFilter::Peer(peer_id, sentry_id))
.await?;
}
_ => unreachable!(),
}
@@ -430,11 +452,11 @@ impl Node {
let data = grpc_sentry::OutboundMessageData {
id: grpc_sentry::MessageId::from(MessageId::GetPooledTransactions) as i32,
data: |hashes: &'_ [H256]| -> bytes::Bytes {
data: || -> bytes::Bytes {
let mut buf = BytesMut::new();
GetPooledTransactions_ { request_id, hashes }.encode(&mut buf);
buf.freeze()
}(hashes),
}(),
};
self.send_raw(data, pred).await?;
@@ -556,19 +578,42 @@ impl Node {
}
impl Node {
const TIMEOUT: Duration = Duration::from_secs(2);
async fn send_raw(
&self,
data: impl Into<grpc_sentry::OutboundMessageData>,
predicate: PeerFilter,
) -> anyhow::Result<()> {
let send_msg = move |mut sentry: Sentry,
pred: PeerFilter,
data: grpc_sentry::OutboundMessageData| async move {
match pred {
PeerFilter::All => {
let data = data.into();
async fn map_await<T, I, F>(iter: T, data: grpc_sentry::OutboundMessageData, closure: F)
where
T: IntoIterator<Item = Sentry>,
I: Future<Output = anyhow::Result<()>>,
F: Fn(Sentry, grpc_sentry::OutboundMessageData) -> I,
{
iter.into_iter()
.map(|sentry| {
let data = data.clone();
tokio::time::timeout(Node::TIMEOUT, closure(sentry, data))
})
.collect::<FuturesUnordered<_>>()
.map(|_| ())
.collect::<()>()
.await
}
match predicate {
PeerFilter::All => {
map_await(self.sentries.clone(), data, |mut sentry, data| async move {
sentry.send_message_to_all(data).await?;
}
PeerFilter::Random(max_peers) => {
Ok::<_, anyhow::Error>(())
})
.await;
}
PeerFilter::Random(max_peers) => {
map_await(self.sentries.clone(), data, |mut sentry, data| async move {
sentry
.send_message_to_random_peers(
grpc_sentry::SendMessageToRandomPeersRequest {
@@ -577,48 +622,36 @@ impl Node {
},
)
.await?;
}
PeerFilter::PeerId(peer_id) => {
Ok::<_, anyhow::Error>(())
})
.await;
}
PeerFilter::Peer(peer_id, sentry_id) => {
let iter = std::iter::once(self.sentries[sentry_id].clone());
map_await(iter, data, |mut sentry, data| async move {
sentry
.send_message_by_id(grpc_sentry::SendMessageByIdRequest {
data: Some(data),
peer_id: Some(peer_id.into()),
})
.await?;
}
PeerFilter::MinBlock(min_block) => {
Ok::<_, anyhow::Error>(())
})
.await;
}
PeerFilter::MinBlock(min_block) => {
map_await(self.sentries.clone(), data, |mut sentry, data| async move {
sentry
.send_message_by_min_block(grpc_sentry::SendMessageByMinBlockRequest {
data: Some(data),
min_block,
})
.await?;
}
};
Ok::<_, anyhow::Error>(())
};
const TIMEOUT: Duration = Duration::from_secs(2);
let data = data.into();
self.sentries
.clone()
.into_iter()
.map(|sentry| {
let predicate = predicate.clone();
let data = data.clone();
async move {
let _ =
tokio::time::timeout(TIMEOUT, send_msg(sentry, predicate, data)).await?;
Ok::<_, anyhow::Error>(())
}
})
.collect::<FuturesUnordered<_>>()
.map(|_| ())
.collect::<()>()
.await;
})
.await
}
}
Ok(())
}
async fn set_status(&self, status_data: grpc_sentry::StatusData) -> anyhow::Result<()> {

View File

@@ -1,13 +1,11 @@
use std::fmt::Debug;
use mdbx::EnvironmentKind;
use crate::{
accessors::chain,
kv::{tables, MdbxWithDirHandle},
models::{BlockBody, BlockHeader, BlockNumber, H256},
p2p::types::{BlockId, GetBlockHeadersParams},
};
use mdbx::EnvironmentKind;
use std::fmt::Debug;
pub trait Stash: Send + Sync + Debug {
fn get_headers(&self, _: GetBlockHeadersParams) -> anyhow::Result<Vec<BlockHeader>>;

View File

@@ -14,7 +14,11 @@ impl SentryStream {
const BACKOFF: Duration = Duration::from_millis(100);
#[allow(clippy::new_ret_no_self)]
pub async fn new(sentry: &Sentry, pred: Vec<i32>) -> anyhow::Result<NodeStream> {
pub async fn new(
sentry: &Sentry,
sentry_id: usize,
pred: Vec<i32>,
) -> anyhow::Result<NodeStream> {
let (penalize_tx, mut penalize_rx) = mpsc::channel(4);
tokio::task::spawn({
let mut sentry = sentry.clone();
@@ -42,7 +46,7 @@ impl SentryStream {
if let Some(Ok(msg)) = inner_stream.next().await {
let peer_id = msg.peer_id.clone();
if let Ok(msg) = InboundMessage::try_from(msg) {
if let Ok(msg) = InboundMessage::new(msg, sentry_id) {
yield msg;
} else {
let _ = penalize_tx.send(peer_id).await;
@@ -55,9 +59,9 @@ impl SentryStream {
Ok::<_, anyhow::Error>(stream)
}
pub async fn join_all<'a, T, P>(iter: T, pred: P) -> NodeStream
pub async fn join_all<'sentry, T, P>(iter: T, pred: P) -> NodeStream
where
T: IntoIterator<Item = &'a Sentry>,
T: IntoIterator<Item = &'sentry Sentry>,
P: IntoIterator<Item = i32>,
{
let pred = pred.into_iter().collect::<Vec<_>>();
@@ -65,7 +69,8 @@ impl SentryStream {
Box::pin(futures::stream::select_all(
futures::future::join_all(
iter.into_iter()
.map(|sentry| Self::new(sentry, pred.clone()))
.enumerate()
.map(|(sentry_id, sentry)| Self::new(sentry, sentry_id, pred.clone()))
.collect::<Vec<_>>(),
)
.await

0
src/p2p/types/event.rs Normal file
View File

View File

@@ -178,13 +178,12 @@ impl From<Vec<H256>> for Message {
pub struct InboundMessage {
pub msg: Message,
pub peer_id: PeerId,
pub sentry_id: usize,
}
impl TryFrom<grpc_sentry::InboundMessage> for InboundMessage {
type Error = anyhow::Error;
#[inline(always)]
fn try_from(value: grpc_sentry::InboundMessage) -> Result<Self, Self::Error> {
impl InboundMessage {
#[inline]
pub fn new(value: grpc_sentry::InboundMessage, sentry_id: usize) -> anyhow::Result<Self> {
let msg_data_slice = &mut &*value.data;
let msg = match MessageId::try_from(match grpc_sentry::MessageId::from_i32(value.id) {
Some(msg_id) => msg_id,
@@ -221,6 +220,7 @@ impl TryFrom<grpc_sentry::InboundMessage> for InboundMessage {
Ok(InboundMessage {
msg,
peer_id: value.peer_id.unwrap_or_default().into(),
sentry_id,
})
}
}

View File

@@ -7,12 +7,13 @@ mod status;
pub use self::{block::*, header::*, message::*, penalty::*, rlp::*, status::*};
use super::node::SentryId;
use crate::sentry::devp2p::PeerId;
#[derive(Clone, Debug, PartialEq)]
pub enum PeerFilter {
All,
Random(u64),
PeerId(PeerId),
Peer(PeerId, SentryId),
MinBlock(u64),
}

View File

@@ -160,7 +160,7 @@ impl Sentry for SentryService {
async fn peer_events(
&self,
_request: tonic::Request<PeerEventsRequest>,
_: tonic::Request<PeerEventsRequest>,
) -> Result<Response<Self::PeerEventsStream>, tonic::Status> {
let receiver = self.capability_server.peers_status_sender.subscribe();
let stream = BroadcastStream::new(receiver)