Header downloader: queue optimizations. (#43)

This commit is contained in:
battlmonstr
2021-10-05 14:51:05 +02:00
committed by GitHub
parent 6a5661de01
commit 4502e5efb9
5 changed files with 138 additions and 48 deletions

View File

@@ -60,10 +60,10 @@ impl<DB: kv::traits::MutableKV> Downloader<DB> {
sentry_client.set_status(status).await?;
let mut sentry_reactor = SentryClientReactor::new(sentry_client);
sentry_reactor.start();
sentry_reactor.start()?;
let mut ui_system = crate::downloader::ui_system::UISystem::new();
ui_system.start();
ui_system.start()?;
let preverified_hashes_config = PreverifiedHashesConfig::new(&self.opts.chain_name)?;

View File

@@ -97,13 +97,13 @@ impl FetchReceiveStage {
.set_slice_status(slice.deref_mut(), HeaderSliceStatus::Downloaded);
}
unexpected_status => {
warn!("FetchReceiveStage ignores a headers slice that we didn't request starting at: {}; status = {:?}", start_block_num, unexpected_status);
debug!("FetchReceiveStage ignores a headers slice that we didn't request starting at: {:?}; status = {:?}", start_block_num, unexpected_status);
}
}
}
None => {
warn!(
"FetchReceiveStage ignores a headers slice that we didn't request starting at: {}",
debug!(
"FetchReceiveStage ignores a headers slice that we didn't request starting at: {:?}",
start_block_num
);
}

View File

@@ -21,7 +21,7 @@ pub enum PeerFilter {
All,
}
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct MessageFromPeer {
pub message: Message,
pub from_peer_id: Option<ethereum_types::H512>,

View File

@@ -12,8 +12,8 @@ use tokio::{
task::JoinHandle,
};
use tokio_stream::{
wrappers::{errors::BroadcastStreamRecvError, BroadcastStream},
StreamExt,
wrappers::{errors::BroadcastStreamRecvError, BroadcastStream, ReceiverStream},
StreamExt, StreamMap,
};
use tracing::*;
@@ -54,7 +54,7 @@ impl std::error::Error for SendMessageError {}
impl SentryClientReactor {
pub fn new(sentry: Box<dyn SentryClient>) -> Self {
let (send_message_sender, send_message_receiver) = mpsc::channel::<SendMessageCommand>(16);
let (send_message_sender, send_message_receiver) = mpsc::channel::<SendMessageCommand>(1);
let mut receive_messages_senders =
HashMap::<EthMessageId, broadcast::Sender<Message>>::new();
@@ -82,8 +82,11 @@ impl SentryClientReactor {
}
}
pub fn start(&mut self) {
let mut event_loop = self.event_loop.take().unwrap();
pub fn start(&mut self) -> anyhow::Result<()> {
let event_loop = self
.event_loop
.take()
.ok_or_else(|| anyhow::anyhow!("already started once"))?;
let handle = tokio::spawn(async move {
let result = event_loop.run().await;
if let Err(error) = result {
@@ -91,6 +94,7 @@ impl SentryClientReactor {
}
});
self.event_loop_handle = Some(handle);
Ok(())
}
pub async fn stop(&mut self) -> anyhow::Result<()> {
@@ -191,64 +195,144 @@ impl Drop for SentryClientReactor {
}
}
impl SentryClientReactorEventLoop {
async fn run(&mut self) -> anyhow::Result<()> {
// subscribe to incoming messages
let mut in_stream = self.sentry.receive_messages(&[]).await?;
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
enum EventLoopStreamId {
Send,
Receive,
Stop,
}
loop {
tokio::select! {
Some(command) = self.send_message_receiver.recv() => {
let send_result = self.sentry.send_message(command.message, command.peer_filter).await;
match send_result {
Ok(sent_peers_count) => {
debug!("SentryClientReactor.EventLoop sent message to {:?} peers", sent_peers_count);
#[derive(Debug)]
enum EventLoopStreamResult {
Send(u32),
Receive(MessageFromPeer),
Stop(()),
}
type EventLoopStream = Pin<Box<dyn Stream<Item = anyhow::Result<EventLoopStreamResult>> + Send>>;
// dropping this causes dropping the senders and triggers an end of stream event on the receivers end
struct EventLoopReceiveMessagesSendersDropper {
receive_messages_senders: Arc<RwLock<HashMap<EthMessageId, broadcast::Sender<Message>>>>,
}
impl Drop for EventLoopReceiveMessagesSendersDropper {
fn drop(&mut self) {
// drop shared senders so that existing receive_messages() streams end
self.receive_messages_senders.write().clear();
}
}
impl SentryClientReactorEventLoop {
async fn run(self) -> anyhow::Result<()> {
let mut sentry = self.sentry;
// subscribe to incoming messages
let mut stream = sentry.receive_messages(&[]).await?;
let receive_messages_senders = Arc::clone(&self.receive_messages_senders);
let receive_stream = async_stream::stream! {
// When the stream ends (this normally happens during tests)
// we need to ensure unblocking the subscribers which called receive_messages().
let _receive_messages_senders_dropper = EventLoopReceiveMessagesSendersDropper { receive_messages_senders };
while let Some(message_result) = stream.next().await {
yield message_result;
}
debug!("SentryClientReactor.EventLoop receive_messages stream ended");
};
// When the reactor loop stops/aborts (e.g. after calling stop())
// we need to ensure unblocking the subscribers which called receive_messages().
let _receive_messages_senders_dropper = EventLoopReceiveMessagesSendersDropper {
receive_messages_senders: Arc::clone(&self.receive_messages_senders),
};
let mut send_message_receiver = self.send_message_receiver;
let send_stream = async_stream::stream! {
while let Some(command) = send_message_receiver.recv().await {
let send_result = sentry.send_message(command.message, command.peer_filter).await;
yield send_result;
}
};
let stop_stream = ReceiverStream::new(self.stop_signal_receiver);
let mut stream = StreamMap::<EventLoopStreamId, EventLoopStream>::new();
stream.insert(
EventLoopStreamId::Send,
Box::pin(send_stream.map_ok(EventLoopStreamResult::Send)),
);
stream.insert(
EventLoopStreamId::Receive,
Box::pin(receive_stream.map_ok(EventLoopStreamResult::Receive)),
);
stream.insert(
EventLoopStreamId::Stop,
Box::pin(stop_stream.map(|result| Ok(EventLoopStreamResult::Stop(result)))),
);
while let Some((stream_id, result)) = stream.next().await {
match stream_id {
EventLoopStreamId::Send => {
// process an outgoing message that has been just sent
match result {
Ok(EventLoopStreamResult::Send(sent_peers_count)) => {
debug!(
"SentryClientReactor.EventLoop sent message to {:?} peers",
sent_peers_count
);
}
Ok(_) => panic!("unexpected result {:?}", result),
Err(error) => {
error!("SentryClientReactor.EventLoop sentry.send_message error: {}", error);
error!(
"SentryClientReactor.EventLoop sentry.send_message error: {}",
error
);
}
}
}
message_result = in_stream.next() => {
match message_result {
Some(Ok(message_from_peer)) => {
EventLoopStreamId::Receive => {
// process an incoming message that was received
match result {
Ok(EventLoopStreamResult::Receive(message_from_peer)) => {
let id = message_from_peer.message.eth_id();
debug!("SentryClientReactor.EventLoop incoming message: {:?}", id);
if self.receive_messages_senders
.read()
.get(&id)
.ok_or_else(|| anyhow::anyhow!("SentryClientReactor.EventLoop unexpected message id {:?}", id))?
.send(message_from_peer.message).is_err() {
let receive_messages_senders = self.receive_messages_senders.read();
let sender_opt = receive_messages_senders.get(&id);
let sender = sender_opt.ok_or_else(|| {
anyhow::anyhow!(
"SentryClientReactor.EventLoop unexpected message id {:?}",
id
)
})?;
let send_sub_result = sender.send(message_from_peer.message);
if send_sub_result.is_err() {
debug!("SentryClientReactor.EventLoop no subscribers for message {:?}, dropping", id);
}
}
Some(Err(error)) => {
error!("SentryClientReactor.EventLoop receive message error: {}", error);
Ok(_) => panic!("unexpected result {:?}", result),
Err(error) => {
error!(
"SentryClientReactor.EventLoop receive message error: {}",
error
);
if let Some(io_error) = error.downcast_ref::<std::io::Error>() {
if io_error.kind() == std::io::ErrorKind::BrokenPipe {
info!("SentryClientReactor.EventLoop TODO: need to reconnect in_stream");
}
}
},
None => {
debug!("SentryClientReactor.EventLoop receive_messages stream ended");
break;
}
}
}
Some(_) = self.stop_signal_receiver.recv() => {
break;
}
else => {
EventLoopStreamId::Stop => {
break;
}
}
}
// drop shared senders so that existing receive_messages streams end
self.receive_messages_senders.write().clear();
info!("SentryClientReactor stopped");
Ok(())
}

View File

@@ -35,8 +35,11 @@ impl UISystem {
}
}
pub fn start(&mut self) {
let mut event_loop = self.event_loop.take().unwrap();
pub fn start(&mut self) -> anyhow::Result<()> {
let event_loop = self
.event_loop
.take()
.ok_or_else(|| anyhow::anyhow!("already started once"))?;
let handle = tokio::spawn(async move {
let result = event_loop.run().await;
if let Err(error) = result {
@@ -44,6 +47,7 @@ impl UISystem {
}
});
self.event_loop_handle = Some(handle);
Ok(())
}
pub async fn stop(&mut self) -> anyhow::Result<()> {
@@ -68,7 +72,9 @@ impl UISystem {
}
impl UISystemEventLoop {
async fn run(&mut self) -> anyhow::Result<()> {
async fn run(self) -> anyhow::Result<()> {
let mut stop_signal_receiver = self.stop_signal_receiver;
loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(1)) => {
@@ -77,7 +83,7 @@ impl UISystemEventLoop {
_ = view.draw()?;
}
}
Some(_) = self.stop_signal_receiver.recv() => {
Some(_) = stop_signal_receiver.recv() => {
break;
}
else => {