net: fix the Option.unwrap() bug. Adding debug info, we see:

16:14:42 [DEBUG] (2) net::channel::stop(): END => address=tcp+tls://lilith1.dark.fi:5262 remote=485041065
    16:14:42 [DEBUG] (2) net::p2p::seed(): P2P::seed() [END]
    16:14:42 [DEBUG] (2) net::channel::main_receive_loop(): dnet sending recv msg, remote 485041065
    thread '<unnamed>' panicked at src/net/channel.rs:314:32:
    called `Option::unwrap()` on a `None` value

So the bug is caused after running p2p.seed(), where after
channel.stop() is called, main_receive_loop() keeps running. Examining
the code further we see ch.stop().await is called and the session is
destroyed but this doesn't give enough time for the channel to finish.

One solution is to modify `net/session/seedsync_session.rs` to this:

    let stop_sub = ch.subscribe_stop().await;
    ch.stop().await;
    stop_sub.recv().await;
    stop_sub.unsubscribe().await;

However a better solution is to make channel.stop() itself await until
the channel is fully closed (using the above code inside Channel::stop())

But the better solution is to ensure that StoppableTask.stop() itself
will not finish until the task it's responsible for has finished and
is no longer running. We do that by adding a stop barrier (see diff).
This commit is contained in:
x
2023-08-30 10:13:59 +02:00
parent 31478a5305
commit 4cf9878604

View File

@@ -16,22 +16,23 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::sync::Arc;
use smol::{
channel,
future::{self, Future},
Executor,
};
use std::sync::Arc;
use super::CondVar;
pub type StoppableTaskPtr = Arc<StoppableTask>;
#[derive(Debug)]
pub struct StoppableTask {
// NOTE: we could send the error code from stop() instead of having it specified in start()
// but then that would introduce lifetimes to the entire struct.
stop_send: channel::Sender<()>,
stop_recv: channel::Receiver<()>,
stop_barrier: CondVar,
}
/// A task that can be prematurely stopped at any time.
@@ -50,13 +51,14 @@ pub struct StoppableTask {
impl StoppableTask {
pub fn new() -> Arc<Self> {
let (stop_send, stop_recv) = channel::bounded(1);
Arc::new(Self { stop_send, stop_recv })
Arc::new(Self { stop_send, stop_recv, stop_barrier: CondVar::new() })
}
/// Stops the task
pub async fn stop(&self) {
// Ignore any errors from this send
let _ = self.stop_send.send(()).await;
self.stop_barrier.wait().await;
}
/// Starts the task.
@@ -85,6 +87,7 @@ impl StoppableTask {
let result = future::or(main, stop_fut).await;
stop_handler(result).await;
self.stop_barrier.notify();
})
.detach();
}