mirror of
https://github.com/voltrevo/ValueScript.git
synced 2026-01-11 06:27:56 -05:00
Use actor to avoid re-opening the db for every request
This commit is contained in:
37
Cargo.lock
generated
37
Cargo.lock
generated
@@ -12,6 +12,31 @@ dependencies = [
|
||||
"regex",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "actix"
|
||||
version = "0.13.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fb72882332b6d6282f428b77ba0358cb2687e61a6f6df6a6d3871e8a177c2d4f"
|
||||
dependencies = [
|
||||
"actix-macros",
|
||||
"actix-rt",
|
||||
"actix_derive",
|
||||
"bitflags 2.4.1",
|
||||
"bytes",
|
||||
"crossbeam-channel",
|
||||
"futures-core",
|
||||
"futures-sink",
|
||||
"futures-task",
|
||||
"futures-util",
|
||||
"log",
|
||||
"once_cell",
|
||||
"parking_lot 0.12.0",
|
||||
"pin-project-lite",
|
||||
"smallvec",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "actix-codec"
|
||||
version = "0.5.1"
|
||||
@@ -191,6 +216,17 @@ dependencies = [
|
||||
"syn 2.0.38",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "actix_derive"
|
||||
version = "0.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7c7db3d5a9718568e4cf4a537cfd7070e6e6ff7481510d0237fb529ac850f6d3"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.38",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "addr2line"
|
||||
version = "0.17.0"
|
||||
@@ -3084,6 +3120,7 @@ checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d"
|
||||
name = "vstc"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"actix",
|
||||
"actix-web",
|
||||
"serde",
|
||||
"serde_json",
|
||||
|
||||
@@ -15,4 +15,5 @@ serde_qs = "0.8.0"
|
||||
serde_json = "1.0.108"
|
||||
termion = "2.0.1"
|
||||
actix-web = "4"
|
||||
actix = "0.13.3"
|
||||
tokio = "1"
|
||||
|
||||
@@ -1,6 +1,13 @@
|
||||
use crate::exit_command_failed::exit_command_failed;
|
||||
use actix_web::{dev, web, App, FromRequest, HttpRequest, HttpResponse, HttpServer, Responder};
|
||||
use actix::{Actor, Addr, Context, Handler, Message};
|
||||
use actix_web::{
|
||||
dev,
|
||||
http::{Method, StatusCode},
|
||||
web::{self, Bytes},
|
||||
App, FromRequest, HttpRequest, HttpResponse, HttpServer, Responder,
|
||||
};
|
||||
use storage::{storage_head_ptr, SledBackend, Storage, StorageReader};
|
||||
use tokio::task::LocalSet;
|
||||
use valuescript_compiler::inline_valuescript;
|
||||
use valuescript_vm::{
|
||||
vs_object::VsObject,
|
||||
@@ -18,18 +25,20 @@ pub fn db_host(path: &str, args: &[String]) {
|
||||
);
|
||||
}
|
||||
|
||||
let path = path.to_owned();
|
||||
|
||||
// TODO: Multi-thread?
|
||||
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
runtime.block_on(async {
|
||||
let local = LocalSet::new();
|
||||
|
||||
local.block_on(&runtime, async {
|
||||
let db_actor = DbActor::new(Storage::new(SledBackend::open(path).unwrap())).start();
|
||||
|
||||
HttpServer::new(move || {
|
||||
App::new()
|
||||
.app_data(web::Data::new(path.clone()))
|
||||
.app_data(web::Data::new(db_actor.clone()))
|
||||
.default_service(web::route().to(handle_request))
|
||||
})
|
||||
.bind("127.0.0.1:8080")
|
||||
@@ -43,93 +52,160 @@ pub fn db_host(path: &str, args: &[String]) {
|
||||
async fn handle_request(
|
||||
req: HttpRequest,
|
||||
payload: web::Payload,
|
||||
data: web::Data<String>,
|
||||
data: web::Data<Addr<DbActor>>,
|
||||
) -> impl Responder {
|
||||
let path = req.path();
|
||||
let method = req.method();
|
||||
let mut storage = Storage::new(SledBackend::open(data.as_ref()).unwrap());
|
||||
|
||||
let body = match get_body(&req, payload.into_inner()).await {
|
||||
Ok(body) => body,
|
||||
Err(e) => return e.into(),
|
||||
};
|
||||
|
||||
let mut instance: Val = storage
|
||||
.get_head(storage_head_ptr(b"state"))
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
let fn_ = inline_valuescript(
|
||||
r#"
|
||||
export default function(req) {
|
||||
if ("handleRequest" in this) {
|
||||
return this.handleRequest(req);
|
||||
}
|
||||
|
||||
const handlerName = `${req.method} ${req.path}`;
|
||||
|
||||
if (!this[handlerName]) {
|
||||
throw new Error("No handler for request");
|
||||
}
|
||||
|
||||
if (req.method === "GET") {
|
||||
// Enforce GET as read-only
|
||||
const state = this;
|
||||
return state[handlerName](req.body);
|
||||
}
|
||||
|
||||
return this[handlerName](req.body);
|
||||
}
|
||||
"#,
|
||||
);
|
||||
|
||||
let req_val = VsObject {
|
||||
string_map: vec![
|
||||
("path".to_string(), path.to_val()),
|
||||
("method".to_string(), method.to_string().to_val()),
|
||||
("body".to_string(), body),
|
||||
]
|
||||
.into_iter()
|
||||
.collect(),
|
||||
symbol_map: vec![].into_iter().collect(),
|
||||
prototype: Val::Void,
|
||||
}
|
||||
.to_val();
|
||||
|
||||
let mut vm = VirtualMachine::default();
|
||||
|
||||
let res = match vm.run(None, &mut instance, fn_, vec![req_val]) {
|
||||
Ok(res) => match res.to_json() {
|
||||
Some(json) => HttpResponse::Ok().json(json),
|
||||
None => HttpResponse::InternalServerError().body("Failed to serialize response"),
|
||||
let req_val = DbRequest {
|
||||
path: req.path().to_owned(),
|
||||
method: req.method().clone(),
|
||||
body: match get_body(&req, payload.into_inner()).await {
|
||||
Ok(body) => body,
|
||||
Err(_e) => todo!(), // handle error
|
||||
},
|
||||
Err(err) => {
|
||||
println!("Uncaught exception: {}", err.pretty());
|
||||
HttpResponse::InternalServerError().body("Uncaught exception")
|
||||
}
|
||||
};
|
||||
|
||||
storage
|
||||
.set_head(storage_head_ptr(b"state"), &instance)
|
||||
.unwrap();
|
||||
|
||||
res
|
||||
match data.send(req_val).await {
|
||||
Ok(Ok(res)) => HttpResponse::Ok()
|
||||
.content_type("application/json")
|
||||
.body(res),
|
||||
Ok(Err(err)) => {
|
||||
println!("{}", err.message);
|
||||
HttpResponse::build(StatusCode::from_u16(err.code).unwrap()).body(err.message)
|
||||
}
|
||||
Err(err) => {
|
||||
println!("{}", err);
|
||||
HttpResponse::InternalServerError().body("Mailbox has closed")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_body(req: &HttpRequest, mut payload: dev::Payload) -> Result<Val, actix_web::Error> {
|
||||
struct DbRequest {
|
||||
path: String,
|
||||
method: Method,
|
||||
body: Bytes,
|
||||
}
|
||||
|
||||
struct HttpError {
|
||||
code: u16,
|
||||
message: String,
|
||||
}
|
||||
|
||||
impl Message for DbRequest {
|
||||
type Result = Result<String, HttpError>;
|
||||
}
|
||||
|
||||
struct DbActor {
|
||||
storage: Storage<SledBackend>,
|
||||
apply_fn: Val,
|
||||
}
|
||||
|
||||
impl Actor for DbActor {
|
||||
type Context = Context<Self>;
|
||||
}
|
||||
|
||||
impl DbActor {
|
||||
fn new(storage: Storage<SledBackend>) -> Self {
|
||||
Self {
|
||||
storage,
|
||||
apply_fn: inline_valuescript(
|
||||
// TODO: store in actor
|
||||
r#"
|
||||
export default function(req) {
|
||||
if ("handleRequest" in this) {
|
||||
return this.handleRequest(req);
|
||||
}
|
||||
|
||||
const handlerName = `${req.method} ${req.path}`;
|
||||
|
||||
if (!this[handlerName]) {
|
||||
throw new Error("No handler for request");
|
||||
}
|
||||
|
||||
if (req.method === "GET") {
|
||||
// Enforce GET as read-only
|
||||
const state = this;
|
||||
return state[handlerName](req.body);
|
||||
}
|
||||
|
||||
return this[handlerName](req.body);
|
||||
}
|
||||
"#,
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<DbRequest> for DbActor {
|
||||
type Result = Result<String, HttpError>;
|
||||
|
||||
fn handle(&mut self, msg: DbRequest, _ctx: &mut Self::Context) -> Self::Result {
|
||||
let mut instance: Val = self
|
||||
.storage
|
||||
.get_head(storage_head_ptr(b"state"))
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
let DbRequest { path, method, body } = msg;
|
||||
|
||||
let body = if body.is_empty() {
|
||||
Val::Undefined
|
||||
} else {
|
||||
match serde_json::from_slice::<serde_json::Value>(&body) {
|
||||
Ok(json_value) => Val::from_json(&json_value),
|
||||
Err(_err) => {
|
||||
return Err(HttpError {
|
||||
code: 400,
|
||||
message: "Bad request".to_owned(),
|
||||
})
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let req_val = VsObject {
|
||||
string_map: vec![
|
||||
("path".to_string(), path.to_val()),
|
||||
("method".to_string(), method.to_string().to_val()),
|
||||
("body".to_string(), body),
|
||||
]
|
||||
.into_iter()
|
||||
.collect(),
|
||||
symbol_map: vec![].into_iter().collect(),
|
||||
prototype: Val::Void,
|
||||
}
|
||||
.to_val();
|
||||
|
||||
let mut vm = VirtualMachine::default();
|
||||
|
||||
let res = match vm.run(None, &mut instance, self.apply_fn.clone(), vec![req_val]) {
|
||||
Ok(res) => match res.to_json() {
|
||||
Some(json) => Ok(json.to_string()),
|
||||
None => Err(HttpError {
|
||||
code: 500,
|
||||
message: "Failed to serialize response".to_owned(),
|
||||
}),
|
||||
},
|
||||
Err(err) => {
|
||||
println!("Uncaught exception: {}", err.pretty());
|
||||
Err(HttpError {
|
||||
code: 500,
|
||||
message: "Uncaught exception".to_owned(),
|
||||
})
|
||||
}
|
||||
};
|
||||
|
||||
self
|
||||
.storage
|
||||
.set_head(storage_head_ptr(b"state"), &instance)
|
||||
.unwrap();
|
||||
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_body(req: &HttpRequest, mut payload: dev::Payload) -> Result<Bytes, actix_web::Error> {
|
||||
let payload = web::Payload::from_request(req, &mut payload).await?;
|
||||
|
||||
let body = payload
|
||||
payload
|
||||
.to_bytes_limited(1_024 * 1_024)
|
||||
.await
|
||||
.map_err(|_| actix_web::error::PayloadError::Overflow)??;
|
||||
|
||||
if body.is_empty() {
|
||||
Ok(Val::Undefined)
|
||||
} else {
|
||||
match serde_json::from_slice::<serde_json::Value>(&body) {
|
||||
Ok(json_value) => Ok(Val::from_json(&json_value)),
|
||||
Err(err) => Err(actix_web::error::ErrorBadRequest(err)),
|
||||
}
|
||||
}
|
||||
.map_err(|_| actix_web::error::PayloadError::Overflow)?
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user