mirror of
https://github.com/scroll-tech/scroll.git
synced 2026-04-23 03:00:50 -04:00
wip
This commit is contained in:
@@ -18,13 +18,14 @@ version = "4.5.47"
|
||||
|
||||
[workspace.dependencies]
|
||||
# include compatiblity fixing from "fix/coordinator"
|
||||
scroll-zkvm-prover = { git = "https://github.com/scroll-tech/zkvm-prover", rev = "a71dd2b" }
|
||||
scroll-zkvm-verifier = { git = "https://github.com/scroll-tech/zkvm-prover", rev = "a71dd2b" }
|
||||
scroll-zkvm-types = { git = "https://github.com/scroll-tech/zkvm-prover", rev = "a71dd2b" }
|
||||
scroll-zkvm-prover = { git = "https://github.com/scroll-tech/zkvm-prover", branch = "feat/axiom_cli_sdk" }
|
||||
scroll-zkvm-verifier = { git = "https://github.com/scroll-tech/zkvm-prover", branch = "feat/axiom_cli_sdk" }
|
||||
scroll-zkvm-types = { git = "https://github.com/scroll-tech/zkvm-prover", branch = "feat/axiom_cli_sdk", features = ["scroll"] }
|
||||
|
||||
sbv-primitives = { git = "https://github.com/scroll-tech/stateless-block-verifier", branch = "master", features = ["scroll", "rkyv"] }
|
||||
sbv-utils = { git = "https://github.com/scroll-tech/stateless-block-verifier", branch = "master" }
|
||||
sbv-core = { git = "https://github.com/scroll-tech/stateless-block-verifier", branch = "master", features = ["scroll"] }
|
||||
axiom-sdk = { git = "https://github.com/axiom-crypto/axiom-api-cli.git", branch = "feat/upload-exe-raw" }
|
||||
|
||||
metrics = "0.23.0"
|
||||
metrics-util = "0.17"
|
||||
@@ -36,6 +37,7 @@ alloy-primitives = { version = "1.3", default-features = false, features = ["tin
|
||||
# also use this to trigger "serde" feature for primitives
|
||||
alloy-serde = { version = "1", default-features = false }
|
||||
|
||||
jiff = "0.2"
|
||||
serde = { version = "1", default-features = false, features = ["derive"] }
|
||||
serde_json = { version = "1.0" }
|
||||
serde_derive = "1.0"
|
||||
|
||||
@@ -6,6 +6,7 @@ edition.workspace = true
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
axiom-sdk.workspace = true
|
||||
scroll-zkvm-types.workspace = true
|
||||
scroll-zkvm-prover.workspace = true
|
||||
scroll-proving-sdk = { git = "https://github.com/scroll-tech/scroll-proving-sdk.git", rev = "05648db" }
|
||||
@@ -19,11 +20,12 @@ eyre.workspace = true
|
||||
futures = "0.3.30"
|
||||
futures-util = "0.3"
|
||||
|
||||
reqwest = { version = "0.12.4", features = ["gzip", "stream"] }
|
||||
reqwest = { version = "0.12", features = ["gzip", "stream"] }
|
||||
reqwest-middleware = "0.3"
|
||||
reqwest-retry = "0.5"
|
||||
hex = "0.4.3"
|
||||
|
||||
jiff.workspace = true
|
||||
rand = "0.8.5"
|
||||
tokio = "1.37.0"
|
||||
async-trait = "0.1"
|
||||
@@ -36,4 +38,4 @@ serde_bytes = "0.11.15"
|
||||
|
||||
[features]
|
||||
default = []
|
||||
cuda = ["scroll-zkvm-prover/cuda"]
|
||||
cuda = ["scroll-zkvm-prover/cuda"]
|
||||
|
||||
@@ -1,327 +1,5 @@
|
||||
use crate::zk_circuits_handler::{universal::UniversalHandler, CircuitsHandler};
|
||||
use async_trait::async_trait;
|
||||
use eyre::Result;
|
||||
use scroll_proving_sdk::{
|
||||
config::Config as SdkConfig,
|
||||
prover::{
|
||||
proving_service::{
|
||||
GetVkRequest, GetVkResponse, ProveRequest, ProveResponse, QueryTaskRequest,
|
||||
QueryTaskResponse, TaskStatus,
|
||||
},
|
||||
types::ProofType,
|
||||
ProvingService,
|
||||
},
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
fs::File,
|
||||
path::{Path, PathBuf},
|
||||
sync::{Arc, LazyLock},
|
||||
time::{SystemTime, UNIX_EPOCH},
|
||||
};
|
||||
use tokio::{runtime::Handle, sync::Mutex, task::JoinHandle};
|
||||
mod local;
|
||||
pub use local::{LocalProver, LocalProverConfig};
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
pub struct AssetsLocationData {
|
||||
/// the base url to form a general downloading url for an asset, MUST HAVE A TRAILING SLASH
|
||||
pub base_url: url::Url,
|
||||
#[serde(default)]
|
||||
/// a altered url for specififed vk
|
||||
pub asset_detours: HashMap<String, url::Url>,
|
||||
}
|
||||
|
||||
impl AssetsLocationData {
|
||||
pub fn gen_asset_url(&self, vk_as_path: &str, proof_type: ProofType) -> Result<url::Url> {
|
||||
Ok(self.base_url.join(
|
||||
match proof_type {
|
||||
ProofType::Chunk => format!("chunk/{vk_as_path}/"),
|
||||
ProofType::Batch => format!("batch/{vk_as_path}/"),
|
||||
ProofType::Bundle => format!("bundle/{vk_as_path}/"),
|
||||
t => eyre::bail!("unrecognized proof type: {}", t as u8),
|
||||
}
|
||||
.as_str(),
|
||||
)?)
|
||||
}
|
||||
|
||||
pub fn validate(&self) -> Result<()> {
|
||||
if !self.base_url.path().ends_with('/') {
|
||||
eyre::bail!(
|
||||
"base_url must have a trailing slash, got: {}",
|
||||
self.base_url
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_asset(
|
||||
&self,
|
||||
vk: &str,
|
||||
url_base: &url::Url,
|
||||
base_path: impl AsRef<Path>,
|
||||
) -> Result<PathBuf> {
|
||||
let download_files = ["app.vmexe", "openvm.toml"];
|
||||
|
||||
// Step 1: Create a local path for storage
|
||||
let storage_path = base_path.as_ref().join(vk);
|
||||
std::fs::create_dir_all(&storage_path)?;
|
||||
|
||||
// Step 2 & 3: Download each file if needed
|
||||
let client = reqwest::Client::new();
|
||||
|
||||
for filename in download_files.iter() {
|
||||
let local_file_path = storage_path.join(filename);
|
||||
let download_url = url_base.join(filename)?;
|
||||
|
||||
// Check if file already exists
|
||||
if local_file_path.exists() {
|
||||
// Get file metadata to check size
|
||||
if let Ok(metadata) = std::fs::metadata(&local_file_path) {
|
||||
// Make a HEAD request to get remote file size
|
||||
|
||||
if let Ok(head_resp) = client.head(download_url.clone()).send().await {
|
||||
if let Some(content_length) = head_resp.headers().get("content-length") {
|
||||
if let Ok(remote_size) =
|
||||
content_length.to_str().unwrap_or("0").parse::<u64>()
|
||||
{
|
||||
// If sizes match, skip download
|
||||
if metadata.len() == remote_size {
|
||||
println!("File {} already exists with matching size, skipping download", filename);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
println!("Downloading {} from {}", filename, download_url);
|
||||
|
||||
let response = client.get(download_url).send().await?;
|
||||
if !response.status().is_success() {
|
||||
eyre::bail!(
|
||||
"Failed to download {}: HTTP status {}",
|
||||
filename,
|
||||
response.status()
|
||||
);
|
||||
}
|
||||
|
||||
// Stream the content directly to file instead of loading into memory
|
||||
let mut file = std::fs::File::create(&local_file_path)?;
|
||||
let mut stream = response.bytes_stream();
|
||||
|
||||
use futures_util::StreamExt;
|
||||
while let Some(chunk) = stream.next().await {
|
||||
std::io::Write::write_all(&mut file, &chunk?)?;
|
||||
}
|
||||
}
|
||||
|
||||
// Step 4: Return the storage path
|
||||
Ok(storage_path)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
pub struct LocalProverConfig {
|
||||
pub sdk_config: SdkConfig,
|
||||
pub circuits: HashMap<String, CircuitConfig>,
|
||||
}
|
||||
|
||||
impl LocalProverConfig {
|
||||
pub fn from_reader<R>(reader: R) -> Result<Self>
|
||||
where
|
||||
R: std::io::Read,
|
||||
{
|
||||
serde_json::from_reader(reader).map_err(|e| eyre::eyre!(e))
|
||||
}
|
||||
|
||||
pub fn from_file(file_name: String) -> Result<Self> {
|
||||
let file = File::open(file_name)?;
|
||||
Self::from_reader(&file)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
pub struct CircuitConfig {
|
||||
pub hard_fork_name: String,
|
||||
/// The path to save assets for a specified hard fork phase
|
||||
pub workspace_path: String,
|
||||
#[serde(flatten)]
|
||||
/// The location data for dynamic loading
|
||||
pub location_data: AssetsLocationData,
|
||||
/// cached vk value to save some initial cost, for debugging only
|
||||
#[serde(default)]
|
||||
pub vks: HashMap<ProofType, String>,
|
||||
}
|
||||
|
||||
pub struct LocalProver {
|
||||
config: LocalProverConfig,
|
||||
next_task_id: u64,
|
||||
current_task: Option<JoinHandle<Result<String>>>,
|
||||
|
||||
handlers: HashMap<String, Arc<dyn CircuitsHandler>>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ProvingService for LocalProver {
|
||||
fn is_local(&self) -> bool {
|
||||
true
|
||||
}
|
||||
async fn get_vks(&self, _: GetVkRequest) -> GetVkResponse {
|
||||
// get vk has been deprecated in new prover with dynamic asset loading scheme
|
||||
GetVkResponse {
|
||||
vks: vec![],
|
||||
error: None,
|
||||
}
|
||||
}
|
||||
async fn prove(&mut self, req: ProveRequest) -> ProveResponse {
|
||||
match self.do_prove(req).await {
|
||||
Ok(resp) => resp,
|
||||
Err(e) => ProveResponse {
|
||||
status: TaskStatus::Failed,
|
||||
error: Some(format!("failed to request proof: {}", e)),
|
||||
..Default::default()
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
async fn query_task(&mut self, req: QueryTaskRequest) -> QueryTaskResponse {
|
||||
if let Some(handle) = &mut self.current_task {
|
||||
if handle.is_finished() {
|
||||
return match handle.await {
|
||||
Ok(Ok(proof)) => QueryTaskResponse {
|
||||
task_id: req.task_id,
|
||||
status: TaskStatus::Success,
|
||||
proof: Some(proof),
|
||||
..Default::default()
|
||||
},
|
||||
Ok(Err(e)) => QueryTaskResponse {
|
||||
task_id: req.task_id,
|
||||
status: TaskStatus::Failed,
|
||||
error: Some(format!("proving task failed: {}", e)),
|
||||
..Default::default()
|
||||
},
|
||||
Err(e) => QueryTaskResponse {
|
||||
task_id: req.task_id,
|
||||
status: TaskStatus::Failed,
|
||||
error: Some(format!("proving task panicked: {}", e)),
|
||||
..Default::default()
|
||||
},
|
||||
};
|
||||
} else {
|
||||
return QueryTaskResponse {
|
||||
task_id: req.task_id,
|
||||
status: TaskStatus::Proving,
|
||||
..Default::default()
|
||||
};
|
||||
}
|
||||
}
|
||||
// If no handle is found
|
||||
QueryTaskResponse {
|
||||
task_id: req.task_id,
|
||||
status: TaskStatus::Failed,
|
||||
error: Some("no proving task is running".to_string()),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static GLOBAL_ASSET_URLS: LazyLock<HashMap<String, HashMap<String, url::Url>>> =
|
||||
LazyLock::new(|| {
|
||||
const ASSETS_JSON: &str = include_str!("../assets_url_preset.json");
|
||||
serde_json::from_str(ASSETS_JSON).expect("Failed to parse assets_url_preset.json")
|
||||
});
|
||||
|
||||
impl LocalProver {
|
||||
pub fn new(mut config: LocalProverConfig) -> Self {
|
||||
for (fork_name, circuit_config) in config.circuits.iter_mut() {
|
||||
// validate each base url
|
||||
circuit_config.location_data.validate().unwrap();
|
||||
let mut template_url_mapping = GLOBAL_ASSET_URLS
|
||||
.get(&fork_name.to_lowercase())
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
|
||||
// apply default settings in template
|
||||
for (key, url) in circuit_config.location_data.asset_detours.drain() {
|
||||
template_url_mapping.insert(key, url);
|
||||
}
|
||||
|
||||
circuit_config.location_data.asset_detours = template_url_mapping;
|
||||
|
||||
// validate each detours url
|
||||
for url in circuit_config.location_data.asset_detours.values() {
|
||||
assert!(
|
||||
url.path().ends_with('/'),
|
||||
"url {} must be end with /",
|
||||
url.as_str()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Self {
|
||||
config,
|
||||
next_task_id: 0,
|
||||
current_task: None,
|
||||
handlers: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn do_prove(&mut self, req: ProveRequest) -> Result<ProveResponse> {
|
||||
self.next_task_id += 1;
|
||||
let duration = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
|
||||
let created_at = duration.as_secs() as f64 + duration.subsec_nanos() as f64 * 1e-9;
|
||||
|
||||
let prover_task = UniversalHandler::get_task_from_input(&req.input)?;
|
||||
let vk = hex::encode(&prover_task.vk);
|
||||
let handler = if let Some(handler) = self.handlers.get(&vk) {
|
||||
handler.clone()
|
||||
} else {
|
||||
let base_config = self
|
||||
.config
|
||||
.circuits
|
||||
.get(&req.hard_fork_name)
|
||||
.ok_or_else(|| {
|
||||
eyre::eyre!(
|
||||
"coordinator sent unexpected forkname {}",
|
||||
req.hard_fork_name
|
||||
)
|
||||
})?;
|
||||
let url_base = if let Some(url) = base_config.location_data.asset_detours.get(&vk) {
|
||||
url.clone()
|
||||
} else {
|
||||
base_config
|
||||
.location_data
|
||||
.gen_asset_url(&vk, req.proof_type)?
|
||||
};
|
||||
let asset_path = base_config
|
||||
.location_data
|
||||
.get_asset(&vk, &url_base, &base_config.workspace_path)
|
||||
.await?;
|
||||
let circuits_handler = Arc::new(Mutex::new(UniversalHandler::new(
|
||||
&asset_path,
|
||||
req.proof_type,
|
||||
)?));
|
||||
self.handlers.insert(vk, circuits_handler.clone());
|
||||
circuits_handler
|
||||
};
|
||||
|
||||
let handle = Handle::current();
|
||||
let is_evm = req.proof_type == ProofType::Bundle;
|
||||
let task_handle = tokio::task::spawn_blocking(move || {
|
||||
handle.block_on(handler.get_proof_data(&prover_task, is_evm))
|
||||
});
|
||||
self.current_task = Some(task_handle);
|
||||
|
||||
Ok(ProveResponse {
|
||||
task_id: self.next_task_id.to_string(),
|
||||
proof_type: req.proof_type,
|
||||
circuit_version: req.circuit_version,
|
||||
hard_fork_name: req.hard_fork_name,
|
||||
status: TaskStatus::Proving,
|
||||
created_at,
|
||||
input: Some(req.input),
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
}
|
||||
mod axiom;
|
||||
pub use axiom::{AxiomProver, AxiomProverConfig};
|
||||
|
||||
243
crates/prover-bin/src/prover/axiom.rs
Normal file
243
crates/prover-bin/src/prover/axiom.rs
Normal file
@@ -0,0 +1,243 @@
|
||||
use crate::zk_circuits_handler::universal::UniversalHandler;
|
||||
use async_trait::async_trait;
|
||||
use axiom_sdk::{
|
||||
build::BuildSdk,
|
||||
input::Input as AxiomInput,
|
||||
prove::{ProveArgs, ProveSdk},
|
||||
AxiomConfig, AxiomSdk, ProofType as AxiomProofType, SaveOption,
|
||||
};
|
||||
use eyre::Context;
|
||||
use jiff::Timestamp;
|
||||
use scroll_proving_sdk::{
|
||||
config::Config as SdkConfig,
|
||||
prover::{
|
||||
proving_service::{
|
||||
GetVkRequest, GetVkResponse, ProveRequest, ProveResponse, QueryTaskRequest,
|
||||
QueryTaskResponse, TaskStatus,
|
||||
},
|
||||
ProofType, ProvingService,
|
||||
},
|
||||
};
|
||||
use scroll_zkvm_types::proof::{OpenVmEvmProof, OpenVmVersionedVmStarkProof, ProofEnum};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
pub struct AxiomProverConfig {
|
||||
pub api_key: String,
|
||||
pub sdk_config: SdkConfig,
|
||||
// vk to program mapping
|
||||
pub program: HashMap<String, AxiomProgram>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
pub struct AxiomProgram {
|
||||
pub program_id: String,
|
||||
pub config_id: String,
|
||||
}
|
||||
|
||||
pub struct AxiomProver {
|
||||
config: Arc<AxiomProverConfig>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ProvingService for AxiomProver {
|
||||
fn is_local(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
async fn get_vks(&self, _: GetVkRequest) -> GetVkResponse {
|
||||
// get vk has been deprecated in new prover with dynamic asset loading scheme
|
||||
GetVkResponse {
|
||||
vks: vec![],
|
||||
error: None,
|
||||
}
|
||||
}
|
||||
|
||||
async fn prove(&mut self, req: ProveRequest) -> ProveResponse {
|
||||
self.prove_inner(req)
|
||||
.await
|
||||
.unwrap_or_else(|e| ProveResponse {
|
||||
status: TaskStatus::Failed,
|
||||
error: Some(format!("failed to submit proof task to axiom: {}", e)),
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
|
||||
async fn query_task(&mut self, req: QueryTaskRequest) -> QueryTaskResponse {
|
||||
let task_id = req.task_id.clone();
|
||||
self.query_task_inner(req)
|
||||
.await
|
||||
.unwrap_or_else(|e| QueryTaskResponse {
|
||||
task_id,
|
||||
status: TaskStatus::Failed,
|
||||
error: Some(format!("failed to query axiom task: {}", e)),
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl AxiomProver {
|
||||
pub fn new(config: impl Into<Arc<AxiomProverConfig>>) -> Self {
|
||||
Self {
|
||||
config: config.into(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn make_axiom_request<R: Send + 'static>(
|
||||
&self,
|
||||
config_id: Option<String>,
|
||||
req: impl FnOnce(AxiomSdk) -> eyre::Result<R> + Send + 'static,
|
||||
) -> eyre::Result<R> {
|
||||
let api_key = self.config.api_key.clone();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let config = AxiomConfig {
|
||||
api_key: Some(api_key),
|
||||
config_id,
|
||||
..Default::default()
|
||||
};
|
||||
let sdk = AxiomSdk::new(config);
|
||||
req(sdk)
|
||||
})
|
||||
.await
|
||||
.context("failed to join axiom request")
|
||||
.flatten()
|
||||
}
|
||||
|
||||
fn get_program(&self, vk: &[u8]) -> eyre::Result<AxiomProgram> {
|
||||
let vk = hex::encode(vk);
|
||||
self.config
|
||||
.program
|
||||
.get(vk.as_str())
|
||||
.cloned()
|
||||
.ok_or_else(|| eyre::eyre!("no axiom program configured for vk: {vk}"))
|
||||
}
|
||||
|
||||
async fn prove_inner(&mut self, req: ProveRequest) -> eyre::Result<ProveResponse> {
|
||||
let prover_task = UniversalHandler::get_task_from_input(&req.input)?;
|
||||
|
||||
let program = self.get_program(&prover_task.vk)?;
|
||||
|
||||
let input = serde_json::to_value(prover_task.build_openvm_input())?;
|
||||
let proof_type = if req.proof_type == ProofType::Bundle {
|
||||
AxiomProofType::Evm
|
||||
} else {
|
||||
AxiomProofType::Stark
|
||||
};
|
||||
|
||||
let mut response = ProveResponse {
|
||||
proof_type: req.proof_type,
|
||||
created_at: Timestamp::now().as_duration().as_secs_f64(),
|
||||
status: TaskStatus::Queued,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
response.task_id = self
|
||||
.make_axiom_request(Some(program.config_id), move |sdk| {
|
||||
sdk.generate_new_proof(ProveArgs {
|
||||
program_id: Some(program.program_id.clone()),
|
||||
input: Some(AxiomInput::Value(input)),
|
||||
proof_type: Some(proof_type),
|
||||
num_gpus: None,
|
||||
priority: None,
|
||||
})
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
async fn query_task_inner(&mut self, req: QueryTaskRequest) -> eyre::Result<QueryTaskResponse> {
|
||||
let mut response = QueryTaskResponse {
|
||||
task_id: req.task_id.clone(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let task_id = req.task_id.clone();
|
||||
|
||||
let (status, proof_type, proof) = self
|
||||
.make_axiom_request(None, move |sdk| {
|
||||
let status = sdk.get_proof_status(&task_id)?;
|
||||
|
||||
let program_status = sdk.get_build_status(&status.program_uuid)?;
|
||||
let proof_type = match program_status.name.as_str() {
|
||||
"chunk" => ProofType::Chunk,
|
||||
"batch" => ProofType::Batch,
|
||||
"bundle" => ProofType::Bundle,
|
||||
_ => {
|
||||
return Err(eyre::eyre!("unrecognized program in: {program_status:#?}",));
|
||||
}
|
||||
};
|
||||
|
||||
let axiom_proof_type: AxiomProofType = status.proof_type.parse()?;
|
||||
let proof = if status.state == "Succeeded" {
|
||||
Some(sdk.get_generated_proof(
|
||||
&status.id,
|
||||
&axiom_proof_type,
|
||||
SaveOption::DoNotSave,
|
||||
)?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Ok((status, proof_type, proof))
|
||||
})
|
||||
.await?;
|
||||
|
||||
// Queued, Executing, Executed, AppProving, AppProvingDone, PostProcessing, Failed,
|
||||
// Succeeded
|
||||
response.status = match status.state.as_str() {
|
||||
"Queued" => TaskStatus::Queued,
|
||||
"Executing" | "Executed" | "AppProving" | "AppProvingDone" | "PostProcessing" => {
|
||||
TaskStatus::Proving
|
||||
}
|
||||
"Succeeded" => TaskStatus::Success,
|
||||
"Failed" => TaskStatus::Failed,
|
||||
other => {
|
||||
return Err(eyre::eyre!("unrecognized axiom task status: {other}"));
|
||||
}
|
||||
};
|
||||
|
||||
response.proof_type = proof_type;
|
||||
|
||||
let created_at: Timestamp = status.created_at.parse()?;
|
||||
response.created_at = created_at.as_duration().as_secs_f64();
|
||||
if let Some(launched_at) = status.launched_at {
|
||||
let started_at: Timestamp = launched_at.parse()?;
|
||||
let started_at = started_at.as_duration();
|
||||
response.started_at = Some(started_at.as_secs_f64());
|
||||
|
||||
if let Some(terminated_at) = status.terminated_at {
|
||||
let finished_at: Timestamp = terminated_at.parse()?;
|
||||
let finished_at = finished_at.as_duration();
|
||||
response.finished_at = Some(finished_at.as_secs_f64());
|
||||
|
||||
let duration = finished_at.checked_sub(started_at).ok_or_else(|| {
|
||||
eyre::eyre!(
|
||||
"invalid timestamps: started_at={:?}, finished_at={:?}",
|
||||
started_at,
|
||||
finished_at
|
||||
)
|
||||
})?;
|
||||
response.compute_time_sec = Some(duration.as_secs_f64());
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(proof_bytes) = proof {
|
||||
let proof = match proof_type {
|
||||
ProofType::Bundle => {
|
||||
let proof: OpenVmEvmProof = serde_json::from_slice(&proof_bytes)?;
|
||||
ProofEnum::Evm(proof.into())
|
||||
}
|
||||
_ => {
|
||||
let proof: OpenVmVersionedVmStarkProof = serde_json::from_slice(&proof_bytes)?;
|
||||
ProofEnum::Stark(proof.try_into()?)
|
||||
}
|
||||
};
|
||||
|
||||
response.proof = Some(serde_json::to_string(&proof)?);
|
||||
}
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
}
|
||||
324
crates/prover-bin/src/prover/local.rs
Normal file
324
crates/prover-bin/src/prover/local.rs
Normal file
@@ -0,0 +1,324 @@
|
||||
use crate::zk_circuits_handler::{universal::UniversalHandler, CircuitsHandler};
|
||||
use async_trait::async_trait;
|
||||
use eyre::Result;
|
||||
use scroll_proving_sdk::{
|
||||
config::Config as SdkConfig,
|
||||
prover::{
|
||||
proving_service::{
|
||||
GetVkRequest, GetVkResponse, ProveRequest, ProveResponse, QueryTaskRequest,
|
||||
QueryTaskResponse, TaskStatus,
|
||||
},
|
||||
types::ProofType,
|
||||
ProvingService,
|
||||
},
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
fs::File,
|
||||
path::{Path, PathBuf},
|
||||
sync::{Arc, LazyLock},
|
||||
time::{SystemTime, UNIX_EPOCH},
|
||||
};
|
||||
use tokio::{runtime::Handle, sync::Mutex, task::JoinHandle};
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
pub struct AssetsLocationData {
|
||||
/// the base url to form a general downloading url for an asset, MUST HAVE A TRAILING SLASH
|
||||
pub base_url: url::Url,
|
||||
#[serde(default)]
|
||||
/// a altered url for specififed vk
|
||||
pub asset_detours: HashMap<String, url::Url>,
|
||||
}
|
||||
|
||||
impl AssetsLocationData {
|
||||
pub fn gen_asset_url(&self, vk_as_path: &str, proof_type: ProofType) -> Result<url::Url> {
|
||||
Ok(self.base_url.join(
|
||||
match proof_type {
|
||||
ProofType::Chunk => format!("chunk/{vk_as_path}/"),
|
||||
ProofType::Batch => format!("batch/{vk_as_path}/"),
|
||||
ProofType::Bundle => format!("bundle/{vk_as_path}/"),
|
||||
t => eyre::bail!("unrecognized proof type: {}", t as u8),
|
||||
}
|
||||
.as_str(),
|
||||
)?)
|
||||
}
|
||||
|
||||
pub fn validate(&self) -> Result<()> {
|
||||
if !self.base_url.path().ends_with('/') {
|
||||
eyre::bail!(
|
||||
"base_url must have a trailing slash, got: {}",
|
||||
self.base_url
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_asset(
|
||||
&self,
|
||||
vk: &str,
|
||||
url_base: &url::Url,
|
||||
base_path: impl AsRef<Path>,
|
||||
) -> Result<PathBuf> {
|
||||
let download_files = ["app.vmexe", "openvm.toml"];
|
||||
|
||||
// Step 1: Create a local path for storage
|
||||
let storage_path = base_path.as_ref().join(vk);
|
||||
std::fs::create_dir_all(&storage_path)?;
|
||||
|
||||
// Step 2 & 3: Download each file if needed
|
||||
let client = reqwest::Client::new();
|
||||
|
||||
for filename in download_files.iter() {
|
||||
let local_file_path = storage_path.join(filename);
|
||||
let download_url = url_base.join(filename)?;
|
||||
|
||||
// Check if file already exists
|
||||
if local_file_path.exists() {
|
||||
// Get file metadata to check size
|
||||
if let Ok(metadata) = std::fs::metadata(&local_file_path) {
|
||||
// Make a HEAD request to get remote file size
|
||||
|
||||
if let Ok(head_resp) = client.head(download_url.clone()).send().await {
|
||||
if let Some(content_length) = head_resp.headers().get("content-length") {
|
||||
if let Ok(remote_size) =
|
||||
content_length.to_str().unwrap_or("0").parse::<u64>()
|
||||
{
|
||||
// If sizes match, skip download
|
||||
if metadata.len() == remote_size {
|
||||
println!("File {} already exists with matching size, skipping download", filename);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
println!("Downloading {} from {}", filename, download_url);
|
||||
|
||||
let response = client.get(download_url).send().await?;
|
||||
if !response.status().is_success() {
|
||||
eyre::bail!(
|
||||
"Failed to download {}: HTTP status {}",
|
||||
filename,
|
||||
response.status()
|
||||
);
|
||||
}
|
||||
|
||||
// Stream the content directly to file instead of loading into memory
|
||||
let mut file = std::fs::File::create(&local_file_path)?;
|
||||
let mut stream = response.bytes_stream();
|
||||
|
||||
use futures_util::StreamExt;
|
||||
while let Some(chunk) = stream.next().await {
|
||||
std::io::Write::write_all(&mut file, &chunk?)?;
|
||||
}
|
||||
}
|
||||
|
||||
// Step 4: Return the storage path
|
||||
Ok(storage_path)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
pub struct LocalProverConfig {
|
||||
pub sdk_config: SdkConfig,
|
||||
pub circuits: HashMap<String, CircuitConfig>,
|
||||
}
|
||||
|
||||
impl LocalProverConfig {
|
||||
pub fn from_reader<R>(reader: R) -> Result<Self>
|
||||
where
|
||||
R: std::io::Read,
|
||||
{
|
||||
serde_json::from_reader(reader).map_err(|e| eyre::eyre!(e))
|
||||
}
|
||||
|
||||
pub fn from_file(file_name: String) -> Result<Self> {
|
||||
let file = File::open(file_name)?;
|
||||
Self::from_reader(&file)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
pub struct CircuitConfig {
|
||||
pub hard_fork_name: String,
|
||||
/// The path to save assets for a specified hard fork phase
|
||||
pub workspace_path: String,
|
||||
#[serde(flatten)]
|
||||
/// The location data for dynamic loading
|
||||
pub location_data: AssetsLocationData,
|
||||
/// cached vk value to save some initial cost, for debugging only
|
||||
#[serde(default)]
|
||||
pub vks: HashMap<ProofType, String>,
|
||||
}
|
||||
|
||||
pub struct LocalProver {
|
||||
config: LocalProverConfig,
|
||||
next_task_id: u64,
|
||||
current_task: Option<JoinHandle<Result<String>>>,
|
||||
|
||||
handlers: HashMap<String, Arc<dyn CircuitsHandler>>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ProvingService for LocalProver {
|
||||
fn is_local(&self) -> bool {
|
||||
true
|
||||
}
|
||||
async fn get_vks(&self, _: GetVkRequest) -> GetVkResponse {
|
||||
// get vk has been deprecated in new prover with dynamic asset loading scheme
|
||||
GetVkResponse {
|
||||
vks: vec![],
|
||||
error: None,
|
||||
}
|
||||
}
|
||||
async fn prove(&mut self, req: ProveRequest) -> ProveResponse {
|
||||
self.do_prove(req).await.unwrap_or_else(|e| ProveResponse {
|
||||
status: TaskStatus::Failed,
|
||||
error: Some(format!("failed to request proof: {}", e)),
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
|
||||
async fn query_task(&mut self, req: QueryTaskRequest) -> QueryTaskResponse {
|
||||
if let Some(handle) = &mut self.current_task {
|
||||
if handle.is_finished() {
|
||||
return match handle.await {
|
||||
Ok(Ok(proof)) => QueryTaskResponse {
|
||||
task_id: req.task_id,
|
||||
status: TaskStatus::Success,
|
||||
proof: Some(proof),
|
||||
..Default::default()
|
||||
},
|
||||
Ok(Err(e)) => QueryTaskResponse {
|
||||
task_id: req.task_id,
|
||||
status: TaskStatus::Failed,
|
||||
error: Some(format!("proving task failed: {}", e)),
|
||||
..Default::default()
|
||||
},
|
||||
Err(e) => QueryTaskResponse {
|
||||
task_id: req.task_id,
|
||||
status: TaskStatus::Failed,
|
||||
error: Some(format!("proving task panicked: {}", e)),
|
||||
..Default::default()
|
||||
},
|
||||
};
|
||||
} else {
|
||||
return QueryTaskResponse {
|
||||
task_id: req.task_id,
|
||||
status: TaskStatus::Proving,
|
||||
..Default::default()
|
||||
};
|
||||
}
|
||||
}
|
||||
// If no handle is found
|
||||
QueryTaskResponse {
|
||||
task_id: req.task_id,
|
||||
status: TaskStatus::Failed,
|
||||
error: Some("no proving task is running".to_string()),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static GLOBAL_ASSET_URLS: LazyLock<HashMap<String, HashMap<String, url::Url>>> =
|
||||
LazyLock::new(|| {
|
||||
const ASSETS_JSON: &str = include_str!("../assets_url_preset.json");
|
||||
serde_json::from_str(ASSETS_JSON).expect("Failed to parse assets_url_preset.json")
|
||||
});
|
||||
|
||||
impl LocalProver {
|
||||
pub fn new(mut config: LocalProverConfig) -> Self {
|
||||
for (fork_name, circuit_config) in config.circuits.iter_mut() {
|
||||
// validate each base url
|
||||
circuit_config.location_data.validate().unwrap();
|
||||
let mut template_url_mapping = GLOBAL_ASSET_URLS
|
||||
.get(&fork_name.to_lowercase())
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
|
||||
// apply default settings in template
|
||||
for (key, url) in circuit_config.location_data.asset_detours.drain() {
|
||||
template_url_mapping.insert(key, url);
|
||||
}
|
||||
|
||||
circuit_config.location_data.asset_detours = template_url_mapping;
|
||||
|
||||
// validate each detours url
|
||||
for url in circuit_config.location_data.asset_detours.values() {
|
||||
assert!(
|
||||
url.path().ends_with('/'),
|
||||
"url {} must be end with /",
|
||||
url.as_str()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Self {
|
||||
config,
|
||||
next_task_id: 0,
|
||||
current_task: None,
|
||||
handlers: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn do_prove(&mut self, req: ProveRequest) -> Result<ProveResponse> {
|
||||
self.next_task_id += 1;
|
||||
let duration = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
|
||||
let created_at = duration.as_secs() as f64 + duration.subsec_nanos() as f64 * 1e-9;
|
||||
|
||||
let prover_task = UniversalHandler::get_task_from_input(&req.input)?;
|
||||
let vk = hex::encode(&prover_task.vk);
|
||||
let handler = if let Some(handler) = self.handlers.get(&vk) {
|
||||
handler.clone()
|
||||
} else {
|
||||
let base_config = self
|
||||
.config
|
||||
.circuits
|
||||
.get(&req.hard_fork_name)
|
||||
.ok_or_else(|| {
|
||||
eyre::eyre!(
|
||||
"coordinator sent unexpected forkname {}",
|
||||
req.hard_fork_name
|
||||
)
|
||||
})?;
|
||||
let url_base = if let Some(url) = base_config.location_data.asset_detours.get(&vk) {
|
||||
url.clone()
|
||||
} else {
|
||||
base_config
|
||||
.location_data
|
||||
.gen_asset_url(&vk, req.proof_type)?
|
||||
};
|
||||
let asset_path = base_config
|
||||
.location_data
|
||||
.get_asset(&vk, &url_base, &base_config.workspace_path)
|
||||
.await?;
|
||||
let circuits_handler = Arc::new(Mutex::new(UniversalHandler::new(
|
||||
&asset_path,
|
||||
req.proof_type,
|
||||
)?));
|
||||
self.handlers.insert(vk, circuits_handler.clone());
|
||||
circuits_handler
|
||||
};
|
||||
|
||||
let handle = Handle::current();
|
||||
let is_evm = req.proof_type == ProofType::Bundle;
|
||||
let task_handle = tokio::task::spawn_blocking(move || {
|
||||
handle.block_on(handler.get_proof_data(&prover_task, is_evm))
|
||||
});
|
||||
self.current_task = Some(task_handle);
|
||||
|
||||
Ok(ProveResponse {
|
||||
task_id: self.next_task_id.to_string(),
|
||||
proof_type: req.proof_type,
|
||||
circuit_version: req.circuit_version,
|
||||
hard_fork_name: req.hard_fork_name,
|
||||
status: TaskStatus::Proving,
|
||||
created_at,
|
||||
input: Some(req.input),
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user