From 419da3f849e2203f06847799afe269392c663738 Mon Sep 17 00:00:00 2001 From: jkds Date: Tue, 30 Dec 2025 14:44:40 +0100 Subject: [PATCH] net: allow runtime modification of outbound connection count. adds a method to darkirc to set_outbound_connections, use the scripts ./script/node_get_info.py and ./script/node_set-connect.py to verify --- bin/darkirc/src/rpc.rs | 31 +++++++++ script/{node_get-info.py => node_get_info.py} | 10 ++- script/node_set-conns.py | 55 ++++++++++++++++ src/net/session/outbound_session.rs | 63 ++++++++++++++++++- 4 files changed, 157 insertions(+), 2 deletions(-) rename script/{node_get-info.py => node_get_info.py} (92%) create mode 100755 script/node_set-conns.py diff --git a/bin/darkirc/src/rpc.rs b/bin/darkirc/src/rpc.rs index 29888624d..d4506200f 100644 --- a/bin/darkirc/src/rpc.rs +++ b/bin/darkirc/src/rpc.rs @@ -44,6 +44,9 @@ impl RequestHandler<()> for DarkIrc { "dnet.switch" => self.dnet_switch(req.id, req.params).await, "dnet.subscribe_events" => self.dnet_subscribe_events(req.id, req.params).await, "p2p.get_info" => self.p2p_get_info(req.id, req.params).await, + "p2p.set_outbound_connections" => { + self.set_outbound_connections(req.id, req.params).await + } "deg.switch" => self.deg_switch(req.id, req.params).await, "deg.subscribe_events" => self.deg_subscribe_events(req.id, req.params).await, @@ -84,6 +87,34 @@ impl DarkIrc { JsonResponse::new(JsonValue::Boolean(true), id).into() } + // RPCAPI: + // Set the number of outbound connections for the P2P stack. + // Takes a positive integer representing the desired number of outbound connection slots. + // Returns `true` on success. If the number is greater than current, new slots are added. + // If the number is less than current, slots are removed (prioritizing empty slots). + // + // --> {"jsonrpc": "2.0", "method": "p2p.set_outbound_connections", "params": [5], "id": 42} + // <-- {"jsonrpc": "2.0", "result": true, "id": 42} + async fn set_outbound_connections(&self, id: u16, params: JsonValue) -> JsonResult { + let params = params.get::>().unwrap(); + if params.len() != 1 || !params[0].is_number() { + return JsonError::new(ErrorCode::InvalidParams, None, id).into() + } + + let n_f64 = params[0].get::().unwrap(); + let n = *n_f64 as u32; + + if *n_f64 != n as f64 || n == 0 { + return JsonError::new(ErrorCode::InvalidParams, None, id).into() + } + + if let Err(e) = self.p2p.session_outbound().set_outbound_connections(n as usize).await { + return JsonError::new(ErrorCode::InternalError, Some(e.to_string()), id).into() + } + + JsonResponse::new(JsonValue::Boolean(true), id).into() + } + // RPCAPI: // Initializes a subscription to p2p dnet events. // Once a subscription is established, `darkirc` will send JSON-RPC notifications of diff --git a/script/node_get-info.py b/script/node_get_info.py similarity index 92% rename from script/node_get-info.py rename to script/node_get_info.py index 52c60f647..20bb5255a 100755 --- a/script/node_get-info.py +++ b/script/node_get_info.py @@ -73,6 +73,12 @@ class JsonRpc: async def dnet_subscribe_events(self): return await self._subscribe("dnet.subscribe_events", []) + async def get_info(self): + return await self._make_request("p2p.get_info", []) + + async def set_outbound_connections(self, n): + return await self._make_request("p2p.set_outbound_connections", [n]) + async def main(argv): rpc = JsonRpc() while True: @@ -126,4 +132,6 @@ async def main(argv): await rpc.stop() -asyncio.run(main(sys.argv)) +if __name__ == "__main__": + asyncio.run(main(sys.argv)) + diff --git a/script/node_set-conns.py b/script/node_set-conns.py new file mode 100755 index 000000000..ac2ce3a26 --- /dev/null +++ b/script/node_set-conns.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python3 + +# This file is part of DarkFi (https://dark.fi) +# +# Copyright (C) 2020-2025 Dyne.org foundation +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import asyncio +import sys +from node_get_info import JsonRpc + +async def main(argv): + if len(argv) != 2: + print(f"Usage: {argv[0]} ", file=sys.stderr) + sys.exit(1) + + try: + num_conns = int(argv[1]) + if num_conns <= 0: + raise ValueError() + except ValueError: + print("Error: num_connections must be a positive integer", file=sys.stderr) + sys.exit(1) + + rpc = JsonRpc() + while True: + try: + await rpc.start("localhost", 26660) + break + except OSError: + pass + + response = await rpc.set_outbound_connections(num_conns) + + if "error" in response: + print(f"Error: {response['error']}") + await rpc.stop() + sys.exit(1) + + print(f"Set outbound connections to {num_conns}: {response['result']}") + await rpc.stop() + +asyncio.run(main(sys.argv)) diff --git a/src/net/session/outbound_session.rs b/src/net/session/outbound_session.rs index d0204bf29..eb180d2fc 100644 --- a/src/net/session/outbound_session.rs +++ b/src/net/session/outbound_session.rs @@ -37,7 +37,7 @@ use std::{ use async_trait::async_trait; use futures::stream::{FuturesUnordered, StreamExt}; use smol::lock::Mutex; -use tracing::{debug, error, warn}; +use tracing::{debug, error, info, warn}; use url::Url; use super::{ @@ -137,6 +137,67 @@ impl OutboundSession { slot.notify(); } } + + /// Sets the number of outbound connections. + /// If the number is less than the current, then it will first drop empty slots. + pub async fn set_outbound_connections(self: Arc, n: usize) -> Result<()> { + // Guaranteed to be correct since slots is locked for the duration of this method. + let mut slots = self.slots.lock().await; + let slots_len = slots.len(); + + if n > slots_len { + self.clone().add_slots(&mut slots, n).await; + } else if n < slots_len { + self.remove_slots(&mut slots, n).await; + } + // Do nothing when n == current + + Ok(()) + } + + async fn add_slots(self: Arc, slots: &mut Vec>, target: usize) { + let slots_len = slots.len(); + let self_ = Arc::downgrade(&self); + for i in slots_len..target { + let slot = Slot::new(self_.clone(), i as u32); + slot.clone().start().await; + slots.push(slot); + } + info!(target: "net::outbound_session", + "[P2P] Increased outbound slots from {slots_len} to {target}"); + } + + /// Prefers to first remove empty slots. + async fn remove_slots(&self, slots: &mut Vec>, target: usize) { + let slots_len = slots.len(); + let num_to_remove = slots_len - target; + let mut removed = 0; + + // First pass: remove empty slots (channel_id == 0) + let mut i = 0; + while i < slots.len() && removed < num_to_remove { + // Skip connected slots + if slots[i].channel_id.load(Ordering::Relaxed) != 0 { + i += 1; + continue + } + + // Disconnect empty slots + let slot = slots.remove(i); + slot.stop().await; + removed += 1; + } + + // Second pass: remove remaining slots (connected ones) + while removed < num_to_remove && !slots.is_empty() { + let slot = slots.remove(0); + slot.stop().await; + removed += 1; + } + + info!(target: "net::outbound_session", + "[P2P] Decreased outbound slots from {slots_len} to {target}"); + } } #[async_trait]