faucetd: Activate all protocols and blockchain sync.

This commit is contained in:
parazyd
2022-04-21 16:12:39 +02:00
parent e3453bb1cb
commit 797ce2a158

View File

@@ -22,7 +22,9 @@ use darkfi::{
blockchain::{NullifierStore, RootStore},
cli_desc,
consensus2::{
Timestamp, ValidatorState, MAINNET_GENESIS_HASH_BYTES, TESTNET_GENESIS_HASH_BYTES,
proto::{ProtocolSync, ProtocolTx},
task::block_sync_task,
Timestamp, Tx, ValidatorState, MAINNET_GENESIS_HASH_BYTES, TESTNET_GENESIS_HASH_BYTES,
},
crypto::{keypair::PublicKey, types::DrkTokenId},
net,
@@ -43,7 +45,7 @@ use darkfi::{
serial::serialize,
sleep,
},
wallet::walletdb::{WalletDb, WalletPtr},
wallet::walletdb::{init_wallet, WalletPtr},
Error, Result,
};
@@ -118,6 +120,7 @@ pub struct Faucetd {
airdrop_timeout: i64,
airdrop_limit: BigUint,
airdrop_map: Arc<Mutex<HashMap<String, i64>>>,
synced: Mutex<bool>, // AtomicBool is weird in Arc
client: Client,
p2p: P2pPtr,
state: Arc<Mutex<State>>,
@@ -170,6 +173,7 @@ impl Faucetd {
airdrop_timeout: timeout,
airdrop_limit: limit,
airdrop_map: Arc::new(Mutex::new(HashMap::new())),
synced: Mutex::new(false),
client,
p2p,
state,
@@ -211,22 +215,28 @@ impl Faucetd {
// TODO: Token ID decision
// TODO: Rename this function to tx build
// let tx = match self
// .client
// .send(pubkey, amount, DrkTokenId::from(1), true, self.state.clone())
// .await
// {
// Ok(v) => v,
// Err(e) => {
// error!("airdrop(): Failed building transaction: {}", e);
// return jsonrpc::error(InternalError, None, id).into()
// }
// };
let tx = match self
.client
.send(pubkey, amount.try_into().unwrap(), DrkTokenId::from(1), true, self.state.clone())
.await
{
Ok(v) => v,
Err(e) => {
error!("airdrop(): Failed building transaction: {}", e);
return jsonrpc::error(InternalError, None, id).into()
}
};
// let tx_hash = blake3::hash(&serialize(&tx)).to_hex().as_str().to_string();
let tx_hash = "f00b4r";
let tx_hash = blake3::hash(&serialize(&tx)).to_hex().as_str().to_string();
// TODO: p2p tx broadcast
// Broadcast transaction to the network.
match self.p2p.broadcast(Tx(tx)).await {
Ok(()) => {}
Err(e) => {
error!("airdrop(): Failed broadcasting transaction: {}", e);
return jsonrpc::error(InternalError, None, id).into()
}
}
let mut map = self.airdrop_map.lock().await;
map.insert(params[1].as_str().unwrap().to_string(), now);
@@ -261,13 +271,6 @@ async fn prune_airdrop_map(map: Arc<Mutex<HashMap<String, i64>>>, timeout: i64)
}
}
async fn init_wallet(wallet_path: &str, wallet_pass: &str) -> Result<WalletPtr> {
let expanded = expand_path(wallet_path)?;
let wallet_path = format!("sqlite://{}", expanded.to_str().unwrap());
let wallet = WalletDb::new(&wallet_path, wallet_pass).await?;
Ok(wallet)
}
async_daemonize!(realmain);
async fn realmain(args: Args, ex: Arc<Executor<'_>>) -> Result<()> {
// We use this handler to block this function after detaching all
@@ -287,6 +290,7 @@ async fn realmain(args: Args, ex: Arc<Executor<'_>>) -> Result<()> {
let sled_db = sled::open(&db_path)?;
// Initialize validator state
// TODO: genesis_ts should be some hardcoded constant
let genesis_ts = Timestamp(1650103269);
let genesis_data = match args.chain.as_str() {
"mainnet" => *MAINNET_GENESIS_HASH_BYTES,
@@ -300,9 +304,12 @@ async fn realmain(args: Args, ex: Arc<Executor<'_>>) -> Result<()> {
// TODO: Is this ok?
let mut rng = rand::thread_rng();
let id: u64 = rng.gen();
// Initialize validator state
let state = ValidatorState::new(&sled_db, id, genesis_ts, genesis_data)?;
// P2P network
// P2P network. The faucet doesn't participate in consensus, so we only
// build the sync protocol.
let network_settings = net::Settings {
inbound: args.p2p_accept,
outbound_connections: args.slots,
@@ -315,24 +322,29 @@ async fn realmain(args: Args, ex: Arc<Executor<'_>>) -> Result<()> {
let p2p = net::P2p::new(network_settings).await;
let registry = p2p.protocol_registry();
// TODO: Register replicator + tx broadcast protocols
info!("Registering block sync P2P protocols...");
let _state = state.clone();
registry
.register(!net::SESSION_SEED, move |channel, p2p| {
let state = _state.clone();
async move { ProtocolSync::init(channel, state, p2p).await.unwrap() }
})
.await;
info!("Starting P2P networking");
p2p.clone().start(ex.clone()).await?;
let _ex = ex.clone();
let _p2p = p2p.clone();
ex.spawn(async move {
if let Err(e) = _p2p.run(_ex).await {
error!("P2P run failed: {}", e);
}
})
.detach();
let _state = state.clone();
registry
.register(!net::SESSION_SEED, move |channel, p2p| {
let state = _state.clone();
async move { ProtocolTx::init(channel, state, p2p).await.unwrap() }
})
.await;
let airdrop_timeout = args.airdrop_timeout;
let airdrop_limit = decode_base10(&args.airdrop_limit, 8, true)?;
// Initialize program state
let faucetd = Faucetd::new(&sled_db, wallet, p2p, airdrop_timeout, airdrop_limit).await?;
let faucetd =
Faucetd::new(&sled_db, wallet, p2p.clone(), airdrop_timeout, airdrop_limit).await?;
let faucetd = Arc::new(faucetd);
// Task to periodically clean up the hashmap of airdrops.
@@ -340,7 +352,23 @@ async fn realmain(args: Args, ex: Arc<Executor<'_>>) -> Result<()> {
// JSON-RPC server
info!("Starting JSON-RPC server");
ex.spawn(listen_and_serve(args.rpc_listen, faucetd)).detach();
ex.spawn(listen_and_serve(args.rpc_listen, faucetd.clone())).detach();
info!("Starting sync P2P network");
p2p.clone().start(ex.clone()).await?;
let _ex = ex.clone();
let _sync_p2p = p2p.clone();
ex.spawn(async move {
if let Err(e) = _sync_p2p.run(_ex).await {
error!("Failed starting sync P2P network: {}", e);
}
})
.detach();
match block_sync_task(p2p.clone(), state.clone()).await {
Ok(()) => *faucetd.synced.lock().await = true,
Err(e) => error!("Failed syncing blockchain: {}", e),
}
// Wait for SIGINT
shutdown.recv().await?;