mirror of
https://github.com/tlsnotary/notary-server.git
synced 2026-01-09 12:37:56 -05:00
Refactor and fix documentations.
This commit is contained in:
@@ -28,9 +28,9 @@ All APIs are TLS-protected, hence please use `https://` or `wss://`.
|
||||
Defined in the [OpenAPI specification](./openapi.yaml).
|
||||
|
||||
### WebSocket APIs
|
||||
#### /ws-notarize
|
||||
#### /notarize
|
||||
##### Description
|
||||
To perform notarization using the session id (unique id returned upon calling the `/notarize` endpoint successfully) submitted as a custom header.
|
||||
To perform notarization using the session id (unique id returned upon calling the `/session` endpoint successfully) submitted as a custom header.
|
||||
|
||||
##### Custom Header
|
||||
`X-Session-Id`
|
||||
@@ -54,7 +54,7 @@ To perform notarization, some parameters need to be configured by the prover and
|
||||
- maximum transcript size
|
||||
- unique session id
|
||||
|
||||
To streamline this process, a single HTTP endpoint (`/notarize`) is used by both TCP and WebSocket clients. The only difference being, for TCP client, the notarization process will be kickstarted at the end of this configuration, whereas WebSocket client will need to establish a separate WebSocket connection to a different endpoint (`/ws-notarize`).
|
||||
To streamline this process, a single HTTP endpoint (`/session`) is used by both TCP and WebSocket clients.
|
||||
|
||||
#### WebSocket
|
||||
Axum's internal implementation of WebSocket uses [tokio_tungstenite](https://docs.rs/tokio-tungstenite/latest/tokio_tungstenite/), which provides a WebSocket struct that doesn't implement [AsyncRead](https://docs.rs/futures/latest/futures/io/trait.AsyncRead.html) and [AsyncWrite](https://docs.rs/futures/latest/futures/io/trait.AsyncWrite.html). Both these traits are required by TLSN core libraries for prover and notary. To overcome this, a [slight modification](./src/axum_websocket.rs) of Axum's implementation of WebSocket is used, where [async_tungstenite](https://docs.rs/async-tungstenite/latest/async_tungstenite/) is used instead so that [ws_stream_tungstenite](https://docs.rs/ws_stream_tungstenite/latest/ws_stream_tungstenite/index.html) can be used to wrap on top of the WebSocket struct to get AsyncRead and AsyncWrite implemented.
|
||||
Axum's internal implementation of WebSocket uses [tokio_tungstenite](https://docs.rs/tokio-tungstenite/latest/tokio_tungstenite/), which provides a WebSocket struct that doesn't implement [AsyncRead](https://docs.rs/futures/latest/futures/io/trait.AsyncRead.html) and [AsyncWrite](https://docs.rs/futures/latest/futures/io/trait.AsyncWrite.html). Both these traits are required by TLSN core libraries for prover and notary. To overcome this, a [slight modification](./src/service/axum_websocket.rs) of Axum's implementation of WebSocket is used, where [async_tungstenite](https://docs.rs/async-tungstenite/latest/async_tungstenite/) is used instead so that [ws_stream_tungstenite](https://docs.rs/ws_stream_tungstenite/latest/ws_stream_tungstenite/index.html) can be used to wrap on top of the WebSocket struct to get AsyncRead and AsyncWrite implemented.
|
||||
|
||||
77
openapi.yaml
77
openapi.yaml
@@ -9,11 +9,11 @@ tags:
|
||||
- name: Notarization
|
||||
|
||||
paths:
|
||||
/notarize:
|
||||
/session:
|
||||
post:
|
||||
tags:
|
||||
- Notarization
|
||||
description: Configure notarization for both TCP and WebSocket clients, as well as to trigger notarization for TCP client
|
||||
description: Initialize and configure notarization for both TCP and WebSocket clients
|
||||
parameters:
|
||||
- in: header
|
||||
name: Content-Type
|
||||
@@ -23,34 +23,20 @@ paths:
|
||||
enum:
|
||||
- "application/json"
|
||||
required: true
|
||||
- in: header
|
||||
name: Connection
|
||||
description: Header to be provided if prover is TCP client, the value should be 'Upgrade'
|
||||
schema:
|
||||
type: string
|
||||
enum:
|
||||
- "Upgrade"
|
||||
- in: header
|
||||
name: Upgrade
|
||||
description: Header to be provided if prover is TCP client, the value should be 'TCP'
|
||||
schema:
|
||||
type: string
|
||||
enum:
|
||||
- "TCP"
|
||||
requestBody:
|
||||
description: Notarization request to server
|
||||
description: Notarization session request to server
|
||||
required: true
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/NotarizationRequest"
|
||||
$ref: "#/components/schemas/NotarizationSessionRequest"
|
||||
responses:
|
||||
"200":
|
||||
description: Notarization response from server
|
||||
description: Notarization session response from server
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/NotarizationResponse"
|
||||
$ref: "#/components/schemas/NotarizationSessionResponse"
|
||||
"400":
|
||||
description: Configuration parameters or headers provided by prover are invalid
|
||||
content:
|
||||
@@ -64,11 +50,56 @@ paths:
|
||||
text/plain:
|
||||
schema:
|
||||
type: string
|
||||
example: "Error occurred during notarization"
|
||||
example: "Something is wrong"
|
||||
/notarize:
|
||||
get:
|
||||
tags:
|
||||
- Notarization
|
||||
description: Start notarization for TCP client
|
||||
parameters:
|
||||
- in: header
|
||||
name: Connection
|
||||
description: The value should be 'Upgrade'
|
||||
schema:
|
||||
type: string
|
||||
enum:
|
||||
- "Upgrade"
|
||||
required: true
|
||||
- in: header
|
||||
name: Upgrade
|
||||
description: The value should be 'TCP'
|
||||
schema:
|
||||
type: string
|
||||
enum:
|
||||
- "TCP"
|
||||
required: true
|
||||
- in: header
|
||||
name: X-Session-Id
|
||||
description: Unique ID returned from server upon calling POST /session
|
||||
schema:
|
||||
type: string
|
||||
required: true
|
||||
responses:
|
||||
"101":
|
||||
description: Switching protocol response
|
||||
"400":
|
||||
description: Headers provided by prover are invalid
|
||||
content:
|
||||
text/plain:
|
||||
schema:
|
||||
type: string
|
||||
example: "Invalid request from prover: Upgrade header is not set for client"
|
||||
"500":
|
||||
description: There was some internal error when processing
|
||||
content:
|
||||
text/plain:
|
||||
schema:
|
||||
type: string
|
||||
example: "Something is wrong"
|
||||
|
||||
components:
|
||||
schemas:
|
||||
NotarizationRequest:
|
||||
NotarizationSessionRequest:
|
||||
type: object
|
||||
properties:
|
||||
clientType:
|
||||
@@ -83,7 +114,7 @@ components:
|
||||
required:
|
||||
- "clientType"
|
||||
- "maxTranscriptSize"
|
||||
NotarizationResponse:
|
||||
NotarizationSessionResponse:
|
||||
type: object
|
||||
properties:
|
||||
sessionId:
|
||||
|
||||
@@ -6,18 +6,18 @@ use tokio::sync::Mutex;
|
||||
|
||||
use crate::config::NotarizationProperties;
|
||||
|
||||
/// Response object of the /notarize API
|
||||
/// Response object of the /session API
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct NotarizationResponse {
|
||||
pub struct NotarizationSessionResponse {
|
||||
/// Unique session id that is generated by notary and shared to prover
|
||||
pub session_id: String,
|
||||
}
|
||||
|
||||
/// Request object of the /notarize API
|
||||
/// Request object of the /session API
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct NotarizationRequest {
|
||||
pub struct NotarizationSessionRequest {
|
||||
pub client_type: ClientType,
|
||||
/// Maximum transcript size in bytes
|
||||
pub max_transcript_size: Option<usize>,
|
||||
@@ -45,12 +45,11 @@ impl NotaryGlobals {
|
||||
pub fn new(
|
||||
notary_signing_key: SigningKey,
|
||||
notarization_config: NotarizationProperties,
|
||||
store: Arc<Mutex<HashMap<String, Option<usize>>>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
notary_signing_key,
|
||||
notarization_config,
|
||||
store,
|
||||
store: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
mod axum_websocket;
|
||||
mod config;
|
||||
mod domain;
|
||||
mod error;
|
||||
@@ -13,7 +12,7 @@ pub use config::{
|
||||
};
|
||||
pub use domain::{
|
||||
cli::CliFields,
|
||||
notary::{ClientType, NotarizationRequest, NotarizationResponse},
|
||||
notary::{ClientType, NotarizationSessionRequest, NotarizationSessionResponse},
|
||||
};
|
||||
pub use error::NotaryServerError;
|
||||
pub use server::{read_pem_file, run_server};
|
||||
|
||||
@@ -13,7 +13,6 @@ use hyper::server::{
|
||||
use p256::{ecdsa::SigningKey, pkcs8::DecodePrivateKey};
|
||||
use rustls::{Certificate, PrivateKey, ServerConfig};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
fs::File as StdFile,
|
||||
io::BufReader,
|
||||
net::{IpAddr, SocketAddr},
|
||||
@@ -21,7 +20,7 @@ use std::{
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use tokio::{fs::File, net::TcpListener, sync::Mutex};
|
||||
use tokio::{fs::File, net::TcpListener};
|
||||
use tokio_rustls::TlsAcceptor;
|
||||
use tower::MakeService;
|
||||
use tracing::{debug, error, info};
|
||||
@@ -30,7 +29,7 @@ use crate::{
|
||||
config::{NotaryServerProperties, NotarySignatureProperties, TLSSignatureProperties},
|
||||
domain::notary::NotaryGlobals,
|
||||
error::NotaryServerError,
|
||||
service::{tcp::initiate, websocket::switch_protocol},
|
||||
service::{initialize, upgrade_protocol},
|
||||
};
|
||||
|
||||
/// Start a TLS-secured TCP server to accept notarization request for both TCP and WebSocket clients
|
||||
@@ -72,16 +71,14 @@ pub async fn run_server(config: &NotaryServerProperties) -> Result<(), NotarySer
|
||||
);
|
||||
|
||||
let protocol = Arc::new(Http::new());
|
||||
// A temporary storage to store configuration data, mainly used for WebSocket client
|
||||
let store = Arc::new(Mutex::new(HashMap::<String, Option<usize>>::new()));
|
||||
let notary_globals = NotaryGlobals::new(notary_signing_key, config.notarization.clone(), store);
|
||||
let notary_globals = NotaryGlobals::new(notary_signing_key, config.notarization.clone());
|
||||
let router = Router::new()
|
||||
.route(
|
||||
"/healthcheck",
|
||||
get(|| async move { (StatusCode::OK, "Ok").into_response() }),
|
||||
)
|
||||
.route("/initialize", post(initiate))
|
||||
.route("/notarize", get(switch_protocol))
|
||||
.route("/session", post(initialize))
|
||||
.route("/notarize", get(upgrade_protocol))
|
||||
.with_state(notary_globals);
|
||||
let mut app = router.into_make_service();
|
||||
|
||||
|
||||
154
src/service.rs
154
src/service.rs
@@ -1,13 +1,163 @@
|
||||
pub mod axum_websocket;
|
||||
pub mod tcp;
|
||||
pub mod websocket;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use axum::{
|
||||
extract::{rejection::JsonRejection, FromRequestParts, State},
|
||||
http::{header, request::Parts, HeaderMap, StatusCode},
|
||||
response::{IntoResponse, Json, Response},
|
||||
};
|
||||
use axum_macros::debug_handler;
|
||||
use p256::ecdsa::{Signature, SigningKey};
|
||||
use tlsn_notary::{bind_notary, NotaryConfig};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio_util::compat::TokioAsyncReadCompatExt;
|
||||
use tracing::debug;
|
||||
use tracing::{debug, error, info, trace};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::error::NotaryServerError;
|
||||
use crate::{
|
||||
domain::notary::{NotarizationSessionRequest, NotarizationSessionResponse, NotaryGlobals},
|
||||
error::NotaryServerError,
|
||||
service::{
|
||||
axum_websocket::{header_eq, WebSocketUpgrade},
|
||||
tcp::{tcp_notarize, TcpUpgrade},
|
||||
websocket::websocket_notarize,
|
||||
},
|
||||
};
|
||||
|
||||
/// A wrapper enum to facilitate extracting TCP connection for either WebSocket or TCP clients,
|
||||
/// so that we can use a single endpoint and handler for notarization for both types of clients
|
||||
pub enum ProtocolUpgrade {
|
||||
Tcp(TcpUpgrade),
|
||||
Ws(WebSocketUpgrade),
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<S> FromRequestParts<S> for ProtocolUpgrade
|
||||
where
|
||||
S: Send + Sync,
|
||||
{
|
||||
type Rejection = NotaryServerError;
|
||||
|
||||
async fn from_request_parts(parts: &mut Parts, state: &S) -> Result<Self, Self::Rejection> {
|
||||
// Extract tcp connection for websocket client
|
||||
if header_eq(&parts.headers, header::UPGRADE, "websocket") {
|
||||
let extractor = WebSocketUpgrade::from_request_parts(parts, state)
|
||||
.await
|
||||
.map_err(|err| NotaryServerError::BadProverRequest(err.to_string()))?;
|
||||
return Ok(Self::Ws(extractor));
|
||||
// Extract tcp connection for tcp client
|
||||
} else if header_eq(&parts.headers, header::UPGRADE, "tcp") {
|
||||
let extractor = TcpUpgrade::from_request_parts(parts, state)
|
||||
.await
|
||||
.map_err(|err| NotaryServerError::BadProverRequest(err.to_string()))?;
|
||||
return Ok(Self::Tcp(extractor));
|
||||
} else {
|
||||
return Err(NotaryServerError::BadProverRequest(
|
||||
"Upgrade header is not set for client".to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Handler to upgrade protocol from http to either websocket or underlying tcp depending on the type of client
|
||||
/// the session_id header is also extracted here to fetch the configuration parameters
|
||||
/// that have been submitted in the previous request to /session made by the same client
|
||||
pub async fn upgrade_protocol(
|
||||
protocol_upgrade: ProtocolUpgrade,
|
||||
mut headers: HeaderMap,
|
||||
State(notary_globals): State<NotaryGlobals>,
|
||||
) -> Response {
|
||||
info!("Received upgrade protocol request");
|
||||
// Extract the session_id from the headers
|
||||
let session_id = match headers.remove("X-Session-Id") {
|
||||
Some(session_id) => match session_id.to_str() {
|
||||
Ok(session_id) => session_id.to_string(),
|
||||
Err(err) => {
|
||||
let err_msg = format!("X-Session-Id header submitted is not a string: {}", err);
|
||||
error!(err_msg);
|
||||
return NotaryServerError::BadProverRequest(err_msg).into_response();
|
||||
}
|
||||
},
|
||||
None => {
|
||||
let err_msg = "Missing X-Session-Id in upgrade protocol request".to_string();
|
||||
error!(err_msg);
|
||||
return NotaryServerError::BadProverRequest(err_msg).into_response();
|
||||
}
|
||||
};
|
||||
// Fetch the configuration data from the store using the session_id
|
||||
let max_transcript_size = match notary_globals.store.lock().await.get(&session_id) {
|
||||
Some(max_transcript_size) => max_transcript_size.to_owned(),
|
||||
None => {
|
||||
let err_msg = format!("Session id {} does not exist", session_id);
|
||||
error!(err_msg);
|
||||
return NotaryServerError::BadProverRequest(err_msg).into_response();
|
||||
}
|
||||
};
|
||||
// This completes the HTTP Upgrade request and returns a successful response to the client, meanwhile initiating the websocket or tcp connection
|
||||
match protocol_upgrade {
|
||||
ProtocolUpgrade::Ws(ws) => ws.on_upgrade(move |socket| {
|
||||
websocket_notarize(socket, notary_globals, session_id, max_transcript_size)
|
||||
}),
|
||||
ProtocolUpgrade::Tcp(tcp) => tcp.on_upgrade(move |stream| {
|
||||
tcp_notarize(stream, notary_globals, session_id, max_transcript_size)
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Handler to initialize and configure notarization for both TCP and WebSocket clients
|
||||
#[debug_handler(state = NotaryGlobals)]
|
||||
pub async fn initialize(
|
||||
State(notary_globals): State<NotaryGlobals>,
|
||||
payload: Result<Json<NotarizationSessionRequest>, JsonRejection>,
|
||||
) -> impl IntoResponse {
|
||||
info!(
|
||||
?payload,
|
||||
"Received request for initializing a notarization session"
|
||||
);
|
||||
|
||||
// Parse the body payload
|
||||
let payload = match payload {
|
||||
Ok(payload) => payload,
|
||||
Err(err) => {
|
||||
error!("Malformed payload submitted for initializing notarization: {err}");
|
||||
return NotaryServerError::BadProverRequest(err.to_string()).into_response();
|
||||
}
|
||||
};
|
||||
|
||||
// Ensure that the max_transcript_size submitted is not larger than the global max limit configured in notary server
|
||||
if payload.max_transcript_size > Some(notary_globals.notarization_config.max_transcript_size) {
|
||||
error!(
|
||||
"Max transcript size requested {:?} exceeds the maximum threshold {:?}",
|
||||
payload.max_transcript_size, notary_globals.notarization_config.max_transcript_size
|
||||
);
|
||||
return NotaryServerError::BadProverRequest(
|
||||
"Max transcript size requested exceeds the maximum threshold".to_string(),
|
||||
)
|
||||
.into_response();
|
||||
}
|
||||
|
||||
let prover_session_id = Uuid::new_v4().to_string();
|
||||
|
||||
// Store the configuration data in a temporary store
|
||||
notary_globals
|
||||
.store
|
||||
.lock()
|
||||
.await
|
||||
.insert(prover_session_id.clone(), payload.max_transcript_size);
|
||||
|
||||
trace!("Latest store state: {:?}", notary_globals.store);
|
||||
|
||||
// Return the session id in the response to the client
|
||||
(
|
||||
StatusCode::OK,
|
||||
Json(NotarizationSessionResponse {
|
||||
session_id: prover_session_id,
|
||||
}),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
|
||||
/// Run the notarization
|
||||
pub async fn notary_service<T: AsyncWrite + AsyncRead + Send + Unpin + 'static>(
|
||||
|
||||
@@ -1,32 +1,27 @@
|
||||
use async_trait::async_trait;
|
||||
use axum::{
|
||||
body,
|
||||
extract::{rejection::JsonRejection, FromRequestParts, State},
|
||||
extract::FromRequestParts,
|
||||
http::{header, request::Parts, HeaderValue, StatusCode},
|
||||
response::{IntoResponse, Json, Response},
|
||||
response::Response,
|
||||
};
|
||||
use axum_macros::debug_handler;
|
||||
use hyper::upgrade::{OnUpgrade, Upgraded};
|
||||
use std::future::Future;
|
||||
use tracing::{error, info, trace};
|
||||
use uuid::Uuid;
|
||||
use tracing::{debug, error, info};
|
||||
|
||||
use crate::{
|
||||
domain::notary::{NotarizationRequest, NotarizationResponse, NotaryGlobals},
|
||||
NotaryServerError,
|
||||
};
|
||||
use crate::{domain::notary::NotaryGlobals, service::notary_service, NotaryServerError};
|
||||
|
||||
/// Custom extractor used to extract underlying TCP connection for TCP client — using the same upgrade primitives used by
|
||||
/// the WebSocket implementation where the underlying TCP connection (wrapped in an Upgraded object) only gets polled as an OnUpgrade future
|
||||
/// after the ongoing HTTP request is finished (ref: https://github.com/tokio-rs/axum/blob/a6a849bb5b96a2f641fa077fe76f70ad4d20341c/axum/src/extract/ws.rs#L122)
|
||||
///
|
||||
/// More info on the upgrade primitives: https://docs.rs/hyper/latest/hyper/upgrade/index.html
|
||||
pub struct TcpConnectionExtractor {
|
||||
pub struct TcpUpgrade {
|
||||
pub on_upgrade: OnUpgrade,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<S> FromRequestParts<S> for TcpConnectionExtractor
|
||||
impl<S> FromRequestParts<S> for TcpUpgrade
|
||||
where
|
||||
S: Send + Sync,
|
||||
{
|
||||
@@ -45,7 +40,10 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl TcpConnectionExtractor {
|
||||
impl TcpUpgrade {
|
||||
/// Utility function to complete the http upgrade protocol by
|
||||
/// (1) Return 101 switching protocol response to client to indicate the switching to TCP
|
||||
/// (2) Spawn a new thread to await on the OnUpgrade object to claim the underlying TCP connection
|
||||
pub fn on_upgrade<C, Fut>(self, callback: C) -> Response
|
||||
where
|
||||
C: FnOnce(Upgraded) -> Fut + Send + 'static,
|
||||
@@ -77,52 +75,27 @@ impl TcpConnectionExtractor {
|
||||
}
|
||||
}
|
||||
|
||||
/// Handler to configure notarization for both TCP and WebSocket clients, as well as to trigger notarization for TCP client
|
||||
#[debug_handler(state = NotaryGlobals)]
|
||||
pub async fn initiate(
|
||||
State(notary_globals): State<NotaryGlobals>,
|
||||
payload: Result<Json<NotarizationRequest>, JsonRejection>,
|
||||
) -> impl IntoResponse {
|
||||
info!(?payload, "Received request for notarization");
|
||||
|
||||
// Parse the body payload
|
||||
let payload = match payload {
|
||||
Ok(payload) => payload,
|
||||
Err(err) => {
|
||||
error!("Malformed payload submitted for notarization: {err}");
|
||||
return NotaryServerError::BadProverRequest(err.to_string()).into_response();
|
||||
}
|
||||
};
|
||||
|
||||
// Ensure that the max_transcript_size submitted is not larger than the global max limit configured in notary server
|
||||
if payload.max_transcript_size > Some(notary_globals.notarization_config.max_transcript_size) {
|
||||
error!(
|
||||
"Max transcript size requested {:?} exceeds the maximum threshold {:?}",
|
||||
payload.max_transcript_size, notary_globals.notarization_config.max_transcript_size
|
||||
);
|
||||
return NotaryServerError::BadProverRequest(
|
||||
"Max transcript size requested exceeds the maximum threshold".to_string(),
|
||||
)
|
||||
.into_response();
|
||||
}
|
||||
|
||||
let prover_session_id = Uuid::new_v4().to_string();
|
||||
|
||||
// Store the configuration data in a temporary store, currently mainly used for websocket clients
|
||||
notary_globals
|
||||
.store
|
||||
.lock()
|
||||
.await
|
||||
.insert(prover_session_id.clone(), payload.max_transcript_size);
|
||||
|
||||
trace!("Latest store state: {:?}", notary_globals.store);
|
||||
|
||||
// Return the session id in the response to the client
|
||||
(
|
||||
StatusCode::OK,
|
||||
Json(NotarizationResponse {
|
||||
session_id: prover_session_id,
|
||||
}),
|
||||
/// Perform notarization using the extracted tcp connection
|
||||
pub async fn tcp_notarize(
|
||||
stream: Upgraded,
|
||||
notary_globals: NotaryGlobals,
|
||||
session_id: String,
|
||||
max_transcript_size: Option<usize>,
|
||||
) {
|
||||
debug!(?session_id, "Upgraded to tcp connection");
|
||||
match notary_service(
|
||||
stream,
|
||||
¬ary_globals.notary_signing_key,
|
||||
&session_id,
|
||||
max_transcript_size,
|
||||
)
|
||||
.into_response()
|
||||
.await
|
||||
{
|
||||
Ok(_) => {
|
||||
info!(?session_id, "Successful notarization using tcp!");
|
||||
}
|
||||
Err(err) => {
|
||||
error!(?session_id, "Failed notarization using tcp: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,108 +1,13 @@
|
||||
use async_trait::async_trait;
|
||||
use axum::{
|
||||
extract::{FromRequestParts, State},
|
||||
http::{header, request::Parts, HeaderMap},
|
||||
response::{IntoResponse, Response},
|
||||
};
|
||||
use hyper::upgrade::Upgraded;
|
||||
use tracing::{debug, error, info};
|
||||
use ws_stream_tungstenite::WsStream;
|
||||
|
||||
use crate::{
|
||||
axum_websocket::{header_eq, WebSocket, WebSocketUpgrade},
|
||||
domain::notary::NotaryGlobals,
|
||||
error::NotaryServerError,
|
||||
service::{notary_service, tcp::TcpConnectionExtractor},
|
||||
service::{axum_websocket::WebSocket, notary_service},
|
||||
};
|
||||
|
||||
pub struct ProtocolExtractor {
|
||||
pub tcp_extractor: Option<TcpConnectionExtractor>,
|
||||
pub ws_extractor: Option<WebSocketUpgrade>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<S> FromRequestParts<S> for ProtocolExtractor
|
||||
where
|
||||
S: Send + Sync,
|
||||
{
|
||||
type Rejection = NotaryServerError;
|
||||
|
||||
async fn from_request_parts(parts: &mut Parts, state: &S) -> Result<Self, Self::Rejection> {
|
||||
if header_eq(&parts.headers, header::UPGRADE, "websocket") {
|
||||
let ws_extractor = WebSocketUpgrade::from_request_parts(parts, state)
|
||||
.await
|
||||
.map_err(|err| NotaryServerError::BadProverRequest(err.to_string()))?;
|
||||
return Ok(Self {
|
||||
tcp_extractor: None,
|
||||
ws_extractor: Some(ws_extractor),
|
||||
});
|
||||
} else if header_eq(&parts.headers, header::UPGRADE, "tcp") {
|
||||
let tcp_extractor = TcpConnectionExtractor::from_request_parts(parts, state)
|
||||
.await
|
||||
.map_err(|err| NotaryServerError::BadProverRequest(err.to_string()))?;
|
||||
return Ok(Self {
|
||||
tcp_extractor: Some(tcp_extractor),
|
||||
ws_extractor: None,
|
||||
});
|
||||
} else {
|
||||
return Err(NotaryServerError::BadProverRequest(
|
||||
"Upgrade header is not set for client".to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Handler to upgade websocket connection from http — the session_id header is also extracted here
|
||||
/// to fetch the configuration parameters that have been submitted in the previous request to /notarize made by
|
||||
/// the same websocket client
|
||||
pub async fn switch_protocol(
|
||||
protocol: ProtocolExtractor,
|
||||
mut headers: HeaderMap,
|
||||
State(notary_globals): State<NotaryGlobals>,
|
||||
) -> Response {
|
||||
info!("Received switch protocol request");
|
||||
// Extract the session_id from the headers
|
||||
let session_id = match headers.remove("X-Session-Id") {
|
||||
Some(session_id) => match session_id.to_str() {
|
||||
Ok(session_id) => session_id.to_string(),
|
||||
Err(err) => {
|
||||
let err_msg = format!("X-Session-Id header submitted is not a string: {}", err);
|
||||
error!(err_msg);
|
||||
return NotaryServerError::BadProverRequest(err_msg).into_response();
|
||||
}
|
||||
},
|
||||
None => {
|
||||
let err_msg = "Missing X-Session-Id in connection request".to_string();
|
||||
error!(err_msg);
|
||||
return NotaryServerError::BadProverRequest(err_msg).into_response();
|
||||
}
|
||||
};
|
||||
// Fetch the configuration data from the store using the session_id
|
||||
let max_transcript_size = match notary_globals.store.lock().await.get(&session_id) {
|
||||
Some(max_transcript_size) => max_transcript_size.to_owned(),
|
||||
None => {
|
||||
let err_msg = format!("Session id {} does not exist", session_id);
|
||||
error!(err_msg);
|
||||
return NotaryServerError::BadProverRequest(err_msg).into_response();
|
||||
}
|
||||
};
|
||||
// This completes the HTTP Upgrade request and returns a successful response to the client, meanwhile initiating the websocket connection
|
||||
if let Some(ws) = protocol.ws_extractor {
|
||||
#[allow(clippy::needless_return)]
|
||||
return ws.on_upgrade(move |socket| {
|
||||
websocket_notarize(socket, notary_globals, session_id, max_transcript_size)
|
||||
});
|
||||
} else if let Some(tcp) = protocol.tcp_extractor {
|
||||
return tcp.on_upgrade(move |stream| {
|
||||
tcp_notarize(stream, notary_globals, session_id, max_transcript_size)
|
||||
});
|
||||
} else {
|
||||
unreachable!();
|
||||
}
|
||||
}
|
||||
|
||||
/// Perform notarization using the established websocket connection
|
||||
async fn websocket_notarize(
|
||||
pub async fn websocket_notarize(
|
||||
socket: WebSocket,
|
||||
notary_globals: NotaryGlobals,
|
||||
session_id: String,
|
||||
@@ -127,29 +32,3 @@ async fn websocket_notarize(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Perform notarization using the established websocket connection
|
||||
async fn tcp_notarize(
|
||||
stream: Upgraded,
|
||||
notary_globals: NotaryGlobals,
|
||||
session_id: String,
|
||||
max_transcript_size: Option<usize>,
|
||||
) {
|
||||
debug!(?session_id, "Upgraded to tcp connection");
|
||||
// Wrap the websocket in WsStream so that we have AsyncRead and AsyncWrite implemented
|
||||
match notary_service(
|
||||
stream,
|
||||
¬ary_globals.notary_signing_key,
|
||||
&session_id,
|
||||
max_transcript_size,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(_) => {
|
||||
info!(?session_id, "Successful notarization using tcp!");
|
||||
}
|
||||
Err(err) => {
|
||||
error!(?session_id, "Failed notarization using tcp: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,9 +22,9 @@ use tracing::debug;
|
||||
use ws_stream_tungstenite::WsStream;
|
||||
|
||||
use notary_server::{
|
||||
read_pem_file, run_server, NotarizationProperties, NotarizationRequest, NotarizationResponse,
|
||||
NotaryServerProperties, NotarySignatureProperties, ServerProperties, TLSSignatureProperties,
|
||||
TracingProperties,
|
||||
read_pem_file, run_server, NotarizationProperties, NotarizationSessionRequest,
|
||||
NotarizationSessionResponse, NotaryServerProperties, NotarySignatureProperties,
|
||||
ServerProperties, TLSSignatureProperties, TracingProperties,
|
||||
};
|
||||
|
||||
const NOTARY_CA_CERT_PATH: &str = "./src/fixture/tls/rootCA.crt";
|
||||
@@ -107,9 +107,7 @@ async fn test_tcp_prover() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Attach the hyper HTTP client to the notary TLS connection to send notarization request via HTTP to the /notarize endpoint
|
||||
// 1. Use HTTP to send configuration data e.g. max transcript size and obtain notarization session id from server
|
||||
// 2. Use underlying TCP connection to perform notarization
|
||||
// Attach the hyper HTTP client to the notary TLS connection to send request to the /session endpoint to configure notarization and obtain session id
|
||||
let (mut request_sender, connection) = hyper::client::conn::handshake(notary_tls_socket)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -118,13 +116,13 @@ async fn test_tcp_prover() {
|
||||
let connection_task = tokio::spawn(connection.without_shutdown());
|
||||
|
||||
// Build the HTTP request to configure notarization
|
||||
let payload = serde_json::to_string(&NotarizationRequest {
|
||||
let payload = serde_json::to_string(&NotarizationSessionRequest {
|
||||
client_type: notary_server::ClientType::Tcp,
|
||||
max_transcript_size: Some(notary_config.notarization.max_transcript_size),
|
||||
})
|
||||
.unwrap();
|
||||
let request = Request::builder()
|
||||
.uri(format!("https://{notary_domain}:{notary_port}/initialize"))
|
||||
.uri(format!("https://{notary_domain}:{notary_port}/session"))
|
||||
.method("POST")
|
||||
.header("Host", notary_domain.clone())
|
||||
// Need to specify application/json for axum to parse it as json
|
||||
@@ -145,10 +143,12 @@ async fn test_tcp_prover() {
|
||||
// Pretty printing :)
|
||||
let payload = to_bytes(response.into_body()).await.unwrap().to_vec();
|
||||
let notarization_response =
|
||||
serde_json::from_str::<NotarizationResponse>(&String::from_utf8_lossy(&payload)).unwrap();
|
||||
serde_json::from_str::<NotarizationSessionResponse>(&String::from_utf8_lossy(&payload))
|
||||
.unwrap();
|
||||
|
||||
debug!("Notarization response: {:?}", notarization_response,);
|
||||
|
||||
// Send notarization request via HTTP, where the underlying TCP connection will be extracted later
|
||||
let request = Request::builder()
|
||||
.uri(format!("https://{notary_domain}:{notary_port}/notarize"))
|
||||
.method("GET")
|
||||
@@ -156,8 +156,6 @@ async fn test_tcp_prover() {
|
||||
.header("Connection", "Upgrade")
|
||||
// Need to specify this upgrade header for server to extract tcp connection later
|
||||
.header("Upgrade", "TCP")
|
||||
// Need to specify application/json for axum to parse it as json
|
||||
.header("Content-Type", "application/json")
|
||||
// Need to specify the session_id so that notary server knows the right configuration to use
|
||||
// as the configuration is set in the previous HTTP call
|
||||
.header("X-Session-Id", notarization_response.session_id.clone())
|
||||
@@ -277,7 +275,7 @@ async fn test_websocket_prover() {
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
// Call the /notarize HTTP API to configure notarization and obtain session id
|
||||
// Call the /session HTTP API to configure notarization and obtain session id
|
||||
let mut hyper_http_connector = HttpConnector::new();
|
||||
hyper_http_connector.enforce_http(false);
|
||||
let mut hyper_tls_connector =
|
||||
@@ -286,14 +284,14 @@ async fn test_websocket_prover() {
|
||||
let https_client = Client::builder().build::<_, hyper::Body>(hyper_tls_connector);
|
||||
|
||||
// Build the HTTP request to configure notarization
|
||||
let payload = serde_json::to_string(&NotarizationRequest {
|
||||
let payload = serde_json::to_string(&NotarizationSessionRequest {
|
||||
client_type: notary_server::ClientType::Websocket,
|
||||
max_transcript_size: Some(notary_config.notarization.max_transcript_size),
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let request = Request::builder()
|
||||
.uri(format!("https://{notary_domain}:{notary_port}/initialize"))
|
||||
.uri(format!("https://{notary_domain}:{notary_port}/session"))
|
||||
.method("POST")
|
||||
.header("Host", notary_domain.clone())
|
||||
// Need to specify application/json for axum to parse it as json
|
||||
@@ -314,7 +312,8 @@ async fn test_websocket_prover() {
|
||||
// Pretty printing :)
|
||||
let payload = to_bytes(response.into_body()).await.unwrap().to_vec();
|
||||
let notarization_response =
|
||||
serde_json::from_str::<NotarizationResponse>(&String::from_utf8_lossy(&payload)).unwrap();
|
||||
serde_json::from_str::<NotarizationSessionResponse>(&String::from_utf8_lossy(&payload))
|
||||
.unwrap();
|
||||
|
||||
debug!("Notarization response: {:?}", notarization_response,);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user