make db inside Node - optional (#194)

* make db inside Node - optional

* refactor
This commit is contained in:
Eugene
2022-05-30 16:21:24 +00:00
committed by GitHub
parent b0df894adb
commit 047a11581b
8 changed files with 132 additions and 167 deletions

View File

@@ -135,7 +135,7 @@ async fn download_headers(
let node = Arc::new(
NodeBuilder::default()
.set_env(env.clone())
.set_stash(env.clone())
.add_sentry(uri)
.build()?,
);

View File

@@ -235,7 +235,7 @@ fn main() -> anyhow::Result<()> {
};
let mut builder = NodeBuilder::default()
.set_env(db.clone())
.set_stash(db.clone())
.set_config(chain_config.clone());
for sentry_api_addr in sentries {
builder = builder.add_sentry(sentry_api_addr);

View File

@@ -1,46 +1,23 @@
use std::sync::Arc;
use super::{BlockCaches, Node, Sentry};
use super::{stash::Stash, BlockCaches, Node, Sentry};
use crate::{
kv::MdbxWithDirHandle,
models::{BlockNumber, ChainConfig, H256, U256},
p2p::types::Status,
};
use hashlink::LruCache;
use http::Uri;
use mdbx::EnvironmentKind;
use parking_lot::{Mutex, RwLock};
use std::sync::Arc;
use tonic::transport::Channel;
#[derive(Debug)]
pub struct NodeBuilder<E>
where
E: EnvironmentKind,
{
#[derive(Debug, Default)]
pub struct NodeBuilder {
sentries: Vec<Sentry>,
env: Option<Arc<MdbxWithDirHandle<E>>>,
stash: Option<Arc<dyn Stash>>,
config: Option<ChainConfig>,
status: Option<Status>,
}
impl<E> Default for NodeBuilder<E>
where
E: EnvironmentKind,
{
fn default() -> Self {
Self {
sentries: Vec::new(),
env: None,
config: None,
status: None,
}
}
}
impl<E> NodeBuilder<E>
where
E: EnvironmentKind,
{
impl NodeBuilder {
pub fn add_sentry(mut self, endpoint: impl Into<Uri>) -> Self {
self.sentries.push(Sentry::new(
Channel::builder(endpoint.into()).connect_lazy(),
@@ -62,14 +39,13 @@ where
self
}
pub fn set_env(mut self, env: Arc<MdbxWithDirHandle<E>>) -> Self {
self.env = Some(env);
pub fn set_stash(mut self, stash: Arc<dyn Stash>) -> Self {
self.stash = Some(stash);
self
}
pub fn build(self) -> anyhow::Result<Node<E>> {
let env = self.env.ok_or_else(|| anyhow::anyhow!("env not set"))?;
pub fn build(self) -> anyhow::Result<Node> {
let stash = self.stash.unwrap_or_else(|| Arc::new(()));
let sentries = self.sentries;
if sentries.is_empty() {
anyhow::bail!("No sentries");
@@ -82,7 +58,7 @@ where
let forks = config.forks().into_iter().map(|f| *f).collect::<Vec<_>>();
Ok(Node {
env,
stash,
sentries,
status,
config,

View File

@@ -2,6 +2,7 @@
mod builder;
mod node;
mod stash;
mod stream;
pub use self::{builder::*, node::*, stream::NodeStream};

View File

@@ -1,31 +1,19 @@
#![allow(unused_imports, unreachable_code)]
use super::{
stream::{NodeStream, SentryStream},
NodeBuilder,
};
#![allow(unreachable_code)]
use super::{stash::Stash, stream::*};
use crate::{
accessors::chain,
kv::{mdbx::MdbxTransaction, tables, MdbxWithDirHandle},
models::{BlockNumber, ChainConfig, MessageWithSignature, H256},
p2p::types::{
BlockBodies, BlockHeaders, BlockId, GetBlockBodies, GetBlockHeaders, GetBlockHeadersParams,
HeaderRequest, Message, MessageId, PeerFilter, PooledTransactions, Status,
},
p2p::{node::NodeBuilder, types::*},
};
use anyhow::anyhow;
use bytes::{BufMut, BytesMut};
use ethereum_interfaces::{sentry as grpc_sentry, sentry::sentry_client::SentryClient};
use fastrlp::*;
use futures::stream::FuturesUnordered;
use hashbrown::HashSet;
use hashlink::LruCache;
use mdbx::{EnvironmentKind, WriteMap, RO};
use num_traits::Zero;
use parking_lot::{Mutex, RwLock};
use rand::{thread_rng, Rng};
use std::{future::pending, sync::Arc, time::Duration};
use task_group::TaskGroup;
use tokio::task::JoinHandle;
use tokio_stream::StreamExt;
use tonic::transport::Channel;
use tracing::*;
@@ -44,11 +32,8 @@ pub(crate) struct BlockCaches {
}
#[derive(Debug)]
pub struct Node<E>
where
E: EnvironmentKind,
{
pub(crate) env: Arc<MdbxWithDirHandle<E>>,
pub struct Node {
pub(crate) stash: Arc<dyn Stash>,
/// The sentry clients.
pub(crate) sentries: Vec<Sentry>,
/// The current Node status message.
@@ -63,20 +48,14 @@ where
pub(crate) forks: Vec<u64>,
}
impl<E> Node<E>
where
E: EnvironmentKind,
{
impl Node {
/// Node builder.
pub fn builder() -> NodeBuilder<E> {
pub fn builder() -> NodeBuilder {
NodeBuilder::default()
}
}
impl<E> Node<E>
where
E: EnvironmentKind,
{
impl Node {
const SYNC_INTERVAL: Duration = Duration::from_secs(5);
/// Start node synchronization.
@@ -222,80 +201,6 @@ where
}
});
let collect_bodies = {
let env = self.env.clone();
move |hashes: Vec<H256>| {
let txn = env.begin().expect("Failed to begin transaction");
hashes
.into_iter()
.filter_map(|hash| {
txn.get(tables::HeaderNumber, hash)
.unwrap_or(None)
.map(|number| (hash, number))
})
.filter_map(|(hash, number)| {
chain::block_body::read_without_senders(&txn, hash, number).unwrap_or(None)
})
.collect::<Vec<_>>()
}
};
let collect_headers = {
let env = self.env.clone();
move |params: GetBlockHeadersParams| {
let txn = env.begin().expect("Failed to begin transaction");
let limit = std::cmp::min(params.limit, 1024);
let reverse = params.reverse == 1;
let mut add_op = if params.skip == 0 {
1
} else {
params.skip as i64 + 1
};
if reverse {
add_op = -add_op;
}
let mut headers = Vec::with_capacity(limit as usize);
let mut number_cursor = txn
.cursor(tables::HeaderNumber)
.expect("Failed to open cursor, likely a DB corruption");
let mut canonical_cursor = txn
.cursor(tables::CanonicalHeader)
.expect("Failed to open cursor, likely a DB corruption");
let mut header_cursor = txn
.cursor(tables::Header)
.expect("Failed to open cursor, likely a DB corruption");
let mut next_number = match params.start {
BlockId::Hash(hash) => number_cursor.seek_exact(hash)?.map(|(_, k)| k),
BlockId::Number(number) => Some(number),
};
for _ in 0..limit {
match next_number {
Some(block_number) => {
if let Some(header_key) = canonical_cursor.seek_exact(block_number)? {
if let Some((_, header)) = header_cursor.seek_exact(header_key)? {
headers.push(header);
}
}
next_number = u64::try_from(block_number.0 as i64 + add_op)
.ok()
.map(BlockNumber);
}
None => break,
};
}
Ok::<_, anyhow::Error>(headers)
}
};
tasks.spawn({
let handler = self.clone();
let mut stream = handler
@@ -313,7 +218,7 @@ where
Message::GetBlockHeaders(inner) => {
let msg = Message::BlockHeaders(BlockHeaders {
request_id: inner.request_id,
headers: collect_headers(inner.params).unwrap_or_default(),
headers: self.stash.get_headers(inner.params).unwrap_or_default(),
});
self.send_message(msg, PeerFilter::PeerId(peer_id)).await?;
@@ -321,7 +226,7 @@ where
Message::GetBlockBodies(inner) => {
let msg = Message::BlockBodies(BlockBodies {
request_id: inner.request_id,
bodies: collect_bodies(inner.hashes),
bodies: self.stash.get_bodies(inner.hashes).unwrap_or_default(),
});
self.send_message(msg, PeerFilter::PeerId(peer_id)).await?;
@@ -649,10 +554,7 @@ where
}
}
impl<E> Node<E>
where
E: EnvironmentKind,
{
impl Node {
async fn send_raw(
&self,
data: impl Into<grpc_sentry::OutboundMessageData>,

95
src/p2p/node/stash.rs Normal file
View File

@@ -0,0 +1,95 @@
use std::fmt::Debug;
use mdbx::EnvironmentKind;
use crate::{
accessors::chain,
kv::{tables, MdbxWithDirHandle},
models::{BlockBody, BlockHeader, BlockNumber, H256},
p2p::types::{BlockId, GetBlockHeadersParams},
};
pub trait Stash: Send + Sync + Debug {
fn get_headers(&self, _: GetBlockHeadersParams) -> anyhow::Result<Vec<BlockHeader>>;
fn get_bodies(&self, _: Vec<H256>) -> anyhow::Result<Vec<BlockBody>>;
}
impl Stash for () {
fn get_headers(&self, _: GetBlockHeadersParams) -> anyhow::Result<Vec<BlockHeader>> {
Ok(vec![])
}
fn get_bodies(&self, _: Vec<H256>) -> anyhow::Result<Vec<BlockBody>> {
Ok(vec![])
}
}
impl<E> Stash for MdbxWithDirHandle<E>
where
E: EnvironmentKind,
{
fn get_headers(&self, params: GetBlockHeadersParams) -> anyhow::Result<Vec<BlockHeader>> {
let txn = self.begin().expect("Failed to begin transaction");
let limit = std::cmp::min(params.limit, 1024);
let reverse = params.reverse == 1;
let mut add_op = if params.skip == 0 {
1
} else {
params.skip as i64 + 1
};
if reverse {
add_op = -add_op;
}
let mut headers = Vec::with_capacity(limit as usize);
let mut number_cursor = txn
.cursor(tables::HeaderNumber)
.expect("Failed to open cursor, likely a DB corruption");
let mut canonical_cursor = txn
.cursor(tables::CanonicalHeader)
.expect("Failed to open cursor, likely a DB corruption");
let mut header_cursor = txn
.cursor(tables::Header)
.expect("Failed to open cursor, likely a DB corruption");
let mut next_number = match params.start {
BlockId::Hash(hash) => number_cursor.seek_exact(hash)?.map(|(_, k)| k),
BlockId::Number(number) => Some(number),
};
for _ in 0..limit {
match next_number {
Some(block_number) => {
if let Some(header_key) = canonical_cursor.seek_exact(block_number)? {
if let Some((_, header)) = header_cursor.seek_exact(header_key)? {
headers.push(header);
}
}
next_number = u64::try_from(block_number.0 as i64 + add_op)
.ok()
.map(BlockNumber);
}
None => break,
};
}
Ok::<_, anyhow::Error>(headers)
}
fn get_bodies(&self, hashes: Vec<H256>) -> anyhow::Result<Vec<BlockBody>> {
let txn = self.begin().expect("Failed to begin transaction");
Ok(hashes
.into_iter()
.filter_map(|hash| {
txn.get(tables::HeaderNumber, hash)
.unwrap_or(None)
.map(|number| (hash, number))
})
.filter_map(|(hash, number)| {
chain::block_body::read_without_senders(&txn, hash, number).unwrap_or(None)
})
.collect::<Vec<_>>())
}
}

View File

@@ -28,18 +28,15 @@ use tracing::*;
const REQUEST_INTERVAL: Duration = Duration::from_secs(10);
#[derive(Debug)]
pub struct BodyDownload<E>
where
E: EnvironmentKind,
{
pub struct BodyDownload {
/// Node is a interface for interacting with p2p.
pub node: Arc<Node<E>>,
pub node: Arc<Node>,
/// Consensus engine used.
pub consensus: Arc<dyn Consensus>,
}
#[async_trait]
impl<'db, E> Stage<'db, E> for BodyDownload<E>
impl<'db, E> Stage<'db, E> for BodyDownload
where
E: EnvironmentKind,
{
@@ -104,11 +101,8 @@ where
}
}
impl<E> BodyDownload<E>
where
E: EnvironmentKind,
{
async fn download_bodies(
impl BodyDownload {
async fn download_bodies<E: EnvironmentKind>(
&mut self,
stream: &mut NodeStream,
txn: &mut MdbxTransaction<'_, RW, E>,
@@ -294,7 +288,7 @@ where
Ok(())
}
fn prepare_requests(
fn prepare_requests<E: EnvironmentKind>(
txn: &mut MdbxTransaction<'_, RW, E>,
starting_block: BlockNumber,
target: BlockNumber,

View File

@@ -35,15 +35,15 @@ const STAGE_UPPER_BOUND: usize = 3 << 15;
const REQUEST_INTERVAL: Duration = Duration::from_secs(10);
#[derive(Debug)]
pub struct HeaderDownload<E: EnvironmentKind> {
pub node: Arc<Node<E>>,
pub struct HeaderDownload {
pub node: Arc<Node>,
pub consensus: Arc<dyn Consensus>,
pub max_block: BlockNumber,
pub graph: Graph,
}
#[async_trait]
impl<'db, E> Stage<'db, E> for HeaderDownload<E>
impl<'db, E> Stage<'db, E> for HeaderDownload
where
E: EnvironmentKind,
{
@@ -186,10 +186,7 @@ fn dummy_check_headers(headers: &[BlockHeader]) -> bool {
true
}
impl<E> HeaderDownload<E>
where
E: EnvironmentKind,
{
impl HeaderDownload {
const BACK_OFF: Duration = Duration::from_secs(5);
fn prepare_requests(
@@ -363,7 +360,7 @@ where
}
}
async fn update_head<'tx>(
async fn update_head<'tx, E: EnvironmentKind>(
&self,
txn: &'tx mut MdbxTransaction<'_, RW, E>,
height: BlockNumber,