fix: use bounded channel for headers download task (#2687)

This commit is contained in:
Dan Cline
2023-05-15 17:40:04 -04:00
committed by GitHub
parent d2f0352271
commit 566f5813c9

View File

@@ -10,14 +10,18 @@ use std::{
task::{ready, Context, Poll},
};
use tokio::sync::{mpsc, mpsc::UnboundedSender};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
use tokio_util::sync::PollSender;
/// The maximum number of header results to hold in the buffer.
pub const HEADERS_TASK_BUFFER_SIZE: usize = 8;
/// A [HeaderDownloader] that drives a spawned [HeaderDownloader] on a spawned task.
#[derive(Debug)]
#[pin_project]
pub struct TaskDownloader {
#[pin]
from_downloader: UnboundedReceiverStream<Vec<SealedHeader>>,
from_downloader: ReceiverStream<Vec<SealedHeader>>,
to_downloader: UnboundedSender<DownloaderUpdates>,
}
@@ -60,17 +64,17 @@ impl TaskDownloader {
T: HeaderDownloader + 'static,
S: TaskSpawner,
{
let (headers_tx, headers_rx) = mpsc::unbounded_channel();
let (headers_tx, headers_rx) = mpsc::channel(HEADERS_TASK_BUFFER_SIZE);
let (to_downloader, updates_rx) = mpsc::unbounded_channel();
let downloader = SpawnedDownloader {
headers_tx,
headers_tx: PollSender::new(headers_tx),
updates: UnboundedReceiverStream::new(updates_rx),
downloader,
};
spawner.spawn(downloader.boxed());
Self { from_downloader: UnboundedReceiverStream::new(headers_rx), to_downloader }
Self { from_downloader: ReceiverStream::new(headers_rx), to_downloader }
}
}
@@ -103,7 +107,7 @@ impl Stream for TaskDownloader {
/// A [HeaderDownloader] that runs on its own task
struct SpawnedDownloader<T> {
updates: UnboundedReceiverStream<DownloaderUpdates>,
headers_tx: UnboundedSender<Vec<SealedHeader>>,
headers_tx: PollSender<Vec<SealedHeader>>,
downloader: T,
}
@@ -139,15 +143,24 @@ impl<T: HeaderDownloader> Future for SpawnedDownloader<T> {
}
}
match ready!(this.downloader.poll_next_unpin(cx)) {
Some(headers) => {
if this.headers_tx.send(headers).is_err() {
// channel closed, this means [TaskDownloader] was dropped, so we can also
// exit
return Poll::Ready(())
match ready!(this.headers_tx.poll_reserve(cx)) {
Ok(()) => {
match ready!(this.downloader.poll_next_unpin(cx)) {
Some(headers) => {
if this.headers_tx.send_item(headers).is_err() {
// channel closed, this means [TaskDownloader] was dropped, so we
// can also exit
return Poll::Ready(())
}
}
None => return Poll::Pending,
}
}
None => return Poll::Pending,
Err(_) => {
// channel closed, this means [TaskDownloader] was dropped, so
// we can also exit
return Poll::Ready(())
}
}
}
}