WIP: removing actor...

This commit is contained in:
th4s
2025-12-08 14:26:48 +01:00
parent a0c7d469f6
commit 4727df6fd4
8 changed files with 49 additions and 1949 deletions

20
Cargo.lock generated
View File

@@ -4157,25 +4157,6 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154"
[[package]]
name = "ludi"
version = "0.1.0"
source = "git+https://github.com/sinui0/ludi?rev=e511c3b#e511c3b330dc298613cc3fd168244619e81ac740"
dependencies = [
"futures-util",
"ludi-core",
]
[[package]]
name = "ludi-core"
version = "0.1.0"
source = "git+https://github.com/sinui0/ludi?rev=e511c3b#e511c3b330dc298613cc3fd168244619e81ac740"
dependencies = [
"futures-channel",
"futures-core",
"futures-util",
]
[[package]]
name = "macro-string"
version = "0.1.4"
@@ -7510,7 +7491,6 @@ dependencies = [
"futures",
"generic-array",
"ghash 0.5.1",
"ludi",
"mpz-common",
"mpz-core",
"mpz-fields",

View File

@@ -34,7 +34,6 @@ mpz-share-conversion = { workspace = true }
mpz-vm-core = { workspace = true }
mpz-memory-core = { workspace = true }
ludi = { git = "https://github.com/sinui0/ludi", rev = "e511c3b", default-features = false }
serio = { workspace = true }
async-trait = { workspace = true }

View File

@@ -15,13 +15,6 @@ impl MpcTlsError {
Self(ErrorRepr::Peer(err.into()))
}
pub(crate) fn actor<E>(err: E) -> Self
where
E: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
{
Self(ErrorRepr::Actor(err.into()))
}
pub(crate) fn state<E>(err: E) -> Self
where
E: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
@@ -72,8 +65,6 @@ enum ErrorRepr {
Peer(Box<dyn std::error::Error + Send + Sync>),
#[error("I/O error: {0}")]
Io(std::io::Error),
#[error("actor error: {0}")]
Actor(Box<dyn std::error::Error + Send + Sync>),
#[error("state error: {0}")]
State(Box<dyn std::error::Error + Send + Sync>),
#[error("allocation error: {0}")]

View File

@@ -1,5 +1,3 @@
mod actor;
use crate::{
error::MpcTlsError,
msg::{
@@ -14,7 +12,6 @@ use async_trait::async_trait;
use hmac_sha256::{MpcPrf, PrfOutput};
use ke::KeyExchange;
use key_exchange::{self as ke, MpcKeyExchange};
use ludi::Context as LudiContext;
use mpz_common::{Context, Flush};
use mpz_core::{bitvec::BitVec, Block};
use mpz_memory_core::DecodeFutureTyped;
@@ -50,13 +47,9 @@ use tlsn_core::{
};
use tracing::{debug, instrument, trace, warn};
/// Controller for MPC-TLS leader.
pub type LeaderCtrl = actor::MpcTlsLeaderCtrl;
/// MPC-TLS leader.
#[derive(Debug)]
pub struct MpcTlsLeader {
self_handle: Option<LeaderCtrl>,
config: Config,
state: State,
@@ -114,7 +107,6 @@ impl MpcTlsLeader {
let is_decrypting = !config.defer_decryption;
Self {
self_handle: None,
config,
state: State::Init {
ctx,
@@ -401,9 +393,19 @@ impl MpcTlsLeader {
self.is_decrypting
}
/// Stops the actor.
pub fn stop(&mut self, ctx: &mut LudiContext<Self>) {
ctx.stop();
/// Returns the context and transcript.
///
/// Should be called after a successful call to [`Backend::server_closed`].
pub fn finish(&mut self) -> Option<(Context, TlsTranscript)> {
match self.state.take() {
State::Closed {
ctx, transcript, ..
} => Some((ctx, transcript)),
state => {
self.state = state;
None
}
}
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -16,7 +16,7 @@ pub(crate) mod utils;
pub use config::{Config, ConfigBuilder, ConfigBuilderError};
pub use error::MpcTlsError;
pub use follower::MpcTlsFollower;
pub use leader::{LeaderCtrl, MpcTlsLeader};
pub use leader::MpcTlsLeader;
use std::{future::Future, pin::Pin, sync::Arc};

View File

@@ -160,7 +160,6 @@ impl Prover<state::CommitAccepted> {
} = self.state;
let decrypt = mpc_tls.is_decrypting();
let (mpc_ctrl, mpc_fut) = mpc_tls.run();
let ServerName::Dns(server_name) = config.server_name();
let server_name =
@@ -197,24 +196,12 @@ impl Prover<state::CommitAccepted> {
rustls_config.with_no_client_auth()
};
let client = ClientConnection::new(
Arc::new(rustls_config),
Box::new(mpc_ctrl.clone()),
server_name,
)
.map_err(ProverError::config)?;
let client = ClientConnection::new(Arc::new(rustls_config), Box::new(mpc_tls), server_name)
.map_err(ProverError::config)?;
let span = self.span.clone();
let mpc_tls = MpcTlsClient::new(
Box::new(mpc_fut.map_err(ProverError::from)),
keys,
vm,
span,
mpc_ctrl,
client,
decrypt,
);
let mpc_tls = MpcTlsClient::new(keys, vm, span, client, decrypt);
let prover = Prover {
config: self.config,

View File

@@ -9,7 +9,7 @@ use crate::{
tag::verify_tags,
};
use futures::{Future, FutureExt};
use mpc_tls::{LeaderCtrl, SessionKeys};
use mpc_tls::{MpcTlsLeader, SessionKeys};
use mpz_common::Context;
use mpz_vm_core::Execute;
use std::{collections::VecDeque, pin::Pin, sync::Arc, task::Poll};
@@ -40,28 +40,18 @@ pub(crate) enum Command {
enum State {
Start {
mpc: Pin<MpcFuture>,
inner: Box<InnerState>,
},
Active {
mpc: Pin<MpcFuture>,
inner: Box<InnerState>,
},
Busy {
mpc: Pin<MpcFuture>,
fut: Pin<Box<dyn Future<Output = Result<Box<InnerState>, ProverError>> + Send>>,
},
MpcStop {
mpc: Pin<MpcFuture>,
CloseActive {
inner: Box<InnerState>,
},
CloseBusy {
mpc: Pin<MpcFuture>,
fut: Pin<Box<dyn Future<Output = Result<Box<InnerState>, ProverError>> + Send>>,
},
Finishing {
ctx: Context,
transcript: Box<TlsTranscript>,
fut: Pin<Box<dyn Future<Output = Result<Box<InnerState>, ProverError>> + Send>>,
},
Finalizing {
@@ -73,35 +63,31 @@ enum State {
impl MpcTlsClient {
pub(crate) fn new(
mpc: MpcFuture,
keys: SessionKeys,
vm: Arc<Mutex<Deap<ProverMpc, ProverZk>>>,
span: Span,
mpc_ctrl: LeaderCtrl,
tls: ClientConnection,
decrypt: bool,
) -> Self {
let inner = InnerState {
span,
tls,
vm,
keys,
mpc_ctrl,
mpc_stopped: false,
};
let decrypt = tls.backend().is_decrypting();
Self {
decrypt,
state: State::Start {
mpc: Box::into_pin(mpc),
inner: Box::new(inner),
},
decrypt,
cmds: VecDeque::default(),
}
}
fn inner_client_mut(&mut self) -> Option<&mut ClientConnection> {
if let State::Active { inner, .. } | State::MpcStop { inner, .. } = &mut self.state {
if let State::Active { inner, .. } | State::CloseActive { inner, .. } = &mut self.state {
Some(&mut inner.tls)
} else {
None
@@ -109,7 +95,7 @@ impl MpcTlsClient {
}
fn inner_client(&self) -> Option<&ClientConnection> {
if let State::Active { inner, .. } | State::MpcStop { inner, .. } = &self.state {
if let State::Active { inner, .. } | State::CloseActive { inner, .. } = &self.state {
Some(&inner.tls)
} else {
None
@@ -213,15 +199,14 @@ impl TlsClient for MpcTlsClient {
fn poll(&mut self, cx: &mut std::task::Context) -> Poll<Result<TlsOutput, Self::Error>> {
match std::mem::replace(&mut self.state, State::Error) {
State::Start { mpc, inner } => {
State::Start { inner } => {
trace!("inner client is starting");
self.state = State::Busy {
mpc,
fut: Box::pin(inner.start()),
};
self.poll(cx)
}
State::Active { mpc, inner } => {
State::Active { mut inner } => {
trace!("inner client is active");
if !inner.tls.is_handshaking()
@@ -230,104 +215,61 @@ impl TlsClient for MpcTlsClient {
match cmd {
Command::ClientClose => {
self.state = State::Busy {
mpc,
fut: Box::pin(inner.client_close()),
};
}
Command::ServerClose => {
self.state = State::CloseBusy {
mpc,
fut: Box::pin(inner.server_close()),
};
}
Command::Decrypt(enable) => {
inner.tls.backend_mut().enable_decryption(enable)?;
self.decrypt = enable;
self.state = State::Busy {
mpc,
fut: Box::pin(inner.set_decrypt(enable)),
fut: Box::pin(inner.run()),
};
}
}
} else {
self.state = State::Busy {
mpc,
fut: Box::pin(inner.run()),
};
}
self.poll(cx)
}
State::Busy { mut mpc, mut fut } => {
State::Busy { mut fut } => {
trace!("inner client is busy");
let mpc_poll = mpc.as_mut().poll(cx)?;
assert!(
matches!(mpc_poll, Poll::Pending),
"mpc future should not be finished here"
);
match fut.as_mut().poll(cx)? {
Poll::Ready(inner) => {
self.state = State::Active { mpc, inner };
self.state = State::Active { inner };
}
Poll::Pending => self.state = State::Busy { mpc, fut },
Poll::Pending => self.state = State::Busy { fut },
}
Poll::Pending
}
State::MpcStop { mpc, inner } => {
trace!("inner client is stopping mpc");
self.state = State::CloseBusy {
mpc,
fut: Box::pin(inner.stop()),
};
State::CloseActive { mut inner } => {
trace!("inner client is close active");
if let Some((ctx, transcript)) = inner.tls.backend_mut().finish() {
self.state = State::Finalizing {
fut: Box::pin(inner.finalize(ctx, transcript)),
};
} else {
self.state = State::CloseBusy {
fut: Box::pin(inner.server_close()),
};
}
self.poll(cx)
}
State::CloseBusy { mut mpc, mut fut } => {
State::CloseBusy { mut fut } => {
trace!("inner client is busy closing");
match (fut.poll_unpin(cx)?, mpc.poll_unpin(cx)?) {
(Poll::Ready(inner), Poll::Ready((ctx, transcript))) => {
self.state = State::Finalizing {
fut: Box::pin(inner.finalize(ctx, transcript)),
};
self.poll(cx)
}
(Poll::Ready(inner), Poll::Pending) => {
self.state = State::MpcStop { mpc, inner };
Poll::Pending
}
(Poll::Pending, Poll::Ready((ctx, transcript))) => {
self.state = State::Finishing {
ctx,
transcript: Box::new(transcript),
fut,
};
Poll::Pending
}
(Poll::Pending, Poll::Pending) => {
self.state = State::CloseBusy { mpc, fut };
Poll::Pending
match fut.as_mut().poll(cx)? {
Poll::Ready(inner) => {
self.state = State::CloseActive { inner };
}
Poll::Pending => self.state = State::CloseBusy { fut },
}
}
State::Finishing {
ctx,
transcript,
mut fut,
} => {
trace!("inner client is finishing");
if let Poll::Ready(inner) = fut.poll_unpin(cx)? {
self.state = State::Finalizing {
fut: Box::pin(inner.finalize(ctx, *transcript)),
};
self.poll(cx)
} else {
self.state = State::Finishing {
ctx,
transcript,
fut,
};
Poll::Pending
}
Poll::Pending
}
State::Finalizing { mut fut } => match fut.poll_unpin(cx) {
Poll::Ready(output) => {
@@ -374,7 +316,6 @@ struct InnerState {
tls: ClientConnection,
vm: Arc<Mutex<Deap<ProverMpc, ProverZk>>>,
keys: SessionKeys,
mpc_ctrl: LeaderCtrl,
mpc_stopped: bool,
}
@@ -391,12 +332,6 @@ impl InnerState {
Ok(self)
}
#[instrument(parent = &self.span, level = "debug", skip_all, err)]
async fn set_decrypt(self: Box<Self>, enable: bool) -> Result<Box<Self>, ProverError> {
self.mpc_ctrl.enable_decryption(enable).await?;
self.run().await
}
#[instrument(parent = &self.span, level = "debug", skip_all, err)]
async fn client_close(mut self: Box<Self>) -> Result<Box<Self>, ProverError> {
debug!("sending close notify");
@@ -408,20 +343,11 @@ impl InnerState {
#[instrument(parent = &self.span, level = "debug", skip_all, err)]
async fn server_close(mut self: Box<Self>) -> Result<Box<Self>, ProverError> {
self.tls.process_new_packets().await?;
self.tls.server_closed().await?;
debug!("closed connection serverside");
Ok(self)
}
#[instrument(parent = &self.span, level = "debug", skip_all, err)]
async fn stop(mut self: Box<Self>) -> Result<Box<Self>, ProverError> {
self.tls.process_new_packets().await?;
if !self.mpc_stopped && self.tls.plaintext_is_empty() && self.tls.is_empty().await? {
self.mpc_ctrl.stop().await?;
self.tls.server_closed().await?;
self.mpc_stopped = true;
debug!("stopped mpc");
debug!("closed connection serverside");
}
Ok(self)