Add rln benchmarks (computation & serialization)

This commit is contained in:
sydhds
2025-05-15 16:07:48 +02:00
parent c4bb950fab
commit c286dca5d5
16 changed files with 650 additions and 109 deletions

29
prover/Cargo.toml Normal file
View File

@@ -0,0 +1,29 @@
[package]
name = "status_rln_prover"
version = "0.1.0"
edition = "2024"
[dependencies]
clap = { version = "4.5.37", features = ["derive"] }
tonic = { version = "0.13", features = ["gzip"] }
tonic-reflection = "0.13"
prost = "0.13"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
tracing = "0.1.41"
tracing-test = "0.2.5"
alloy = { version = "0.15", features = ["full"] }
thiserror = "2.0"
futures = "0.3"
rln = { git = "https://github.com/vacp2p/zerokit" }
ark-bn254 = { version = "0.5", features = ["std"] }
ark-serialize = "0.5.0"
serde_json = "1.0"
dashmap = "6.1.0"
bytesize = "2.0.1"
rln_proof = { path = "../rln_proof" }
chrono = "0.4.41"
parking_lot = "0.12.3"
[build-dependencies]
tonic-build = "*"

12
prover/build.rs Normal file
View File

@@ -0,0 +1,12 @@
use std::env;
use std::path::PathBuf;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap());
tonic_build::configure()
.file_descriptor_set_path(out_dir.join("prover_descriptor.bin"))
.compile_protos(&["../proto/net/vac/prover/prover.proto"], &["../proto"])
.unwrap();
Ok(())
}

23
prover/src/args.rs Normal file
View File

@@ -0,0 +1,23 @@
use std::net::IpAddr;
// third-party
use clap::Parser;
#[derive(Debug, Clone, Parser)]
#[command(about = "RLN prover service", long_about = None)]
pub struct AppArgs {
#[arg(short = 'i', long = "ip", default_value = "::1", help = "Service ip")]
pub(crate) ip: IpAddr,
#[arg(
short = 'p',
long = "port",
default_value = "50051",
help = "Service port"
)]
pub(crate) port: u16,
#[arg(
short = 'r',
long = "rpc_url",
help = "Websocket rpc url (e.g. wss://eth-mainnet.g.alchemy.com/v2/your-api-key)"
)]
pub(crate) rpc_url: String,
}

26
prover/src/error.rs Normal file
View File

