Merge branch 'master' of github.com:narodnik/sapvi

This commit is contained in:
narodnik
2021-05-12 11:07:29 +02:00
14 changed files with 191 additions and 565 deletions

View File

@@ -133,14 +133,6 @@ path = "src/bin/services.rs"
name = "demowallet"
path = "src/bin/demowallet.rs"
[[bin]]
name = "wallet"
path = "src/bin/wallet/test.rs"
[[bin]]
name = "darkd"
path = "src/bin/wallet/darkd.rs"
[profile.release]
debug = 1

View File

@@ -11,7 +11,7 @@ use smol::Async;
use std::net::SocketAddr;
use std::net::TcpListener;
use std::sync::Arc;
use sapvi::rpc::jsonserver::RpcInterface;
use sapvi::{net, Result};
/// Listens for incoming connections and serves them.
@@ -73,69 +73,69 @@ async fn listen(
}
}
struct RpcInterface {
p2p: Arc<net::P2p>,
started: Mutex<bool>,
stop_send: async_channel::Sender<()>,
stop_recv: async_channel::Receiver<()>,
}
impl RpcInterface {
fn new(p2p: Arc<net::P2p>) -> Arc<Self> {
let (stop_send, stop_recv) = async_channel::unbounded::<()>();
Arc::new(Self {
p2p,
started: Mutex::new(false),
stop_send,
stop_recv,
})
}
async fn serve(self: Arc<Self>, mut req: Request) -> http_types::Result<Response> {
info!("RPC serving {}", req.url());
let request = req.body_string().await?;
let mut io = jsonrpc_core::IoHandler::new();
io.add_sync_method("say_hello", |_| {
Ok(jsonrpc_core::Value::String("Hello World!".into()))
});
let self2 = self.clone();
io.add_method("get_info", move |_| {
let self2 = self2.clone();
async move {
Ok(json!({
"started": *self2.started.lock().await,
"connections": self2.p2p.connections_count().await
}))
}
});
let stop_send = self.stop_send.clone();
io.add_method("stop", move |_| {
let stop_send = stop_send.clone();
async move {
let _ = stop_send.send(()).await;
Ok(jsonrpc_core::Value::Null)
}
});
let response = io
.handle_request_sync(&request)
.ok_or(sapvi::Error::BadOperationType)?;
let mut res = Response::new(StatusCode::Ok);
res.insert_header("Content-Type", "text/plain");
res.set_body(response);
Ok(res)
}
async fn wait_for_quit(self: Arc<Self>) -> Result<()> {
Ok(self.stop_recv.recv().await?)
}
}
//struct RpcInterface {
// p2p: Arc<net::P2p>,
// started: Mutex<bool>,
// stop_send: async_channel::Sender<()>,
// stop_recv: async_channel::Receiver<()>,
//}
//
//impl RpcInterface {
// fn new(p2p: Arc<net::P2p>) -> Arc<Self> {
// let (stop_send, stop_recv) = async_channel::unbounded::<()>();
//
// Arc::new(Self {
// p2p,
// started: Mutex::new(false),
// stop_send,
// stop_recv,
// })
// }
//
// async fn serve(self: Arc<Self>, mut req: Request) -> http_types::Result<Response> {
// info!("RPC serving {}", req.url());
//
// let request = req.body_string().await?;
//
// let mut io = jsonrpc_core::IoHandler::new();
// io.add_sync_method("say_hello", |_| {
// Ok(jsonrpc_core::Value::String("Hello World!".into()))
// });
//
// let self2 = self.clone();
// io.add_method("get_info", move |_| {
// let self2 = self2.clone();
// async move {
// Ok(json!({
// "started": *self2.started.lock().await,
// "connections": self2.p2p.connections_count().await
// }))
// }
// });
//
// let stop_send = self.stop_send.clone();
// io.add_method("stop", move |_| {
// let stop_send = stop_send.clone();
// async move {
// let _ = stop_send.send(()).await;
// Ok(jsonrpc_core::Value::Null)
// }
// });
//
// let response = io
// .handle_request_sync(&request)
// .ok_or(sapvi::Error::BadOperationType)?;
//
// let mut res = Response::new(StatusCode::Ok);
// res.insert_header("Content-Type", "text/plain");
// res.set_body(response);
// Ok(res)
// }
//
// async fn wait_for_quit(self: Arc<Self>) -> Result<()> {
// Ok(self.stop_recv.recv().await?)
// }
//}
async fn start(executor: Arc<Executor<'_>>, options: ProgramOptions) -> Result<()> {
let p2p = net::P2p::new(options.network_settings);

View File

@@ -1,407 +0,0 @@
#[macro_use]
extern crate clap;
use async_executor::Executor;
use async_native_tls::TlsAcceptor;
use async_std::sync::Mutex;
use easy_parallel::Parallel;
use ff::Field;
use http_types::{Request, Response, StatusCode};
use log::*;
use rand::rngs::OsRng;
use rusqlite::Connection;
use sapvi::serial;
use serde_json::json;
use smol::Async;
use std::fs::File;
use std::io::prelude::*;
use std::io::BufReader;
use std::net::SocketAddr;
use std::net::TcpListener;
use std::sync::Arc;
use sapvi::{net, Error, Result};
/// Listens for incoming connections and serves them.
async fn listen(
executor: Arc<Executor<'_>>,
rpc: Arc<RpcInterface>,
listener: Async<TcpListener>,
tls: Option<TlsAcceptor>,
) -> Result<()> {
// Format the full host address.
let host = match &tls {
None => format!("http://{}", listener.get_ref().local_addr()?),
Some(_) => format!("https://{}", listener.get_ref().local_addr()?),
};
println!("Listening on {}", host);
loop {
// Accept the next connection.
let (stream, _) = listener.accept().await?;
// Spawn a background task serving this connection.
let task = match &tls {
None => {
let stream = async_dup::Arc::new(stream);
let rpc = rpc.clone();
executor.spawn(async move {
if let Err(err) = async_h1::accept(stream, move |req| {
let rpc = rpc.clone();
rpc.serve(req)
})
.await
{
println!("Connection error: {:#?}", err);
}
})
}
Some(tls) => {
// In case of HTTPS, establish a secure TLS connection first.
match tls.accept(stream).await {
Ok(stream) => {
let _stream = async_dup::Arc::new(async_dup::Mutex::new(stream));
executor.spawn(async move {
/*if let Err(err) = async_h1::accept(stream, serve).await {
println!("Connection error: {:#?}", err);
}*/
unimplemented!();
})
}
Err(err) => {
println!("Failed to establish secure TLS connection: {:#?}", err);
continue;
}
}
}
};
// Detach the task to let it run in the background.
task.detach();
}
}
struct RpcInterface {
p2p: Arc<net::P2p>,
started: Mutex<bool>,
stop_send: async_channel::Sender<()>,
stop_recv: async_channel::Receiver<()>,
}
impl RpcInterface {
fn new(p2p: Arc<net::P2p>) -> Arc<Self> {
let (stop_send, stop_recv) = async_channel::unbounded::<()>();
Arc::new(Self {
p2p,
started: Mutex::new(false),
stop_send,
stop_recv,
})
}
async fn db_connect() -> Connection {
let path = dirs::home_dir()
.expect("Cannot find home directory.")
.as_path()
.join(".config/darkfi/wallet.db");
let connector = Connection::open(&path);
connector.expect("Failed to connect to database.")
}
async fn generate_key() -> (Vec<u8>, Vec<u8>) {
let secret: jubjub::Fr = jubjub::Fr::random(&mut OsRng);
let public = zcash_primitives::constants::SPENDING_KEY_GENERATOR * secret;
let pubkey = serial::serialize(&public);
let privkey = serial::serialize(&secret);
(privkey, pubkey)
}
// TODO: fix this
async fn store_key(conn: &Connection, pubkey: Vec<u8>, privkey: Vec<u8>) -> Result<()> {
let mut db_file = File::open("wallet.sql")?;
let mut contents = String::new();
db_file.read_to_string(&mut contents)?;
Ok(conn.execute_batch(&mut contents)?)
}
// add new methods to handle wallet commands
async fn serve(self: Arc<Self>, mut req: Request) -> http_types::Result<Response> {
info!("RPC serving {}", req.url());
let request = req.body_string().await?;
let mut io = jsonrpc_core::IoHandler::new();
io.add_sync_method("say_hello", |_| {
Ok(jsonrpc_core::Value::String("Hello World!".into()))
});
let self2 = self.clone();
io.add_method("get_info", move |_| {
let self2 = self2.clone();
async move {
Ok(json!({
"started": *self2.started.lock().await,
"connections": self2.p2p.connections_count().await
}))
}
});
let stop_send = self.stop_send.clone();
io.add_method("stop", move |_| {
let stop_send = stop_send.clone();
async move {
let _ = stop_send.send(()).await;
Ok(jsonrpc_core::Value::Null)
}
});
io.add_method("key_gen", move |_| async move {
RpcInterface::db_connect().await;
let (pubkey, privkey) = RpcInterface::generate_key().await;
//println!("{}", pubkey, "{}", privkey);
Ok(jsonrpc_core::Value::Null)
});
let response = io
.handle_request_sync(&request)
.ok_or(sapvi::Error::BadOperationType)?;
let mut res = Response::new(StatusCode::Ok);
res.insert_header("Content-Type", "text/plain");
res.set_body(response);
Ok(res)
}
async fn wait_for_quit(self: Arc<Self>) -> Result<()> {
Ok(self.stop_recv.recv().await?)
}
}
async fn start(executor: Arc<Executor<'_>>, options: ProgramOptions) -> Result<()> {
let p2p = net::P2p::new(options.network_settings);
let rpc = RpcInterface::new(p2p.clone());
let http = listen(
executor.clone(),
rpc.clone(),
Async::<TcpListener>::bind(([127, 0, 0, 1], options.rpc_port))?,
None,
);
let http_task = executor.spawn(http);
*rpc.started.lock().await = true;
p2p.clone().start(executor.clone()).await?;
p2p.run(executor).await?;
rpc.wait_for_quit().await?;
http_task.cancel().await;
Ok(())
}
/*
async fn start2(executor: Arc<Executor<'_>>, options: ProgramOptions) -> Result<()> {
let connections = Arc::new(Mutex::new(HashMap::new()));
let stored_addrs = Arc::new(Mutex::new(Vec::new()));
let executor2 = executor.clone();
let stored_addrs2 = stored_addrs.clone();
let mut server_task = None;
if let Some(accept_addr) = options.accept_addr {
let accept_addr = accept_addr.clone();
let protocol = ServerProtocol::new(connections.clone(), accept_addr, stored_addrs2);
server_task = Some(executor.spawn(async move {
protocol.start(executor2).await?;
Ok::<(), sapvi::Error>(())
}));
}
let mut seed_protocols = Vec::with_capacity(options.seed_addrs.len());
// Normally we query this from a server
let accept_addr = options.accept_addr.clone();
for seed_addr in options.seed_addrs.iter() {
let protocol = SeedProtocol::new(seed_addr.clone(), accept_addr, stored_addrs.clone());
protocol.clone().start(executor.clone()).await;
seed_protocols.push(protocol);
}
debug!("Waiting for seed node queries to finish...");
for seed_protocol in seed_protocols {
seed_protocol.await_finish().await;
}
debug!("Seed nodes queried.");
let mut client_slots = vec![];
for i in 0..options.connection_slots {
debug!("Starting connection slot {}", i);
let client = Channel::new(
connections.clone(),
accept_addr.clone(),
stored_addrs.clone(),
);
client.clone().start(executor.clone()).await;
client_slots.push(client);
}
for remote_addr in options.manual_connects {
debug!("Starting connection (manual) to {}", remote_addr);
let client = Channel::new(
connections.clone(),
accept_addr.clone(),
stored_addrs.clone(),
);
client
.clone()
.start_manual(remote_addr, executor.clone())
.await;
client_slots.push(client);
}
let rpc = RpcInterface::new();
let http = listen(
executor.clone(),
rpc.clone(),
Async::<TcpListener>::bind(([127, 0, 0, 1], 8000))?,
None,
);
let http_task = executor.spawn(http);
rpc.stop_recv.recv().await?;
http_task.cancel().await;
match server_task {
None => {}
Some(server_task) => {
server_task.cancel().await;
}
}
Ok(())
}
*/
struct ProgramOptions {
network_settings: net::Settings,
log_path: Box<std::path::PathBuf>,
rpc_port: u16,
}
impl ProgramOptions {
fn load() -> Result<ProgramOptions> {
let app = clap_app!(dfi =>
(version: "0.1.0")
(author: "Amir Taaki <amir@dyne.org>")
(about: "Dark node")
(@arg ACCEPT: -a --accept +takes_value "Accept address")
(@arg SEED_NODES: -s --seeds ... "Seed nodes")
(@arg CONNECTS: -c --connect ... "Manual connections")
(@arg CONNECT_SLOTS: --slots +takes_value "Connection slots")
(@arg LOG_PATH: --log +takes_value "Logfile path")
(@arg RPC_PORT: -r --rpc +takes_value "RPC port")
)
.get_matches();
let accept_addr = if let Some(accept_addr) = app.value_of("ACCEPT") {
Some(accept_addr.parse()?)
} else {
None
};
let mut seed_addrs: Vec<SocketAddr> = vec![];
if let Some(seeds) = app.values_of("SEED_NODES") {
for seed in seeds {
seed_addrs.push(seed.parse()?);
}
}
let mut manual_connects: Vec<SocketAddr> = vec![];
if let Some(connections) = app.values_of("CONNECTS") {
for connect in connections {
manual_connects.push(connect.parse()?);
}
}
let connection_slots = if let Some(connection_slots) = app.value_of("CONNECT_SLOTS") {
connection_slots.parse()?
} else {
0
};
let log_path = Box::new(
if let Some(log_path) = app.value_of("LOG_PATH") {
std::path::Path::new(log_path)
} else {
std::path::Path::new("/tmp/darkfid.log")
}
.to_path_buf(),
);
let rpc_port = if let Some(rpc_port) = app.value_of("RPC_PORT") {
rpc_port.parse()?
} else {
8000
};
Ok(ProgramOptions {
network_settings: net::Settings {
inbound: accept_addr,
outbound_connections: connection_slots,
external_addr: accept_addr,
peers: manual_connects,
seeds: seed_addrs,
..Default::default()
},
log_path,
rpc_port,
})
}
}
fn main() -> Result<()> {
use simplelog::*;
let options = ProgramOptions::load()?;
let logger_config = ConfigBuilder::new().set_time_format_str("%T%.6f").build();
CombinedLogger::init(vec![
TermLogger::new(LevelFilter::Debug, logger_config, TerminalMode::Mixed).unwrap(),
WriteLogger::new(
LevelFilter::Debug,
Config::default(),
std::fs::File::create(options.log_path.as_path()).unwrap(),
),
])
.unwrap();
let ex = Arc::new(Executor::new());
let (signal, shutdown) = async_channel::unbounded::<()>();
let ex2 = ex.clone();
let (_, result) = Parallel::new()
// Run four executor threads.
.each(0..3, |_| smol::future::block_on(ex.run(shutdown.recv())))
// Run the main future on the current thread.
.finish(|| {
smol::future::block_on(async move {
start(ex2, options).await?;
drop(signal);
Ok::<(), sapvi::Error>(())
})
});
result
}

View File

@@ -1,86 +0,0 @@
// rocksdb is the blockchain database
// it is a key value store
// sqlite is the encrypted wallet
use rocksdb::DB;
use rusqlite::{Connection, Result};
use std::path::{Path, PathBuf};
fn main() -> Result<()> {
wallet()?;
blockchain()?;
Ok(())
}
fn wallet() -> Result<()> {
let connector = connect()?;
encrypt(&connector)?;
println!("Created encrypted database.");
decrypt(&connector)?;
println!("Decrypted database.");
Ok(())
}
fn connect() -> Result<Connection> {
println!("Attempting to establish a connection...");
let path = dirs::home_dir()
.expect("Cannot find home directory!")
.as_path()
.join(".config/darkfi/wallet.db");
let connector = Connection::open(&path);
println!("Connection established");
connector
}
fn encrypt(conn: &Connection) -> Result<()> {
println!("Attempting to create an encrypted database...");
conn.execute_batch(
"ATTACH DATABASE 'encrypted.db' AS encrypted KEY 'testkey';
SELECT sqlcipher_export('encrypted');
DETACH DATABASE encrypted;",
)
}
fn decrypt(conn: &Connection) -> Result<()> {
println!("Attempting to decrypt database...");
conn.execute_batch(
"ATTACH DATABASE 'plaintext.db' AS plaintext KEY 'testkey';
SELECT sqlcipher_export('plaintext');
DETACH DATABASE plaintext;",
)
}
fn blockchain() -> Result<()> {
let db = create_db();
write_db(&db)?;
test_db(&db);
Ok(())
}
fn create_db() -> DB {
println!("Creating a blockchain database...");
let path = dirs::home_dir()
.expect("Cannot find home directory!")
.as_path()
.join(".config/darkfi/chain");
let db = DB::open_default(path).unwrap();
db
}
fn write_db(db: &DB) -> Result<()> {
println!("Writing to the blockchain...");
db.put(b"test-value", b"test-key").unwrap();
Ok(())
}
fn test_db(db: &DB) {
println!("Testing if write was successful...");
match db.get(b"test-value") {
Ok(Some(value)) => println!("retrieved value {}", String::from_utf8(value).unwrap()),
Ok(None) => println!("value not found"),
Err(e) => println!("operational problem encountered: {}", e),
}
}
// TODO: macro to load file as a string. load wallet tables in sqlite at run
// Table includes: maintain a list of coins and whether they are spent

View File

@@ -1,3 +1,4 @@
#[macro_use] extern crate clap;
use bellman::groth16;
use bls12_381::{Bls12, Scalar};
use std::collections::{HashMap, HashSet};
@@ -11,6 +12,7 @@ pub mod error;
pub mod gfx;
pub mod gui;
pub mod net;
pub mod rpc;
pub mod serial;
pub mod service;
pub mod system;

2
src/rpc/adapter.rs Normal file
View File

@@ -0,0 +1,2 @@
// Adapter class goes here
//

116
src/rpc/jsonserver.rs Normal file
View File

@@ -0,0 +1,116 @@
use async_executor::Executor;
use async_native_tls::TlsAcceptor;
use async_std::sync::Mutex;
use easy_parallel::Parallel;
use ff::Field;
use http_types::{Request, Response, StatusCode};
use log::*;
use rand::rngs::OsRng;
use serde_json::json;
use smol::Async;
use std::fs::File;
use std::io::prelude::*;
use std::io::BufReader;
use std::net::SocketAddr;
use std::net::TcpListener;
use std::sync::Arc;
use rusqlite::Connection;
use crate::{net, serial, Result, Error};
// json RPC server goes here
pub struct RpcInterface {
p2p: Arc<net::P2p>,
pub started: Mutex<bool>,
stop_send: async_channel::Sender<()>,
stop_recv: async_channel::Receiver<()>,
}
impl RpcInterface {
pub fn new(p2p: Arc<net::P2p>) -> Arc<Self> {
let (stop_send, stop_recv) = async_channel::unbounded::<()>();
Arc::new(Self {
p2p,
started: Mutex::new(false),
stop_send,
stop_recv,
})
}
async fn db_connect() -> Connection {
let path = dirs::home_dir()
.expect("Cannot find home directory.")
.as_path()
.join(".config/darkfi/wallet.db");
let connector = Connection::open(&path);
connector.expect("Failed to connect to database.")
}
async fn generate_key() -> (Vec<u8>, Vec<u8>) {
let secret: jubjub::Fr = jubjub::Fr::random(&mut OsRng);
let public = zcash_primitives::constants::SPENDING_KEY_GENERATOR * secret;
let pubkey = serial::serialize(&public);
let privkey = serial::serialize(&secret);
(privkey, pubkey)
}
// TODO: fix this
async fn store_key(conn: &Connection, pubkey: Vec<u8>, privkey: Vec<u8>) -> Result<()> {
let mut db_file = File::open("wallet.sql")?;
let mut contents = String::new();
db_file.read_to_string(&mut contents)?;
Ok(conn.execute_batch(&mut contents)?)
}
// add new methods to handle wallet commands
pub async fn serve(self: Arc<Self>, mut req: Request) -> http_types::Result<Response> {
info!("RPC serving {}", req.url());
let request = req.body_string().await?;
let mut io = jsonrpc_core::IoHandler::new();
io.add_sync_method("say_hello", |_| {
Ok(jsonrpc_core::Value::String("Hello World!".into()))
});
let self2 = self.clone();
io.add_method("get_info", move |_| {
let self2 = self2.clone();
async move {
Ok(json!({
"started": *self2.started.lock().await,
"connections": self2.p2p.connections_count().await
}))
}
});
let stop_send = self.stop_send.clone();
io.add_method("stop", move |_| {
let stop_send = stop_send.clone();
async move {
let _ = stop_send.send(()).await;
Ok(jsonrpc_core::Value::Null)
}
});
io.add_method("key_gen", move |_| async move {
RpcInterface::db_connect().await;
let (pubkey, privkey) = RpcInterface::generate_key().await;
//println!("{}", pubkey, "{}", privkey);
Ok(jsonrpc_core::Value::Null)
});
let response = io
.handle_request_sync(&request)
.ok_or(Error::BadOperationType)?;
let mut res = Response::new(StatusCode::Ok);
res.insert_header("Content-Type", "text/plain");
res.set_body(response);
Ok(res)
}
pub async fn wait_for_quit(self: Arc<Self>) -> Result<()> {
Ok(self.stop_recv.recv().await?)
}
}

3
src/rpc/mod.rs Normal file
View File

@@ -0,0 +1,3 @@
pub mod test;
pub mod jsonserver;

3
src/rpc/test.rs Normal file
View File

@@ -0,0 +1,3 @@
fn foo() {
}

1
src/wallet/mod.rs Normal file
View File

@@ -0,0 +1 @@
// Empty mod.rs file for now