mirror of
https://github.com/darkrenaissance/darkfi.git
synced 2026-01-08 22:28:12 -05:00
net: allow runtime modification of outbound connection count. adds a method to darkirc to set_outbound_connections, use the scripts ./script/node_get_info.py and ./script/node_set-connect.py to verify
This commit is contained in:
@@ -44,6 +44,9 @@ impl RequestHandler<()> for DarkIrc {
|
||||
"dnet.switch" => self.dnet_switch(req.id, req.params).await,
|
||||
"dnet.subscribe_events" => self.dnet_subscribe_events(req.id, req.params).await,
|
||||
"p2p.get_info" => self.p2p_get_info(req.id, req.params).await,
|
||||
"p2p.set_outbound_connections" => {
|
||||
self.set_outbound_connections(req.id, req.params).await
|
||||
}
|
||||
|
||||
"deg.switch" => self.deg_switch(req.id, req.params).await,
|
||||
"deg.subscribe_events" => self.deg_subscribe_events(req.id, req.params).await,
|
||||
@@ -84,6 +87,34 @@ impl DarkIrc {
|
||||
JsonResponse::new(JsonValue::Boolean(true), id).into()
|
||||
}
|
||||
|
||||
// RPCAPI:
|
||||
// Set the number of outbound connections for the P2P stack.
|
||||
// Takes a positive integer representing the desired number of outbound connection slots.
|
||||
// Returns `true` on success. If the number is greater than current, new slots are added.
|
||||
// If the number is less than current, slots are removed (prioritizing empty slots).
|
||||
//
|
||||
// --> {"jsonrpc": "2.0", "method": "p2p.set_outbound_connections", "params": [5], "id": 42}
|
||||
// <-- {"jsonrpc": "2.0", "result": true, "id": 42}
|
||||
async fn set_outbound_connections(&self, id: u16, params: JsonValue) -> JsonResult {
|
||||
let params = params.get::<Vec<JsonValue>>().unwrap();
|
||||
if params.len() != 1 || !params[0].is_number() {
|
||||
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
|
||||
}
|
||||
|
||||
let n_f64 = params[0].get::<f64>().unwrap();
|
||||
let n = *n_f64 as u32;
|
||||
|
||||
if *n_f64 != n as f64 || n == 0 {
|
||||
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
|
||||
}
|
||||
|
||||
if let Err(e) = self.p2p.session_outbound().set_outbound_connections(n as usize).await {
|
||||
return JsonError::new(ErrorCode::InternalError, Some(e.to_string()), id).into()
|
||||
}
|
||||
|
||||
JsonResponse::new(JsonValue::Boolean(true), id).into()
|
||||
}
|
||||
|
||||
// RPCAPI:
|
||||
// Initializes a subscription to p2p dnet events.
|
||||
// Once a subscription is established, `darkirc` will send JSON-RPC notifications of
|
||||
|
||||
@@ -73,6 +73,12 @@ class JsonRpc:
|
||||
async def dnet_subscribe_events(self):
|
||||
return await self._subscribe("dnet.subscribe_events", [])
|
||||
|
||||
async def get_info(self):
|
||||
return await self._make_request("p2p.get_info", [])
|
||||
|
||||
async def set_outbound_connections(self, n):
|
||||
return await self._make_request("p2p.set_outbound_connections", [n])
|
||||
|
||||
async def main(argv):
|
||||
rpc = JsonRpc()
|
||||
while True:
|
||||
@@ -126,4 +132,6 @@ async def main(argv):
|
||||
|
||||
await rpc.stop()
|
||||
|
||||
asyncio.run(main(sys.argv))
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main(sys.argv))
|
||||
|
||||
55
script/node_set-conns.py
Executable file
55
script/node_set-conns.py
Executable file
@@ -0,0 +1,55 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# This file is part of DarkFi (https://dark.fi)
|
||||
#
|
||||
# Copyright (C) 2020-2025 Dyne.org foundation
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
from node_get_info import JsonRpc
|
||||
|
||||
async def main(argv):
|
||||
if len(argv) != 2:
|
||||
print(f"Usage: {argv[0]} <num_connections>", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
num_conns = int(argv[1])
|
||||
if num_conns <= 0:
|
||||
raise ValueError()
|
||||
except ValueError:
|
||||
print("Error: num_connections must be a positive integer", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
rpc = JsonRpc()
|
||||
while True:
|
||||
try:
|
||||
await rpc.start("localhost", 26660)
|
||||
break
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
response = await rpc.set_outbound_connections(num_conns)
|
||||
|
||||
if "error" in response:
|
||||
print(f"Error: {response['error']}")
|
||||
await rpc.stop()
|
||||
sys.exit(1)
|
||||
|
||||
print(f"Set outbound connections to {num_conns}: {response['result']}")
|
||||
await rpc.stop()
|
||||
|
||||
asyncio.run(main(sys.argv))
|
||||
@@ -37,7 +37,7 @@ use std::{
|
||||
use async_trait::async_trait;
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use smol::lock::Mutex;
|
||||
use tracing::{debug, error, warn};
|
||||
use tracing::{debug, error, info, warn};
|
||||
use url::Url;
|
||||
|
||||
use super::{
|
||||
@@ -137,6 +137,67 @@ impl OutboundSession {
|
||||
slot.notify();
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets the number of outbound connections.
|
||||
/// If the number is less than the current, then it will first drop empty slots.
|
||||
pub async fn set_outbound_connections(self: Arc<Self>, n: usize) -> Result<()> {
|
||||
// Guaranteed to be correct since slots is locked for the duration of this method.
|
||||
let mut slots = self.slots.lock().await;
|
||||
let slots_len = slots.len();
|
||||
|
||||
if n > slots_len {
|
||||
self.clone().add_slots(&mut slots, n).await;
|
||||
} else if n < slots_len {
|
||||
self.remove_slots(&mut slots, n).await;
|
||||
}
|
||||
// Do nothing when n == current
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn add_slots(self: Arc<Self>, slots: &mut Vec<Arc<Slot>>, target: usize) {
|
||||
let slots_len = slots.len();
|
||||
let self_ = Arc::downgrade(&self);
|
||||
for i in slots_len..target {
|
||||
let slot = Slot::new(self_.clone(), i as u32);
|
||||
slot.clone().start().await;
|
||||
slots.push(slot);
|
||||
}
|
||||
info!(target: "net::outbound_session",
|
||||
"[P2P] Increased outbound slots from {slots_len} to {target}");
|
||||
}
|
||||
|
||||
/// Prefers to first remove empty slots.
|
||||
async fn remove_slots(&self, slots: &mut Vec<Arc<Slot>>, target: usize) {
|
||||
let slots_len = slots.len();
|
||||
let num_to_remove = slots_len - target;
|
||||
let mut removed = 0;
|
||||
|
||||
// First pass: remove empty slots (channel_id == 0)
|
||||
let mut i = 0;
|
||||
while i < slots.len() && removed < num_to_remove {
|
||||
// Skip connected slots
|
||||
if slots[i].channel_id.load(Ordering::Relaxed) != 0 {
|
||||
i += 1;
|
||||
continue
|
||||
}
|
||||
|
||||
// Disconnect empty slots
|
||||
let slot = slots.remove(i);
|
||||
slot.stop().await;
|
||||
removed += 1;
|
||||
}
|
||||
|
||||
// Second pass: remove remaining slots (connected ones)
|
||||
while removed < num_to_remove && !slots.is_empty() {
|
||||
let slot = slots.remove(0);
|
||||
slot.stop().await;
|
||||
removed += 1;
|
||||
}
|
||||
|
||||
info!(target: "net::outbound_session",
|
||||
"[P2P] Decreased outbound slots from {slots_len} to {target}");
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
||||
Reference in New Issue
Block a user