Add histogram metrics for channels (#20)

This commit is contained in:
Sydhds
2025-07-16 14:41:32 +02:00
committed by GitHub
parent 1e813c6710
commit 8b1540cf76
6 changed files with 73 additions and 19 deletions

View File

@@ -22,7 +22,10 @@ RUST_LOG=debug cargo run -p prover_cli -- -i 127.0.0.1 --metrics-ip 127.0.0.1 --
### Run prover client (for tests)
RUST_LOG=debug cargo run -p prover_client
* RUST_LOG=debug cargo run -p prover_client -- --help
* RUST_LOG=debug cargo run -p prover_client -- -i 127.0.0.1 -p 42942 register-user
* RUST_LOG=debug cargo run -p prover_client -- -i 127.0.0.1 -p 42942 send-transaction --tx-hash aa
* RUST_LOG=debug cargo run -p prover_client -- -i 127.0.0.1 -p 42942 -a 0xd8da6bf26964af9d7eed9e03e53415d37aa96045 get-user-tier-info
## Debug

View File

@@ -131,6 +131,8 @@ fn proof_generation_bench(c: &mut Criterion) {
mock_user: None,
config_path: Default::default(),
no_config: Some(true),
metrics_ip: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
metrics_port: 30051,
broadcast_channel_size: 100,
proof_service_count: 32,
transaction_channel_size: 100,

View File

@@ -8,7 +8,7 @@ use async_channel::Sender;
use bytesize::ByteSize;
use futures::TryFutureExt;
use http::Method;
use metrics::counter;
use metrics::{counter, histogram};
use num_bigint::BigUint;
use tokio::sync::{broadcast, mpsc};
use tonic::{
@@ -21,8 +21,9 @@ use url::Url;
// internal
use crate::error::{AppError, ProofGenerationStringError};
use crate::metrics::{
GET_PROOFS_LISTENERS, GET_USER_TIER_INFO_REQUESTS, GaugeWrapper, SEND_TRANSACTION_REQUESTS,
USER_REGISTERED, USER_REGISTERED_REQUESTS,
GET_PROOFS_LISTENERS, GET_USER_TIER_INFO_REQUESTS, GaugeWrapper,
PROOF_SERVICES_CHANNEL_QUEUE_LEN, SEND_TRANSACTION_REQUESTS, USER_REGISTERED,
USER_REGISTERED_REQUESTS,
};
use crate::proof_generation::{ProofGenerationData, ProofSendingData};
use crate::user_db::{UserDb, UserTierInfo};
@@ -106,7 +107,7 @@ where
&self,
request: Request<SendTransactionRequest>,
) -> Result<Response<SendTransactionReply>, Status> {
counter!(SEND_TRANSACTION_REQUESTS.name, "service" => "grpc").increment(1);
counter!(SEND_TRANSACTION_REQUESTS.name, "prover" => "grpc").increment(1);
debug!("send_transaction request: {:?}", request);
let req = request.into_inner();
@@ -152,6 +153,11 @@ where
.await
.map_err(|e| Status::from_error(Box::new(e)))?;
// Note: based on this link https://doc.rust-lang.org/reference/expressions/operator-expr.html#type-cast-expressions
// "Casting from an integer to float will produce the closest possible float *"
histogram!(PROOF_SERVICES_CHANNEL_QUEUE_LEN.name, "prover" => "grpc")
.record(self.proof_sender.len() as f64);
let reply = SendTransactionReply { result: true };
Ok(Response::new(reply))
}
@@ -162,7 +168,7 @@ where
request: Request<RegisterUserRequest>,
) -> Result<Response<RegisterUserReply>, Status> {
debug!("register_user request: {:?}", request);
counter!(USER_REGISTERED_REQUESTS.name, "service" => "grpc").increment(1);
counter!(USER_REGISTERED_REQUESTS.name, "prover" => "grpc").increment(1);
let req = request.into_inner();
let user = if let Some(user) = req.user {
@@ -202,7 +208,7 @@ where
status: status.into(),
};
counter!(USER_REGISTERED.name, "service" => "grpc").increment(1);
counter!(USER_REGISTERED.name, "prover" => "grpc").increment(1);
Ok(Response::new(reply))
}
@@ -214,7 +220,7 @@ where
request: Request<RlnProofFilter>,
) -> Result<Response<Self::GetProofsStream>, Status> {
debug!("get_proofs request: {:?}", request);
let gauge = GaugeWrapper::new(GET_PROOFS_LISTENERS.name, "service", "grpc");
let gauge = GaugeWrapper::new(GET_PROOFS_LISTENERS.name, "prover", "grpc");
// Channel to send proof to the connected grpc client (aka the Verifier)
let (tx, rx) = mpsc::channel(self.proof_sender_channel_size);
@@ -255,7 +261,7 @@ where
request: Request<GetUserTierInfoRequest>,
) -> Result<Response<GetUserTierInfoReply>, Status> {
debug!("request: {:?}", request);
counter!(GET_USER_TIER_INFO_REQUESTS.name, "service" => "grpc").increment(1);
counter!(GET_USER_TIER_INFO_REQUESTS.name, "prover" => "grpc").increment(1);
let req = request.into_inner();

View File

@@ -164,29 +164,30 @@ pub async fn run_prover(
};
let mut set = JoinSet::new();
for _i in 0..app_args.proof_service_count {
for i in 0..app_args.proof_service_count {
let proof_recv = proof_receiver.clone();
let broadcast_sender = tx.clone();
let current_epoch = epoch_service.current_epoch.clone();
let user_db = user_db_service.get_user_db();
set.spawn(async {
set.spawn(async move {
let proof_service = ProofService::new(
proof_recv,
broadcast_sender,
current_epoch,
user_db,
PROVER_SPAM_LIMIT,
u64::from(i),
);
proof_service.serve().await
});
}
if registry_listener.is_some() {
set.spawn(async move { registry_listener.unwrap().listen().await });
if let Some(registry_listener) = registry_listener {
set.spawn(async move { registry_listener.listen().await });
}
if tiers_listener.is_some() {
set.spawn(async move { tiers_listener.unwrap().listen().await });
if let Some(tiers_listener) = tiers_listener {
set.spawn(async move { tiers_listener.listen().await });
}
set.spawn(async move { epoch_service.listen_for_new_epoch().await });
set.spawn(async move { user_db_service.listen_for_epoch_changes().await });

View File

@@ -47,6 +47,11 @@ pub const EPOCH_SERVICE_DRIFT_MILLIS: Metric = Metric {
description: "Drift in milliseconds (when epoch service is waiting for the next epoch slice)",
};
pub const PROOF_SERVICE_PROOF_COMPUTED: Metric = Metric {
name: "proof_service_proof_computed",
description: "Number of computed proofs",
};
pub const PROOF_SERVICE_GEN_PROOF_TIME: Metric = Metric {
name: "proof_service_gen_proof_time",
description: "Generation time of a proof in seconds",
@@ -57,18 +62,35 @@ pub const GET_PROOFS_LISTENERS: Metric = Metric {
description: "Current number of active subscription to grpc get_proofs server streaming endpoint",
};
pub const COUNTERS: [Metric; 4] = [
/// Histogram metrics for the broadcast channel (used by proof service to send proofs)
pub const BROADCAST_CHANNEL_QUEUE_LEN: Metric = Metric {
name: "broadcast_channel_queue_len",
description: "Number of queued values",
};
/// Histogram metrics for the mpmc channel (used by proof services to receive new tx)
pub const PROOF_SERVICES_CHANNEL_QUEUE_LEN: Metric = Metric {
name: "proof_services_channel_queue_len",
description: "Number of queued values",
};
pub const COUNTERS: [Metric; 5] = [
USER_REGISTERED,
USER_REGISTERED_REQUESTS,
SEND_TRANSACTION_REQUESTS,
GET_USER_TIER_INFO_REQUESTS,
PROOF_SERVICE_PROOF_COMPUTED,
];
pub const GAUGES: [Metric; 3] = [
EPOCH_SERVICE_CURRENT_EPOCH,
EPOCH_SERVICE_CURRENT_EPOCH_SLICE,
GET_PROOFS_LISTENERS,
];
pub const HISTOGRAMS: [Metric; 2] = [EPOCH_SERVICE_DRIFT_MILLIS, PROOF_SERVICE_GEN_PROOF_TIME];
pub const HISTOGRAMS: [Metric; 3] = [
EPOCH_SERVICE_DRIFT_MILLIS,
PROOF_SERVICE_GEN_PROOF_TIME,
BROADCAST_CHANNEL_QUEUE_LEN,
];
pub fn init_metrics(ip: IpAddr, port: &u16) {
info!("Initializing metrics exporter (port: {})", port);

View File

@@ -4,7 +4,7 @@ use std::sync::Arc;
use ark_bn254::Fr;
use ark_serialize::CanonicalSerialize;
use async_channel::Receiver;
use metrics::histogram;
use metrics::{counter, histogram};
use parking_lot::RwLock;
use rln::hashers::hash_to_field;
use rln::protocol::serialize_proof_values;
@@ -12,7 +12,9 @@ use tracing::{Instrument, debug, debug_span, info};
// internal
use crate::epoch_service::{Epoch, EpochSlice};
use crate::error::{AppError, ProofGenerationError, ProofGenerationStringError};
use crate::metrics::PROOF_SERVICE_GEN_PROOF_TIME;
use crate::metrics::{
BROADCAST_CHANNEL_QUEUE_LEN, PROOF_SERVICE_GEN_PROOF_TIME, PROOF_SERVICE_PROOF_COMPUTED,
};
use crate::proof_generation::{ProofGenerationData, ProofSendingData};
use crate::user_db::UserDb;
use crate::user_db_types::RateLimit;
@@ -28,6 +30,7 @@ pub struct ProofService {
current_epoch: Arc<RwLock<(Epoch, EpochSlice)>>,
user_db: UserDb,
rate_limit: RateLimit,
id: u64,
}
impl ProofService {
@@ -39,6 +42,7 @@ impl ProofService {
current_epoch: Arc<RwLock<(Epoch, EpochSlice)>>,
user_db: UserDb,
rate_limit: RateLimit,
id: u64,
) -> Self {
debug_assert!(rate_limit > RateLimit::ZERO);
Self {
@@ -47,6 +51,7 @@ impl ProofService {
current_epoch,
user_db,
rate_limit,
id,
}
}
@@ -66,6 +71,10 @@ impl ProofService {
let proof_generation_data_ = proof_generation_data.clone();
let rate_limit = self.rate_limit;
// let counter_label = Arc::new(format!("proof service (id: {})", self.id));
// let counter_label_ref = counter_label.clone();
let counter_id = self.id;
// Move to a task (as generating the proof can take quite some time)
let blocking_task = tokio::task::spawn_blocking(move || {
let proof_generation_start = std::time::Instant::now();
@@ -118,6 +127,8 @@ impl ProofService {
histogram!(PROOF_SERVICE_GEN_PROOF_TIME.name, "prover" => "proof service")
.record(proof_generation_start.elapsed().as_secs_f64());
let labels = [("prover", format!("proof service id: {counter_id}"))];
counter!(PROOF_SERVICE_PROOF_COMPUTED.name, &labels).increment(1);
Ok::<Vec<u8>, ProofGenerationError>(output_buffer.into_inner())
});
@@ -139,6 +150,11 @@ impl ProofService {
info!("Stopping proof generation service: {}", e);
break;
};
// Note: based on this link https://doc.rust-lang.org/reference/expressions/operator-expr.html#type-cast-expressions
// "Casting from an integer to float will produce the closest possible float *"
histogram!(BROADCAST_CHANNEL_QUEUE_LEN.name, "prover" => "proof service")
.record(self.broadcast_sender.len() as f64);
}
Ok(())
@@ -293,6 +309,7 @@ mod tests {
epoch_store,
user_db.clone(),
RateLimit::from(10),
0,
);
// Verification
@@ -350,6 +367,7 @@ mod tests {
epoch_store,
user_db.clone(),
RateLimit::from(10),
0,
);
// Verification
@@ -512,6 +530,7 @@ mod tests {
epoch_store,
user_db.clone(),
rate_limit,
0,
);
info!("Starting...");
@@ -582,6 +601,7 @@ mod tests {
epoch_store,
user_db.clone(),
rate_limit,
0,
);
info!("Starting...");