Cargo.toml: features updates, util/time.rs: replaced worldtime api request with jsonrpc request towards network nodes, darkfid: configuration check for new clock check impl, contrib/localnet + script/consensu_simulation: added new config args, faucted: removed unnessesary clock check, src/error.rs: new ConfigInvalid error and removed NTP error

This commit is contained in:
aggstam
2022-06-13 18:23:27 +03:00
parent dff636d30a
commit 35c459db9a
11 changed files with 100 additions and 66 deletions

View File

@@ -173,6 +173,7 @@ util = [
"async-runtime",
"darkfi-derive",
"darkfi-derive-internal",
"rpc",
]
rpc = [
@@ -183,7 +184,6 @@ rpc = [
"async-net",
"async-runtime",
"websockets",
"util",
"net",
]

View File

@@ -36,9 +36,15 @@
# Seed nodes to connect to for the consensus protocol
#consensus_p2p_seed = []
# Seed nodes JSON-RPC listen URL for clock synchronization
#consensus_seed_rpc = []
# Peers to connect to for the consensus protocol
#consensus_p2p_peer = []
# Peers JSON-RPC listen URL for clock synchronization
#consensus_peer_rpc = []
# P2P accept address for the syncing protocol
#sync_p2p_accept = "tls://127.0.0.1:8342"

View File

@@ -97,10 +97,18 @@ struct Args {
/// Connect to peer for the consensus protocol (repeatable flag)
consensus_p2p_peer: Vec<Url>,
#[structopt(long)]
/// Peers JSON-RPC listen URL for clock synchronization (repeatable flag)
consensus_peer_rpc: Vec<Url>,
#[structopt(long)]
/// Connect to seed for the consensus protocol (repeatable flag)
consensus_p2p_seed: Vec<Url>,
#[structopt(long)]
/// Seed nodes JSON-RPC listen URL for clock synchronization (repeatable flag)
consensus_seed_rpc: Vec<Url>,
#[structopt(long)]
/// P2P accept address for the syncing protocol
sync_p2p_accept: Option<Url>,
@@ -163,6 +171,7 @@ impl RequestHandler for Darkfid {
match req.method.as_str() {
Some("ping") => return self.pong(req.id, params).await,
Some("clock") => return self.clock(req.id, params).await,
Some("blockchain.get_slot") => return self.get_slot(req.id, params).await,
Some("blockchain.merkle_roots") => return self.merkle_roots(req.id, params).await,
Some("tx.transfer") => return self.transfer(req.id, params).await,
@@ -202,8 +211,20 @@ impl Darkfid {
async_daemonize!(realmain);
async fn realmain(args: Args, ex: Arc<Executor<'_>>) -> Result<()> {
if args.consensus && args.clock_sync {
// We verify that if peer/seed nodes are configured, their rpc config also exists
if ((!args.consensus_p2p_peer.is_empty() && args.consensus_peer_rpc.is_empty()) ||
(args.consensus_p2p_peer.is_empty() && !args.consensus_peer_rpc.is_empty())) ||
((!args.consensus_p2p_seed.is_empty() && args.consensus_seed_rpc.is_empty()) ||
(args.consensus_p2p_seed.is_empty() && !args.consensus_seed_rpc.is_empty()))
{
error!(
"Consensus peer/seed nodes misconfigured: both p2p and rpc urls must be present"
);
return Err(Error::ConfigInvalid)
}
// We verify that the system clock is valid before initializing
if (check_clock().await).is_err() {
let peers = [&args.consensus_peer_rpc[..], &args.consensus_seed_rpc[..]].concat();
if (check_clock(peers).await).is_err() {
error!("System clock is invalid, terminating...");
return Err(Error::InvalidClock)
};

View File

@@ -1,6 +1,9 @@
use serde_json::{json, Value};
use darkfi::rpc::jsonrpc::{JsonResponse, JsonResult};
use darkfi::{
rpc::jsonrpc::{JsonResponse, JsonResult},
util::time::Timestamp,
};
use super::Darkfid;
@@ -12,4 +15,12 @@ impl Darkfid {
pub async fn pong(&self, id: Value, _params: &[Value]) -> JsonResult {
JsonResponse::new(json!("pong"), id).into()
}
// RPCAPI:
// Returns current system clock in `Timestamp` format.
// --> {"jsonrpc": "2.0", "method": "clock", "params": [], "id": 1}
// <-- {"jsonrpc": "2.0", "result": {...}, "id": 1}
pub async fn clock(&self, id: Value, _params: &[Value]) -> JsonResult {
JsonResponse::new(json!(Timestamp::current_time()), id).into()
}
}

View File

@@ -41,6 +41,3 @@
# Airdrop amount limit
#airdrop_limit = "10"
# Verify system clock is correct
#clock_sync = true

View File

@@ -37,9 +37,7 @@ use darkfi::{
decode_base10, expand_path,
path::get_config_path,
serial::serialize,
sleep,
time::check_clock,
NetworkName,
sleep, NetworkName,
},
wallet::walletdb::init_wallet,
Error, Result,
@@ -115,10 +113,6 @@ struct Args {
/// Airdrop amount limit
airdrop_limit: String, // We convert this to biguint with decode_base10
#[structopt(long)]
/// Verify system clock is correct
clock_sync: bool,
#[structopt(short, parse(from_occurrences))]
/// Increase verbosity (-vvv supported)
verbose: u8,
@@ -301,14 +295,6 @@ async fn prune_airdrop_map(map: Arc<Mutex<HashMap<Address, i64>>>, timeout: i64)
async_daemonize!(realmain);
async fn realmain(args: Args, ex: Arc<Executor<'_>>) -> Result<()> {
if args.clock_sync {
// We verify that the system clock is valid before initializing
if (check_clock().await).is_err() {
error!("System clock is invalid, terminating...");
return Err(Error::InvalidClock)
};
}
// We use this handler to block this function after detaching all
// tasks, and to catch a shutdown signal, where we can clean up and
// exit gracefully.

View File

@@ -36,9 +36,15 @@ consensus_p2p_external = "tls://127.0.0.1:8441"
# Connection slots for the consensus protocol
consensus_p2p_seed = ["tls://127.0.0.1:8341"]
# Seed nodes JSON-RPC listen URL for clock synchronization
consensus_seed_rpc = ["tcp://127.0.0.1:8340"]
# Peers to connect to for the consensus protocol
#consensus_p2p_peer = []
# Peers JSON-RPC listen URL for clock synchronization
#consensus_peer_rpc = []
# P2P accept address for the syncing protocol
sync_p2p_accept = "tls://127.0.0.1:8442"

View File

@@ -35,6 +35,3 @@ sync_p2p_seed = ["tls://127.0.0.1:8342"]
# Peers to connect to for the syncing protocol
#sync_p2p_peer = []
# Verify system clock is correct
#clock_sync = true

View File

@@ -35,6 +35,7 @@ do
-v \
--consensus \
--consensus-p2p-seed tcp://127.0.0.1:6000 \
--consensus-seed-rpc tcp://127.0.0.1:6010 \
--sync-p2p-seed tcp://127.0.0.1:6020 \
--consensus-p2p-accept tcp://127.0.0.1:600$i \
--consensus-p2p-external tcp://127.0.0.1:600$i \
@@ -65,6 +66,7 @@ LOG_TARGETS="!sled,!net" ./darkfid \
-v \
--consensus \
--consensus-p2p-seed tcp://127.0.0.1:6000 \
--consensus-seed-rpc tcp://127.0.0.1:6010 \
--sync-p2p-seed tcp://127.0.0.1:6020 \
--consensus-p2p-accept tcp://127.0.0.1:600$bound \
--consensus-p2p-external tcp://127.0.0.1:600$bound \

View File

@@ -153,10 +153,6 @@ pub enum Error {
#[error("async_native_tls error: {0}")]
AsyncNativeTlsError(String),
#[cfg(feature = "util")]
#[error("NTP error: {0}")]
NtpError(String),
// =============
// Crypto errors
// =============
@@ -290,6 +286,9 @@ pub enum Error {
#[error("No config file detected")]
ConfigNotFound,
#[error("Invalid config file detected")]
ConfigInvalid,
#[error("Failed decoding bincode: {0}")]
ZkasDecoderError(&'static str),

View File

@@ -4,16 +4,15 @@ use std::{
time::{Duration, UNIX_EPOCH},
};
use async_std::{
io::{ReadExt, WriteExt},
net::TcpStream,
};
use chrono::{NaiveDateTime, Utc};
use log::debug;
use rand::seq::SliceRandom;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use serde_json::json;
use url::Url;
use crate::{
rpc::{client::RpcClient, jsonrpc::JsonRequest},
util::serial::{SerialDecodable, SerialEncodable},
Error, Result,
};
@@ -86,33 +85,30 @@ impl std::fmt::Display for NanoTimestamp {
// Clock sync parameters
const RETRIES: u8 = 10;
const WORLDTIMEAPI_ADDRESS: &str = "worldtimeapi.org";
const WORLDTIMEAPI_ADDRESS_WITH_PORT: &str = "worldtimeapi.org:443";
const WORLDTIMEAPI_PAYLOAD: &[u8; 88] = b"GET /api/timezone/Etc/UTC HTTP/1.1\r\nHost: worldtimeapi.org\r\nAccept: application/json\r\n\r\n";
const NTP_ADDRESS: &str = "pool.ntp.org:123";
const EPOCH: i64 = 2208988800; //1900
// Raw https request execution for worldtimeapi
async fn worldtimeapi_request() -> Result<Timestamp> {
// Create connection
let stream = TcpStream::connect(WORLDTIMEAPI_ADDRESS_WITH_PORT).await?;
let mut stream = async_native_tls::connect(WORLDTIMEAPI_ADDRESS, stream).await?;
stream.write_all(WORLDTIMEAPI_PAYLOAD).await?;
// JsonRPC request to a network peer(randomly selected),
// to retrieve their current system clock.
async fn peer_request(peers: &Vec<Url>) -> Result<Option<Timestamp>> {
// Select peer, None if vector is empty
let peer = peers.choose(&mut rand::thread_rng());
match peer {
None => Ok(None),
Some(p) => {
// Create rpc client
let rpc_client = RpcClient::new(p.clone()).await?;
// Execute request
let mut res = vec![0_u8; 1024];
stream.read(&mut res).await?;
// Execute request
let req = JsonRequest::new("clock", json!([]));
let rep = rpc_client.oneshot_request(req).await?;
// Parse response
let reply = String::from_utf8(res)?;
let lines = reply.split('\n');
// JSON data exist in last row of response
let last = lines.last().unwrap().trim_matches(char::from(0));
debug!("worldtimeapi json response: {:#?}", last);
let reply: Value = serde_json::from_str(last)?;
let timestamp = Timestamp(reply["unixtime"].as_i64().unwrap());
// Parse response
let timestamp: Timestamp = serde_json::from_value(rep)?;
Ok(timestamp)
Ok(Some(timestamp))
}
}
}
// Raw ntp request execution
@@ -140,12 +136,11 @@ async fn ntp_request() -> Result<Timestamp> {
// Retry loop is used to in case discrepancies are found.
// If all retries fail, system clock is considered invalid.
// TODO: 1. Add proxy functionality in order not to leak connections
// 2. Improve requests and/or add extra protocols
pub async fn check_clock() -> Result<()> {
pub async fn check_clock(peers: Vec<Url>) -> Result<()> {
debug!("System clock check started...");
let mut r = 0;
while r < RETRIES {
if let Err(e) = clock_check().await {
if let Err(e) = clock_check(&peers).await {
debug!("Error during clock check: {:#?}", e);
r += 1;
continue
@@ -160,30 +155,44 @@ pub async fn check_clock() -> Result<()> {
}
}
async fn clock_check() -> Result<()> {
async fn clock_check(peers: &Vec<Url>) -> Result<()> {
// Start elapsed time counter to cover for all requests and processing time
let requests_start = Timestamp::current_time();
// Poll worldtimeapi.org for current UTC timestamp
let mut worldtimeapi_time = worldtimeapi_request().await?;
// Poll one of peers for their current UTC timestamp
let peer_time = peer_request(peers).await?;
// Start elapsed time counter to cover for ntp request and processing time
let ntp_request_start = Timestamp::current_time();
// Poll ntp.org for current timestamp
let mut ntp_time = ntp_request().await?;
// Add elapsed time to respone times
ntp_time.add(ntp_request_start.elapsed() as i64);
worldtimeapi_time.add(requests_start.elapsed() as i64);
// Stop elapsed time counters
let ntp_elapsed_time = ntp_request_start.elapsed() as i64;
let requests_elapsed_time = requests_start.elapsed() as i64;
// Current system time
let system_time = Timestamp::current_time();
debug!("worldtimeapi_time: {:#?}", worldtimeapi_time);
// Add elapsed time to respone times
ntp_time.add(ntp_elapsed_time);
let peer_time = match peer_time {
None => None,
Some(p) => {
let mut t = p;
t.add(requests_elapsed_time);
Some(t)
}
};
debug!("peer_time: {:#?}", peer_time);
debug!("ntp_time: {:#?}", ntp_time);
debug!("system_time: {:#?}", system_time);
// We verify that system time is equal to worldtimeapi and ntp
let check = (system_time == worldtimeapi_time) && (system_time == ntp_time);
// We verify that system time is equal to peer(if exists) and ntp times
let check = match peer_time {
Some(p) => (system_time == p) && (system_time == ntp_time),
None => system_time == ntp_time,
};
match check {
true => Ok(()),
false => Err(Error::InvalidClock),