mirror of
https://github.com/vacp2p/status-rln-prover.git
synced 2026-01-08 21:18:05 -05:00
Move all code related to smart contracts into lib smart_contract (#7)
* Move all code related to smart contracts into lib smart_contract * Can start the prover with mocked smart contract * Add KarmaTier integration * Filter & validate initial tier limits
This commit is contained in:
@@ -13,7 +13,7 @@ tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
|
||||
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
|
||||
tracing = "0.1.41"
|
||||
tracing-test = "0.2.5"
|
||||
alloy = { version = "1.0", features = ["getrandom", "sol-types", "contract", "provider-ws"] }
|
||||
alloy.workspace = true
|
||||
thiserror = "2.0"
|
||||
futures = "0.3"
|
||||
rln = { git = "https://github.com/vacp2p/zerokit", default-features = false }
|
||||
@@ -28,10 +28,14 @@ tower-http = { version = "0.6.4", features = ["cors"] }
|
||||
http = "*"
|
||||
async-channel = "2.3.1"
|
||||
rand = "0.8.5"
|
||||
derive_more = "2.0.1"
|
||||
url = "2.5"
|
||||
derive_more.workspace = true
|
||||
url.workspace = true
|
||||
num-bigint = "0.4"
|
||||
async-trait.workspace = true
|
||||
serde = { version="1", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
rln_proof = { path = "../rln_proof" }
|
||||
smart_contract = { path = "../smart_contract" }
|
||||
|
||||
[build-dependencies]
|
||||
tonic-build = "*"
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use std::net::IpAddr;
|
||||
use std::path::PathBuf;
|
||||
// third-party
|
||||
use alloy::primitives::Address;
|
||||
use clap::Parser;
|
||||
@@ -18,12 +19,22 @@ pub struct AppArgs {
|
||||
pub(crate) port: u16,
|
||||
#[arg(
|
||||
short = 'u',
|
||||
long = "ws_rpc_url",
|
||||
long = "ws-rpc-url",
|
||||
help = "Websocket rpc url (e.g. wss://eth-mainnet.g.alchemy.com/v2/your-api-key)"
|
||||
)]
|
||||
pub(crate) ws_rpc_url: Url,
|
||||
pub(crate) ws_rpc_url: Option<Url>,
|
||||
#[arg(short = 'k', long = "ksc", help = "Karma smart contract address")]
|
||||
pub(crate) ksc_address: Address,
|
||||
pub(crate) ksc_address: Option<Address>,
|
||||
#[arg(short = 'r', long = "rlnsc", help = "RLN smart contract address")]
|
||||
pub(crate) rlnsc_address: Address,
|
||||
pub(crate) rlnsc_address: Option<Address>,
|
||||
#[arg(short = 't', long = "tsc", help = "KarmaTiers smart contract address")]
|
||||
pub(crate) tsc_address: Option<Address>,
|
||||
#[arg(long = "mock-sc", help = "Test only - mock smart contracts", action)]
|
||||
pub(crate) mock_sc: Option<bool>,
|
||||
#[arg(
|
||||
long = "mock-user",
|
||||
help = "Test only - register user (requite --mock-sc to be enabled)",
|
||||
action
|
||||
)]
|
||||
pub(crate) mock_user: Option<PathBuf>,
|
||||
}
|
||||
|
||||
@@ -12,14 +12,16 @@ pub enum AppError {
|
||||
Tonic(#[from] tonic::transport::Error),
|
||||
#[error("Tonic reflection (grpc) error: {0}")]
|
||||
TonicReflection(#[from] tonic_reflection::server::Error),
|
||||
#[error("SC error 1: {0}")]
|
||||
Alloy(#[from] RpcError<RpcError<TransportErrorKind>>),
|
||||
#[error("SC error 2: {0}")]
|
||||
Alloy2(#[from] RpcError<TransportErrorKind>),
|
||||
#[error("Rpc error 1: {0}")]
|
||||
RpcError(#[from] RpcError<RpcError<TransportErrorKind>>),
|
||||
#[error("Rpc transport error 2: {0}")]
|
||||
RpcTransportError(#[from] RpcError<TransportErrorKind>),
|
||||
#[error("Epoch service error: {0}")]
|
||||
EpochError(#[from] WaitUntilError),
|
||||
#[error(transparent)]
|
||||
RegistryError(#[from] HandleTransferError),
|
||||
#[error(transparent)]
|
||||
ContractError(#[from] alloy::contract::Error),
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
// std
|
||||
use std::collections::BTreeMap;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
@@ -20,13 +19,17 @@ use tracing::debug;
|
||||
use url::Url;
|
||||
// internal
|
||||
use crate::error::{AppError, ProofGenerationStringError, RegisterError};
|
||||
use crate::karma_sc::KarmaSC::KarmaSCInstance;
|
||||
use crate::proof_generation::{ProofGenerationData, ProofSendingData};
|
||||
use crate::rln_sc::KarmaRLNSC::KarmaRLNSCInstance;
|
||||
use crate::sc::AlloyWsProvider;
|
||||
use crate::tier::{KarmaAmount, TierLimit, TierName};
|
||||
use crate::user_db_service::{UserDb, UserTierInfo};
|
||||
use rln_proof::RlnIdentifier;
|
||||
use smart_contract::{
|
||||
KarmaAmountExt,
|
||||
KarmaRLNSC::KarmaRLNSCInstance,
|
||||
KarmaSC::KarmaSCInstance,
|
||||
MockKarmaRLNSc,
|
||||
MockKarmaSc,
|
||||
RLNRegister, // traits
|
||||
};
|
||||
|
||||
pub mod prover_proto {
|
||||
|
||||
@@ -37,9 +40,19 @@ pub mod prover_proto {
|
||||
tonic::include_file_descriptor_set!("prover_descriptor");
|
||||
}
|
||||
use prover_proto::{
|
||||
GetUserTierInfoReply, GetUserTierInfoRequest, RegisterUserReply, RegisterUserRequest,
|
||||
RegistrationStatus, RlnProof, RlnProofFilter, RlnProofReply, SendTransactionReply,
|
||||
SendTransactionRequest, SetTierLimitsReply, SetTierLimitsRequest, Tier, UserTierInfoError,
|
||||
GetUserTierInfoReply,
|
||||
GetUserTierInfoRequest,
|
||||
RegisterUserReply,
|
||||
RegisterUserRequest,
|
||||
RegistrationStatus,
|
||||
RlnProof,
|
||||
RlnProofFilter,
|
||||
RlnProofReply,
|
||||
SendTransactionReply,
|
||||
SendTransactionRequest,
|
||||
// SetTierLimitsReply, SetTierLimitsRequest,
|
||||
Tier,
|
||||
UserTierInfoError,
|
||||
UserTierInfoResult,
|
||||
get_user_tier_info_reply::Resp,
|
||||
rln_proof_reply::Resp as GetProofsResp,
|
||||
@@ -61,7 +74,7 @@ const PROVER_SERVICE_MESSAGE_ENCODING_MAX_SIZE: ByteSize = ByteSize::mib(5);
|
||||
const PROVER_TX_HASH_BYTESIZE: usize = 32;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ProverService {
|
||||
pub struct ProverService<KSC: KarmaAmountExt, RLNSC: RLNRegister> {
|
||||
proof_sender: Sender<ProofGenerationData>,
|
||||
user_db: UserDb,
|
||||
rln_identifier: Arc<RlnIdentifier>,
|
||||
@@ -69,12 +82,18 @@ pub struct ProverService {
|
||||
broadcast::Sender<Result<ProofSendingData, ProofGenerationStringError>>,
|
||||
broadcast::Receiver<Result<ProofSendingData, ProofGenerationStringError>>,
|
||||
),
|
||||
karma_sc: KarmaSCInstance<AlloyWsProvider>,
|
||||
karma_rln_sc: KarmaRLNSCInstance<AlloyWsProvider>,
|
||||
karma_sc: KSC,
|
||||
karma_rln_sc: RLNSC,
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl RlnProver for ProverService {
|
||||
impl<KSC, RLNSC> RlnProver for ProverService<KSC, RLNSC>
|
||||
where
|
||||
KSC: KarmaAmountExt + Send + Sync + 'static,
|
||||
KSC::Error: std::error::Error + Send + Sync + 'static,
|
||||
RLNSC: RLNRegister + Send + Sync + 'static,
|
||||
RLNSC::Error: std::error::Error + Send + Sync + 'static,
|
||||
{
|
||||
async fn send_transaction(
|
||||
&self,
|
||||
request: Request<SendTransactionRequest>,
|
||||
@@ -99,7 +118,7 @@ impl RlnProver for ProverService {
|
||||
};
|
||||
|
||||
// Update the counter as soon as possible (should help to prevent spamming...)
|
||||
let counter = self.user_db.on_new_tx(&sender).unwrap_or_default();
|
||||
let counter = self.user_db.on_new_tx(&sender, None).unwrap_or_default();
|
||||
|
||||
// FIXME: hardcoded
|
||||
if req.transaction_hash.len() != PROVER_TX_HASH_BYTESIZE {
|
||||
@@ -156,7 +175,6 @@ impl RlnProver for ProverService {
|
||||
// TODO: on error, remove from user_db?
|
||||
self.karma_rln_sc
|
||||
.register(id_co)
|
||||
.call()
|
||||
.await
|
||||
.map_err(|e| Status::from_error(Box::new(e)))?;
|
||||
|
||||
@@ -223,13 +241,7 @@ impl RlnProver for ProverService {
|
||||
return Err(Status::invalid_argument("No user address"));
|
||||
};
|
||||
|
||||
let tier_info = self
|
||||
.user_db
|
||||
.user_tier_info::<alloy::contract::Error, KarmaSCInstance<AlloyWsProvider>>(
|
||||
&user,
|
||||
&self.karma_sc,
|
||||
)
|
||||
.await;
|
||||
let tier_info = self.user_db.user_tier_info(&user, &self.karma_sc).await;
|
||||
|
||||
match tier_info {
|
||||
Ok(tier_info) => Ok(Response::new(GetUserTierInfoReply {
|
||||
@@ -241,6 +253,7 @@ impl RlnProver for ProverService {
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
async fn set_tier_limits(
|
||||
&self,
|
||||
request: Request<SetTierLimitsRequest>,
|
||||
@@ -280,6 +293,7 @@ impl RlnProver for ProverService {
|
||||
};
|
||||
Ok(Response::new(reply))
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
pub(crate) struct GrpcProverService {
|
||||
@@ -291,16 +305,22 @@ pub(crate) struct GrpcProverService {
|
||||
pub addr: SocketAddr,
|
||||
pub rln_identifier: RlnIdentifier,
|
||||
pub user_db: UserDb,
|
||||
pub karma_sc_info: (Url, Address),
|
||||
pub rln_sc_info: (Url, Address),
|
||||
pub karma_sc_info: Option<(Url, Address)>,
|
||||
pub rln_sc_info: Option<(Url, Address)>,
|
||||
}
|
||||
|
||||
impl GrpcProverService {
|
||||
pub(crate) async fn serve(&self) -> Result<(), AppError> {
|
||||
let karma_sc =
|
||||
KarmaSCInstance::try_new(self.karma_sc_info.0.clone(), self.karma_sc_info.1).await?;
|
||||
let karma_rln_sc =
|
||||
KarmaRLNSCInstance::try_new(self.rln_sc_info.0.clone(), self.rln_sc_info.1).await?;
|
||||
let karma_sc = if let Some(karma_sc_info) = self.karma_sc_info.as_ref() {
|
||||
KarmaSCInstance::try_new(karma_sc_info.0.clone(), karma_sc_info.1).await?
|
||||
} else {
|
||||
panic!("Please provide karma_sc_info or use serve_with_mock");
|
||||
};
|
||||
let karma_rln_sc = if let Some(rln_sc_info) = self.rln_sc_info.as_ref() {
|
||||
KarmaRLNSCInstance::try_new(rln_sc_info.0.clone(), rln_sc_info.1).await?
|
||||
} else {
|
||||
panic!("Please provide rln_sc_info or use serve_with_mock");
|
||||
};
|
||||
|
||||
let prover_service = ProverService {
|
||||
proof_sender: self.proof_sender.clone(),
|
||||
@@ -361,6 +381,67 @@ impl GrpcProverService {
|
||||
.map_err(AppError::from)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn serve_with_mock(&self) -> Result<(), AppError> {
|
||||
let prover_service = ProverService {
|
||||
proof_sender: self.proof_sender.clone(),
|
||||
user_db: self.user_db.clone(),
|
||||
rln_identifier: Arc::new(self.rln_identifier.clone()),
|
||||
broadcast_channel: (
|
||||
self.broadcast_channel.0.clone(),
|
||||
self.broadcast_channel.0.subscribe(),
|
||||
),
|
||||
karma_sc: MockKarmaSc {},
|
||||
karma_rln_sc: MockKarmaRLNSc {},
|
||||
};
|
||||
|
||||
let reflection_service = tonic_reflection::server::Builder::configure()
|
||||
.register_encoded_file_descriptor_set(prover_proto::FILE_DESCRIPTOR_SET)
|
||||
.build_v1()?;
|
||||
|
||||
let r = RlnProverServer::new(prover_service)
|
||||
.max_decoding_message_size(PROVER_SERVICE_MESSAGE_DECODING_MAX_SIZE.as_u64() as usize)
|
||||
.max_encoding_message_size(PROVER_SERVICE_MESSAGE_ENCODING_MAX_SIZE.as_u64() as usize)
|
||||
// TODO: perf?
|
||||
//.accept_compressed(CompressionEncoding::Gzip)
|
||||
//.send_compressed(CompressionEncoding::Gzip)
|
||||
;
|
||||
|
||||
// CORS
|
||||
let cors = CorsLayer::new()
|
||||
// Allow `GET`, `POST` and `OPTIONS` when accessing the resource
|
||||
.allow_methods([
|
||||
Method::GET,
|
||||
// http POST && OPTIONS not required for grpc-web
|
||||
// Method::POST,
|
||||
// Method::OPTIONS
|
||||
])
|
||||
// Allow requests from any origin
|
||||
// FIXME: config?
|
||||
.allow_origin(Any)
|
||||
.allow_headers(Any);
|
||||
|
||||
Server::builder()
|
||||
// service protection && limits
|
||||
// limits: connection
|
||||
.concurrency_limit_per_connection(PROVER_SERVICE_LIMIT_PER_CONNECTION)
|
||||
.timeout(PROVER_SERVICE_GRPC_TIMEOUT)
|
||||
// limits : http2
|
||||
.max_concurrent_streams(PROVER_SERVICE_HTTP2_MAX_CONCURRENT_STREAM)
|
||||
.max_frame_size(PROVER_SERVICE_HTTP2_MAX_FRAME_SIZE.as_u64() as u32)
|
||||
// perf: tcp
|
||||
.tcp_nodelay(true)
|
||||
// http 1 layer required for GrpcWebLayer
|
||||
.accept_http1(true)
|
||||
// services
|
||||
.layer(cors)
|
||||
.layer(GrpcWebLayer::new())
|
||||
.add_service(reflection_service)
|
||||
.add_service(r)
|
||||
.serve(self.addr)
|
||||
.map_err(AppError::from)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
/// UserTierInfo to UserTierInfoResult (Grpc message) conversion
|
||||
|
||||
@@ -1,45 +0,0 @@
|
||||
// third-party
|
||||
use alloy::{
|
||||
primitives::{Address, U256},
|
||||
providers::{ProviderBuilder, WsConnect},
|
||||
sol,
|
||||
transports::{RpcError, TransportError},
|
||||
};
|
||||
use url::Url;
|
||||
// internal
|
||||
use crate::sc::AlloyWsProvider;
|
||||
|
||||
pub trait KarmaAmountExt {
|
||||
type Error;
|
||||
|
||||
async fn karma_amount(&self, address: &Address) -> Result<U256, Self::Error>;
|
||||
}
|
||||
|
||||
sol! {
|
||||
// https://github.com/vacp2p/staking-reward-streamer/pull/220
|
||||
#[sol(rpc)]
|
||||
contract KarmaSC {
|
||||
// From: https://github.com/OpenZeppelin/openzeppelin-contracts/blob/master/contracts/token/ERC20/IERC20.sol#L16
|
||||
event Transfer(address indexed from, address indexed to, uint256 value);
|
||||
|
||||
function balanceOf(address account) public view override returns (uint256);
|
||||
}
|
||||
}
|
||||
|
||||
impl KarmaSC::KarmaSCInstance<AlloyWsProvider> {
|
||||
pub(crate) async fn try_new(
|
||||
rpc_url: Url,
|
||||
address: Address,
|
||||
) -> Result<Self, RpcError<TransportError>> {
|
||||
let ws = WsConnect::new(rpc_url.as_str());
|
||||
let provider = ProviderBuilder::new().connect_ws(ws).await?;
|
||||
Ok(KarmaSC::new(address, provider))
|
||||
}
|
||||
}
|
||||
|
||||
impl KarmaAmountExt for KarmaSC::KarmaSCInstance<AlloyWsProvider> {
|
||||
type Error = alloy::contract::Error;
|
||||
async fn karma_amount(&self, address: &Address) -> Result<U256, Self::Error> {
|
||||
self.balanceOf(*address).call().await
|
||||
}
|
||||
}
|
||||
@@ -3,13 +3,12 @@ mod args;
|
||||
mod epoch_service;
|
||||
mod error;
|
||||
mod grpc_service;
|
||||
mod karma_sc;
|
||||
mod mock;
|
||||
mod proof_generation;
|
||||
mod proof_service;
|
||||
mod registry_listener;
|
||||
mod rln_sc;
|
||||
mod sc;
|
||||
mod tier;
|
||||
mod tiers_listener;
|
||||
mod user_db_service;
|
||||
|
||||
// std
|
||||
@@ -20,6 +19,8 @@ use alloy::primitives::U256;
|
||||
use chrono::{DateTime, Utc};
|
||||
use clap::Parser;
|
||||
use rln_proof::RlnIdentifier;
|
||||
use smart_contract::KarmaTiersSC::KarmaTiersSCInstance;
|
||||
use smart_contract::TIER_LIMITS;
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::level_filters::LevelFilter;
|
||||
use tracing::{
|
||||
@@ -32,8 +33,11 @@ use tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt, util::SubscriberI
|
||||
use crate::args::AppArgs;
|
||||
use crate::epoch_service::EpochService;
|
||||
use crate::grpc_service::GrpcProverService;
|
||||
use crate::mock::read_mock_user;
|
||||
use crate::proof_service::ProofService;
|
||||
use crate::registry_listener::RegistryListener;
|
||||
use crate::tier::TierLimits;
|
||||
use crate::tiers_listener::TiersListener;
|
||||
use crate::user_db_service::{RateLimit, UserDbService};
|
||||
|
||||
const RLN_IDENTIFIER_NAME: &[u8] = b"test-rln-identifier";
|
||||
@@ -56,25 +60,86 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let app_args = AppArgs::parse();
|
||||
debug!("Arguments: {:?}", app_args);
|
||||
|
||||
// Application cli arguments checks
|
||||
if app_args.ws_rpc_url.is_some() {
|
||||
if app_args.ksc_address.is_none()
|
||||
|| app_args.ksc_address.is_none()
|
||||
|| app_args.tsc_address.is_none()
|
||||
{
|
||||
return Err("Please provide smart contract addresses".into());
|
||||
}
|
||||
} else if app_args.mock_sc.is_none() {
|
||||
return Err("Please provide rpc url (--ws-rpc-url) or mock (--mock-sc)".into());
|
||||
}
|
||||
|
||||
// Epoch
|
||||
let epoch_service = EpochService::try_from((Duration::from_secs(60 * 2), GENESIS))
|
||||
.expect("Failed to create epoch service");
|
||||
|
||||
let mut tier_limits = if app_args.ws_rpc_url.is_some() {
|
||||
TierLimits::from(
|
||||
KarmaTiersSCInstance::get_tiers(
|
||||
app_args.ws_rpc_url.clone().unwrap(),
|
||||
app_args.tsc_address.unwrap(),
|
||||
)
|
||||
.await?,
|
||||
)
|
||||
} else {
|
||||
// mock
|
||||
debug!("Mock - will use tier limits: {:#?}", TIER_LIMITS);
|
||||
TierLimits::from(TIER_LIMITS.clone())
|
||||
};
|
||||
|
||||
tier_limits.filter_inactive();
|
||||
tier_limits.validate()?;
|
||||
|
||||
// User db service
|
||||
let user_db_service = UserDbService::new(
|
||||
epoch_service.epoch_changes.clone(),
|
||||
epoch_service.current_epoch.clone(),
|
||||
PROVER_SPAM_LIMIT,
|
||||
tier_limits,
|
||||
);
|
||||
|
||||
if app_args.mock_sc.is_some() {
|
||||
if let Some(user_filepath) = app_args.mock_user.as_ref() {
|
||||
let mock_users = read_mock_user(user_filepath).unwrap();
|
||||
debug!("Mock - will register {} users", mock_users.len());
|
||||
mock_users.into_iter().for_each(|mock_user| {
|
||||
debug!(
|
||||
"Registering user address: {} - tx count: {}",
|
||||
mock_user.address, mock_user.tx_count
|
||||
);
|
||||
let user_db = user_db_service.get_user_db();
|
||||
user_db.on_new_user(mock_user.address).unwrap();
|
||||
user_db
|
||||
.on_new_tx(&mock_user.address, Some(mock_user.tx_count))
|
||||
.unwrap();
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Smart contract
|
||||
// let karma_sc_address = address!("1f9840a85d5aF5bf1D1762F925BDADdC4201F984");
|
||||
let registry_listener = RegistryListener::new(
|
||||
app_args.ws_rpc_url.as_str(),
|
||||
app_args.ksc_address,
|
||||
user_db_service.get_user_db(),
|
||||
PROVER_MINIMAL_AMOUNT_FOR_REGISTRATION,
|
||||
);
|
||||
let registry_listener = if app_args.mock_sc.is_some() {
|
||||
None
|
||||
} else {
|
||||
Some(RegistryListener::new(
|
||||
app_args.ws_rpc_url.clone().unwrap().as_str(),
|
||||
app_args.ksc_address.unwrap(),
|
||||
user_db_service.get_user_db(),
|
||||
PROVER_MINIMAL_AMOUNT_FOR_REGISTRATION,
|
||||
))
|
||||
};
|
||||
|
||||
let tiers_listener = if app_args.mock_sc.is_some() {
|
||||
None
|
||||
} else {
|
||||
Some(TiersListener::new(
|
||||
app_args.ws_rpc_url.clone().unwrap().as_str(),
|
||||
app_args.tsc_address.unwrap(),
|
||||
user_db_service.get_user_db(),
|
||||
))
|
||||
};
|
||||
|
||||
// proof service
|
||||
// FIXME: bound
|
||||
@@ -88,14 +153,23 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let addr = SocketAddr::new(app_args.ip, app_args.port);
|
||||
debug!("Listening on: {}", addr);
|
||||
// TODO: broadcast subscribe?
|
||||
let prover_grpc_service = GrpcProverService {
|
||||
proof_sender,
|
||||
broadcast_channel: (tx.clone(), rx),
|
||||
addr,
|
||||
rln_identifier,
|
||||
user_db: user_db_service.get_user_db(),
|
||||
karma_sc_info: (app_args.ws_rpc_url.clone(), app_args.ksc_address),
|
||||
rln_sc_info: (app_args.ws_rpc_url, app_args.rlnsc_address),
|
||||
let prover_grpc_service = {
|
||||
let mut service = GrpcProverService {
|
||||
proof_sender,
|
||||
broadcast_channel: (tx.clone(), rx),
|
||||
addr,
|
||||
rln_identifier,
|
||||
user_db: user_db_service.get_user_db(),
|
||||
karma_sc_info: None,
|
||||
rln_sc_info: None,
|
||||
};
|
||||
|
||||
if app_args.ws_rpc_url.is_some() {
|
||||
let ws_rpc_url = app_args.ws_rpc_url.clone().unwrap();
|
||||
service.karma_sc_info = Some((ws_rpc_url.clone(), app_args.ksc_address.unwrap()));
|
||||
service.rln_sc_info = Some((ws_rpc_url, app_args.rlnsc_address.unwrap()));
|
||||
}
|
||||
service
|
||||
};
|
||||
|
||||
let mut set = JoinSet::new();
|
||||
@@ -116,10 +190,21 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
proof_service.serve().await
|
||||
});
|
||||
}
|
||||
set.spawn(async move { registry_listener.listen().await });
|
||||
|
||||
if registry_listener.is_some() {
|
||||
set.spawn(async move { registry_listener.unwrap().listen().await });
|
||||
}
|
||||
if tiers_listener.is_some() {
|
||||
set.spawn(async move { tiers_listener.unwrap().listen().await });
|
||||
}
|
||||
set.spawn(async move { epoch_service.listen_for_new_epoch().await });
|
||||
set.spawn(async move { user_db_service.listen_for_epoch_changes().await });
|
||||
set.spawn(async move { prover_grpc_service.serve().await });
|
||||
if app_args.ws_rpc_url.is_some() {
|
||||
set.spawn(async move { prover_grpc_service.serve().await });
|
||||
} else {
|
||||
debug!("Grpc service started with mocked smart contracts");
|
||||
set.spawn(async move { prover_grpc_service.serve_with_mock().await });
|
||||
}
|
||||
|
||||
let _ = set.join_all().await;
|
||||
Ok(())
|
||||
|
||||
24
prover/src/mock.rs
Normal file
24
prover/src/mock.rs
Normal file
@@ -0,0 +1,24 @@
|
||||
use std::path::PathBuf;
|
||||
// third-party
|
||||
use alloy::primitives::Address;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct MockUser {
|
||||
pub address: Address,
|
||||
pub tx_count: u64,
|
||||
}
|
||||
|
||||
pub fn read_mock_user(path: &PathBuf) -> Result<Vec<MockUser>, MockUserError> {
|
||||
let f = std::fs::File::open(path)?;
|
||||
let users: Vec<MockUser> = serde_json::from_reader(f)?;
|
||||
Ok(users)
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum MockUserError {
|
||||
#[error("transparent")]
|
||||
IOError(#[from] std::io::Error),
|
||||
#[error("transparent")]
|
||||
JsonError(#[from] serde_json::Error),
|
||||
}
|
||||
@@ -146,11 +146,13 @@ mod tests {
|
||||
use ark_serialize::CanonicalDeserialize;
|
||||
use claims::assert_matches;
|
||||
use futures::TryFutureExt;
|
||||
use rln::circuit::{Curve, zkey_from_folder};
|
||||
use tokio::sync::broadcast;
|
||||
use tracing::info;
|
||||
// third-party: zerokit
|
||||
use rln::protocol::{compute_id_secret, deserialize_proof_values, verify_proof};
|
||||
use rln::{
|
||||
circuit::{Curve, zkey_from_folder},
|
||||
protocol::{compute_id_secret, deserialize_proof_values, verify_proof},
|
||||
};
|
||||
// internal
|
||||
use crate::user_db_service::UserDbService;
|
||||
use rln_proof::RlnIdentifier;
|
||||
@@ -254,8 +256,12 @@ mod tests {
|
||||
let epoch_store = Arc::new(RwLock::new((epoch, epoch_slice)));
|
||||
|
||||
// User db
|
||||
let user_db_service =
|
||||
UserDbService::new(Default::default(), epoch_store.clone(), 10.into());
|
||||
let user_db_service = UserDbService::new(
|
||||
Default::default(),
|
||||
epoch_store.clone(),
|
||||
10.into(),
|
||||
Default::default(),
|
||||
);
|
||||
let user_db = user_db_service.get_user_db();
|
||||
user_db.on_new_user(ADDR_1).unwrap();
|
||||
user_db.on_new_user(ADDR_2).unwrap();
|
||||
@@ -302,8 +308,12 @@ mod tests {
|
||||
let epoch_store = Arc::new(RwLock::new((epoch, epoch_slice)));
|
||||
|
||||
// User db
|
||||
let user_db_service =
|
||||
UserDbService::new(Default::default(), epoch_store.clone(), 10.into());
|
||||
let user_db_service = UserDbService::new(
|
||||
Default::default(),
|
||||
epoch_store.clone(),
|
||||
10.into(),
|
||||
Default::default(),
|
||||
);
|
||||
let user_db = user_db_service.get_user_db();
|
||||
user_db.on_new_user(ADDR_1).unwrap();
|
||||
// user_db.on_new_user(ADDR_2).unwrap();
|
||||
@@ -453,8 +463,12 @@ mod tests {
|
||||
let rate_limit = RateLimit::from(1);
|
||||
|
||||
// User db
|
||||
let user_db_service =
|
||||
UserDbService::new(Default::default(), epoch_store.clone(), rate_limit);
|
||||
let user_db_service = UserDbService::new(
|
||||
Default::default(),
|
||||
epoch_store.clone(),
|
||||
rate_limit,
|
||||
Default::default(),
|
||||
);
|
||||
let user_db = user_db_service.get_user_db();
|
||||
user_db.on_new_user(ADDR_1).unwrap();
|
||||
let user_addr_1 = user_db.get_user(&ADDR_1).unwrap();
|
||||
@@ -514,8 +528,12 @@ mod tests {
|
||||
let rate_limit = RateLimit::from(1);
|
||||
|
||||
// User db - limit is 1 message per epoch
|
||||
let user_db_service =
|
||||
UserDbService::new(Default::default(), epoch_store.clone(), rate_limit.into());
|
||||
let user_db_service = UserDbService::new(
|
||||
Default::default(),
|
||||
epoch_store.clone(),
|
||||
rate_limit.into(),
|
||||
Default::default(),
|
||||
);
|
||||
let user_db = user_db_service.get_user_db();
|
||||
user_db.on_new_user(ADDR_1).unwrap();
|
||||
let user_addr_1 = user_db.get_user(&ADDR_1).unwrap();
|
||||
|
||||
@@ -10,9 +10,8 @@ use tonic::codegen::tokio_stream::StreamExt;
|
||||
use tracing::{debug, error, info};
|
||||
// internal
|
||||
use crate::error::{AppError, HandleTransferError, RegisterError};
|
||||
use crate::karma_sc::{KarmaAmountExt, KarmaSC};
|
||||
use crate::sc::AlloyWsProvider;
|
||||
use crate::user_db_service::UserDb;
|
||||
use smart_contract::{AlloyWsProvider, KarmaAmountExt, KarmaSC};
|
||||
|
||||
pub(crate) struct RegistryListener {
|
||||
rpc_url: String,
|
||||
@@ -125,18 +124,22 @@ impl RegistryListener {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::epoch_service::{Epoch, EpochSlice};
|
||||
use alloy::primitives::address;
|
||||
use parking_lot::RwLock;
|
||||
use std::sync::Arc;
|
||||
// use crate::tier::TIER_LIMITS;
|
||||
use super::*;
|
||||
// std
|
||||
use std::sync::Arc;
|
||||
// third-party
|
||||
use alloy::primitives::address;
|
||||
use async_trait::async_trait;
|
||||
use parking_lot::RwLock;
|
||||
// internal
|
||||
use crate::epoch_service::{Epoch, EpochSlice};
|
||||
use crate::user_db_service::UserDbService;
|
||||
|
||||
// const ADDR_1: Address = address!("0xd8da6bf26964af9d7eed9e03e53415d37aa96045");
|
||||
const ADDR_2: Address = address!("0xb20a608c624Ca5003905aA834De7156C68b2E1d0");
|
||||
struct MockKarmaSc {}
|
||||
|
||||
#[async_trait]
|
||||
impl KarmaAmountExt for MockKarmaSc {
|
||||
type Error = AlloyContractError;
|
||||
async fn karma_amount(&self, _address: &Address) -> Result<U256, Self::Error> {
|
||||
@@ -149,7 +152,12 @@ mod tests {
|
||||
let epoch = Epoch::from(11);
|
||||
let epoch_slice = EpochSlice::from(42);
|
||||
let epoch_store = Arc::new(RwLock::new((epoch, epoch_slice)));
|
||||
let user_db_service = UserDbService::new(Default::default(), epoch_store, 10.into());
|
||||
let user_db_service = UserDbService::new(
|
||||
Default::default(),
|
||||
epoch_store,
|
||||
10.into(),
|
||||
Default::default(),
|
||||
);
|
||||
let user_db = user_db_service.get_user_db();
|
||||
|
||||
assert!(user_db_service.get_user_db().get_user(&ADDR_2).is_none());
|
||||
|
||||
@@ -1,29 +0,0 @@
|
||||
// third-party
|
||||
use alloy::{
|
||||
primitives::Address,
|
||||
providers::{ProviderBuilder, WsConnect},
|
||||
sol,
|
||||
transports::{RpcError, TransportError},
|
||||
};
|
||||
use url::Url;
|
||||
// internal
|
||||
use crate::sc::AlloyWsProvider;
|
||||
|
||||
sol! {
|
||||
// https://github.com/vacp2p/staking-reward-streamer/pull/220
|
||||
#[sol(rpc)]
|
||||
contract KarmaRLNSC {
|
||||
function register(uint256 identityCommitment) external onlyRole(REGISTER_ROLE);
|
||||
}
|
||||
}
|
||||
|
||||
impl KarmaRLNSC::KarmaRLNSCInstance<AlloyWsProvider> {
|
||||
pub(crate) async fn try_new(
|
||||
rpc_url: Url,
|
||||
address: Address,
|
||||
) -> Result<Self, RpcError<TransportError>> {
|
||||
let ws = WsConnect::new(rpc_url.as_str());
|
||||
let provider = ProviderBuilder::new().connect_ws(ws).await?;
|
||||
Ok(KarmaRLNSC::new(address, provider))
|
||||
}
|
||||
}
|
||||
@@ -1,12 +0,0 @@
|
||||
use alloy::providers::{
|
||||
Identity, RootProvider,
|
||||
fillers::{BlobGasFiller, ChainIdFiller, FillProvider, GasFiller, JoinFill, NonceFiller},
|
||||
};
|
||||
|
||||
pub type AlloyWsProvider = FillProvider<
|
||||
JoinFill<
|
||||
Identity,
|
||||
JoinFill<GasFiller, JoinFill<BlobGasFiller, JoinFill<NonceFiller, ChainIdFiller>>>,
|
||||
>,
|
||||
RootProvider,
|
||||
>;
|
||||
@@ -1,21 +1,11 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::LazyLock;
|
||||
// third-party
|
||||
use alloy::primitives::U256;
|
||||
use std::collections::{BTreeMap, HashSet};
|
||||
use std::ops::{Deref, DerefMut};
|
||||
// third-party
|
||||
use derive_more::{From, Into};
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, From)]
|
||||
pub struct KarmaAmount(U256);
|
||||
|
||||
impl KarmaAmount {
|
||||
pub(crate) const ZERO: KarmaAmount = KarmaAmount(U256::ZERO);
|
||||
}
|
||||
|
||||
impl From<u64> for KarmaAmount {
|
||||
fn from(value: u64) -> Self {
|
||||
Self(U256::from(value))
|
||||
}
|
||||
}
|
||||
// internal
|
||||
use crate::user_db_service::SetTierLimitsError;
|
||||
use smart_contract::{Tier, TierIndex};
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, From, Into)]
|
||||
pub struct TierLimit(u64);
|
||||
@@ -29,32 +19,98 @@ impl From<&str> for TierName {
|
||||
}
|
||||
}
|
||||
|
||||
pub static TIER_LIMITS: LazyLock<BTreeMap<KarmaAmount, (TierLimit, TierName)>> =
|
||||
LazyLock::new(|| {
|
||||
BTreeMap::from([
|
||||
(
|
||||
KarmaAmount::from(10),
|
||||
(TierLimit(6), TierName::from("Basic")),
|
||||
),
|
||||
(
|
||||
KarmaAmount::from(50),
|
||||
(TierLimit(120), TierName::from("Active")),
|
||||
),
|
||||
(
|
||||
KarmaAmount::from(100),
|
||||
(TierLimit(720), TierName::from("Regular")),
|
||||
),
|
||||
(
|
||||
KarmaAmount::from(500),
|
||||
(TierLimit(14440), TierName::from("Regular")),
|
||||
),
|
||||
(
|
||||
KarmaAmount::from(1000),
|
||||
(TierLimit(86400), TierName::from("Power User")),
|
||||
),
|
||||
(
|
||||
KarmaAmount::from(5000),
|
||||
(TierLimit(432000), TierName::from("S-Tier")),
|
||||
),
|
||||
])
|
||||
});
|
||||
#[derive(Debug, Clone, Default, From, Into, PartialEq)]
|
||||
pub struct TierLimits(BTreeMap<TierIndex, Tier>);
|
||||
|
||||
impl Deref for TierLimits {
|
||||
type Target = BTreeMap<TierIndex, Tier>;
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for TierLimits {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl TierLimits {
|
||||
/// Filter inactive Tier (rejected by validate)
|
||||
pub(crate) fn filter_inactive(&mut self) -> Self {
|
||||
let map = std::mem::take(&mut self.0);
|
||||
let map_filtered = map.into_iter().filter(|(_k, v)| v.active).collect();
|
||||
Self(map_filtered)
|
||||
}
|
||||
|
||||
pub(crate) fn validate(&self) -> Result<(), SetTierLimitsError> {
|
||||
#[derive(Default)]
|
||||
struct Context<'a> {
|
||||
tier_names: HashSet<String>,
|
||||
prev_amount: Option<&'a U256>,
|
||||
prev_tx_per_epoch: Option<&'a u32>,
|
||||
prev_index: Option<&'a TierIndex>,
|
||||
}
|
||||
|
||||
let _context =
|
||||
self.0
|
||||
.iter()
|
||||
.try_fold(Context::default(), |mut state, (tier_index, tier)| {
|
||||
if !tier.active {
|
||||
return Err(SetTierLimitsError::InactiveTier);
|
||||
}
|
||||
|
||||
if *tier_index <= *state.prev_index.unwrap_or(&TierIndex::default()) {
|
||||
return Err(SetTierLimitsError::InvalidTierIndex);
|
||||
}
|
||||
|
||||
if tier.min_karma >= tier.max_karma {
|
||||
return Err(SetTierLimitsError::InvalidMaxAmount(
|
||||
tier.min_karma,
|
||||
tier.max_karma,
|
||||
));
|
||||
}
|
||||
|
||||
if tier.min_karma <= *state.prev_amount.unwrap_or(&U256::ZERO) {
|
||||
return Err(SetTierLimitsError::InvalidKarmaAmount);
|
||||
}
|
||||
|
||||
if tier.tx_per_epoch <= *state.prev_tx_per_epoch.unwrap_or(&0) {
|
||||
return Err(SetTierLimitsError::InvalidTierLimit);
|
||||
}
|
||||
|
||||
if state.tier_names.contains(&tier.name) {
|
||||
return Err(SetTierLimitsError::NonUniqueTierName);
|
||||
}
|
||||
|
||||
state.prev_amount = Some(&tier.min_karma);
|
||||
state.prev_tx_per_epoch = Some(&tier.tx_per_epoch);
|
||||
state.tier_names.insert(tier.name.clone());
|
||||
state.prev_index = Some(tier_index);
|
||||
Ok(state)
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Given some karma amount, find the matching Tier
|
||||
pub(crate) fn get_tier_by_karma(&self, karma_amount: &U256) -> Option<(TierIndex, Tier)> {
|
||||
#[derive(Default)]
|
||||
struct Context<'a> {
|
||||
prev: Option<(&'a TierIndex, &'a Tier)>,
|
||||
}
|
||||
|
||||
let ctx = self
|
||||
.0
|
||||
.iter()
|
||||
.try_fold(Context::default(), |mut state, (tier_index, tier)| {
|
||||
if karma_amount < &tier.min_karma {
|
||||
return None;
|
||||
}
|
||||
state.prev = Some((tier_index, tier));
|
||||
Some(state)
|
||||
})?;
|
||||
|
||||
ctx.prev.map(|p| (*p.0, p.1.clone()))
|
||||
}
|
||||
}
|
||||
|
||||
84
prover/src/tiers_listener.rs
Normal file
84
prover/src/tiers_listener.rs
Normal file
@@ -0,0 +1,84 @@
|
||||
// third-party
|
||||
use alloy::{
|
||||
primitives::Address,
|
||||
providers::{Provider, ProviderBuilder, WsConnect},
|
||||
sol_types::SolEvent,
|
||||
transports::{RpcError, TransportError},
|
||||
};
|
||||
use futures::StreamExt;
|
||||
use tracing::error;
|
||||
// internal
|
||||
use crate::error::AppError;
|
||||
use crate::user_db_service::UserDb;
|
||||
use smart_contract::{AlloyWsProvider, KarmaTiersSC, Tier, TierIndex};
|
||||
|
||||
pub(crate) struct TiersListener {
|
||||
rpc_url: String,
|
||||
sc_address: Address,
|
||||
user_db: UserDb,
|
||||
}
|
||||
|
||||
impl TiersListener {
|
||||
pub(crate) fn new(rpc_url: &str, sc_address: Address, user_db: UserDb) -> Self {
|
||||
Self {
|
||||
rpc_url: rpc_url.to_string(),
|
||||
sc_address,
|
||||
user_db,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a provider (aka connect to websocket url)
|
||||
async fn setup_provider_ws(&self) -> Result<AlloyWsProvider, RpcError<TransportError>> {
|
||||
let ws = WsConnect::new(self.rpc_url.as_str());
|
||||
let provider = ProviderBuilder::new().connect_ws(ws).await?;
|
||||
Ok(provider)
|
||||
}
|
||||
|
||||
/// Listen to Smart Contract specified events
|
||||
pub(crate) async fn listen(&self) -> Result<(), AppError> {
|
||||
let provider = self.setup_provider_ws().await.map_err(AppError::from)?;
|
||||
|
||||
let filter = alloy::rpc::types::Filter::new()
|
||||
.address(self.sc_address)
|
||||
.event(KarmaTiersSC::TierAdded::SIGNATURE)
|
||||
.event(KarmaTiersSC::TierUpdated::SIGNATURE);
|
||||
|
||||
// Subscribe to logs matching the filter.
|
||||
let subscription = provider.subscribe_logs(&filter).await?;
|
||||
let mut stream = subscription.into_stream();
|
||||
|
||||
// Loop through the incoming event logs
|
||||
while let Some(log) = stream.next().await {
|
||||
if let Ok(tier_added) = KarmaTiersSC::TierAdded::decode_log_data(log.data()) {
|
||||
let tier_id: TierIndex = tier_added.tierId.into();
|
||||
if let Err(e) = self.user_db.on_new_tier(tier_id, Tier::from(tier_added)) {
|
||||
// If there is an error here, we assume this is an error by the user
|
||||
// updating the Tier limits (and thus we don't want to shut down the prover)
|
||||
error!("Error while adding tier (index: {:?}): {}", tier_id, e);
|
||||
}
|
||||
} else {
|
||||
match KarmaTiersSC::TierUpdated::decode_log_data(log.data()) {
|
||||
Ok(tier_updated) => {
|
||||
let tier_id: TierIndex = tier_updated.tierId.into();
|
||||
if let Err(e) = self
|
||||
.user_db
|
||||
.on_tier_updated(tier_updated.tierId.into(), Tier::from(tier_updated))
|
||||
{
|
||||
// If there is an error here, we assume this is an error by the user
|
||||
// updating the Tier limits (and thus we don't want to shut down the prover)
|
||||
error!("Error while updating tier (index: {:?}): {}", tier_id, e);
|
||||
};
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("Error decoding log data: {:?}", e);
|
||||
// It's also useful to print the raw log data for debugging
|
||||
eprintln!("Raw log topics: {:?}", log.topics());
|
||||
eprintln!("Raw log data: {:?}", log.data());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,3 @@
|
||||
use std::collections::{BTreeMap, HashSet};
|
||||
use std::ops::Bound::Included;
|
||||
use std::ops::Deref;
|
||||
use std::sync::Arc;
|
||||
// third-party
|
||||
@@ -16,9 +14,9 @@ use tracing::debug;
|
||||
// internal
|
||||
use crate::epoch_service::{Epoch, EpochSlice};
|
||||
use crate::error::{AppError, GetMerkleTreeProofError, RegisterError};
|
||||
use crate::karma_sc::KarmaAmountExt;
|
||||
use crate::tier::{KarmaAmount, TIER_LIMITS, TierLimit, TierName};
|
||||
use crate::tier::{TierLimit, TierLimits, TierName};
|
||||
use rln_proof::{RlnUserIdentity, ZerokitMerkleTree};
|
||||
use smart_contract::{KarmaAmountExt, Tier, TierIndex};
|
||||
|
||||
const MERKLE_TREE_HEIGHT: usize = 20;
|
||||
|
||||
@@ -45,7 +43,7 @@ impl From<RateLimit> for Fr {
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct UserRegistry {
|
||||
inner: HashMap<Address, (RlnUserIdentity, MerkleTreeIndex)>,
|
||||
tree: Arc<RwLock<PoseidonTree>>,
|
||||
merkle_tree: Arc<RwLock<PoseidonTree>>,
|
||||
rate_limit: RateLimit,
|
||||
}
|
||||
|
||||
@@ -60,8 +58,9 @@ impl Default for UserRegistry {
|
||||
Self {
|
||||
inner: Default::default(),
|
||||
// unwrap safe - no config
|
||||
tree: Arc::new(RwLock::new(
|
||||
PoseidonTree::new(MERKLE_TREE_HEIGHT, Default::default(), Default::default()).unwrap(),
|
||||
merkle_tree: Arc::new(RwLock::new(
|
||||
PoseidonTree::new(MERKLE_TREE_HEIGHT, Default::default(), Default::default())
|
||||
.unwrap(),
|
||||
)),
|
||||
rate_limit: Default::default(),
|
||||
}
|
||||
@@ -73,8 +72,9 @@ impl From<RateLimit> for UserRegistry {
|
||||
Self {
|
||||
inner: Default::default(),
|
||||
// unwrap safe - no config
|
||||
tree: Arc::new(RwLock::new(
|
||||
PoseidonTree::new(MERKLE_TREE_HEIGHT, Default::default(), Default::default()).unwrap(),
|
||||
merkle_tree: Arc::new(RwLock::new(
|
||||
PoseidonTree::new(MERKLE_TREE_HEIGHT, Default::default(), Default::default())
|
||||
.unwrap(),
|
||||
)),
|
||||
rate_limit,
|
||||
}
|
||||
@@ -101,7 +101,7 @@ impl UserRegistry {
|
||||
.map_err(|_e| RegisterError::AlreadyRegistered(address));
|
||||
|
||||
let rate_commit = poseidon_hash(&[id_commitment, Fr::from(u64::from(self.rate_limit))]);
|
||||
self.tree
|
||||
self.merkle_tree
|
||||
.write()
|
||||
.set(index, rate_commit)
|
||||
.map_err(|e| RegisterError::TreeError(e.to_string()))?;
|
||||
@@ -122,7 +122,7 @@ impl UserRegistry {
|
||||
.get(address)
|
||||
.map(|entry| entry.1)
|
||||
.ok_or(GetMerkleTreeProofError::NotRegistered)?;
|
||||
self.tree
|
||||
self.merkle_tree
|
||||
.read()
|
||||
.proof(index.into())
|
||||
.map_err(|e| GetMerkleTreeProofError::TreeError(e.to_string()))
|
||||
@@ -190,8 +190,8 @@ pub enum UserTierInfoError<E: std::error::Error> {
|
||||
pub struct UserDb {
|
||||
user_registry: Arc<UserRegistry>,
|
||||
tx_registry: Arc<TxRegistry>,
|
||||
tier_limits: Arc<RwLock<BTreeMap<KarmaAmount, (TierLimit, TierName)>>>,
|
||||
tier_limits_next: Arc<RwLock<BTreeMap<KarmaAmount, (TierLimit, TierName)>>>,
|
||||
tier_limits: Arc<RwLock<TierLimits>>,
|
||||
tier_limits_next: Arc<RwLock<TierLimits>>,
|
||||
epoch_store: Arc<RwLock<(Epoch, EpochSlice)>>,
|
||||
}
|
||||
|
||||
@@ -209,7 +209,7 @@ impl UserDb {
|
||||
let tier_limits_next_has_updates = !self.tier_limits_next.read().is_empty();
|
||||
if tier_limits_next_has_updates {
|
||||
let mut guard = self.tier_limits_next.write();
|
||||
// mem::take will clear the BTreeMap in tier_limits_next
|
||||
// mem::take will clear the TierLimits in tier_limits_next
|
||||
let new_tier_limits = std::mem::take(&mut *guard);
|
||||
debug!("Installing new tier limits: {:?}", new_tier_limits);
|
||||
*self.tier_limits.write() = new_tier_limits;
|
||||
@@ -231,9 +231,13 @@ impl UserDb {
|
||||
self.user_registry.get_merkle_proof(address)
|
||||
}
|
||||
|
||||
pub(crate) fn on_new_tx(&self, address: &Address) -> Option<EpochSliceCounter> {
|
||||
pub(crate) fn on_new_tx(
|
||||
&self,
|
||||
address: &Address,
|
||||
incr_value: Option<u64>,
|
||||
) -> Option<EpochSliceCounter> {
|
||||
if self.user_registry.has_user(address) {
|
||||
Some(self.tx_registry.incr_counter(address, None))
|
||||
Some(self.tx_registry.incr_counter(address, incr_value))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
@@ -241,39 +245,39 @@ impl UserDb {
|
||||
|
||||
pub(crate) fn on_new_tier_limits(
|
||||
&self,
|
||||
tier_limits: BTreeMap<KarmaAmount, (TierLimit, TierName)>,
|
||||
tier_limits: TierLimits,
|
||||
) -> Result<(), SetTierLimitsError> {
|
||||
#[derive(Default)]
|
||||
struct Context<'a> {
|
||||
tier_names: HashSet<TierName>,
|
||||
prev_karma_amount: Option<&'a KarmaAmount>,
|
||||
prev_tier_limit: Option<&'a TierLimit>,
|
||||
i: usize,
|
||||
let tier_limits = tier_limits.clone().filter_inactive();
|
||||
tier_limits.validate()?;
|
||||
*self.tier_limits_next.write() = tier_limits;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn on_new_tier(
|
||||
&self,
|
||||
tier_index: TierIndex,
|
||||
tier: Tier,
|
||||
) -> Result<(), SetTierLimitsError> {
|
||||
let mut tier_limits = self.tier_limits.read().clone();
|
||||
tier_limits.insert(tier_index, tier);
|
||||
tier_limits.validate()?;
|
||||
// Write
|
||||
*self.tier_limits_next.write() = tier_limits;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn on_tier_updated(
|
||||
&self,
|
||||
tier_index: TierIndex,
|
||||
tier: Tier,
|
||||
) -> Result<(), SetTierLimitsError> {
|
||||
let mut tier_limits = self.tier_limits.read().clone();
|
||||
if !tier_limits.contains_key(&tier_index) {
|
||||
return Err(SetTierLimitsError::InvalidTierIndex);
|
||||
}
|
||||
|
||||
let _context = tier_limits.iter().try_fold(
|
||||
Context::default(),
|
||||
|mut state, (karma_amount, (tier_limit, tier_name))| {
|
||||
if karma_amount <= state.prev_karma_amount.unwrap_or(&KarmaAmount::ZERO) {
|
||||
return Err(SetTierLimitsError::InvalidKarmaAmount);
|
||||
}
|
||||
|
||||
if tier_limit <= state.prev_tier_limit.unwrap_or(&TierLimit::from(0)) {
|
||||
return Err(SetTierLimitsError::InvalidTierLimit);
|
||||
}
|
||||
|
||||
if state.tier_names.contains(tier_name) {
|
||||
return Err(SetTierLimitsError::NonUniqueTierName);
|
||||
}
|
||||
|
||||
state.prev_karma_amount = Some(karma_amount);
|
||||
state.prev_tier_limit = Some(tier_limit);
|
||||
state.tier_names.insert(tier_name.clone());
|
||||
state.i += 1;
|
||||
Ok(state)
|
||||
},
|
||||
)?;
|
||||
|
||||
tier_limits.entry(tier_index).and_modify(|e| *e = tier);
|
||||
tier_limits.validate()?;
|
||||
// Write
|
||||
*self.tier_limits_next.write() = tier_limits;
|
||||
Ok(())
|
||||
}
|
||||
@@ -295,14 +299,9 @@ impl UserDb {
|
||||
.karma_amount(address)
|
||||
.await
|
||||
.map_err(|e| UserTierInfoError::Contract(e))?;
|
||||
let guard = self.tier_limits.read();
|
||||
let range_res = guard.range((
|
||||
Included(&KarmaAmount::ZERO),
|
||||
Included(&KarmaAmount::from(karma_amount)),
|
||||
));
|
||||
let tier_info: Option<(TierLimit, TierName)> =
|
||||
range_res.into_iter().last().map(|o| o.1.clone());
|
||||
drop(guard);
|
||||
let tier_limits_guard = self.tier_limits.read();
|
||||
let tier_info = tier_limits_guard.get_tier_by_karma(&karma_amount);
|
||||
drop(tier_limits_guard);
|
||||
|
||||
let user_tier_info = {
|
||||
let (current_epoch, current_epoch_slice) = *self.epoch_store.read();
|
||||
@@ -315,9 +314,10 @@ impl UserDb {
|
||||
tier_name: None,
|
||||
tier_limit: None,
|
||||
};
|
||||
if let Some((tier_limit, tier_name)) = tier_info {
|
||||
t.tier_name = Some(tier_name);
|
||||
t.tier_limit = Some(tier_limit);
|
||||
if let Some((_tier_index, tier)) = tier_info {
|
||||
t.tier_name = Some(tier.name.into());
|
||||
// TODO
|
||||
t.tier_limit = Some(0.into());
|
||||
}
|
||||
t
|
||||
};
|
||||
@@ -333,10 +333,16 @@ impl UserDb {
|
||||
pub enum SetTierLimitsError {
|
||||
#[error("Invalid Karma amount (must be increasing)")]
|
||||
InvalidKarmaAmount,
|
||||
#[error("Invalid Karma max amount (min: {0} vs max: {1})")]
|
||||
InvalidMaxAmount(U256, U256),
|
||||
#[error("Invalid Tier limit (must be increasing)")]
|
||||
InvalidTierLimit,
|
||||
#[error("Invalid Tier index (must be increasing)")]
|
||||
InvalidTierIndex,
|
||||
#[error("Non unique Tier name")]
|
||||
NonUniqueTierName,
|
||||
#[error("Non active Tier")]
|
||||
InactiveTier,
|
||||
}
|
||||
|
||||
/// Async service to update a UserDb on epoch changes
|
||||
@@ -351,12 +357,13 @@ impl UserDbService {
|
||||
epoch_changes_notifier: Arc<Notify>,
|
||||
epoch_store: Arc<RwLock<(Epoch, EpochSlice)>>,
|
||||
rate_limit: RateLimit,
|
||||
tier_limits: TierLimits,
|
||||
) -> Self {
|
||||
Self {
|
||||
user_db: UserDb {
|
||||
user_registry: Arc::new(UserRegistry::from(rate_limit)),
|
||||
tx_registry: Default::default(),
|
||||
tier_limits: Arc::new(RwLock::new(TIER_LIMITS.clone())),
|
||||
tier_limits: Arc::new(RwLock::new(tier_limits)),
|
||||
tier_limits_next: Arc::new(Default::default()),
|
||||
epoch_store,
|
||||
},
|
||||
@@ -410,6 +417,7 @@ impl UserDbService {
|
||||
mod tests {
|
||||
use super::*;
|
||||
use alloy::primitives::address;
|
||||
use async_trait::async_trait;
|
||||
use claims::{assert_err, assert_matches};
|
||||
use derive_more::Display;
|
||||
|
||||
@@ -418,6 +426,7 @@ mod tests {
|
||||
|
||||
struct MockKarmaSc {}
|
||||
|
||||
#[async_trait]
|
||||
impl KarmaAmountExt for MockKarmaSc {
|
||||
type Error = DummyError;
|
||||
|
||||
@@ -431,6 +440,7 @@ mod tests {
|
||||
|
||||
struct MockKarmaSc2 {}
|
||||
|
||||
#[async_trait]
|
||||
impl KarmaAmountExt for MockKarmaSc2 {
|
||||
type Error = DummyError;
|
||||
|
||||
@@ -450,7 +460,7 @@ mod tests {
|
||||
let user_db = UserDb {
|
||||
user_registry: Default::default(),
|
||||
tx_registry: Default::default(),
|
||||
tier_limits: Arc::new(RwLock::new(TIER_LIMITS.clone())),
|
||||
tier_limits: Arc::new(RwLock::new(Default::default())),
|
||||
tier_limits_next: Arc::new(Default::default()),
|
||||
epoch_store: Arc::new(RwLock::new(Default::default())),
|
||||
};
|
||||
@@ -467,14 +477,14 @@ mod tests {
|
||||
let user_db = UserDb {
|
||||
user_registry: Default::default(),
|
||||
tx_registry: Default::default(),
|
||||
tier_limits: Arc::new(RwLock::new(TIER_LIMITS.clone())),
|
||||
tier_limits: Arc::new(RwLock::new(Default::default())),
|
||||
tier_limits_next: Arc::new(Default::default()),
|
||||
epoch_store: Arc::new(RwLock::new(Default::default())),
|
||||
};
|
||||
let addr = Address::new([0; 20]);
|
||||
|
||||
// Try to update tx counter without registering first
|
||||
assert_eq!(user_db.on_new_tx(&addr), None);
|
||||
assert_eq!(user_db.on_new_tx(&addr, None), None);
|
||||
let tier_info = user_db.user_tier_info(&addr, &MockKarmaSc {}).await;
|
||||
// User is not registered -> no tier info
|
||||
assert!(matches!(
|
||||
@@ -484,7 +494,7 @@ mod tests {
|
||||
// Register user
|
||||
user_db.user_registry.register(addr).unwrap();
|
||||
// Now update user tx counter
|
||||
assert_eq!(user_db.on_new_tx(&addr), Some(EpochSliceCounter(1)));
|
||||
assert_eq!(user_db.on_new_tx(&addr, None), Some(EpochSliceCounter(1)));
|
||||
let tier_info = user_db
|
||||
.user_tier_info(&addr, &MockKarmaSc {})
|
||||
.await
|
||||
@@ -498,7 +508,66 @@ mod tests {
|
||||
let mut epoch = Epoch::from(11);
|
||||
let mut epoch_slice = EpochSlice::from(42);
|
||||
let epoch_store = Arc::new(RwLock::new((epoch, epoch_slice)));
|
||||
let user_db_service = UserDbService::new(Default::default(), epoch_store, 10.into());
|
||||
|
||||
let tier_limits = BTreeMap::from([
|
||||
(
|
||||
TierIndex::from(0),
|
||||
Tier {
|
||||
name: "Basic".into(),
|
||||
min_karma: U256::from(10),
|
||||
max_karma: U256::from(49),
|
||||
tx_per_epoch: 5,
|
||||
active: true,
|
||||
},
|
||||
),
|
||||
(
|
||||
TierIndex::from(1),
|
||||
Tier {
|
||||
name: "Active".into(),
|
||||
min_karma: U256::from(50),
|
||||
max_karma: U256::from(99),
|
||||
tx_per_epoch: 10,
|
||||
active: true,
|
||||
},
|
||||
),
|
||||
(
|
||||
TierIndex::from(2),
|
||||
Tier {
|
||||
name: "Regular".into(),
|
||||
min_karma: U256::from(100),
|
||||
max_karma: U256::from(499),
|
||||
tx_per_epoch: 15,
|
||||
active: true,
|
||||
},
|
||||
),
|
||||
(
|
||||
TierIndex::from(3),
|
||||
Tier {
|
||||
name: "Power User".into(),
|
||||
min_karma: U256::from(500),
|
||||
max_karma: U256::from(4999),
|
||||
tx_per_epoch: 20,
|
||||
active: true,
|
||||
},
|
||||
),
|
||||
(
|
||||
TierIndex::from(4),
|
||||
Tier {
|
||||
name: "S-Tier".into(),
|
||||
min_karma: U256::from(5000),
|
||||
max_karma: U256::from(U256::MAX),
|
||||
tx_per_epoch: 25,
|
||||
active: true,
|
||||
},
|
||||
),
|
||||
]);
|
||||
|
||||
let user_db_service = UserDbService::new(
|
||||
Default::default(),
|
||||
epoch_store,
|
||||
10.into(),
|
||||
tier_limits.into(),
|
||||
);
|
||||
let user_db = user_db_service.get_user_db();
|
||||
|
||||
let addr_1_tx_count = 2;
|
||||
@@ -581,24 +650,39 @@ mod tests {
|
||||
let mut epoch = Epoch::from(11);
|
||||
let mut epoch_slice = EpochSlice::from(42);
|
||||
let epoch_store = Arc::new(RwLock::new((epoch, epoch_slice)));
|
||||
let user_db_service = UserDbService::new(Default::default(), epoch_store, 10.into());
|
||||
|
||||
let user_db_service = UserDbService::new(
|
||||
Default::default(),
|
||||
epoch_store,
|
||||
10.into(),
|
||||
Default::default(),
|
||||
);
|
||||
let user_db = user_db_service.get_user_db();
|
||||
let tier_limits_original = user_db.tier_limits.read().clone();
|
||||
|
||||
let tier_limits = BTreeMap::from([
|
||||
(
|
||||
KarmaAmount::from(100),
|
||||
(TierLimit::from(100), TierName::from("Basic")),
|
||||
TierIndex::from(0),
|
||||
Tier {
|
||||
name: "Basic".into(),
|
||||
min_karma: U256::from(10),
|
||||
max_karma: U256::from(49),
|
||||
tx_per_epoch: 5,
|
||||
active: true,
|
||||
},
|
||||
),
|
||||
(
|
||||
KarmaAmount::from(200),
|
||||
(TierLimit::from(200), TierName::from("Power User")),
|
||||
),
|
||||
(
|
||||
KarmaAmount::from(300),
|
||||
(TierLimit::from(300), TierName::from("Elite User")),
|
||||
TierIndex::from(1),
|
||||
Tier {
|
||||
name: "Power User".into(),
|
||||
min_karma: U256::from(50),
|
||||
max_karma: U256::from(299),
|
||||
tx_per_epoch: 20,
|
||||
active: true,
|
||||
},
|
||||
),
|
||||
]);
|
||||
let tier_limits = TierLimits::from(tier_limits);
|
||||
|
||||
user_db.on_new_tier_limits(tier_limits.clone()).unwrap();
|
||||
// Check it is not yet installed
|
||||
@@ -618,7 +702,7 @@ mod tests {
|
||||
// Should be installed now
|
||||
assert_eq!(*user_db.tier_limits.read(), tier_limits);
|
||||
// And the tier_limits_next field is expected to be empty
|
||||
assert_eq!(*user_db.tier_limits_next.read(), BTreeMap::new());
|
||||
assert!(user_db.tier_limits_next.read().is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -629,46 +713,200 @@ mod tests {
|
||||
let epoch = Epoch::from(11);
|
||||
let epoch_slice = EpochSlice::from(42);
|
||||
let epoch_store = Arc::new(RwLock::new((epoch, epoch_slice)));
|
||||
let user_db_service = UserDbService::new(Default::default(), epoch_store, 10.into());
|
||||
let user_db_service = UserDbService::new(
|
||||
Default::default(),
|
||||
epoch_store,
|
||||
10.into(),
|
||||
Default::default(),
|
||||
);
|
||||
let user_db = user_db_service.get_user_db();
|
||||
|
||||
let tier_limits_original = user_db.tier_limits.read().clone();
|
||||
|
||||
// Invalid: non unique index
|
||||
{
|
||||
let tier_limits = BTreeMap::from([
|
||||
(
|
||||
KarmaAmount::from(100),
|
||||
(TierLimit::from(100), TierName::from("Basic")),
|
||||
TierIndex::from(0),
|
||||
Tier {
|
||||
min_karma: Default::default(),
|
||||
max_karma: Default::default(),
|
||||
name: "Basic".to_string(),
|
||||
tx_per_epoch: 100,
|
||||
active: true,
|
||||
},
|
||||
),
|
||||
(
|
||||
KarmaAmount::from(200),
|
||||
(TierLimit::from(200), TierName::from("Power User")),
|
||||
),
|
||||
(
|
||||
KarmaAmount::from(199),
|
||||
(TierLimit::from(300), TierName::from("Elite User")),
|
||||
TierIndex::from(0),
|
||||
Tier {
|
||||
min_karma: Default::default(),
|
||||
max_karma: Default::default(),
|
||||
name: "Power User".to_string(),
|
||||
tx_per_epoch: 200,
|
||||
active: true,
|
||||
},
|
||||
),
|
||||
]);
|
||||
let tier_limits = TierLimits::from(tier_limits);
|
||||
|
||||
assert_err!(user_db.on_new_tier_limits(tier_limits.clone()));
|
||||
assert_eq!(*user_db.tier_limits.read(), tier_limits_original);
|
||||
}
|
||||
|
||||
// Invalid: min Karma amount not increasing
|
||||
{
|
||||
let tier_limits = BTreeMap::from([
|
||||
(
|
||||
KarmaAmount::from(100),
|
||||
(TierLimit::from(100), TierName::from("Basic")),
|
||||
TierIndex::from(0),
|
||||
Tier {
|
||||
min_karma: U256::from(10),
|
||||
max_karma: U256::from(49),
|
||||
name: "Basic".to_string(),
|
||||
tx_per_epoch: 5,
|
||||
active: true,
|
||||
},
|
||||
),
|
||||
(
|
||||
KarmaAmount::from(200),
|
||||
(TierLimit::from(200), TierName::from("Power User")),
|
||||
TierIndex::from(1),
|
||||
Tier {
|
||||
min_karma: U256::from(50),
|
||||
max_karma: U256::from(99),
|
||||
name: "Power User".to_string(),
|
||||
tx_per_epoch: 10,
|
||||
active: true,
|
||||
},
|
||||
),
|
||||
(
|
||||
KarmaAmount::from(300),
|
||||
(TierLimit::from(300), TierName::from("Basic")),
|
||||
TierIndex::from(2),
|
||||
Tier {
|
||||
min_karma: U256::from(60),
|
||||
max_karma: U256::from(99),
|
||||
name: "Power User".to_string(),
|
||||
tx_per_epoch: 15,
|
||||
active: true,
|
||||
},
|
||||
),
|
||||
]);
|
||||
let tier_limits = TierLimits::from(tier_limits);
|
||||
|
||||
assert_err!(user_db.on_new_tier_limits(tier_limits.clone()));
|
||||
assert_eq!(*user_db.tier_limits.read(), tier_limits_original);
|
||||
}
|
||||
|
||||
// Invalid: Non unique tier name
|
||||
{
|
||||
let tier_limits = BTreeMap::from([
|
||||
(
|
||||
TierIndex::from(0),
|
||||
Tier {
|
||||
min_karma: U256::from(10),
|
||||
max_karma: U256::from(49),
|
||||
name: "Basic".to_string(),
|
||||
tx_per_epoch: 5,
|
||||
active: true,
|
||||
},
|
||||
),
|
||||
(
|
||||
TierIndex::from(1),
|
||||
Tier {
|
||||
min_karma: U256::from(50),
|
||||
max_karma: U256::from(99),
|
||||
name: "Power User".to_string(),
|
||||
tx_per_epoch: 10,
|
||||
active: true,
|
||||
},
|
||||
),
|
||||
(
|
||||
TierIndex::from(2),
|
||||
Tier {
|
||||
min_karma: U256::from(100),
|
||||
max_karma: U256::from(999),
|
||||
name: "Power User".to_string(),
|
||||
tx_per_epoch: 15,
|
||||
active: true,
|
||||
},
|
||||
),
|
||||
]);
|
||||
let tier_limits = TierLimits::from(tier_limits);
|
||||
|
||||
assert_err!(user_db.on_new_tier_limits(tier_limits.clone()));
|
||||
assert_eq!(*user_db.tier_limits.read(), tier_limits_original);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[tracing_test::traced_test]
|
||||
fn test_set_invalid_tier_limits_2() {
|
||||
// Check we cannot update with invalid tier limits
|
||||
|
||||
let epoch = Epoch::from(11);
|
||||
let epoch_slice = EpochSlice::from(42);
|
||||
let epoch_store = Arc::new(RwLock::new((epoch, epoch_slice)));
|
||||
let user_db_service = UserDbService::new(
|
||||
Default::default(),
|
||||
epoch_store,
|
||||
10.into(),
|
||||
Default::default(),
|
||||
);
|
||||
let user_db = user_db_service.get_user_db();
|
||||
|
||||
let tier_limits_original = user_db.tier_limits.read().clone();
|
||||
|
||||
// Invalid: inactive tier
|
||||
{
|
||||
let tier_limits = BTreeMap::from([
|
||||
(
|
||||
TierIndex::from(0),
|
||||
Tier {
|
||||
min_karma: U256::from(10),
|
||||
max_karma: U256::from(49),
|
||||
name: "Basic".to_string(),
|
||||
tx_per_epoch: 5,
|
||||
active: true,
|
||||
},
|
||||
),
|
||||
(
|
||||
TierIndex::from(1),
|
||||
Tier {
|
||||
min_karma: U256::from(50),
|
||||
max_karma: U256::from(99),
|
||||
name: "Power User".to_string(),
|
||||
tx_per_epoch: 10,
|
||||
active: true,
|
||||
},
|
||||
),
|
||||
]);
|
||||
let tier_limits = TierLimits::from(tier_limits);
|
||||
|
||||
assert_err!(user_db.on_new_tier_limits(tier_limits.clone()));
|
||||
assert_eq!(*user_db.tier_limits.read(), tier_limits_original);
|
||||
}
|
||||
|
||||
// Invalid: non-increasing tx_per_epoch
|
||||
{
|
||||
let tier_limits = BTreeMap::from([
|
||||
(
|
||||
TierIndex::from(0),
|
||||
Tier {
|
||||
min_karma: U256::from(10),
|
||||
max_karma: U256::from(49),
|
||||
name: "Basic".to_string(),
|
||||
tx_per_epoch: 5,
|
||||
active: true,
|
||||
},
|
||||
),
|
||||
(
|
||||
TierIndex::from(1),
|
||||
Tier {
|
||||
min_karma: U256::from(50),
|
||||
max_karma: U256::from(99),
|
||||
name: "Power User".to_string(),
|
||||
tx_per_epoch: 5,
|
||||
active: true,
|
||||
},
|
||||
),
|
||||
]);
|
||||
let tier_limits = TierLimits::from(tier_limits);
|
||||
|
||||
assert_err!(user_db.on_new_tier_limits(tier_limits.clone()));
|
||||
assert_eq!(*user_db.tier_limits.read(), tier_limits_original);
|
||||
|
||||
Reference in New Issue
Block a user