chore: rm redundant spawn task (#2088)

This commit is contained in:
Matthias Seitz
2023-04-03 15:16:00 +02:00
committed by GitHub
parent 254467bbae
commit 3f49c8a9c4

View File

@@ -88,7 +88,7 @@ where
let pubsub = self.inner.clone();
self.subscription_task_spawner.spawn(Box::pin(async move {
handle_accepted(pubsub, sink, kind, params, Box::<TokioTaskExecutor>::default()).await;
handle_accepted(pubsub, sink, kind, params).await;
}));
Ok(())
@@ -101,7 +101,6 @@ async fn handle_accepted<Client, Pool, Events, Network>(
mut accepted_sink: SubscriptionSink,
kind: SubscriptionKind,
params: Option<Params>,
subscription_task_spawner: Box<dyn TaskSpawner>,
) where
Client: BlockProvider + EvmEnvProvider + Clone + 'static,
Pool: TransactionPool + 'static,
@@ -132,30 +131,27 @@ async fn handle_accepted<Client, Pool, Events, Network>(
accepted_sink.pipe_from_stream(stream).await;
}
SubscriptionKind::Syncing => {
subscription_task_spawner.spawn(Box::pin(async move {
// get new block subscription
let mut new_blocks =
BroadcastStream::new(pubsub.chain_events.subscribe_new_blocks());
// get current sync status
let mut initial_sync_status = pubsub.network.is_syncing();
let current_sub_res = pubsub.sync_status(initial_sync_status).await;
// get new block subscription
let mut new_blocks = BroadcastStream::new(pubsub.chain_events.subscribe_new_blocks());
// get current sync status
let mut initial_sync_status = pubsub.network.is_syncing();
let current_sub_res = pubsub.sync_status(initial_sync_status).await;
// send the current status immediately
let _ = accepted_sink.send(&current_sub_res);
// send the current status immediately
let _ = accepted_sink.send(&current_sub_res);
while (new_blocks.next().await).is_some() {
let current_syncing = pubsub.network.is_syncing();
// Only send a new response if the sync status has changed
if current_syncing != initial_sync_status {
// Update the sync status on each new block
initial_sync_status = current_syncing;
while (new_blocks.next().await).is_some() {
let current_syncing = pubsub.network.is_syncing();
// Only send a new response if the sync status has changed
if current_syncing != initial_sync_status {
// Update the sync status on each new block
initial_sync_status = current_syncing;
// send a new message now that the status changed
let sync_status = pubsub.sync_status(current_syncing).await;
let _ = accepted_sink.send(&sync_status);
}
// send a new message now that the status changed
let sync_status = pubsub.sync_status(current_syncing).await;
let _ = accepted_sink.send(&sync_status);
}
}));
}
}
}
}