Update proto file with Tx 'estimated gas used' field (#51)

* Update proto file with Tx 'estimated gas used' field
* Update tx_counter according to tx gas quota
* Feature/user spam limit checks (#47)
  * Print proof generation error before sending it
  * Check user spam limit in grpc send_transaction endpoint
This commit is contained in:
Sydhds
2025-10-15 10:57:03 +02:00
committed by GitHub
parent d60b93416e
commit 07ea466b33
11 changed files with 345 additions and 21 deletions

View File

@@ -135,6 +135,9 @@ message SendTransactionRequest {
optional Address sender = 2;
optional U256 chainId = 3;
bytes transactionHash = 4 [(max_size) = 32];
// Estimated gas units the transaction is expected to consume (best-effort)
// This is used for the gas checking feature
uint64 estimated_gas_used = 5;
}
message SendTransactionReply {

View File

@@ -76,6 +76,7 @@ async fn proof_sender(port: u16, addresses: Vec<Address>, proof_count: usize) {
sender: Some(addr.clone()),
chain_id: Some(chain_id.clone()),
transaction_hash: tx_hash,
estimated_gas_used: 1_000,
};
let request = tonic::Request::new(request_0);
@@ -192,6 +193,7 @@ fn proof_generation_bench(c: &mut Criterion) {
rln_identifier: AppArgs::default_rln_identifier_name(),
spam_limit: 1_000_000u64,
no_grpc_reflection: true,
tx_gas_quota: AppArgs::default_tx_gas_quota(),
};
// Tokio notify - wait for some time after spawning run_prover then notify it's ready to accept

View File

@@ -53,6 +53,7 @@ async fn proof_sender(ip: IpAddr, port: u16, addresses: Vec<Address>, proof_coun
sender: Some(addr.clone()),
chain_id: Some(chain_id.clone()),
transaction_hash: tx_hash,
estimated_gas_used: 1_000,
};
let request = tonic::Request::new(request_0);
@@ -156,6 +157,7 @@ fn proof_generation_bench(c: &mut Criterion) {
rln_identifier: AppArgs::default_rln_identifier_name(),
spam_limit: 1_000_000u64,
no_grpc_reflection: true,
tx_gas_quota: AppArgs::default_tx_gas_quota(),
};
// Tokio notify - wait for some time after spawning run_prover then notify it's ready to accept

View File

@@ -1,4 +1,5 @@
use std::net::IpAddr;
use std::num::NonZeroU64;
use std::path::PathBuf;
use std::str::FromStr;
// third-party
@@ -44,6 +45,13 @@ pub const ARGS_DEFAULT_GENESIS: DateTime<Utc> = DateTime::from_timestamp(1431648
const ARGS_DEFAULT_PROVER_MINIMAL_AMOUNT_FOR_REGISTRATION: WrappedU256 =
WrappedU256(U256::from_le_slice(10u64.to_le_bytes().as_slice()));
/// Tx gas quota
///
/// Prover will receive a Tx with the field 'estimated_gas_used'.
/// If 'estimated_gas_used' <= 'tx gas quota', tx counter is increased by 1
/// If 'estimated_gas_used' <= 'tx gas quota', tx counter is increased by (estimated_gas_used / tx gas quota)
const ARGS_DEFAULT_TX_GAS_QUOTA: NonZeroU64 = NonZeroU64::new(100_000).unwrap();
#[derive(Debug, Clone, Parser, ClapConfig)]
#[command(about = "RLN prover service", long_about = None)]
pub struct AppArgs {
@@ -170,6 +178,14 @@ pub struct AppArgs {
)]
pub registration_min_amount: WrappedU256,
#[arg(
help_heading = "prover config",
long = "tx-gas-quota",
help = "Gas quota for a Tx",
default_value_t = AppArgs::default_tx_gas_quota(),
)]
pub tx_gas_quota: NonZeroU64,
// Hidden option - expect user set it via a config file
#[arg(
long = "broadcast-channel-size",
@@ -225,6 +241,10 @@ impl AppArgs {
pub fn default_rln_identifier_name() -> String {
ARGS_DEFAULT_RLN_IDENTIFIER_NAME.to_string()
}
pub fn default_tx_gas_quota() -> NonZeroU64 {
ARGS_DEFAULT_TX_GAS_QUOTA
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Display)]

View File

@@ -2,6 +2,7 @@
// std
use std::net::SocketAddr;
use std::num::NonZeroU64;
use std::sync::Arc;
use std::time::Duration;
// third-party
@@ -41,6 +42,7 @@ pub mod prover_proto {
pub(crate) const FILE_DESCRIPTOR_SET: &[u8] =
tonic::include_file_descriptor_set!("prover_descriptor");
}
use crate::user_db_types::RateLimit;
use prover_proto::{
GetUserTierInfoReply,
GetUserTierInfoRequest,
@@ -87,6 +89,8 @@ pub struct ProverService<KSC: KarmaAmountExt> {
karma_sc: KSC,
// karma_rln_sc: RLNSC,
proof_sender_channel_size: usize,
tx_gas_quota: NonZeroU64,
rate_limit: RateLimit,
}
#[tonic::async_trait]
@@ -120,8 +124,23 @@ where
return Err(Status::not_found("Sender not registered"));
};
let tx_counter_incr = if req.estimated_gas_used <= self.tx_gas_quota.get() {
None
} else {
Some(req.estimated_gas_used / self.tx_gas_quota)
};
// Update the counter as soon as possible (should help to prevent spamming...)
let counter = self.user_db.on_new_tx(&sender, None).unwrap_or_default();
let counter = self
.user_db
.on_new_tx(&sender, tx_counter_incr)
.unwrap_or_default();
if counter > self.rate_limit {
return Err(Status::resource_exhausted(
"Too many transactions sent by this user",
));
}
if req.transaction_hash.len() != PROVER_TX_HASH_BYTESIZE {
return Err(Status::invalid_argument(
@@ -297,6 +316,8 @@ pub(crate) struct GrpcProverService<P: Provider> {
pub provider: Option<P>,
pub proof_sender_channel_size: usize,
pub grpc_reflection: bool,
pub tx_gas_quota: NonZeroU64,
pub rate_limit: RateLimit,
}
impl<P: Provider + Clone + Send + Sync + 'static> GrpcProverService<P> {
@@ -319,6 +340,8 @@ impl<P: Provider + Clone + Send + Sync + 'static> GrpcProverService<P> {
),
karma_sc,
proof_sender_channel_size: self.proof_sender_channel_size,
tx_gas_quota: self.tx_gas_quota,
rate_limit: self.rate_limit,
};
let reflection_service = if self.grpc_reflection {
@@ -387,6 +410,8 @@ impl<P: Provider + Clone + Send + Sync + 'static> GrpcProverService<P> {
karma_sc: MockKarmaSc {},
// karma_rln_sc: MockKarmaRLNSc {},
proof_sender_channel_size: self.proof_sender_channel_size,
tx_gas_quota: self.tx_gas_quota,
rate_limit: self.rate_limit,
};
let reflection_service = if self.grpc_reflection {

View File

@@ -187,6 +187,8 @@ pub async fn run_prover(app_args: AppArgs) -> Result<(), AppError> {
provider: provider.clone(),
proof_sender_channel_size: app_args.proof_sender_channel_size,
grpc_reflection: !app_args.no_grpc_reflection,
tx_gas_quota: app_args.tx_gas_quota,
rate_limit: RateLimit::from(app_args.spam_limit),
};
if app_args.ws_rpc_url.is_some() {

View File

@@ -8,11 +8,7 @@ use metrics::{counter, histogram};
use parking_lot::RwLock;
use rln::hashers::hash_to_field_le;
use rln::protocol::serialize_proof_values;
use tracing::{
Instrument, // debug,
debug_span,
info,
};
use tracing::{Instrument, debug_span, error, info};
// internal
use crate::epoch_service::{Epoch, EpochSlice};
use crate::error::{AppError, ProofGenerationError, ProofGenerationStringError};
@@ -186,6 +182,13 @@ impl ProofService {
})
.map_err(ProofGenerationStringError::from);
if proof_sending_data.is_err() {
error!(
"[proof service {counter_id}] error: {:?}",
proof_sending_data
);
}
if let Err(e) = self.broadcast_sender.send(proof_sending_data) {
info!("Stopping proof generation service: {}", e);
break;
@@ -307,7 +310,7 @@ mod tests {
}
#[tokio::test]
#[tracing_test::traced_test]
// #[tracing_test::traced_test]
async fn test_proof_generation() {
// Queues
let (broadcast_sender, _broadcast_receiver) = broadcast::channel(2);

View File

@@ -806,10 +806,11 @@ mod tests {
user_db.register(addr).unwrap();
let (ec, ecs) = user_db.get_tx_counter(&addr).unwrap();
assert_eq!(ec, 0.into());
assert_eq!(ecs, 0.into());
assert_eq!(ec, 0u64.into());
assert_eq!(ecs, EpochSliceCounter::from(0u64));
let ecs_2 = user_db.incr_tx_counter(&addr, Some(42)).unwrap();
assert_eq!(ecs_2, 42.into());
assert_eq!(ecs_2, EpochSliceCounter::from(42));
}
#[tokio::test]

View File

@@ -81,5 +81,18 @@ impl From<RateLimit> for Fr {
#[derive(Debug, Default, Clone, Copy, PartialEq, From, Into, Add)]
pub(crate) struct EpochCounter(u64);
/// A Tx counter for a user in a given epoch slice
#[derive(Debug, Default, Clone, Copy, PartialEq, From, Into, Add)]
pub(crate) struct EpochSliceCounter(u64);
impl PartialEq<RateLimit> for EpochSliceCounter {
fn eq(&self, other: &RateLimit) -> bool {
self.0 == other.0
}
}
impl PartialOrd<RateLimit> for EpochSliceCounter {
fn partial_cmp(&self, other: &RateLimit) -> Option<std::cmp::Ordering> {
Some(self.0.cmp(&other.0))
}
}

View File

@@ -4,6 +4,7 @@ use parking_lot::RwLock;
use prover::{AppArgs, MockUser, run_prover};
use std::io::Write;
use std::net::{IpAddr, Ipv4Addr};
use std::num::NonZeroU64;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
@@ -18,12 +19,12 @@ pub mod prover_proto {
// Include generated code (see build.rs)
tonic::include_proto!("prover");
}
use crate::prover_proto::get_user_tier_info_reply::Resp;
use crate::prover_proto::{
Address as GrpcAddress, GetUserTierInfoReply, GetUserTierInfoRequest, RlnProofFilter,
RlnProofReply, SendTransactionReply, SendTransactionRequest, U256 as GrpcU256, Wei as GrpcWei,
rln_prover_client::RlnProverClient,
};
/*
async fn register_users(port: u16, addresses: Vec<Address>) {
let url = format!("http://127.0.0.1:{}", port);
@@ -116,13 +117,15 @@ async fn test_grpc_register_users() {
}
*/
async fn proof_sender(port: u16, addresses: Vec<Address>, proof_count: usize) {
let start = std::time::Instant::now();
#[derive(Default)]
struct TxData {
chain_id: Option<U256>,
gas_price: Option<U256>,
estimated_gas_used: Option<u64>,
}
let chain_id = GrpcU256 {
// FIXME: LE or BE?
value: U256::from(1).to_le_bytes::<32>().to_vec(),
};
async fn proof_sender(port: u16, addresses: Vec<Address>, proof_count: usize, tx_data: TxData) {
let start = std::time::Instant::now();
let url = format!("http://127.0.0.1:{port}");
let mut client = RlnProverClient::connect(url).await.unwrap();
@@ -130,11 +133,24 @@ async fn proof_sender(port: u16, addresses: Vec<Address>, proof_count: usize) {
let addr = GrpcAddress {
value: addresses[0].to_vec(),
};
let wei = GrpcWei {
// FIXME: LE or BE?
value: U256::from(1000).to_le_bytes::<32>().to_vec(),
let chain_id = GrpcU256 {
value: tx_data
.chain_id
.unwrap_or(U256::from(1))
.to_le_bytes::<32>()
.to_vec(),
};
let wei = GrpcWei {
value: tx_data
.gas_price
.unwrap_or(U256::from(1_000))
.to_le_bytes::<32>()
.to_vec(),
};
let estimated_gas_used = tx_data.estimated_gas_used.unwrap_or(1_000);
let mut count = 0;
for i in 0..proof_count {
let tx_hash = U256::from(42 + i).to_le_bytes::<32>().to_vec();
@@ -144,6 +160,7 @@ async fn proof_sender(port: u16, addresses: Vec<Address>, proof_count: usize) {
sender: Some(addr.clone()),
chain_id: Some(chain_id.clone()),
transaction_hash: tx_hash,
estimated_gas_used,
};
let request = tonic::Request::new(request_0);
@@ -258,6 +275,7 @@ async fn test_grpc_gen_proof() {
rln_identifier: AppArgs::default_rln_identifier_name(),
spam_limit: AppArgs::default_spam_limit(),
no_grpc_reflection: true,
tx_gas_quota: AppArgs::default_tx_gas_quota(),
};
info!("Starting prover with args: {:?}", app_args);
@@ -275,7 +293,7 @@ async fn test_grpc_gen_proof() {
let proof_count = 10;
let mut set = JoinSet::new();
set.spawn(
proof_sender(port, addresses.clone(), proof_count).map(|_| vec![]), // JoinSet require having the same return type
proof_sender(port, addresses.clone(), proof_count, Default::default()).map(|_| vec![]), // JoinSet require having the same return type
);
set.spawn(proof_collector(port, proof_count));
let res = set.join_all().await;
@@ -287,3 +305,230 @@ async fn test_grpc_gen_proof() {
prover_handle.abort();
tokio::time::sleep(Duration::from_secs(1)).await;
}
async fn proof_sender_2(port: u16, addresses: Vec<Address>, proof_count: usize) {
let start = std::time::Instant::now();
let chain_id = GrpcU256 {
// FIXME: LE or BE?
value: U256::from(1).to_le_bytes::<32>().to_vec(),
};
let url = format!("http://127.0.0.1:{port}");
let mut client = RlnProverClient::connect(url).await.unwrap();
let addr = GrpcAddress {
value: addresses[0].to_vec(),
};
let wei = GrpcWei {
// FIXME: LE or BE?
value: U256::from(1000).to_le_bytes::<32>().to_vec(),
};
let mut count = 0;
for i in 0..proof_count {
let tx_hash = U256::from(42 + i).to_le_bytes::<32>().to_vec();
let request_0 = SendTransactionRequest {
gas_price: Some(wei.clone()),
sender: Some(addr.clone()),
chain_id: Some(chain_id.clone()),
transaction_hash: tx_hash,
estimated_gas_used: 1_000,
};
let request = tonic::Request::new(request_0);
let response = client.send_transaction(request).await;
// assert!(response.into_inner().result);
if response.is_err() {
println!("Error sending tx: {:?}", response.err());
break;
}
count += 1;
}
println!(
"[proof_sender] sent {} tx - elapsed: {} secs",
count,
start.elapsed().as_secs_f64()
);
}
#[tokio::test]
// #[traced_test]
async fn test_grpc_user_spamming() {
let mock_users = vec![
MockUser {
address: Address::from_str("0xd8da6bf26964af9d7eed9e03e53415d37aa96045").unwrap(),
tx_count: 0,
},
MockUser {
address: Address::from_str("0xb20a608c624Ca5003905aA834De7156C68b2E1d0").unwrap(),
tx_count: 0,
},
];
let addresses: Vec<Address> = mock_users.iter().map(|u| u.address).collect();
// Write mock users to tempfile
let mock_users_as_str = serde_json::to_string(&mock_users).unwrap();
let mut temp_file = NamedTempFile::new().unwrap();
let temp_file_path = temp_file.path().to_path_buf();
temp_file.write_all(mock_users_as_str.as_bytes()).unwrap();
temp_file.flush().unwrap();
debug!(
"Mock user temp file path: {}",
temp_file_path.to_str().unwrap()
);
//
let temp_folder = tempfile::tempdir().unwrap();
let temp_folder_tree = tempfile::tempdir().unwrap();
let port = 50053;
let app_args = AppArgs {
ip: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
port,
ws_rpc_url: None,
db_path: temp_folder.path().to_path_buf(),
merkle_tree_folder: temp_folder_tree.path().to_path_buf(),
merkle_tree_count: 1,
merkle_tree_max_count: 1,
ksc_address: None,
rlnsc_address: None,
tsc_address: None,
mock_sc: Some(true),
mock_user: Some(temp_file_path),
config_path: Default::default(),
no_config: true,
metrics_ip: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
metrics_port: 30031,
broadcast_channel_size: 500,
proof_service_count: 8,
transaction_channel_size: 500,
proof_sender_channel_size: 500,
registration_min_amount: AppArgs::default_minimal_amount_for_registration(),
rln_identifier: AppArgs::default_rln_identifier_name(),
spam_limit: 3,
no_grpc_reflection: true,
tx_gas_quota: NonZeroU64::new(1_000).unwrap(),
};
info!("Starting prover with args: {:?}", app_args);
let prover_handle = task::spawn(run_prover(app_args));
// Wait for the prover to be ready
// Note: if unit test is failing - maybe add an optional notification when service is ready
tokio::time::sleep(Duration::from_secs(5)).await;
// info!("Registering some users...");
// register_users(port, addresses.clone()).await;
info!("Query info for these new users...");
let res = query_user_info(port, addresses.clone()).await;
assert_eq!(res.len(), addresses.len());
info!("Sending tx and collecting proofs...");
let proof_count = 10;
let mut set = JoinSet::new();
set.spawn(
proof_sender_2(port, addresses.clone(), proof_count).map(|_| vec![]), // JoinSet require having the same return type
);
set.spawn(proof_collector(port, 2 + 1));
let res = set.join_all().await;
println!("res lengths: {} {}", res[0].len(), res[1].len());
/*
assert_eq!(res[0].len() + res[1].len(), proof_count);
*/
info!("Aborting prover...");
prover_handle.abort();
tokio::time::sleep(Duration::from_secs(1)).await;
}
#[tokio::test]
// #[traced_test]
async fn test_grpc_tx_exceed_gas_quota() {
let mock_users = vec![
MockUser {
address: Address::from_str("0xd8da6bf26964af9d7eed9e03e53415d37aa96045").unwrap(),
tx_count: 0,
},
MockUser {
address: Address::from_str("0xb20a608c624Ca5003905aA834De7156C68b2E1d0").unwrap(),
tx_count: 0,
},
];
let addresses: Vec<Address> = mock_users.iter().map(|u| u.address).collect();
// Write mock users to tempfile
let mock_users_as_str = serde_json::to_string(&mock_users).unwrap();
let mut temp_file = NamedTempFile::new().unwrap();
let temp_file_path = temp_file.path().to_path_buf();
temp_file.write_all(mock_users_as_str.as_bytes()).unwrap();
temp_file.flush().unwrap();
debug!(
"Mock user temp file path: {}",
temp_file_path.to_str().unwrap()
);
//
let temp_folder = tempfile::tempdir().unwrap();
let temp_folder_tree = tempfile::tempdir().unwrap();
let port = 50054;
let tx_gas_quota = NonZeroU64::new(1_000).unwrap();
let app_args = AppArgs {
ip: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
port,
ws_rpc_url: None,
db_path: temp_folder.path().to_path_buf(),
merkle_tree_folder: temp_folder_tree.path().to_path_buf(),
merkle_tree_count: 1,
merkle_tree_max_count: 1,
ksc_address: None,
rlnsc_address: None,
tsc_address: None,
mock_sc: Some(true),
mock_user: Some(temp_file_path),
config_path: Default::default(),
no_config: true,
metrics_ip: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
metrics_port: 30031,
broadcast_channel_size: 500,
proof_service_count: 8,
transaction_channel_size: 500,
proof_sender_channel_size: 500,
registration_min_amount: AppArgs::default_minimal_amount_for_registration(),
rln_identifier: AppArgs::default_rln_identifier_name(),
spam_limit: AppArgs::default_spam_limit(),
no_grpc_reflection: true,
tx_gas_quota,
};
info!("Starting prover with args: {:?}", app_args);
let _prover_handle = task::spawn(run_prover(app_args));
// Wait for the prover to be ready
// Note: if unit test is failing - maybe add an optional notification when service is ready
tokio::time::sleep(Duration::from_secs(5)).await;
let quota_mult = 11;
let tx_data = TxData {
estimated_gas_used: Some(tx_gas_quota.get() * quota_mult),
..Default::default()
};
// Send a tx with 11 * the tx_gas_quota
proof_sender(port, addresses.clone(), 1, tx_data).await;
tokio::time::sleep(Duration::from_secs(5)).await;
let res = query_user_info(port, vec![addresses[0]]).await;
let resp = res[0].resp.as_ref().unwrap();
match resp {
Resp::Res(r) => {
// Check the tx counter is updated to the right value
assert_eq!(r.tx_count, quota_mult);
}
Resp::Error(e) => {
panic!("Unexpected error {:?}", e);
}
}
}

View File

@@ -60,6 +60,13 @@ pub struct SendTransactionArgs {
help = "Send an invalid tx hash"
)]
invalid_hash: bool,
#[arg(
short = 'g',
long = "tx-gas",
help = "Tx estimated gas used",
default_value = "21000"
)]
estimated_gas_used: u64,
}
#[tokio::main]
@@ -96,6 +103,7 @@ async fn main() {
sender: Some(grpc_addr),
chain_id: Some(chain_id),
transaction_hash: tx_hash,
estimated_gas_used: send_transaction_args.estimated_gas_used,
};
let request = tonic::Request::new(request_0);
let response: Response<SendTransactionReply> =