mirror of
https://github.com/extism/extism.git
synced 2026-01-11 14:58:01 -05:00
Compare commits
5 Commits
v1.0.3
...
extend-tim
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8bafdfb710 | ||
|
|
e1d33800f0 | ||
|
|
6084b69790 | ||
|
|
5896729cfb | ||
|
|
0f93c5ef9d |
@@ -173,6 +173,13 @@ void extism_function_free(ExtismFunction *f);
|
||||
*/
|
||||
void extism_function_set_namespace(ExtismFunction *ptr, const char *namespace_);
|
||||
|
||||
/**
|
||||
* Set the cost of an `ExtismFunction`, when set to 0 this has no effect, when set to `1` this will add
|
||||
* the runtime of the function back to the plugin timer.
|
||||
*/
|
||||
void extism_function_set_cost(ExtismFunction *ptr,
|
||||
double cost);
|
||||
|
||||
/**
|
||||
* Create a new plugin with host functions, the functions passed to this function no longer need to be manually freed using
|
||||
*
|
||||
|
||||
@@ -155,7 +155,7 @@ unsafe impl<T> Sync for UserData<T> {}
|
||||
unsafe impl Send for CPtr {}
|
||||
unsafe impl Sync for CPtr {}
|
||||
|
||||
type FunctionInner = dyn Fn(wasmtime::Caller<CurrentPlugin>, &[wasmtime::Val], &mut [wasmtime::Val]) -> Result<(), Error>
|
||||
pub(crate) type FunctionInner = dyn Fn(wasmtime::Caller<CurrentPlugin>, &[wasmtime::Val], &mut [wasmtime::Val]) -> Result<(), Error>
|
||||
+ Sync
|
||||
+ Send;
|
||||
|
||||
@@ -174,6 +174,9 @@ pub struct Function {
|
||||
/// Function handle
|
||||
pub(crate) f: Arc<FunctionInner>,
|
||||
|
||||
/// Function cost (in terms of time)
|
||||
pub(crate) cost: f64,
|
||||
|
||||
/// UserData
|
||||
pub(crate) _user_data: UserDataHandle,
|
||||
}
|
||||
@@ -204,10 +207,12 @@ impl Function {
|
||||
ty,
|
||||
f: Arc::new(
|
||||
move |mut caller: Caller<_>, inp: &[Val], outp: &mut [Val]| {
|
||||
let plugin = caller.data_mut();
|
||||
let x = data.clone();
|
||||
f(caller.data_mut(), inp, outp, x)
|
||||
f(plugin, inp, outp, x)
|
||||
},
|
||||
),
|
||||
cost: 0.0,
|
||||
namespace: None,
|
||||
_user_data: match &user_data {
|
||||
UserData::C(ptr) => UserDataHandle::C(ptr.clone()),
|
||||
@@ -239,6 +244,23 @@ impl Function {
|
||||
self
|
||||
}
|
||||
|
||||
/// Function cost
|
||||
pub fn cost(&self) -> f64 {
|
||||
self.cost
|
||||
}
|
||||
|
||||
/// Set host function cost
|
||||
pub fn set_cost(&mut self, cost: f64) {
|
||||
trace!("Setting cost for {} to {cost}", self.name);
|
||||
self.cost = cost;
|
||||
}
|
||||
|
||||
/// Update host function cost
|
||||
pub fn with_cost(mut self, cost: f64) -> Self {
|
||||
self.set_cost(cost);
|
||||
self
|
||||
}
|
||||
|
||||
/// Get function type
|
||||
pub fn ty(&self) -> &wasmtime::FuncType {
|
||||
&self.ty
|
||||
|
||||
@@ -14,6 +14,7 @@ pub(crate) mod manifest;
|
||||
pub(crate) mod pdk;
|
||||
mod plugin;
|
||||
mod plugin_builder;
|
||||
mod timeout_manager;
|
||||
mod timer;
|
||||
|
||||
/// Extism C API
|
||||
@@ -27,6 +28,7 @@ pub use plugin::{CancelHandle, Plugin, WasmInput, EXTISM_ENV_MODULE, EXTISM_USER
|
||||
pub use plugin_builder::{DebugOptions, PluginBuilder};
|
||||
|
||||
pub(crate) use internal::{Internal, Wasi};
|
||||
pub(crate) use timeout_manager::TimeoutManager;
|
||||
pub(crate) use timer::{Timer, TimerAction};
|
||||
pub(crate) use tracing::{debug, error, trace, warn};
|
||||
|
||||
|
||||
@@ -238,6 +238,20 @@ impl Plugin {
|
||||
CurrentPlugin::new(manifest, with_wasi, available_pages, id)?,
|
||||
);
|
||||
store.set_epoch_deadline(1);
|
||||
store.call_hook(|data, hook| {
|
||||
if hook.entering_host() {
|
||||
let tx = Timer::tx();
|
||||
tx.send(TimerAction::EnterHost {
|
||||
id: data.id.clone(),
|
||||
})?;
|
||||
} else if hook.exiting_host() {
|
||||
let tx = Timer::tx();
|
||||
tx.send(TimerAction::ExitHost {
|
||||
id: data.id.clone(),
|
||||
})?;
|
||||
}
|
||||
Ok(())
|
||||
});
|
||||
|
||||
let mut linker = Linker::new(&engine);
|
||||
linker.allow_shadowing(true);
|
||||
@@ -284,9 +298,13 @@ impl Plugin {
|
||||
for f in &mut imports {
|
||||
let name = f.name().to_string();
|
||||
let ns = f.namespace().unwrap_or(EXTISM_USER_MODULE);
|
||||
unsafe {
|
||||
linker.func_new(ns, &name, f.ty().clone(), &*(f.f.as_ref() as *const _))?;
|
||||
}
|
||||
let cost = f.cost();
|
||||
let inner: &function::FunctionInner = unsafe { &*(f.f.as_ref() as *const _) };
|
||||
linker.func_new(ns, &name, f.ty().clone(), move |caller, params, results| {
|
||||
let _timeout =
|
||||
TimeoutManager::new(caller.data()).map(|x| x.with_cost(cost.clone()));
|
||||
inner(caller, params, results)
|
||||
})?;
|
||||
}
|
||||
|
||||
let instance_pre = linker.instantiate_pre(main)?;
|
||||
|
||||
@@ -258,6 +258,18 @@ pub unsafe extern "C" fn extism_function_set_namespace(
|
||||
}
|
||||
}
|
||||
|
||||
/// Set the cost of an `ExtismFunction`, when set to 0 this has no effect, when set to `1` this will add
|
||||
/// the runtime of the function back to the plugin timer.
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn extism_function_set_cost(ptr: *mut ExtismFunction, cost: f64) {
|
||||
let f = &mut *ptr;
|
||||
if let Some(x) = f.0.get_mut() {
|
||||
x.set_cost(cost);
|
||||
} else {
|
||||
debug!("Trying to set the cost of already registered function")
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new plugin with host functions, the functions passed to this function no longer need to be manually freed using
|
||||
///
|
||||
/// `wasm`: is a WASM module (wat or wasm) or a JSON encoded manifest
|
||||
|
||||
@@ -235,6 +235,43 @@ fn test_timeout() {
|
||||
assert!(err == "timeout");
|
||||
}
|
||||
|
||||
fn hello_world_timeout_manager(
|
||||
plugin: &mut CurrentPlugin,
|
||||
inputs: &[Val],
|
||||
outputs: &mut [Val],
|
||||
_user_data: UserData<()>,
|
||||
) -> Result<(), Error> {
|
||||
let mgr = plugin.timeout_manager().map(|x| x.add_on_drop());
|
||||
std::thread::sleep(std::time::Duration::from_secs(3));
|
||||
outputs[0] = inputs[0].clone();
|
||||
drop(mgr);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_timeout_manager() {
|
||||
let f = Function::new(
|
||||
"hello_world",
|
||||
[PTR],
|
||||
[PTR],
|
||||
UserData::default(),
|
||||
hello_world_timeout_manager,
|
||||
);
|
||||
|
||||
let manifest = Manifest::new([extism_manifest::Wasm::data(WASM)])
|
||||
.with_timeout(std::time::Duration::from_secs(1));
|
||||
let mut plugin = Plugin::new(manifest, [f], true).unwrap();
|
||||
|
||||
let start = std::time::Instant::now();
|
||||
let output: Result<&[u8], Error> = plugin.call("count_vowels", "testing");
|
||||
println!("Result {:?}", output);
|
||||
assert!(output.is_ok());
|
||||
let end = std::time::Instant::now();
|
||||
let time = end - start;
|
||||
println!("Plugin ran for {:?}", time);
|
||||
assert!(time.as_secs() >= 3);
|
||||
}
|
||||
|
||||
typed_plugin!(pub TestTypedPluginGenerics {
|
||||
count_vowels<T: FromBytes<'a>>(&str) -> T
|
||||
});
|
||||
|
||||
59
runtime/src/timeout_manager.rs
Normal file
59
runtime/src/timeout_manager.rs
Normal file
@@ -0,0 +1,59 @@
|
||||
use crate::*;
|
||||
|
||||
/// `TimeoutManager` is used to control `Plugin` timeouts from within host functions
|
||||
///
|
||||
/// It can be used to add or subtract time from a plug-in's timeout. If a plugin is not
|
||||
/// configured to have a timeout then this will have no effect.
|
||||
pub(crate) struct TimeoutManager {
|
||||
start_time: std::time::Instant,
|
||||
id: uuid::Uuid,
|
||||
tx: std::sync::mpsc::Sender<TimerAction>,
|
||||
cost: f64,
|
||||
}
|
||||
|
||||
impl TimeoutManager {
|
||||
/// Create a new `TimeoutManager` from the `CurrentPlugin`, this will return `None` if no timeout
|
||||
/// is configured
|
||||
pub fn new(plugin: &CurrentPlugin) -> Option<TimeoutManager> {
|
||||
plugin.manifest.timeout_ms.map(|_| TimeoutManager {
|
||||
start_time: std::time::Instant::now(),
|
||||
id: plugin.id.clone(),
|
||||
tx: Timer::tx(),
|
||||
cost: 0.0,
|
||||
})
|
||||
}
|
||||
|
||||
/// Add the amount of time this value has existed to the configured timeout
|
||||
pub fn add_elapsed(&mut self) -> Result<(), Error> {
|
||||
let cost = self.cost.abs();
|
||||
let d = self.start_time.elapsed().mul_f64(cost);
|
||||
let mut d: timer::ExtendTimeout = d.into();
|
||||
if self.cost.is_sign_negative() {
|
||||
d = -d;
|
||||
}
|
||||
self.tx.send(TimerAction::Extend {
|
||||
id: self.id.clone(),
|
||||
duration: d,
|
||||
})?;
|
||||
self.start_time = std::time::Instant::now();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn with_cost(mut self, cost: f64) -> Self {
|
||||
self.cost = cost;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TimeoutManager {
|
||||
fn drop(&mut self) {
|
||||
let x = self.add_elapsed();
|
||||
if let Err(e) = x {
|
||||
error!(
|
||||
plugin = self.id.to_string(),
|
||||
"unable to extend timeout: {}",
|
||||
e.to_string()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,17 +1,51 @@
|
||||
use crate::*;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct ExtendTimeout {
|
||||
duration: std::time::Duration,
|
||||
negative: bool,
|
||||
}
|
||||
|
||||
impl From<std::time::Duration> for ExtendTimeout {
|
||||
fn from(value: std::time::Duration) -> Self {
|
||||
ExtendTimeout {
|
||||
duration: value,
|
||||
negative: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Neg for ExtendTimeout {
|
||||
type Output = ExtendTimeout;
|
||||
|
||||
fn neg(mut self) -> Self::Output {
|
||||
self.negative = !self.negative;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) enum TimerAction {
|
||||
Start {
|
||||
id: uuid::Uuid,
|
||||
engine: Engine,
|
||||
duration: Option<std::time::Duration>,
|
||||
},
|
||||
Extend {
|
||||
id: uuid::Uuid,
|
||||
duration: ExtendTimeout,
|
||||
},
|
||||
Stop {
|
||||
id: uuid::Uuid,
|
||||
},
|
||||
Cancel {
|
||||
id: uuid::Uuid,
|
||||
},
|
||||
EnterHost {
|
||||
id: uuid::Uuid,
|
||||
},
|
||||
ExitHost {
|
||||
id: uuid::Uuid,
|
||||
},
|
||||
Shutdown,
|
||||
}
|
||||
|
||||
@@ -31,6 +65,8 @@ extern "C" fn cleanup_timer() {
|
||||
|
||||
static mut TIMER: std::sync::Mutex<Option<Timer>> = std::sync::Mutex::new(None);
|
||||
|
||||
type TimerMap = std::collections::BTreeMap<uuid::Uuid, (Engine, Option<std::time::Instant>)>;
|
||||
|
||||
impl Timer {
|
||||
pub(crate) fn tx() -> std::sync::mpsc::Sender<TimerAction> {
|
||||
let mut timer = match unsafe { TIMER.lock() } {
|
||||
@@ -49,7 +85,8 @@ impl Timer {
|
||||
pub fn init(timer: &mut Option<Timer>) -> std::sync::mpsc::Sender<TimerAction> {
|
||||
let (tx, rx) = std::sync::mpsc::channel();
|
||||
let thread = std::thread::spawn(move || {
|
||||
let mut plugins = std::collections::BTreeMap::new();
|
||||
let mut plugins = TimerMap::new();
|
||||
let mut in_host = TimerMap::new();
|
||||
|
||||
macro_rules! handle {
|
||||
($x:expr) => {
|
||||
@@ -70,12 +107,16 @@ impl Timer {
|
||||
TimerAction::Stop { id } => {
|
||||
trace!(plugin = id.to_string(), "handling stop event");
|
||||
plugins.remove(&id);
|
||||
in_host.remove(&id);
|
||||
}
|
||||
TimerAction::Cancel { id } => {
|
||||
trace!(plugin = id.to_string(), "handling cancel event");
|
||||
if let Some((engine, _)) = plugins.remove(&id) {
|
||||
engine.increment_epoch();
|
||||
}
|
||||
if let Some((engine, _)) = in_host.remove(&id) {
|
||||
engine.increment_epoch();
|
||||
}
|
||||
}
|
||||
TimerAction::Shutdown => {
|
||||
trace!("Shutting down timer");
|
||||
@@ -83,8 +124,42 @@ impl Timer {
|
||||
trace!(plugin = id.to_string(), "handling shutdown event");
|
||||
engine.increment_epoch();
|
||||
}
|
||||
|
||||
for (id, (engine, _)) in in_host.iter() {
|
||||
trace!(plugin = id.to_string(), "handling shutdown event");
|
||||
engine.increment_epoch();
|
||||
}
|
||||
return;
|
||||
}
|
||||
TimerAction::Extend { id, duration } => {
|
||||
if let Some((_engine, Some(timeout))) = plugins.get_mut(&id) {
|
||||
let x = if duration.negative {
|
||||
timeout.checked_sub(duration.duration)
|
||||
} else {
|
||||
timeout.checked_add(duration.duration)
|
||||
};
|
||||
if let Some(t) = x {
|
||||
*timeout = t;
|
||||
} else {
|
||||
error!(
|
||||
plugin = id.to_string(),
|
||||
"unable to extend timeout by {:?}", duration.duration
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
TimerAction::EnterHost { id } => {
|
||||
trace!(plugin = id.to_string(), "enter host function");
|
||||
if let Some(x) = plugins.remove(&id) {
|
||||
in_host.insert(id, x);
|
||||
}
|
||||
}
|
||||
TimerAction::ExitHost { id } => {
|
||||
trace!(plugin = id.to_string(), "exit host function");
|
||||
if let Some(x) = in_host.remove(&id) {
|
||||
plugins.insert(id, x);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
@@ -96,6 +171,10 @@ impl Timer {
|
||||
}
|
||||
}
|
||||
|
||||
for x in rx.try_iter() {
|
||||
handle!(x)
|
||||
}
|
||||
|
||||
plugins = plugins
|
||||
.into_iter()
|
||||
.filter(|(_k, (engine, end))| {
|
||||
@@ -109,10 +188,6 @@ impl Timer {
|
||||
true
|
||||
})
|
||||
.collect();
|
||||
|
||||
for x in rx.try_iter() {
|
||||
handle!(x)
|
||||
}
|
||||
}
|
||||
});
|
||||
*timer = Some(Timer {
|
||||
|
||||
Reference in New Issue
Block a user