@@ -0,0 +1,26 @@
use alloy::{
primitives::Address,
transports::{RpcError, TransportErrorKind},
};
#[derive(thiserror::Error, Debug)]
pub enum AppError {
#[error("Tonic (grpc) error: {0}")]
Tonic(#[from] tonic::transport::Error),
#[error("Tonic reflection (grpc) error: {0}")]
TonicReflection(#[from] tonic_reflection::server::Error),
#[error("SC error 1: {0}")]
Alloy(#[from] RpcError<RpcError<TransportErrorKind>>),
#[error("SC error 2: {0}")]
Alloy2(#[from] RpcError<TransportErrorKind>),
}
#[derive(thiserror::Error, Debug)]
pub enum RegistrationError {
#[error("Transaction has no sender address")]
NoSender,
#[error("Transaction sender address is invalid: {0:?}")]
InvalidAddress(Vec<u8>),
#[error("Cannot find id_commitment for address: {0:?}")]
NotFound(Address),
}

324
prover/src/grpc_service.rs Normal file
View File

@@ -0,0 +1,324 @@
// std
use std::collections::HashMap;
use std::io::{Cursor, Write};
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
// use std::sync::atomic::{
// AtomicI64,
// };
// third-party
use alloy::primitives::Address;
use ark_bn254::Fr;
use ark_serialize::{CanonicalSerialize, SerializationError};
use bytesize::ByteSize;
use futures::TryFutureExt;
use rln::hashers::{hash_to_field, poseidon_hash};
use rln::pm_tree_adapter::PmTree;
use rln::protocol::{ProofError, serialize_proof_values};
use tokio::sync::{
RwLock, broadcast,
broadcast::{Receiver, Sender},
mpsc,
};
use tonic::{
Request,
Response,
Status,
codegen::tokio_stream::wrappers::ReceiverStream,
transport::Server,
// codec::CompressionEncoding
};
use tracing::{
debug,
// error,
// info
};
// internal
use crate::{
error::{
AppError,
// RegistrationError
},
registry::UserRegistry,
};
pub mod prover_proto {
// Include generated code (see build.rs)
tonic::include_proto!("prover");
// for reflection service
pub(crate) const FILE_DESCRIPTOR_SET: &[u8] =
tonic::include_file_descriptor_set!("prover_descriptor");
}
use prover_proto::{
RegisterUserReply, RegisterUserRequest, RlnProof, RlnProofFilter, SendTransactionReply,
SendTransactionRequest,
rln_prover_server::{RlnProver, RlnProverServer},
};
use rln_proof::{
RlnData, RlnIdentifier, RlnUserIdentity, ZerokitMerkleTree, compute_rln_proof_and_values,
};
const PROVER_SERVICE_LIMIT_PER_CONNECTION: usize = 16;
// Timeout for all handlers of a request
const PROVER_SERVICE_GRPC_TIMEOUT: Duration = Duration::from_secs(30);
//
const PROVER_SERVICE_HTTP2_MAX_CONCURRENT_STREAM: u32 = 64;
// Http2 max frame size (e.g. 16 Kb)
const PROVER_SERVICE_HTTP2_MAX_FRAME_SIZE: ByteSize = ByteSize::kib(16);
// Max size for Message (decoding, e.g., 5 Mb)
const PROVER_SERVICE_MESSAGE_DECODING_MAX_SIZE: ByteSize = ByteSize::mib(5);
// Max size for Message (encoding, e.g., 5 Mb)
const PROVER_SERVICE_MESSAGE_ENCODING_MAX_SIZE: ByteSize = ByteSize::mib(5);
const PROVER_SPAM_LIMIT: u64 = 10_000;
#[derive(Debug)]
pub struct ProverService {
registry: UserRegistry,
rln_identifier: Arc<RlnIdentifier>,
message_counters: RwLock<HashMap<Address, u64>>,
spam_limit: u64,
broadcast_channel: (Sender<Vec<u8>>, Receiver<Vec<u8>>),
}
#[derive(thiserror::Error, Debug)]
enum ProofGenerationError {
#[error("Proof generation failed: {0}")]
Proof(#[from] ProofError),
#[error("Proof serialization failed: {0}")]
Serialization(#[from] SerializationError),
#[error("Proof serialization failed: {0}")]
SerializationWrite(#[from] std::io::Error),
#[error("Error: {0}")]
Misc(String),
}
#[tonic::async_trait]
impl RlnProver for ProverService {
async fn send_transaction(
&self,
request: Request<SendTransactionRequest>,
) -> Result<Response<SendTransactionReply>, Status> {
debug!("send_transaction request: {:?}", request);
let req = request.into_inner();
let sender = if let Some(sender) = req.sender {
if let Ok(sender) = Address::try_from(sender.value.as_slice()) {
sender
} else {
return Err(Status::invalid_argument("Invalid sender address"));
}
} else {
return Err(Status::invalid_argument("No sender address"));
};
// Update the counter as soon as possible (should help to prevent spamming...)
let mut message_counter_guard = self.message_counters.write().await;
let counter = *message_counter_guard
.entry(sender)
.and_modify(|e| *e += 1)
.or_insert(1);
drop(message_counter_guard);
let user_id = if let Some(id) = self.registry.get(&sender) {
*id
} else {
return Err(Status::not_found("Sender not registered"));
};
let user_identity = RlnUserIdentity {
secret_hash: user_id.0,
commitment: user_id.1,
user_limit: Fr::from(self.spam_limit),
};
// Inexpensive clone (behind Arc ptr)
let rln_identifier = self.rln_identifier.clone();
// Move to a task (as generating the proof can take quite some time)
let blocking_task = tokio::task::spawn_blocking(move || {
let rln_data = RlnData {
message_id: Fr::from(counter),
// TODO: tx hash to field
data: hash_to_field(b"RLN is awesome"),
};
// FIXME: track/update epoch
let epoch = hash_to_field(b"Today at noon, this year");
// FIXME: maintain tree in Prover or query RLN Reg SC ?
// Merkle tree
let tree_height = 20;
let mut tree = PmTree::new(tree_height, Fr::from(0), Default::default())
.map_err(|e| ProofGenerationError::Misc(e.to_string()))?;
// .unwrap();
// let mut tree = OptimalMerkleTree::new(tree_height, Fr::from(0), Default::default()).unwrap();
let rate_commit = poseidon_hash(&[user_identity.commitment, user_identity.user_limit]);
tree.set(0, rate_commit)
.map_err(|e| ProofGenerationError::Misc(e.to_string()))?;
//.unwrap();
let merkle_proof = tree
.proof(0)
.map_err(|e| ProofGenerationError::Misc(e.to_string()))?;
// .unwrap();
let (proof, proof_values) = compute_rln_proof_and_values(
&user_identity,
&rln_identifier,
rln_data,
epoch,
&merkle_proof,
)
.map_err(ProofGenerationError::Proof)?;
// .unwrap(); // FIXME: no unwrap
// Serialize proof
// FIXME: proof size?
let mut output_buffer = Cursor::new(Vec::with_capacity(512));
proof
.serialize_compressed(&mut output_buffer)
.map_err(ProofGenerationError::Serialization)?;
// .unwrap();
output_buffer
.write_all(&serialize_proof_values(&proof_values))
.map_err(ProofGenerationError::SerializationWrite)?;
// .unwrap();
Ok::<Vec<u8>, ProofGenerationError>(output_buffer.into_inner())
});
let result = blocking_task.await;
if let Err(e) = result {
return Err(Status::from_error(Box::new(e)));
}
// blocking_task returns Result<Result<Vec<u8>, _>>
// Result (1st) is a JoinError (and should not happen)
// Result (2nd) is a ProofGenerationError
let _result = result.unwrap();
// TODO: broadcast proof
let reply = SendTransactionReply { result: true };
Ok(Response::new(reply))
}
async fn register_user(
&self,
_request: Request<RegisterUserRequest>,
) -> Result<Response<RegisterUserReply>, Status> {
todo!()
}
type GetProofsStream = ReceiverStream<Result<RlnProof, Status>>;
async fn get_proofs(
&self,
request: Request<RlnProofFilter>,
) -> Result<Response<Self::GetProofsStream>, Status> {
debug!("get_proofs request: {:?}", request);
// FIXME: channel size or unbounded channel?
let (tx, rx) = mpsc::channel(100);
let mut rx2 = self.broadcast_channel.0.subscribe();
tokio::spawn(async move {
while let Ok(data) = rx2.recv().await {
let rln_proof = RlnProof {
sender: "0xAA".to_string(),
id_commitment: "1".to_string(),
proof: data,
};
if let Err(e) = tx.send(Ok(rln_proof)).await {
debug!("Done: sending dummy rln proofs: {}", e);
break;
};
}
});
Ok(Response::new(ReceiverStream::new(rx)))
}
}
pub(crate) struct GrpcProverService {
addr: SocketAddr,
rln_identifier: RlnIdentifier,
// epoch_counter: Arc<AtomicI64>,
}
impl GrpcProverService {
pub(crate) fn new(
addr: SocketAddr,
rln_identifier: RlnIdentifier, /* epoch_counter: Arc<AtomicI64> */
) -> Self {
Self {
addr,
rln_identifier,
// epoch_counter,
}
}
pub(crate) async fn serve(&self) -> Result<(), AppError> {
let (tx, rx) = broadcast::channel(2);
let prover_service = ProverService {
registry: Default::default(),
rln_identifier: Arc::new(self.rln_identifier.clone()),
message_counters: Default::default(),
spam_limit: PROVER_SPAM_LIMIT,
broadcast_channel: (tx, rx),
};
let reflection_service = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(prover_proto::FILE_DESCRIPTOR_SET)
.build_v1()?;
let r = RlnProverServer::new(prover_service)
.max_decoding_message_size(PROVER_SERVICE_MESSAGE_DECODING_MAX_SIZE.as_u64() as usize)
.max_encoding_message_size(PROVER_SERVICE_MESSAGE_ENCODING_MAX_SIZE.as_u64() as usize)
// TODO: perf?
//.accept_compressed(CompressionEncoding::Gzip)
//.send_compressed(CompressionEncoding::Gzip)
;
Server::builder()
// service protection && limits
// limits: connection
.concurrency_limit_per_connection(PROVER_SERVICE_LIMIT_PER_CONNECTION)
.timeout(PROVER_SERVICE_GRPC_TIMEOUT)
// limits : http2
.max_concurrent_streams(PROVER_SERVICE_HTTP2_MAX_CONCURRENT_STREAM)
.max_frame_size(PROVER_SERVICE_HTTP2_MAX_FRAME_SIZE.as_u64() as u32)
// perf: tcp
.tcp_nodelay(true)
// No http 1
.accept_http1(false)
// services
.add_service(reflection_service)
.add_service(r)
.serve(self.addr)
.map_err(AppError::from)
.await
}
}
#[cfg(test)]
mod tests {
use crate::grpc_service::prover_proto::Address;
use prost::Message;
const MAX_ADDRESS_SIZE_BYTES: usize = 20;
#[test]
#[should_panic]
fn test_address_size_limit() {
// Check if an invalid address can be encoded (as Address grpc type)
let invalid_address = vec![0; MAX_ADDRESS_SIZE_BYTES + 1];
let addr = Address {
value: invalid_address,
};
let mut addr_encoded = vec![];
addr.encode(&mut addr_encoded).unwrap();
let _addr_decoded = Address::decode(&*addr_encoded).unwrap();
}
}

88
prover/src/main.rs Normal file
View File

@@ -0,0 +1,88 @@
mod args;
mod epoch_service;
mod error;
mod grpc_service;
mod registry;
mod registry_listener;
// std
use std::net::SocketAddr;
// third-party
use alloy::primitives::address;
// use chrono::{
// DateTime,
// Utc
// };
use clap::Parser;
use rln_proof::RlnIdentifier;
use tracing::level_filters::LevelFilter;
use tracing::{
debug,
error,
// info
};
use tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt};
// internal
use crate::args::AppArgs;
// use crate::epoch_service::EpochService;
use crate::grpc_service::GrpcProverService;
use crate::registry_listener::RegistryListener;
const RLN_IDENTIFIER_NAME: &[u8] = b"test-rln-identifier";
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let filter = EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy();
tracing_subscriber::registry()
.with(fmt::layer())
.with(filter)
.init();
let app_args = AppArgs::parse();
debug!("Arguments: {:?}", app_args);
// Smart contract
let uniswap_token_address = address!("1f9840a85d5aF5bf1D1762F925BDADdC4201F984");
let event = "Transfer(address,address,uint256)";
let registry_listener =
RegistryListener::new(app_args.rpc_url.as_str(), uniswap_token_address, event);
// Epoch
// let mut epoch_service = EpochService::new(
// Duration::from_secs(10),
// Utc::now()
// );
// grpc
let rln_identifier = RlnIdentifier::new(RLN_IDENTIFIER_NAME);
let addr = SocketAddr::new(app_args.ip, app_args.port);
debug!("Listening on: {}", addr);
let prover_service = GrpcProverService::new(
addr,
rln_identifier,
// epoch_service.current_epoch.clone()
);
let res = tokio::try_join!(
// epoch_service.listen_for_new_epoch(),
registry_listener.listen(),
prover_service.serve(),
);
match res {
// Ok((epoch, registry, prover)) => {
Ok((registry, prover)) => {
debug!("{:?}", registry);
debug!("{:?}", prover);
}
Err(e) => {
error!("{:?}", e);
}
}
Ok(())
}

49
prover/src/registry.rs Normal file
View File

@@ -0,0 +1,49 @@
use alloy::primitives::Address;
use ark_bn254::Fr;
use dashmap::DashMap;
use dashmap::mapref::one::Ref;
use rln::protocol::keygen;
#[derive(Debug)]
pub(crate) struct UserRegistry {
inner: DashMap<Address, (Fr, Fr)>,
}
impl UserRegistry {
fn new() -> Self {
Self {
inner: DashMap::new(),
}
}
pub(crate) fn get(&self, address: &Address) -> Option<Ref<Address, (Fr, Fr)>> {
self.inner.get(address)
}
fn register(&self, address: Address) {
let (identity_secret_hash, id_commitment) = keygen();
self.inner
.insert(address, (identity_secret_hash, id_commitment));
}
}
impl Default for UserRegistry {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloy::primitives::address;
#[test]
fn test_user_registration() {
let address = address!("0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f");
let reg = UserRegistry::default();
reg.register(address);
assert!(reg.get(&address).is_some());
}
}

View File

@@ -0,0 +1,64 @@
use crate::error::AppError;
use alloy::eips::BlockNumberOrTag;
use alloy::primitives::Address;
use alloy::providers::fillers::{
BlobGasFiller, ChainIdFiller, FillProvider, GasFiller, JoinFill, NonceFiller,
};
use alloy::providers::{Identity, Provider, ProviderBuilder, RootProvider, WsConnect};
use alloy::rpc::types::Filter;
use alloy::transports::{RpcError, TransportError};
use tonic::codegen::tokio_stream::StreamExt;
type AlloyWsProvider = FillProvider<
JoinFill<
Identity,
JoinFill<GasFiller, JoinFill<BlobGasFiller, JoinFill<NonceFiller, ChainIdFiller>>>,
>,
RootProvider,
>;
pub(crate) struct RegistryListener {
rpc_url: String,
sc_address: Address,
event: String,
}
impl RegistryListener {
pub(crate) fn new(rpc_url: &str, sc_address: Address, event: &str) -> Self {
Self {
rpc_url: rpc_url.to_string(),
sc_address,
event: event.to_string(),
}
}
/// Create a provider (aka connect to websocket url)
async fn setup_provider_ws(&self) -> Result<AlloyWsProvider, RpcError<TransportError>> {
let ws = WsConnect::new(self.rpc_url.as_str());
let provider = ProviderBuilder::new().connect_ws(ws).await?;
Ok(provider)
}
/// Listen to Smart Contract specified events
pub(crate) async fn listen(&self) -> Result<(), AppError> {
let provider = self.setup_provider_ws().await.map_err(AppError::from)?;
let filter = Filter::new()
.address(self.sc_address)
.event(self.event.as_str())
.from_block(BlockNumberOrTag::Latest);
// Subscribe to logs.
let sub = provider
.subscribe_logs(&filter)
.await
.map_err(AppError::from)?;
let mut stream = sub.into_stream();
while let Some(log) = stream.next().await {
println!("Uniswap token logs: {log:?}");
}
Ok(())
}
}