mirror of
https://github.com/extism/extism.git
synced 2026-01-11 14:58:01 -05:00
Compare commits
8 Commits
v1.11.1
...
allowed-pa
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fad7eb4454 | ||
|
|
37e0e2fed4 | ||
|
|
5b94feb7ec | ||
|
|
6146d2f47c | ||
|
|
424e6c328a | ||
|
|
d1ba15484e | ||
|
|
dedd81d90f | ||
|
|
2732ca198d |
@@ -1,6 +1,10 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
mod local_path;
|
||||
|
||||
pub use local_path::LocalPath;
|
||||
|
||||
#[deprecated]
|
||||
pub type ManifestMemory = MemoryOptions;
|
||||
|
||||
@@ -279,7 +283,7 @@ pub struct Manifest {
|
||||
/// the path on disk to the path it should be available inside the plugin.
|
||||
/// For example, `".": "/tmp"` would mount the current directory as `/tmp` inside the module
|
||||
#[serde(default)]
|
||||
pub allowed_paths: Option<BTreeMap<String, PathBuf>>,
|
||||
pub allowed_paths: Option<BTreeMap<PathBuf, LocalPath>>,
|
||||
|
||||
/// The plugin timeout in milliseconds
|
||||
#[serde(default)]
|
||||
@@ -337,15 +341,15 @@ impl Manifest {
|
||||
}
|
||||
|
||||
/// Add a path to `allowed_paths`
|
||||
pub fn with_allowed_path(mut self, src: String, dest: impl AsRef<Path>) -> Self {
|
||||
pub fn with_allowed_path(mut self, src: impl Into<LocalPath>, dest: impl AsRef<Path>) -> Self {
|
||||
let dest = dest.as_ref().to_path_buf();
|
||||
match &mut self.allowed_paths {
|
||||
Some(p) => {
|
||||
p.insert(src, dest);
|
||||
p.insert(dest, src.into());
|
||||
}
|
||||
None => {
|
||||
let mut p = BTreeMap::new();
|
||||
p.insert(src, dest);
|
||||
p.insert(dest, src.into());
|
||||
self.allowed_paths = Some(p);
|
||||
}
|
||||
}
|
||||
@@ -354,8 +358,8 @@ impl Manifest {
|
||||
}
|
||||
|
||||
/// Set `allowed_paths`
|
||||
pub fn with_allowed_paths(mut self, paths: impl Iterator<Item = (String, PathBuf)>) -> Self {
|
||||
self.allowed_paths = Some(paths.collect());
|
||||
pub fn with_allowed_paths(mut self, paths: impl Iterator<Item = (LocalPath, PathBuf)>) -> Self {
|
||||
self.allowed_paths = Some(paths.map(|(local, wasm)| (wasm, local)).collect());
|
||||
self
|
||||
}
|
||||
|
||||
|
||||
119
manifest/src/local_path.rs
Normal file
119
manifest/src/local_path.rs
Normal file
@@ -0,0 +1,119 @@
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
|
||||
#[cfg_attr(feature = "json_schema", derive(schemars::JsonSchema))]
|
||||
pub enum LocalPath {
|
||||
ReadOnly(PathBuf),
|
||||
ReadWrite(PathBuf),
|
||||
}
|
||||
|
||||
impl LocalPath {
|
||||
pub fn as_path(&self) -> &Path {
|
||||
match self {
|
||||
LocalPath::ReadOnly(p) => p.as_path(),
|
||||
LocalPath::ReadWrite(p) => p.as_path(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&str> for LocalPath {
|
||||
fn from(value: &str) -> Self {
|
||||
if let Some(s) = value.strip_prefix("ro:") {
|
||||
LocalPath::ReadOnly(PathBuf::from(s))
|
||||
} else {
|
||||
LocalPath::ReadWrite(PathBuf::from(value))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<String> for LocalPath {
|
||||
fn from(value: String) -> Self {
|
||||
LocalPath::from(value.as_str())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PathBuf> for LocalPath {
|
||||
fn from(value: PathBuf) -> Self {
|
||||
LocalPath::ReadWrite(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&Path> for LocalPath {
|
||||
fn from(value: &Path) -> Self {
|
||||
LocalPath::ReadWrite(value.to_path_buf())
|
||||
}
|
||||
}
|
||||
|
||||
impl serde::Serialize for LocalPath {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
match self {
|
||||
LocalPath::ReadOnly(path) => {
|
||||
let s = match path.to_str() {
|
||||
Some(s) => s,
|
||||
None => {
|
||||
return Err(serde::ser::Error::custom(
|
||||
"Path contains invalid UTF-8 characters",
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
format!("ro:{s}").serialize(serializer)
|
||||
}
|
||||
LocalPath::ReadWrite(path) => path.serialize(serializer),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct LocalPathVisitor;
|
||||
|
||||
impl serde::de::Visitor<'_> for LocalPathVisitor {
|
||||
type Value = LocalPath;
|
||||
|
||||
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
formatter.write_str("path string")
|
||||
}
|
||||
|
||||
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
|
||||
where
|
||||
E: serde::de::Error,
|
||||
{
|
||||
Ok(From::from(v))
|
||||
}
|
||||
|
||||
fn visit_string<E>(self, v: String) -> Result<Self::Value, E>
|
||||
where
|
||||
E: serde::de::Error,
|
||||
{
|
||||
Ok(From::from(v))
|
||||
}
|
||||
|
||||
fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E>
|
||||
where
|
||||
E: serde::de::Error,
|
||||
{
|
||||
std::str::from_utf8(v)
|
||||
.map(From::from)
|
||||
.map_err(|_| serde::de::Error::invalid_value(serde::de::Unexpected::Bytes(v), &self))
|
||||
}
|
||||
|
||||
fn visit_byte_buf<E>(self, v: Vec<u8>) -> Result<Self::Value, E>
|
||||
where
|
||||
E: serde::de::Error,
|
||||
{
|
||||
String::from_utf8(v).map(From::from).map_err(|e| {
|
||||
serde::de::Error::invalid_value(serde::de::Unexpected::Bytes(&e.into_bytes()), &self)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> serde::Deserialize<'de> for LocalPath {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::de::Deserializer<'de>,
|
||||
{
|
||||
deserializer.deserialize_string(LocalPathVisitor)
|
||||
}
|
||||
}
|
||||
@@ -24,7 +24,7 @@ There are a few environment variables that can be used for debugging purposes:
|
||||
- `EXTISM_COREDUMP=extism.core`: write [coredump](https://github.com/WebAssembly/tool-conventions/blob/main/Coredump.md) to a file when a WebAssembly function traps
|
||||
- `EXTISM_DEBUG=1`: generate debug information
|
||||
- `EXTISM_PROFILE=perf|jitdump|vtune`: enable Wasmtime profiling
|
||||
- `EXTISM_CACHE_CONFIG=path/to/config.toml`: enable Wasmtime cache, see [the docs](https://docs.wasmtime.dev/cli-cache.html) for details about configuration. Setting this to an empty string will disable caching.
|
||||
- `EXTISM_CACHE_CONFIG=path/to/config.toml`: enable Wasmtime cache, details [here](#wasmtime-caching)
|
||||
|
||||
> *Note*: The debug and coredump info will only be written if the plug-in has an error.
|
||||
|
||||
@@ -229,3 +229,42 @@ Inside your host application, the rust-sdk emits these as [tracing](https://gith
|
||||
tracing_subscriber::fmt::init();
|
||||
```
|
||||
|
||||
### Wasmtime Caching
|
||||
|
||||
To enable or disable caching for plugin compilation, you need to provide a configuration file that will be used by the [wasmtime crate](https://github.com/bytecodealliance/wasmtime).
|
||||
|
||||
For more information and values that can be used for configuring caching, take a look at [the docs](https://docs.wasmtime.dev/cli-cache.html).
|
||||
|
||||
> *Note*: As of now extism uses wasmtime [`version = ">= 27.0.0, < 31.0.0"`](https://github.com/extism/extism/blob/v1.11.1/runtime/Cargo.toml#L12), but the `enabled` key requirement [was removed](https://github.com/bytecodealliance/wasmtime/pull/10859) from `wasmtime` and its documentation, this could explain the `failed to parse config file` error you might encounter without it.
|
||||
|
||||
An example configuration for caching would be:
|
||||
|
||||
```toml
|
||||
[cache]
|
||||
enabled = true # This value is required
|
||||
directory = "/some/path"
|
||||
```
|
||||
|
||||
You can :
|
||||
- [Create a global `wasmtime` configuration file](#using-a-configuration-file) in `$HOME/.config/wasmtime/config.toml`.
|
||||
- [Set the `EXTISM_CACHE_CONFIG` environment variable](#using-an-environment-variable)
|
||||
- [Set the configuration file path using `PluginBuilder`](#using-pluginbuilder)
|
||||
|
||||
#### Using a configuration file
|
||||
|
||||
The [wasmtime](https://github.com/bytecodealliance/wasmtime) crate, by default, will look for a configuration file in your systems' default configuration directory (for example on UNIX systems: `$HOME/.config/wasmtime/config.toml`),
|
||||
for more [information on this behaviour](`https://docs.rs/wasmtime/31.0.0/wasmtime/struct.Config.html#method.cache_config_load_default`).
|
||||
|
||||
#### Using an environment variable
|
||||
|
||||
You can set the `EXTISM_CACHE_CONFIG=path/to/config.toml` environment variable to set the path of the configuration file used by [wasmtime](https://github.com/bytecodealliance/wasmtime).
|
||||
Setting the variable to an empty string will disable caching (it won't load any configuration file).
|
||||
|
||||
> *Note*: If the environment variable is not set, `wasmtime` will still try to read from a configuration file that may exist in your system's default configuration folder (e.g. `$HOME/.config/wasmtime/config.toml`).
|
||||
|
||||
The environment variable does not override the path you might have set using `PluginBuilder`. will only be checked for if you did not specify a cache configuration path in `PluginBuilder`.
|
||||
|
||||
#### Using PluginBuilder
|
||||
|
||||
If you use a [PluginBuilder](https://docs.rs/extism/latest/extism/struct.PluginBuilder.html), you can set the `wasmtime` configuration path using the [with_cache_config](https://docs.rs/extism/latest/extism/struct.PluginBuilder.html#method.with_cache_config) method.
|
||||
This will override the `EXTISM_CACHE_CONFIG` environment variable if it's set, so you could have a "global" and per plugin configuration if needed.
|
||||
|
||||
@@ -352,9 +352,9 @@ impl CurrentPlugin {
|
||||
|
||||
if let Some(a) = &manifest.allowed_paths {
|
||||
for (k, v) in a.iter() {
|
||||
let readonly = k.starts_with("ro:");
|
||||
let readonly = matches!(v, extism_manifest::LocalPath::ReadOnly(_));
|
||||
|
||||
let dir_path = if readonly { &k[3..] } else { k };
|
||||
let dir_path = v.as_path();
|
||||
|
||||
let dir = wasi_common::sync::dir::Dir::from_cap_std(
|
||||
wasi_common::sync::Dir::open_ambient_dir(dir_path, auth)?,
|
||||
@@ -366,7 +366,7 @@ impl CurrentPlugin {
|
||||
Box::new(dir)
|
||||
};
|
||||
|
||||
ctx.push_preopened_dir(file, v)?;
|
||||
ctx.push_preopened_dir(file, k)?;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -29,6 +29,7 @@ pub(crate) mod manifest;
|
||||
pub(crate) mod pdk;
|
||||
mod plugin;
|
||||
mod plugin_builder;
|
||||
mod pool;
|
||||
mod readonly_dir;
|
||||
mod timer;
|
||||
|
||||
@@ -43,6 +44,7 @@ pub use plugin::{
|
||||
CancelHandle, CompiledPlugin, Plugin, WasmInput, EXTISM_ENV_MODULE, EXTISM_USER_MODULE,
|
||||
};
|
||||
pub use plugin_builder::{DebugOptions, PluginBuilder};
|
||||
pub use pool::{Pool, PoolBuilder, PoolPlugin};
|
||||
|
||||
pub(crate) use internal::{Internal, Wasi};
|
||||
pub(crate) use timer::{Timer, TimerAction};
|
||||
|
||||
@@ -191,6 +191,7 @@ pub(crate) fn profiling_strategy() -> ProfilingStrategy {
|
||||
/// Defines an input type for Wasm data.
|
||||
///
|
||||
/// Types that implement `Into<WasmInput>` can be passed directly into `Plugin::new`
|
||||
#[derive(Clone)]
|
||||
pub enum WasmInput<'a> {
|
||||
/// Raw Wasm module
|
||||
Data(std::borrow::Cow<'a, [u8]>),
|
||||
|
||||
@@ -33,6 +33,7 @@ impl Default for DebugOptions {
|
||||
}
|
||||
|
||||
/// PluginBuilder is used to configure and create `Plugin` instances
|
||||
#[derive(Clone)]
|
||||
pub struct PluginBuilder<'a> {
|
||||
pub(crate) source: WasmInput<'a>,
|
||||
pub(crate) config: Option<wasmtime::Config>,
|
||||
|
||||
174
runtime/src/pool.rs
Normal file
174
runtime/src/pool.rs
Normal file
@@ -0,0 +1,174 @@
|
||||
use crate::{Error, FromBytesOwned, Plugin, ToBytes};
|
||||
|
||||
// `PoolBuilder` is used to configure and create `Pool`s
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PoolBuilder {
|
||||
/// Max number of concurrent instances for a plugin - by default this is set to
|
||||
/// the output of `std::thread::available_parallelism`
|
||||
pub max_instances: usize,
|
||||
}
|
||||
|
||||
impl PoolBuilder {
|
||||
/// Create a `PoolBuilder` with default values
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
/// Set the max number of parallel instances
|
||||
pub fn with_max_instances(mut self, n: usize) -> Self {
|
||||
self.max_instances = n;
|
||||
self
|
||||
}
|
||||
|
||||
/// Create a new `Pool` with the given configuration
|
||||
pub fn build<F: 'static + Fn() -> Result<Plugin, Error>>(self, source: F) -> Pool {
|
||||
Pool::new_from_builder(source, self)
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for PoolBuilder {
|
||||
fn default() -> Self {
|
||||
PoolBuilder {
|
||||
max_instances: std::thread::available_parallelism()
|
||||
.expect("available parallelism")
|
||||
.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// `PoolPlugin` is used by the pool to track the number of live instances of a particular plugin
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct PoolPlugin(std::rc::Rc<std::cell::RefCell<Plugin>>);
|
||||
|
||||
impl PoolPlugin {
|
||||
fn new(plugin: Plugin) -> Self {
|
||||
Self(std::rc::Rc::new(std::cell::RefCell::new(plugin)))
|
||||
}
|
||||
|
||||
/// Access the underlying plugin
|
||||
pub fn plugin(&self) -> std::cell::RefMut<Plugin> {
|
||||
self.0.borrow_mut()
|
||||
}
|
||||
|
||||
/// Helper to call a plugin function on the underlying plugin
|
||||
pub fn call<'a, Input: ToBytes<'a>, Output: FromBytesOwned>(
|
||||
&self,
|
||||
name: impl AsRef<str>,
|
||||
input: Input,
|
||||
) -> Result<Output, Error> {
|
||||
self.plugin().call(name.as_ref(), input)
|
||||
}
|
||||
|
||||
/// Helper to get the underlying plugin's ID
|
||||
pub fn id(&self) -> uuid::Uuid {
|
||||
self.plugin().id
|
||||
}
|
||||
}
|
||||
|
||||
type PluginSource = dyn Fn() -> Result<Plugin, Error>;
|
||||
|
||||
struct PoolInner {
|
||||
plugin_source: Box<PluginSource>,
|
||||
instances: Vec<PoolPlugin>,
|
||||
}
|
||||
|
||||
unsafe impl Send for PoolInner {}
|
||||
unsafe impl Sync for PoolInner {}
|
||||
|
||||
/// `Pool` manages threadsafe access to a limited number of instances of multiple plugins
|
||||
#[derive(Clone)]
|
||||
pub struct Pool {
|
||||
config: PoolBuilder,
|
||||
inner: std::sync::Arc<std::sync::Mutex<PoolInner>>,
|
||||
}
|
||||
|
||||
unsafe impl Send for Pool {}
|
||||
unsafe impl Sync for Pool {}
|
||||
|
||||
impl Pool {
|
||||
/// Create a new pool with the default configuration
|
||||
pub fn new<F: 'static + Fn() -> Result<Plugin, Error>>(source: F) -> Self {
|
||||
Pool {
|
||||
config: Default::default(),
|
||||
inner: std::sync::Arc::new(std::sync::Mutex::new(PoolInner {
|
||||
plugin_source: Box::new(source),
|
||||
instances: Default::default(),
|
||||
})),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new pool configured using a `PoolBuilder`
|
||||
pub fn new_from_builder<F: 'static + Fn() -> Result<Plugin, Error>>(
|
||||
source: F,
|
||||
builder: PoolBuilder,
|
||||
) -> Self {
|
||||
Pool {
|
||||
config: builder,
|
||||
inner: std::sync::Arc::new(std::sync::Mutex::new(PoolInner {
|
||||
plugin_source: Box::new(source),
|
||||
instances: Default::default(),
|
||||
})),
|
||||
}
|
||||
}
|
||||
|
||||
fn find_available(&self) -> Result<Option<PoolPlugin>, Error> {
|
||||
let pool = self.inner.lock().unwrap();
|
||||
for instance in pool.instances.iter() {
|
||||
if std::rc::Rc::strong_count(&instance.0) == 1 {
|
||||
return Ok(Some(instance.clone()));
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Get the number of live instances for a plugin
|
||||
pub fn count(&self) -> usize {
|
||||
self.inner.lock().unwrap().instances.len()
|
||||
}
|
||||
|
||||
/// Get access to a plugin, this will create a new instance if needed (and allowed by the specified
|
||||
/// max_instances). `Ok(None)` is returned if the timeout is reached before an available plugin could be
|
||||
/// acquired
|
||||
pub fn get(&self, timeout: std::time::Duration) -> Result<Option<PoolPlugin>, Error> {
|
||||
let start = std::time::Instant::now();
|
||||
let max = self.config.max_instances;
|
||||
if let Some(avail) = self.find_available()? {
|
||||
return Ok(Some(avail));
|
||||
}
|
||||
|
||||
{
|
||||
let mut pool = self.inner.lock().unwrap();
|
||||
if pool.instances.len() < max {
|
||||
let plugin = (*pool.plugin_source)()?;
|
||||
let instance = PoolPlugin::new(plugin);
|
||||
pool.instances.push(instance);
|
||||
return Ok(Some(pool.instances.last().unwrap().clone()));
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
if let Ok(Some(x)) = self.find_available() {
|
||||
return Ok(Some(x));
|
||||
}
|
||||
if std::time::Instant::now() - start > timeout {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
std::thread::sleep(std::time::Duration::from_millis(100));
|
||||
}
|
||||
}
|
||||
|
||||
/// Access a plugin in a callback function. This calls `Pool::get` then the provided
|
||||
/// callback. `Ok(None)` is returned if the timeout is reached before an available
|
||||
/// plugin could be acquired
|
||||
pub fn with_plugin<T>(
|
||||
&self,
|
||||
timeout: std::time::Duration,
|
||||
f: impl FnOnce(&mut Plugin) -> Result<T, Error>,
|
||||
) -> Result<Option<T>, Error> {
|
||||
if let Some(plugin) = self.get(timeout)? {
|
||||
return f(&mut plugin.plugin()).map(Some);
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
@@ -1,3 +1,4 @@
|
||||
mod issues;
|
||||
mod kernel;
|
||||
mod pool;
|
||||
mod runtime;
|
||||
|
||||
48
runtime/src/tests/pool.rs
Normal file
48
runtime/src/tests/pool.rs
Normal file
@@ -0,0 +1,48 @@
|
||||
use crate::*;
|
||||
|
||||
fn run_thread(p: Pool, i: u64) -> std::thread::JoinHandle<()> {
|
||||
std::thread::spawn(move || {
|
||||
std::thread::sleep(std::time::Duration::from_millis(i));
|
||||
let s: String = p
|
||||
.get(std::time::Duration::from_secs(1))
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.call("count_vowels", "abc")
|
||||
.unwrap();
|
||||
println!("{}", s);
|
||||
})
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_threads() {
|
||||
for i in 1..=3 {
|
||||
let data = include_bytes!("../../../wasm/code.wasm");
|
||||
let plugin_builder =
|
||||
extism::PluginBuilder::new(extism::Manifest::new([extism::Wasm::data(data)]))
|
||||
.with_wasi(true);
|
||||
let pool: Pool = PoolBuilder::new()
|
||||
.with_max_instances(i)
|
||||
.build(move || plugin_builder.clone().build());
|
||||
|
||||
let threads = vec![
|
||||
run_thread(pool.clone(), 1000),
|
||||
run_thread(pool.clone(), 1000),
|
||||
run_thread(pool.clone(), 1000),
|
||||
run_thread(pool.clone(), 1000),
|
||||
run_thread(pool.clone(), 1000),
|
||||
run_thread(pool.clone(), 1000),
|
||||
run_thread(pool.clone(), 500),
|
||||
run_thread(pool.clone(), 500),
|
||||
run_thread(pool.clone(), 500),
|
||||
run_thread(pool.clone(), 500),
|
||||
run_thread(pool.clone(), 500),
|
||||
run_thread(pool.clone(), 0),
|
||||
];
|
||||
|
||||
for t in threads {
|
||||
t.join().unwrap();
|
||||
}
|
||||
|
||||
assert!(pool.count() <= i);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user