Add karma sc instance in grpc service

This commit is contained in:
sydhds
2025-06-06 09:20:27 +02:00
parent 4679c98270
commit bc3f3483a8
9 changed files with 60 additions and 35 deletions

1
Cargo.lock generated
View File

@@ -4434,6 +4434,7 @@ dependencies = [
"tracing",
"tracing-subscriber 0.3.19",
"tracing-test",
"url",
]
[[package]]

View File

@@ -31,6 +31,7 @@ http = "*"
async-channel = "2.3.1"
rand = "0.8.5"
derive_more = "2.0.1"
url = "2.5"
[build-dependencies]
tonic-build = "*"

View File

@@ -1,6 +1,8 @@
use std::net::IpAddr;
// third-party
use clap::Parser;
use url::Url;
use alloy::primitives::Address;
#[derive(Debug, Clone, Parser)]
#[command(about = "RLN prover service", long_about = None)]
@@ -16,8 +18,15 @@ pub struct AppArgs {
pub(crate) port: u16,
#[arg(
short = 'r',
long = "rpc_url",
long = "ws_rpc_url",
help = "Websocket rpc url (e.g. wss://eth-mainnet.g.alchemy.com/v2/your-api-key)"
)]
pub(crate) rpc_url: String,
pub(crate) ws_rpc_url: Url,
#[arg(
short = 'k',
long = "ksc",
help = "Karma smart contract address"
)]
pub(crate) ksc_address: Address,
}

View File

@@ -494,7 +494,7 @@ mod tests {
debug!("[Notified] Epoch update...");
let _v = counter.fetch_add(1, Ordering::SeqCst);
}
Ok::<(), AppErrorExt>(())
// Ok::<(), AppErrorExt>(())
}
)
.map_err(|_e| AppErrorExt::Elapsed)

View File

