Downloader: enable 2nd phase and fix the test. (#80)

Downloader has 2 sequential phases: preverified and linear.

Enabling them both was breaking the downloader test, because in the preverified downloader phase
sending a request closes the SentryClientMock receiver, this causes a reactor loop stream end
and clear the receive_messages_senders to unblock the FetchReceiveStage (and exit the downloader loop).
But then in the linear phase receive_messages() produced an error
"SentryClientReactor unexpected filter_id".

To fix this this condition is checked in receive_messages() and an empty stream is returned,
so that the next FetchReceiveStage immediately ends, and the linear phase loop exits cleanly too.
This commit is contained in:
battlmonstr
2021-11-23 12:58:53 +01:00
committed by GitHub
parent b30ea69e51
commit f0a9792fb9
2 changed files with 29 additions and 10 deletions

View File

@@ -45,7 +45,7 @@ impl<DB: kv::traits::MutableKV + Sync> Downloader<DB> {
let (final_preverified_block_num, final_preverified_block_hash) =
downloader_preverified.run().await?;
let _downloader_linear = downloader_linear::DownloaderLinear::new(
let downloader_linear = downloader_linear::DownloaderLinear::new(
self.chain_config.clone(),
final_preverified_block_num,
final_preverified_block_hash,
@@ -55,7 +55,7 @@ impl<DB: kv::traits::MutableKV + Sync> Downloader<DB> {
self.ui_system.clone(),
);
//downloader_linear.run().await?;
downloader_linear.run().await?;
Ok(())
}

View File

@@ -124,6 +124,13 @@ impl SentryClientReactor {
}
}
fn is_stopped(&self) -> bool {
matches!(
self.send_message_sender.try_reserve(),
Err(mpsc::error::TrySendError::Closed(_))
)
}
pub async fn penalize_peer(&self, peer_id: PeerId) -> anyhow::Result<()> {
let command = SentryCommand::PenalizePeer(peer_id);
let result = self.send_message_sender.send(command).await;
@@ -180,14 +187,26 @@ impl SentryClientReactor {
&self,
filter_id: EthMessageId,
) -> anyhow::Result<Pin<Box<dyn Stream<Item = MessageFromPeer> + Send>>> {
let receiver = self
.receive_messages_senders
.read()
.get(&filter_id)
.ok_or_else(|| {
anyhow::anyhow!("SentryClientReactor unexpected filter_id {:?}", filter_id)
})?
.subscribe();
let receiver = {
let receive_messages_senders = self.receive_messages_senders.read();
// This happens if receive_messages_senders were cleared (see EventLoopReceiveMessagesSendersDropper)
if receive_messages_senders.is_empty() {
// if the SentryClientReactorEventLoop is stopped
if self.is_stopped() {
return Err(anyhow::Error::new(SendMessageError::ReactorStopped));
}
// if the sentry client stream has ended (during tests)
return Ok(Box::pin(tokio_stream::empty()));
}
let receive_messages_sender =
receive_messages_senders.get(&filter_id).ok_or_else(|| {
anyhow::anyhow!("SentryClientReactor unexpected filter_id {:?}", filter_id)
})?;
receive_messages_sender.subscribe()
};
let stream = BroadcastStream::new(receiver)
.map_err(|error| match error {