diff --git a/crates/net/discv4/src/lib.rs b/crates/net/discv4/src/lib.rs index b791897aa4..17bfe2d7e6 100644 --- a/crates/net/discv4/src/lib.rs +++ b/crates/net/discv4/src/lib.rs @@ -328,6 +328,7 @@ impl Discv4 { let cmd = Discv4Command::Ban(node_id, ip); self.send_to_service(cmd); } + /// Adds the ip to the ban list. /// /// This will prevent any future inclusion in the table @@ -389,6 +390,11 @@ impl Discv4 { self.to_service.send(cmd)?; Ok(rx.await?) } + + /// Terminates the spawned [Discv4Service]. + pub fn terminate(&self) { + self.send_to_service(Discv4Command::Terminated); + } } /// Manages discv4 peer discovery over UDP. @@ -665,6 +671,7 @@ impl Discv4Service { while let Some(event) = self.next().await { trace!(target : "discv4", ?event, "processed"); } + trace!(target : "discv4", "service terminated"); }) } @@ -1554,6 +1561,11 @@ impl Discv4Service { let _ = self.local_eip_868_enr.set_tcp6(port, &self.secret_key); } } + + Discv4Command::Terminated => { + // terminate the service + self.queued_events.push_back(Discv4Event::Terminated); + } } } @@ -1623,7 +1635,13 @@ impl Stream for Discv4Service { type Item = Discv4Event; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Some(ready!(self.get_mut().poll(cx)))) + // Poll the internal poll method + match ready!(self.get_mut().poll(cx)) { + // if the service is terminated, return None to terminate the stream + Discv4Event::Terminated => Poll::Ready(None), + // For any other event, return Poll::Ready(Some(event)) + ev => Poll::Ready(Some(ev)), + } } } @@ -1644,6 +1662,8 @@ pub enum Discv4Event { EnrRequest, /// A `EnrResponse` message was handled. EnrResponse, + /// Service is being terminated + Terminated, } /// Continuously reads new messages from the channel and writes them to the socket @@ -1714,6 +1734,7 @@ enum Discv4Command { Lookup { node_id: Option, tx: Option }, SetLookupInterval(Duration), Updates(OneshotSender>), + Terminated, } /// Event type receiver produces