@@ -16,11 +16,12 @@ use tonic::{
use tonic_web::GrpcWebLayer;
use tower_http::cors::{Any, CorsLayer};
use tracing::debug;
use url::Url;
// internal
use crate::error::{AppError, ProofGenerationStringError, RegisterError};
use crate::proof_generation::{ProofGenerationData, ProofSendingData};
use crate::tier::{KarmaAmount, TierLimit, TierName};
use crate::user_db_service::{KarmaAmountExt, UserDb, UserTierInfo};
use crate::user_db_service::{UserDb, UserTierInfo};
use rln_proof::RlnIdentifier;
pub mod prover_proto {
@@ -40,6 +41,8 @@ use prover_proto::{
rln_proof_reply::Resp as GetProofsResp,
rln_prover_server::{RlnProver, RlnProverServer},
};
use crate::registry_listener::{AlloyWsProvider};
use crate::registry_listener::KarmaSC::KarmaSCInstance;
const PROVER_SERVICE_LIMIT_PER_CONNECTION: usize = 16;
// Timeout for all handlers of a request
@@ -62,6 +65,7 @@ pub struct ProverService {
broadcast::Sender<Result<ProofSendingData, ProofGenerationStringError>>,
broadcast::Receiver<Result<ProofSendingData, ProofGenerationStringError>>,
),
karma_sc: KarmaSCInstance<AlloyWsProvider>
}
#[tonic::async_trait]
@@ -201,19 +205,10 @@ impl RlnProver for ProverService {
return Err(Status::invalid_argument("No user address"));
};
// TODO: SC call
struct MockKarmaSc {}
impl KarmaAmountExt for MockKarmaSc {
type Error = alloy::contract::Error;
async fn karma_amount(&self, _address: &Address) -> Result<U256, Self::Error> {
Ok(U256::from(10))
}
}
let tier_info = self.user_db.user_tier_info(&user, MockKarmaSc {}).await;
let tier_info = self.user_db.user_tier_info::<alloy::contract::Error, KarmaSCInstance<AlloyWsProvider>>(
&user,
&self.karma_sc
).await;
match tier_info {
Ok(tier_info) => Ok(Response::new(GetUserTierInfoReply {
@@ -275,10 +270,16 @@ pub(crate) struct GrpcProverService {
pub addr: SocketAddr,
pub rln_identifier: RlnIdentifier,
pub user_db: UserDb,
pub karma_sc_info: (Url, Address),
}
impl GrpcProverService {
pub(crate) async fn serve(&self) -> Result<(), AppError> {
let karma_sc = KarmaSCInstance::try_new(
self.karma_sc_info.0.clone(), self.karma_sc_info.1)
.await?;
let prover_service = ProverService {
proof_sender: self.proof_sender.clone(),
user_db: self.user_db.clone(),
@@ -287,6 +288,7 @@ impl GrpcProverService {
self.broadcast_channel.0.clone(),
self.broadcast_channel.0.subscribe(),
),
karma_sc,
};
let reflection_service = tonic_reflection::server::Builder::configure()

View File

@@ -12,10 +12,10 @@ mod user_db_service;
// std
use std::net::SocketAddr;
use std::time::Duration;
use alloy::primitives::{address, U256};
// third-party
use chrono::{DateTime, Utc};
use clap::Parser;
use alloy::primitives::U256;
use rln_proof::RlnIdentifier;
use tokio::task::JoinSet;
use tracing::level_filters::LevelFilter;
@@ -30,7 +30,7 @@ use crate::args::AppArgs;
use crate::epoch_service::EpochService;
use crate::grpc_service::GrpcProverService;
use crate::proof_service::ProofService;
use crate::registry_listener::RegistryListener;
use crate::registry_listener::{RegistryListener};
use crate::user_db_service::{RateLimit, UserDbService};
const RLN_IDENTIFIER_NAME: &[u8] = b"test-rln-identifier";
@@ -42,6 +42,7 @@ const PROVER_MINIMAL_AMOUNT_FOR_REGISTRATION: U256 = U256::from_le_slice(10u64.t
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let filter = EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy();
@@ -66,10 +67,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Smart contract
let karma_sc_address = address!("1f9840a85d5aF5bf1D1762F925BDADdC4201F984");
// let karma_sc_address = address!("1f9840a85d5aF5bf1D1762F925BDADdC4201F984");
let registry_listener =
RegistryListener::new(app_args.rpc_url.as_str(), karma_sc_address, user_db_service.get_user_db(), PROVER_MINIMAL_AMOUNT_FOR_REGISTRATION);
RegistryListener::new(app_args.ws_rpc_url.as_str(), app_args.ksc_address, user_db_service.get_user_db(), PROVER_MINIMAL_AMOUNT_FOR_REGISTRATION);
// proof service
// FIXME: bound
let (tx, rx) = tokio::sync::broadcast::channel(2);
@@ -88,6 +89,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
addr,
rln_identifier,
user_db: user_db_service.get_user_db(),
karma_sc_info: (app_args.ws_rpc_url, app_args.ksc_address),
};
let mut set = JoinSet::new();

View File

@@ -534,7 +534,7 @@ mod tests {
);
info!("Starting...");
let res = tokio::try_join!(
let _res = tokio::try_join!(
proof_service.serve().map_err(AppErrorExt::AppError),
proof_reveal_secret(&mut broadcast_receiver),
proof_sender_2(

View File

@@ -6,12 +6,13 @@ use alloy::providers::fillers::{
use alloy::providers::{Identity, Provider, ProviderBuilder, RootProvider, WsConnect};
use alloy::transports::{RpcError, TransportError};
use alloy::{sol, sol_types::SolEvent, contract::Error as AlloyContractError};
use alloy::transports::http::reqwest::Url;
use tonic::codegen::tokio_stream::StreamExt;
use tracing::{debug, error, info};
use crate::registry_listener::KarmaSC::KarmaSCInstance;
use crate::user_db_service::{KarmaAmountExt, UserDb};
type AlloyWsProvider = FillProvider<
pub type AlloyWsProvider = FillProvider<
JoinFill<
Identity,
JoinFill<GasFiller, JoinFill<BlobGasFiller, JoinFill<NonceFiller, ChainIdFiller>>>,
@@ -29,8 +30,17 @@ sol! {
}
}
impl KarmaSCInstance<AlloyWsProvider> {
pub(crate) async fn try_new(rpc_url: Url, address: Address) -> Result<Self, RpcError<TransportError>> {
let ws = WsConnect::new(rpc_url.as_str());
let provider = ProviderBuilder::new().connect_ws(ws).await?;
Ok(KarmaSC::new(address, provider))
}
}
impl KarmaAmountExt for KarmaSCInstance<AlloyWsProvider> {
type Error = AlloyContractError;
type Error = alloy::contract::Error;
async fn karma_amount(&self, address: &Address) -> Result<U256, Self::Error> {
self.balanceOf(*address).call().await
}
@@ -79,7 +89,7 @@ impl RegistryListener {
match KarmaSC::Transfer::decode_log_data(log.data()) {
Ok(transfer_event) => {
match self.handle_transfer_event(&karma_sc, transfer_event).await {
Ok(addr) => {
info!("Registered new user: {}", addr);
@@ -170,7 +180,7 @@ mod tests {
assert!(
user_db_service.get_user_db().get_user(&ADDR_2).is_none()
);
let minimal_amount = U256::from(25);
let registry = RegistryListener {
rpc_url: "".to_string(),
@@ -189,7 +199,7 @@ mod tests {
registry.handle_transfer_event(&karma_sc, transfer)
.await
.unwrap();
assert!(
user_db_service.get_user_db().get_user(&ADDR_2).is_some()
);

View File

@@ -288,7 +288,7 @@ impl UserDb {
pub(crate) async fn user_tier_info<E: std::error::Error, KSC: KarmaAmountExt<Error = E>>(
&self,
address: &Address,
karma_sc: KSC,
karma_sc: &KSC,
) -> Result<UserTierInfo, UserTierInfoError<E>> {
if self.user_registry.has_user(address) {
let (epoch_tx_count, epoch_slice_tx_count) = self
@@ -481,7 +481,7 @@ mod tests {
// Try to update tx counter without registering first
assert_eq!(user_db.on_new_tx(&addr), None);
let tier_info = user_db.user_tier_info(&addr, MockKarmaSc {}).await;
let tier_info = user_db.user_tier_info(&addr, &MockKarmaSc {}).await;
// User is not registered -> no tier info
assert!(matches!(
tier_info,
@@ -491,7 +491,7 @@ mod tests {
user_db.user_registry.register(addr).unwrap();
// Now update user tx counter
assert_eq!(user_db.on_new_tx(&addr), Some(EpochSliceCounter(1)));
let tier_info = user_db.user_tier_info(&addr, MockKarmaSc {}).await.unwrap();
let tier_info = user_db.user_tier_info(&addr, &MockKarmaSc {}).await.unwrap();
assert_eq!(tier_info.epoch_tx_count, 1);
assert_eq!(tier_info.epoch_slice_tx_count, 1);
}
@@ -526,7 +526,7 @@ mod tests {
new_epoch_slice,
);
let addr_1_tier_info = user_db
.user_tier_info(&ADDR_1, MockKarmaSc2 {})
.user_tier_info(&ADDR_1, &MockKarmaSc2 {})
.await
.unwrap();
assert_eq!(addr_1_tier_info.epoch_tx_count, addr_1_tx_count);
@@ -534,7 +534,7 @@ mod tests {
assert_eq!(addr_1_tier_info.tier_name, Some(TierName::from("Basic")));
let addr_2_tier_info = user_db
.user_tier_info(&ADDR_2, MockKarmaSc2 {})
.user_tier_info(&ADDR_2, &MockKarmaSc2 {})
.await
.unwrap();
assert_eq!(addr_2_tier_info.epoch_tx_count, addr_2_tx_count);
@@ -556,7 +556,7 @@ mod tests {
new_epoch_slice,
);
let addr_1_tier_info = user_db
.user_tier_info(&ADDR_1, MockKarmaSc2 {})
.user_tier_info(&ADDR_1, &MockKarmaSc2 {})
.await
.unwrap();
assert_eq!(addr_1_tier_info.epoch_tx_count, 0);
@@ -564,7 +564,7 @@ mod tests {
assert_eq!(addr_1_tier_info.tier_name, Some(TierName::from("Basic")));
let addr_2_tier_info = user_db
.user_tier_info(&ADDR_2, MockKarmaSc2 {})
.user_tier_info(&ADDR_2, &MockKarmaSc2 {})
.await
.unwrap();
assert_eq!(addr_2_tier_info.epoch_tx_count, 0);