diff --git a/Readme.md b/Readme.md index 6e84a45..715c2dc 100644 --- a/Readme.md +++ b/Readme.md @@ -22,7 +22,10 @@ RUST_LOG=debug cargo run -p prover_cli -- -i 127.0.0.1 --metrics-ip 127.0.0.1 -- ### Run prover client (for tests) -RUST_LOG=debug cargo run -p prover_client +* RUST_LOG=debug cargo run -p prover_client -- --help +* RUST_LOG=debug cargo run -p prover_client -- -i 127.0.0.1 -p 42942 register-user +* RUST_LOG=debug cargo run -p prover_client -- -i 127.0.0.1 -p 42942 send-transaction --tx-hash aa +* RUST_LOG=debug cargo run -p prover_client -- -i 127.0.0.1 -p 42942 -a 0xd8da6bf26964af9d7eed9e03e53415d37aa96045 get-user-tier-info ## Debug diff --git a/prover/benches/prover_bench.rs b/prover/benches/prover_bench.rs index dea1a0f..8073a42 100644 --- a/prover/benches/prover_bench.rs +++ b/prover/benches/prover_bench.rs @@ -131,6 +131,8 @@ fn proof_generation_bench(c: &mut Criterion) { mock_user: None, config_path: Default::default(), no_config: Some(true), + metrics_ip: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + metrics_port: 30051, broadcast_channel_size: 100, proof_service_count: 32, transaction_channel_size: 100, diff --git a/prover/src/grpc_service.rs b/prover/src/grpc_service.rs index 8288e3b..40303b2 100644 --- a/prover/src/grpc_service.rs +++ b/prover/src/grpc_service.rs @@ -8,7 +8,7 @@ use async_channel::Sender; use bytesize::ByteSize; use futures::TryFutureExt; use http::Method; -use metrics::counter; +use metrics::{counter, histogram}; use num_bigint::BigUint; use tokio::sync::{broadcast, mpsc}; use tonic::{ @@ -21,8 +21,9 @@ use url::Url; // internal use crate::error::{AppError, ProofGenerationStringError}; use crate::metrics::{ - GET_PROOFS_LISTENERS, GET_USER_TIER_INFO_REQUESTS, GaugeWrapper, SEND_TRANSACTION_REQUESTS, - USER_REGISTERED, USER_REGISTERED_REQUESTS, + GET_PROOFS_LISTENERS, GET_USER_TIER_INFO_REQUESTS, GaugeWrapper, + PROOF_SERVICES_CHANNEL_QUEUE_LEN, SEND_TRANSACTION_REQUESTS, USER_REGISTERED, + USER_REGISTERED_REQUESTS, }; use crate::proof_generation::{ProofGenerationData, ProofSendingData}; use crate::user_db::{UserDb, UserTierInfo}; @@ -106,7 +107,7 @@ where &self, request: Request, ) -> Result, Status> { - counter!(SEND_TRANSACTION_REQUESTS.name, "service" => "grpc").increment(1); + counter!(SEND_TRANSACTION_REQUESTS.name, "prover" => "grpc").increment(1); debug!("send_transaction request: {:?}", request); let req = request.into_inner(); @@ -152,6 +153,11 @@ where .await .map_err(|e| Status::from_error(Box::new(e)))?; + // Note: based on this link https://doc.rust-lang.org/reference/expressions/operator-expr.html#type-cast-expressions + // "Casting from an integer to float will produce the closest possible float *" + histogram!(PROOF_SERVICES_CHANNEL_QUEUE_LEN.name, "prover" => "grpc") + .record(self.proof_sender.len() as f64); + let reply = SendTransactionReply { result: true }; Ok(Response::new(reply)) } @@ -162,7 +168,7 @@ where request: Request, ) -> Result, Status> { debug!("register_user request: {:?}", request); - counter!(USER_REGISTERED_REQUESTS.name, "service" => "grpc").increment(1); + counter!(USER_REGISTERED_REQUESTS.name, "prover" => "grpc").increment(1); let req = request.into_inner(); let user = if let Some(user) = req.user { @@ -202,7 +208,7 @@ where status: status.into(), }; - counter!(USER_REGISTERED.name, "service" => "grpc").increment(1); + counter!(USER_REGISTERED.name, "prover" => "grpc").increment(1); Ok(Response::new(reply)) } @@ -214,7 +220,7 @@ where request: Request, ) -> Result, Status> { debug!("get_proofs request: {:?}", request); - let gauge = GaugeWrapper::new(GET_PROOFS_LISTENERS.name, "service", "grpc"); + let gauge = GaugeWrapper::new(GET_PROOFS_LISTENERS.name, "prover", "grpc"); // Channel to send proof to the connected grpc client (aka the Verifier) let (tx, rx) = mpsc::channel(self.proof_sender_channel_size); @@ -255,7 +261,7 @@ where request: Request, ) -> Result, Status> { debug!("request: {:?}", request); - counter!(GET_USER_TIER_INFO_REQUESTS.name, "service" => "grpc").increment(1); + counter!(GET_USER_TIER_INFO_REQUESTS.name, "prover" => "grpc").increment(1); let req = request.into_inner(); diff --git a/prover/src/lib.rs b/prover/src/lib.rs index 624fca0..235c44a 100644 --- a/prover/src/lib.rs +++ b/prover/src/lib.rs @@ -164,29 +164,30 @@ pub async fn run_prover( }; let mut set = JoinSet::new(); - for _i in 0..app_args.proof_service_count { + for i in 0..app_args.proof_service_count { let proof_recv = proof_receiver.clone(); let broadcast_sender = tx.clone(); let current_epoch = epoch_service.current_epoch.clone(); let user_db = user_db_service.get_user_db(); - set.spawn(async { + set.spawn(async move { let proof_service = ProofService::new( proof_recv, broadcast_sender, current_epoch, user_db, PROVER_SPAM_LIMIT, + u64::from(i), ); proof_service.serve().await }); } - if registry_listener.is_some() { - set.spawn(async move { registry_listener.unwrap().listen().await }); + if let Some(registry_listener) = registry_listener { + set.spawn(async move { registry_listener.listen().await }); } - if tiers_listener.is_some() { - set.spawn(async move { tiers_listener.unwrap().listen().await }); + if let Some(tiers_listener) = tiers_listener { + set.spawn(async move { tiers_listener.listen().await }); } set.spawn(async move { epoch_service.listen_for_new_epoch().await }); set.spawn(async move { user_db_service.listen_for_epoch_changes().await }); diff --git a/prover/src/metrics.rs b/prover/src/metrics.rs index d3d1225..a70e9a9 100644 --- a/prover/src/metrics.rs +++ b/prover/src/metrics.rs @@ -47,6 +47,11 @@ pub const EPOCH_SERVICE_DRIFT_MILLIS: Metric = Metric { description: "Drift in milliseconds (when epoch service is waiting for the next epoch slice)", }; +pub const PROOF_SERVICE_PROOF_COMPUTED: Metric = Metric { + name: "proof_service_proof_computed", + description: "Number of computed proofs", +}; + pub const PROOF_SERVICE_GEN_PROOF_TIME: Metric = Metric { name: "proof_service_gen_proof_time", description: "Generation time of a proof in seconds", @@ -57,18 +62,35 @@ pub const GET_PROOFS_LISTENERS: Metric = Metric { description: "Current number of active subscription to grpc get_proofs server streaming endpoint", }; -pub const COUNTERS: [Metric; 4] = [ +/// Histogram metrics for the broadcast channel (used by proof service to send proofs) +pub const BROADCAST_CHANNEL_QUEUE_LEN: Metric = Metric { + name: "broadcast_channel_queue_len", + description: "Number of queued values", +}; + +/// Histogram metrics for the mpmc channel (used by proof services to receive new tx) +pub const PROOF_SERVICES_CHANNEL_QUEUE_LEN: Metric = Metric { + name: "proof_services_channel_queue_len", + description: "Number of queued values", +}; + +pub const COUNTERS: [Metric; 5] = [ USER_REGISTERED, USER_REGISTERED_REQUESTS, SEND_TRANSACTION_REQUESTS, GET_USER_TIER_INFO_REQUESTS, + PROOF_SERVICE_PROOF_COMPUTED, ]; pub const GAUGES: [Metric; 3] = [ EPOCH_SERVICE_CURRENT_EPOCH, EPOCH_SERVICE_CURRENT_EPOCH_SLICE, GET_PROOFS_LISTENERS, ]; -pub const HISTOGRAMS: [Metric; 2] = [EPOCH_SERVICE_DRIFT_MILLIS, PROOF_SERVICE_GEN_PROOF_TIME]; +pub const HISTOGRAMS: [Metric; 3] = [ + EPOCH_SERVICE_DRIFT_MILLIS, + PROOF_SERVICE_GEN_PROOF_TIME, + BROADCAST_CHANNEL_QUEUE_LEN, +]; pub fn init_metrics(ip: IpAddr, port: &u16) { info!("Initializing metrics exporter (port: {})", port); diff --git a/prover/src/proof_service.rs b/prover/src/proof_service.rs index c41dc34..a5d66cc 100644 --- a/prover/src/proof_service.rs +++ b/prover/src/proof_service.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use ark_bn254::Fr; use ark_serialize::CanonicalSerialize; use async_channel::Receiver; -use metrics::histogram; +use metrics::{counter, histogram}; use parking_lot::RwLock; use rln::hashers::hash_to_field; use rln::protocol::serialize_proof_values; @@ -12,7 +12,9 @@ use tracing::{Instrument, debug, debug_span, info}; // internal use crate::epoch_service::{Epoch, EpochSlice}; use crate::error::{AppError, ProofGenerationError, ProofGenerationStringError}; -use crate::metrics::PROOF_SERVICE_GEN_PROOF_TIME; +use crate::metrics::{ + BROADCAST_CHANNEL_QUEUE_LEN, PROOF_SERVICE_GEN_PROOF_TIME, PROOF_SERVICE_PROOF_COMPUTED, +}; use crate::proof_generation::{ProofGenerationData, ProofSendingData}; use crate::user_db::UserDb; use crate::user_db_types::RateLimit; @@ -28,6 +30,7 @@ pub struct ProofService { current_epoch: Arc>, user_db: UserDb, rate_limit: RateLimit, + id: u64, } impl ProofService { @@ -39,6 +42,7 @@ impl ProofService { current_epoch: Arc>, user_db: UserDb, rate_limit: RateLimit, + id: u64, ) -> Self { debug_assert!(rate_limit > RateLimit::ZERO); Self { @@ -47,6 +51,7 @@ impl ProofService { current_epoch, user_db, rate_limit, + id, } } @@ -66,6 +71,10 @@ impl ProofService { let proof_generation_data_ = proof_generation_data.clone(); let rate_limit = self.rate_limit; + // let counter_label = Arc::new(format!("proof service (id: {})", self.id)); + // let counter_label_ref = counter_label.clone(); + let counter_id = self.id; + // Move to a task (as generating the proof can take quite some time) let blocking_task = tokio::task::spawn_blocking(move || { let proof_generation_start = std::time::Instant::now(); @@ -118,6 +127,8 @@ impl ProofService { histogram!(PROOF_SERVICE_GEN_PROOF_TIME.name, "prover" => "proof service") .record(proof_generation_start.elapsed().as_secs_f64()); + let labels = [("prover", format!("proof service id: {counter_id}"))]; + counter!(PROOF_SERVICE_PROOF_COMPUTED.name, &labels).increment(1); Ok::, ProofGenerationError>(output_buffer.into_inner()) }); @@ -139,6 +150,11 @@ impl ProofService { info!("Stopping proof generation service: {}", e); break; }; + + // Note: based on this link https://doc.rust-lang.org/reference/expressions/operator-expr.html#type-cast-expressions + // "Casting from an integer to float will produce the closest possible float *" + histogram!(BROADCAST_CHANNEL_QUEUE_LEN.name, "prover" => "proof service") + .record(self.broadcast_sender.len() as f64); } Ok(()) @@ -293,6 +309,7 @@ mod tests { epoch_store, user_db.clone(), RateLimit::from(10), + 0, ); // Verification @@ -350,6 +367,7 @@ mod tests { epoch_store, user_db.clone(), RateLimit::from(10), + 0, ); // Verification @@ -512,6 +530,7 @@ mod tests { epoch_store, user_db.clone(), rate_limit, + 0, ); info!("Starting..."); @@ -582,6 +601,7 @@ mod tests { epoch_store, user_db.clone(), rate_limit, + 0, ); info!("Starting...");