Add UserRegistry + grpc service limits

This commit is contained in:
sydhds
2025-05-12 15:09:18 +02:00
parent bba384ad9f
commit c4bb950fab
5 changed files with 166 additions and 16 deletions

41
Cargo.lock generated
View File

@@ -17,6 +17,12 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "adler2"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627"
[[package]]
name = "ahash"
version = "0.8.11"
@@ -1323,7 +1329,7 @@ dependencies = [
"cc",
"cfg-if",
"libc",
"miniz_oxide",
"miniz_oxide 0.7.4",
"object",
"rustc-demangle",
]
@@ -1448,6 +1454,12 @@ dependencies = [
"serde",
]
[[package]]
name = "bytesize"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3c8f83209414aacf0eeae3cf730b18d6981697fba62f200fcfb92b9f082acba"
[[package]]
name = "c-kzg"
version = "2.1.0"
@@ -2026,6 +2038,16 @@ version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99"
[[package]]
name = "flate2"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ced92e76e966ca2fd84c8f7aa01a4aea65b0eb6648d72f7c8f3e2764a67fece"
dependencies = [
"crc32fast",
"miniz_oxide 0.8.8",
]
[[package]]
name = "fnv"
version = "1.0.7"
@@ -2871,6 +2893,15 @@ dependencies = [
"adler",
]
[[package]]
name = "miniz_oxide"
version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3be647b768db090acb35d5ec5db2b0e1f1de11133ca123b9eacf5137868f892a"
dependencies = [
"adler2",
]
[[package]]
name = "mio"
version = "1.0.3"
@@ -4123,9 +4154,10 @@ version = "0.1.0"
dependencies = [
"alloy",
"ark-bn254",
"bytesize",
"clap",
"dashmap",
"futures",
"parking_lot 0.12.3",
"prost",
"rln",
"serde_json",
@@ -4470,6 +4502,7 @@ dependencies = [
"axum",
"base64",
"bytes",
"flate2",
"h2",
"http",
"http-body",
@@ -4505,9 +4538,9 @@ dependencies = [
[[package]]
name = "tonic-reflection"
version = "0.13.0"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88fa815be858816dad226a49439ee90b7bcf81ab55bee72fdb217f1e6778c3ca"
checksum = "f9687bd5bfeafebdded2356950f278bba8226f0b32109537c4253406e09aafe1"
dependencies = [
"prost",
"prost-types",

View File

@@ -5,20 +5,21 @@ edition = "2024"
[dependencies]
clap = { version = "4.5.37", features = ["derive"] }
tonic = "0.13"
tonic = { version = "0.13", features = ["gzip"] }
tonic-reflection = "0.13"
prost = "0.13"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
tonic-reflection = "0.13"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
tracing = "0.1.41"
tracing-test = "0.2.5"
alloy = { version = "0.15", features = ["full"] }
thiserror = "2.0"
futures = "0.3"
parking_lot = "0.12"
rln = { git = "https://github.com/vacp2p/zerokit" }
ark-bn254 = { version = "0.5", features = ["std"] }
serde_json = "1.0"
dashmap = "6.1.0"
bytesize = "2.0.1"
[build-dependencies]
tonic-build = "*"

View File

@@ -1,12 +1,12 @@
// std
use std::collections::HashMap;
use std::io::Cursor;
use std::net::SocketAddr;
use std::time::Duration;
// third-party
use alloy::primitives::Address;
use ark_bn254::Fr;
use bytesize::ByteSize;
use futures::TryFutureExt;
use parking_lot::RwLock;
use rln::{
hashers::{hash_to_field, poseidon_hash},
protocol::prepare_prove_input,
@@ -19,7 +19,12 @@ use tokio::sync::{
mpsc,
};
use tonic::{
Request, Response, Status, codegen::tokio_stream::wrappers::ReceiverStream, transport::Server,
Request,
Response,
Status,
codegen::tokio_stream::wrappers::ReceiverStream,
transport::Server,
// codec::CompressionEncoding
};
use tracing::{
debug,
@@ -27,7 +32,10 @@ use tracing::{
// info
};
// internal
use crate::error::{AppError, RegistrationError};
use crate::{
error::{AppError, RegistrationError},
registry::UserRegistry,
};
pub mod prover_proto {
@@ -43,9 +51,22 @@ use prover_proto::{
rln_prover_server::{RlnProver, RlnProverServer},
};
const PROVER_SERVICE_LIMIT_PER_CONNECTION: usize = 16;
// Timeout for all handlers of a request
const PROVER_SERVICE_GRPC_TIMEOUT: Duration = Duration::from_secs(30);
//
const PROVER_SERVICE_HTTP2_MAX_CONCURRENT_STREAM: u32 = 64;
// Http2 max frame size (e.g. 16 Kb)
const PROVER_SERVICE_HTTP2_MAX_FRAME_SIZE: ByteSize = ByteSize::kib(16);
// Max size for Message (decoding, e.g., 5 Mb)
// const PROVER_SERVICE_MESSAGE_DECODING_MAX_SIZE: usize = 1024 * 1024 * 5;
const PROVER_SERVICE_MESSAGE_DECODING_MAX_SIZE: ByteSize = ByteSize::mib(5);
// Max size for Message (encoding, e.g., 5 Mb)
const PROVER_SERVICE_MESSAGE_ENCODING_MAX_SIZE: ByteSize = ByteSize::mib(5);
#[derive(Debug)]
pub struct ProverService {
user_registered: RwLock<HashMap<Address, (Fr, Fr)>>,
registry: UserRegistry,
broadcast_channel: (Sender<Vec<u8>>, Receiver<Vec<u8>>),
}
@@ -60,9 +81,8 @@ impl RlnProver for ProverService {
let sender = req.sender;
let id = if let Some(sender) = sender {
let guard = self.user_registered.read();
if let Ok(sender_) = Address::try_from(sender.value.as_slice()) {
if let Some(id) = guard.get(&sender_) {
if let Some(id) = self.registry.get(&sender_) {
Ok(*id)
} else {
Err(RegistrationError::NotFound(sender_))
@@ -176,18 +196,64 @@ impl GrpcProverService {
pub(crate) async fn serve(&self) -> Result<(), AppError> {
let (tx, rx) = broadcast::channel(2);
let prover_service = ProverService {
user_registered: Default::default(),
registry: Default::default(),
broadcast_channel: (tx, rx),
};
let reflection_service = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(prover_proto::FILE_DESCRIPTOR_SET)
.build_v1()?;
let r = RlnProverServer::new(prover_service)
.max_decoding_message_size(PROVER_SERVICE_MESSAGE_DECODING_MAX_SIZE.as_u64() as usize)
.max_encoding_message_size(PROVER_SERVICE_MESSAGE_ENCODING_MAX_SIZE.as_u64() as usize)
// TODO: perf?
//.accept_compressed(CompressionEncoding::Gzip)
//.send_compressed(CompressionEncoding::Gzip)
;
Server::builder()
// service protection && limits
// limits: connection
.concurrency_limit_per_connection(PROVER_SERVICE_LIMIT_PER_CONNECTION)
.timeout(PROVER_SERVICE_GRPC_TIMEOUT)
// limits : http2
.max_concurrent_streams(PROVER_SERVICE_HTTP2_MAX_CONCURRENT_STREAM)
.max_frame_size(PROVER_SERVICE_HTTP2_MAX_FRAME_SIZE.as_u64() as u32)
// perf: tcp
.tcp_nodelay(true)
// No http 1
.accept_http1(false)
// services
.add_service(reflection_service)
.add_service(RlnProverServer::new(prover_service))
.add_service(r)
.serve(self.addr)
.map_err(AppError::from)
.await
}
}
#[cfg(test)]
mod tests {
use crate::grpc_service::prover_proto::Address;
use prost::Message;
// use crate::proto::prover::{Address}; // Adjust the import path as needed
const MAX_ADDRESS_SIZE_BYTES: usize = 20;
#[test]
#[should_panic]
fn test_address_size_limit() {
// Check if an invalid address can be encoded (as Address grpc type)
let invalid_address = vec![0; MAX_ADDRESS_SIZE_BYTES + 1];
let addr = Address {
value: invalid_address,
};
let mut addr_encoded = vec![];
addr.encode(&mut addr_encoded).unwrap();
let _addr_decoded = Address::decode(&*addr_encoded).unwrap();
}
}

View File

@@ -1,6 +1,7 @@
mod args;
mod error;
mod grpc_service;
mod registry;
mod registry_listener;
// std

49
src/registry.rs Normal file
View File

@@ -0,0 +1,49 @@
use alloy::primitives::Address;
use ark_bn254::Fr;
use dashmap::DashMap;
use dashmap::mapref::one::Ref;
use rln::protocol::keygen;
#[derive(Debug)]
pub(crate) struct UserRegistry {
inner: DashMap<Address, (Fr, Fr)>,
}
impl UserRegistry {
fn new() -> Self {
Self {
inner: DashMap::new(),
}
}
pub(crate) fn get(&self, address: &Address) -> Option<Ref<Address, (Fr, Fr)>> {
self.inner.get(address)
}
fn register(&self, address: Address) {
let (identity_secret_hash, id_commitment) = keygen();
self.inner
.insert(address, (identity_secret_hash, id_commitment));
}
}
impl Default for UserRegistry {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloy::primitives::address;
#[test]
fn test_user_registration() {
let address = address!("0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f");
let reg = UserRegistry::default();
reg.register(address);
assert!(reg.get(&address).is_some());
}
}