Compare commits

...

5 Commits

Author SHA1 Message Date
zach
c9de677d7d wip: add custom http resolver that prioritizes ipv4 addresses 2023-12-13 15:37:14 -08:00
zach
bdb27c00dc feat: add extism_current_plugin_timeout_add_ms 2023-12-13 13:21:25 -08:00
zach
e80eb71f51 feat: add TimeoutManager to add/subtract from a plugins timeout 2023-12-13 13:17:19 -08:00
zach
a5edf58747 fix(kernel): improve performance after large allocations, add extism_plugin_reset to give users more control when dealing with large allocations (#627)
See https://github.com/extism/cpp-sdk/issues/15

- Limits a call to memset in the kernel to the size of the current
memory offset instead of the total size of memory.
- Adds `extism_plugin_reset` to the C API and `extism::Plugin::reset` to
Rust

---------

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: zshipko <zshipko@users.noreply.github.com>
2023-12-12 13:56:34 -08:00
zach
e5ffabb975 feat: Add extism_convert::Raw to allow direct encoding using bytemuck (#626)
- Adds `extism_convert::Raw` to encode certain types using their direct
memory representations using https://github.com/Lokathor/bytemuck
- Only enabled on little-endian targets to prevent memory mismatch
issues
- Allows for certain types of structs to be encoded using their
in-memory representation
- Makes passing structs between Rust and C easier since none of the
other encodings are available in C by default.

## Usage
After making a bytemuck-compatible struct:
```rust
use bytemuck::{Zeroable, Pod};

#[derive(Debug, Clone, Copy, PartialEq, Pod, Zeroable)]
#[repr(C)]
struct Point {
  x: i32,
  y: i32,
}
```

It can be used in `call`:

```rust
let input = Point { x: 100, y: 50 };
let Raw(pt): Raw<Point> = plugin.call("transform", Raw(&input))?;
```
2023-12-11 11:14:33 -08:00
16 changed files with 440 additions and 41 deletions

View File

@@ -12,6 +12,7 @@ description = "Traits to make Rust types usable with Extism"
[dependencies]
anyhow = "1.0.75"
base64 = "~0.21"
bytemuck = {version = "1.14.0", optional = true }
prost = { version = "0.12.0", optional = true }
rmp-serde = { version = "1.1.2", optional = true }
serde = "1.0.186"
@@ -21,6 +22,7 @@ serde_json = "1.0.105"
serde = { version = "1.0.186", features = ["derive"] }
[features]
default = ["msgpack", "protobuf"]
default = ["msgpack", "protobuf", "raw"]
msgpack = ["rmp-serde"]
protobuf = ["prost"]
raw = ["bytemuck"]

View File

@@ -138,3 +138,53 @@ impl<T: Default + prost::Message> FromBytesOwned for Protobuf<T> {
Ok(Protobuf(T::decode(data)?))
}
}
/// Raw does no conversion, it just copies the memory directly.
/// Note: This will only work for types that implement [bytemuck::Pod](https://docs.rs/bytemuck/latest/bytemuck/trait.Pod.html)
#[cfg(all(feature = "raw", target_endian = "little"))]
pub struct Raw<'a, T: bytemuck::Pod>(pub &'a T);
#[cfg(all(feature = "raw", target_endian = "little"))]
impl<'a, T: bytemuck::Pod> ToBytes<'a> for Raw<'a, T> {
type Bytes = &'a [u8];
fn to_bytes(&self) -> Result<Self::Bytes, Error> {
Ok(bytemuck::bytes_of(self.0))
}
}
#[cfg(all(feature = "raw", target_endian = "little"))]
impl<'a, T: bytemuck::Pod> FromBytes<'a> for Raw<'a, T> {
fn from_bytes(data: &'a [u8]) -> Result<Self, Error> {
let x = bytemuck::try_from_bytes(data).map_err(|x| Error::msg(x.to_string()))?;
Ok(Raw(x))
}
}
#[cfg(all(test, feature = "raw", target_endian = "little"))]
mod tests {
use crate::*;
#[test]
fn test_raw() {
#[derive(Debug, Clone, Copy, PartialEq)]
struct TestRaw {
a: i32,
b: f64,
c: bool,
}
unsafe impl bytemuck::Pod for TestRaw {}
unsafe impl bytemuck::Zeroable for TestRaw {}
let x = TestRaw {
a: 123,
b: 45678.91011,
c: true,
};
let raw = Raw(&x).to_bytes().unwrap();
let y = Raw::from_bytes(&raw).unwrap();
assert_eq!(&x, y.0);
let y: Result<Raw<[u8; std::mem::size_of::<TestRaw>()]>, Error> = Raw::from_bytes(&raw);
assert!(y.is_ok());
}
}

View File

@@ -21,6 +21,9 @@ pub use encoding::Msgpack;
#[cfg(feature = "protobuf")]
pub use encoding::Protobuf;
#[cfg(all(feature = "raw", target_endian = "little"))]
pub use encoding::Raw;
pub use from_bytes::{FromBytes, FromBytesOwned};
pub use memory_handle::MemoryHandle;
pub use to_bytes::ToBytes;

View File

@@ -139,7 +139,9 @@ impl MemoryRoot {
}
// Ensure that at least one page is allocated to store the `MemoryRoot` data
if core::arch::wasm32::memory_size(0) == 0 && core::arch::wasm32::memory_grow(0, 1) == usize::MAX {
if core::arch::wasm32::memory_size(0) == 0
&& core::arch::wasm32::memory_grow(0, 1) == usize::MAX
{
core::arch::wasm32::unreachable()
}
@@ -168,12 +170,15 @@ impl MemoryRoot {
/// Resets the position of the allocator and zeroes out all allocations
pub unsafe fn reset(&mut self) {
// Clear allocated data
let self_position = self.position.fetch_and(0, Ordering::SeqCst);
core::ptr::write_bytes(
self.blocks.as_mut_ptr() as *mut u8,
0,
self.length.load(Ordering::Acquire) as usize,
MemoryStatus::Unused as u8,
self_position as usize,
);
self.position.store(0, Ordering::Release);
// Clear extism runtime metadata
self.error.store(0, Ordering::Release);
self.input_offset = 0;
self.input_length = 0;

View File

@@ -90,6 +90,14 @@ Get the plugin's output data.
const uint8_t *extism_plugin_output_data(ExtismPlugin *plugin);
```
### `extism_plugin_reset`
Reset the Extism runtime, this will invalidate all allocated memory.
```c
bool extism_plugin_reset(ExtismPlugin *plugin);
```
### `extism_log_file`
Set log file and level.

View File

@@ -118,9 +118,7 @@ pub fn echo(c: &mut Criterion) {
pub fn reflect(c: &mut Criterion) {
let mut g = c.benchmark_group("reflect");
g.sample_size(500);
g.noise_threshold(1.0);
g.significance_level(0.2);
let mut plugin = PluginBuilder::new(REFLECT)
.with_wasi(true)
.with_function(
@@ -132,22 +130,23 @@ pub fn reflect(c: &mut Criterion) {
)
.build()
.unwrap();
for (i, elements) in [
b"a".repeat(65536),
b"a".repeat(65536 * 10),
b"a".repeat(65536 * 100),
b"a".repeat(65536),
]
.iter()
.enumerate()
{
g.throughput(criterion::Throughput::Bytes(elements.len() as u64));
g.bench_with_input(
format!("reflect {} bytes", 10u32.pow(i as u32) * 65536),
format!("{i}: reflect {} bytes", elements.len()),
elements,
|b, elems| {
b.iter(|| {
assert_eq!(elems, plugin.call::<_, &[u8]>("reflect", &elems).unwrap());
// plugin.reset().unwrap();
});
},
);

View File

@@ -132,6 +132,12 @@ ExtismSize extism_current_plugin_memory_length(ExtismCurrentPlugin *plugin, Exti
*/
void extism_current_plugin_memory_free(ExtismCurrentPlugin *plugin, ExtismMemoryHandle ptr);
/**
* Add milliseconds to a plug-in's timeout
* NOTE: this should only be called from host functions.
*/
bool extism_current_plugin_timeout_add_ms(ExtismCurrentPlugin *plugin, uint64_t ms);
/**
* Create a new host function
*
@@ -266,6 +272,11 @@ bool extism_log_custom(const char *log_level);
*/
void extism_log_drain(void (*handler)(const char*, uintptr_t));
/**
* Reset the Extism runtime, this will invalidate all allocated memory
*/
bool extism_plugin_reset(ExtismPlugin *plugin);
/**
* Get the Extism version string
*/

View File

@@ -87,18 +87,27 @@ impl CurrentPlugin {
}
/// Access memory bytes as `str`
pub fn memory_str(&mut self, handle: MemoryHandle) -> Result<&mut str, Error> {
let bytes = self.memory_bytes(handle)?;
pub fn memory_str_mut(&mut self, handle: MemoryHandle) -> Result<&mut str, Error> {
let bytes = self.memory_bytes_mut(handle)?;
let s = std::str::from_utf8_mut(bytes)?;
Ok(s)
}
pub fn memory_str(&mut self, handle: MemoryHandle) -> Result<&str, Error> {
let bytes = self.memory_bytes(handle)?;
let s = std::str::from_utf8(bytes)?;
Ok(s)
}
/// Allocate a handle large enough for the encoded Rust type and copy it into Extism memory
pub fn memory_new<'a, T: ToBytes<'a>>(&mut self, t: T) -> Result<MemoryHandle, Error> {
let data = t.to_bytes()?;
let data = data.as_ref();
if data.is_empty() {
return Ok(MemoryHandle::null());
}
let handle = self.memory_alloc(data.len() as u64)?;
let bytes = self.memory_bytes(handle)?;
let bytes = self.memory_bytes_mut(handle)?;
bytes.copy_from_slice(data.as_ref());
Ok(handle)
}
@@ -144,7 +153,7 @@ impl CurrentPlugin {
Ok(())
}
pub fn memory_bytes(&mut self, handle: MemoryHandle) -> Result<&mut [u8], Error> {
pub fn memory_bytes_mut(&mut self, handle: MemoryHandle) -> Result<&mut [u8], Error> {
let (linker, mut store) = self.linker_and_store();
if let Some(mem) = linker.get(&mut store, EXTISM_ENV_MODULE, "memory") {
let mem = mem.into_memory().unwrap();
@@ -158,6 +167,20 @@ impl CurrentPlugin {
anyhow::bail!("{} unable to locate extism memory", self.id)
}
pub fn memory_bytes(&mut self, handle: MemoryHandle) -> Result<&[u8], Error> {
let (linker, mut store) = self.linker_and_store();
if let Some(mem) = linker.get(&mut store, EXTISM_ENV_MODULE, "memory") {
let mem = mem.into_memory().unwrap();
let ptr = unsafe { mem.data_ptr(&store).add(handle.offset() as usize) };
if ptr.is_null() {
return Ok(&[]);
}
return Ok(unsafe { std::slice::from_raw_parts(ptr, handle.len()) });
}
anyhow::bail!("{} unable to locate extism memory", self.id)
}
pub fn memory_alloc(&mut self, n: u64) -> Result<MemoryHandle, Error> {
if n == 0 {
return Ok(MemoryHandle {
@@ -398,6 +421,12 @@ impl CurrentPlugin {
let length = self.memory_length(offs).unwrap_or_default();
(offs, length)
}
/// Create a new `TimeoutManager` that can be used to adjust timeouts for the
/// current plugin. Returns `None` when no timeout has been configured.
pub fn timeout_manager(&self) -> Option<TimeoutManager> {
TimeoutManager::new(self)
}
}
impl Internal for CurrentPlugin {

Binary file not shown.

View File

@@ -14,6 +14,7 @@ pub(crate) mod manifest;
pub(crate) mod pdk;
mod plugin;
mod plugin_builder;
mod timeout_manager;
mod timer;
/// Extism C API
@@ -25,6 +26,7 @@ pub use extism_manifest::{Manifest, Wasm, WasmMetadata};
pub use function::{Function, UserData, Val, ValType, PTR};
pub use plugin::{CancelHandle, Plugin, WasmInput, EXTISM_ENV_MODULE, EXTISM_USER_MODULE};
pub use plugin_builder::{DebugOptions, PluginBuilder};
pub use timeout_manager::TimeoutManager;
pub(crate) use internal::{Internal, Wasi};
pub(crate) use timer::{Timer, TimerAction};

View File

@@ -1,3 +1,5 @@
use std::cmp::Ordering;
/// All the functions in the file are exposed from inside WASM plugins
use crate::*;
@@ -140,6 +142,26 @@ pub(crate) fn var_set(
Ok(())
}
#[derive(Default, Debug)]
struct Resolver;
impl ureq::Resolver for Resolver {
fn resolve(&self, netloc: &str) -> std::io::Result<Vec<std::net::SocketAddr>> {
let addrs = std::net::ToSocketAddrs::to_socket_addrs(netloc)?.into_iter();
let mut addrs: Vec<_> = addrs.collect();
addrs.sort_by(|a, b| {
if a.is_ipv4() && b.is_ipv6() {
Ordering::Less
} else if a.is_ipv6() && b.is_ipv4() {
Ordering::Greater
} else {
Ordering::Equal
}
});
Ok(addrs)
}
}
/// Make an HTTP request
/// Params: i64 (offset to JSON encoded HttpRequest), i64 (offset to body or 0)
/// Returns: i64 (offset)
@@ -150,6 +172,7 @@ pub(crate) fn http_request(
) -> Result<(), Error> {
let data: &mut CurrentPlugin = caller.data_mut();
let http_req_offset = args!(input, 0, i64) as u64;
#[cfg(not(feature = "http"))]
{
let handle = match data.memory_handle(http_req_offset) {
@@ -201,7 +224,8 @@ pub(crate) fn http_request(
)));
}
let mut r = ureq::request(req.method.as_deref().unwrap_or("GET"), &req.url);
let agent = ureq::builder().resolver(Resolver).build();
let mut r = agent.request(req.method.as_deref().unwrap_or("GET"), &req.url);
for (k, v) in req.headers.iter() {
r = r.set(k, v);

View File

@@ -74,7 +74,7 @@ pub struct Plugin {
/// Set to `true` when de-initializarion may have occured (i.e.a call to `_start`),
/// in this case we need to re-initialize the entire module.
pub(crate) needs_reset: bool,
pub(crate) store_needs_reset: bool,
pub(crate) debug_options: DebugOptions,
}
@@ -238,6 +238,20 @@ impl Plugin {
CurrentPlugin::new(manifest, with_wasi, available_pages, id)?,
);
store.set_epoch_deadline(1);
store.call_hook(|data, hook| {
if hook.entering_host() {
let tx = Timer::tx();
tx.send(TimerAction::EnterHost {
id: data.id.clone(),
})?;
} else if hook.exiting_host() {
let tx = Timer::tx();
tx.send(TimerAction::ExitHost {
id: data.id.clone(),
})?;
}
Ok(())
});
let mut linker = Linker::new(&engine);
linker.allow_shadowing(true);
@@ -303,7 +317,7 @@ impl Plugin {
cancel_handle: CancelHandle { id, timer_tx },
instantiations: 0,
output: Output::default(),
needs_reset: false,
store_needs_reset: false,
debug_options,
_functions: imports,
};
@@ -324,7 +338,7 @@ impl Plugin {
&mut self,
instance_lock: &mut std::sync::MutexGuard<Option<Instance>>,
) -> Result<(), Error> {
if self.instantiations > 100 {
if self.store_needs_reset {
let engine = self.store.engine().clone();
let internal = self.current_plugin_mut();
self.store = Store::new(
@@ -355,9 +369,9 @@ impl Plugin {
}
self.instantiations = 0;
self.instance_pre = self.linker.instantiate_pre(main)?;
**instance_lock = None;
self.store_needs_reset = false;
}
**instance_lock = None;
Ok(())
}
@@ -428,12 +442,7 @@ impl Plugin {
let bytes = unsafe { std::slice::from_raw_parts(input, len) };
debug!(plugin = &id, "input size: {}", bytes.len());
if let Some(f) = self.linker.get(&mut self.store, EXTISM_ENV_MODULE, "reset") {
f.into_func().unwrap().call(&mut self.store, &[], &mut [])?;
} else {
error!(plugin = &id, "call to extism:host/env::reset failed");
}
self.reset()?;
let handle = self.current_plugin_mut().memory_new(bytes)?;
if let Some(f) = self
@@ -450,6 +459,19 @@ impl Plugin {
Ok(())
}
/// Reset Extism runtime, this will invalidate all allocated memory
pub fn reset(&mut self) -> Result<(), Error> {
let id = self.id.to_string();
if let Some(f) = self.linker.get(&mut self.store, EXTISM_ENV_MODULE, "reset") {
f.into_func().unwrap().call(&mut self.store, &[], &mut [])?;
} else {
error!(plugin = &id, "call to extism:host/env::reset failed");
}
Ok(())
}
/// Determine if wasi is enabled
pub fn has_wasi(&self) -> bool {
self.current_plugin().wasi.is_some()
@@ -615,14 +637,11 @@ impl Plugin {
let name = name.as_ref();
let input = input.as_ref();
if self.needs_reset {
if let Err(e) = self.reset_store(lock) {
error!(
plugin = self.id.to_string(),
"call to Plugin::reset_store failed: {e:?}"
);
}
self.needs_reset = false;
if let Err(e) = self.reset_store(lock) {
error!(
plugin = self.id.to_string(),
"call to Plugin::reset_store failed: {e:?}"
);
}
self.instantiate(lock).map_err(|e| (e, -1))?;
@@ -667,7 +686,7 @@ impl Plugin {
self.store
.epoch_deadline_callback(|_| Ok(UpdateDeadline::Continue(1)));
let _ = self.timer_tx.send(TimerAction::Stop { id: self.id });
self.needs_reset = name == "_start";
self.store_needs_reset = name == "_start";
// Get extism error
self.get_output_after_call().map_err(|x| (x, -1))?;

View File

@@ -143,6 +143,25 @@ pub unsafe extern "C" fn extism_current_plugin_memory_free(
}
}
/// Add milliseconds to a plug-in's timeout
/// NOTE: this should only be called from host functions.
#[no_mangle]
pub unsafe extern "C" fn extism_current_plugin_timeout_add_ms(
plugin: *mut CurrentPlugin,
ms: u64,
) -> bool {
if plugin.is_null() {
return false;
}
let plugin = &mut *plugin;
if let Some(mgr) = plugin.timeout_manager() {
return mgr.add(std::time::Duration::from_millis(ms)).is_ok();
}
false
}
/// Create a new host function
///
/// Arguments
@@ -702,6 +721,30 @@ impl std::io::Write for LogBuffer {
}
}
/// Reset the Extism runtime, this will invalidate all allocated memory
#[no_mangle]
pub unsafe extern "C" fn extism_plugin_reset(plugin: *mut Plugin) -> bool {
let plugin = &mut *plugin;
if let Err(e) = plugin.reset() {
error!(
plugin = plugin.id.to_string(),
"unable to reset plugin: {}",
e.to_string()
);
if let Err(e) = plugin.current_plugin_mut().set_error(e.to_string()) {
error!(
plugin = plugin.id.to_string(),
"unable to set error after failed plugin reset: {}",
e.to_string()
);
}
false
} else {
true
}
}
/// Get the Extism version string
#[no_mangle]
pub unsafe extern "C" fn extism_version() -> *const c_char {

View File

@@ -235,6 +235,43 @@ fn test_timeout() {
assert!(err == "timeout");
}
fn hello_world_timeout_manager(
plugin: &mut CurrentPlugin,
inputs: &[Val],
outputs: &mut [Val],
_user_data: UserData<()>,
) -> Result<(), Error> {
let mgr = plugin.timeout_manager().map(|x| x.add_on_drop());
std::thread::sleep(std::time::Duration::from_secs(3));
outputs[0] = inputs[0].clone();
drop(mgr);
Ok(())
}
#[test]
fn test_timeout_manager() {
let f = Function::new(
"hello_world",
[PTR],
[PTR],
UserData::default(),
hello_world_timeout_manager,
);
let manifest = Manifest::new([extism_manifest::Wasm::data(WASM)])
.with_timeout(std::time::Duration::from_secs(1));
let mut plugin = Plugin::new(manifest, [f], true).unwrap();
let start = std::time::Instant::now();
let output: Result<&[u8], Error> = plugin.call("count_vowels", "testing");
println!("Result {:?}", output);
assert!(output.is_ok());
let end = std::time::Instant::now();
let time = end - start;
println!("Plugin ran for {:?}", time);
assert!(time.as_secs() >= 3);
}
typed_plugin!(pub TestTypedPluginGenerics {
count_vowels<T: FromBytes<'a>>(&str) -> T
});
@@ -283,7 +320,7 @@ fn test_multiple_instantiations() {
#[test]
fn test_globals() {
let mut plugin = Plugin::new(WASM_GLOBALS, [], true).unwrap();
for i in 0..1000 {
for i in 0..100000 {
let Json(count) = plugin.call::<_, Json<Count>>("globals", "").unwrap();
assert_eq!(count.count, i);
}

View File

@@ -0,0 +1,92 @@
use crate::*;
enum DropBehavior {
Add,
Sub,
}
/// `TimeoutManager` is used to control `Plugin` timeouts from within host functions
///
/// It can be used to add or subtract time from a plug-in's timeout. If a plugin is not
/// configured to have a timeout then this will have no effect.
pub struct TimeoutManager {
start_time: std::time::Instant,
id: uuid::Uuid,
tx: std::sync::mpsc::Sender<TimerAction>,
cost: f64,
drop_behavior: Option<DropBehavior>,
}
impl TimeoutManager {
/// Create a new `TimeoutManager` from the `CurrentPlugin`, this will return `None` if no timeout
/// is configured
pub fn new(plugin: &CurrentPlugin) -> Option<TimeoutManager> {
plugin.manifest.timeout_ms.map(|_| TimeoutManager {
start_time: std::time::Instant::now(),
id: plugin.id.clone(),
tx: Timer::tx(),
cost: 1.0,
drop_behavior: None,
})
}
/// Add to the configured timeout
pub fn add(&self, duration: std::time::Duration) -> Result<(), Error> {
let d = duration.mul_f64(self.cost);
self.tx.send(TimerAction::Extend {
id: self.id.clone(),
duration: d.into(),
})?;
Ok(())
}
/// Subtract from the configured timeout
pub fn sub(&self, duration: std::time::Duration) -> Result<(), Error> {
let d: timer::ExtendTimeout = duration.mul_f64(self.cost).into();
self.tx.send(TimerAction::Extend {
id: self.id.clone(),
duration: -d,
})?;
Ok(())
}
/// When the `TimeoutManager` is dropped the elapsed duration since it was created will be
/// added to the plug-in's timeout
pub fn add_on_drop(mut self) -> Self {
self.drop_behavior = Some(DropBehavior::Add);
self
}
/// When the `TimeoutManager` is dropped the elapsed duration since it was created will be
/// added to the plug-in's timeout
pub fn sub_on_drop(mut self) -> Self {
self.drop_behavior = Some(DropBehavior::Sub);
self
}
/// Adjust the cost of added/subtracted values, this will scale all durations
/// submitted to this manager by the provided factor.
pub fn cost(mut self, cost: f64) -> Self {
self.cost = cost;
self
}
}
impl Drop for TimeoutManager {
fn drop(&mut self) {
if let Some(b) = &self.drop_behavior {
let duration = self.start_time.elapsed();
let x = match b {
DropBehavior::Add => self.add(duration),
DropBehavior::Sub => self.sub(duration),
};
if let Err(e) = x {
error!(
plugin = self.id.to_string(),
"unable to extend timeout: {}",
e.to_string()
);
}
}
}
}

View File

@@ -1,17 +1,51 @@
use crate::*;
#[derive(Debug)]
pub(crate) struct ExtendTimeout {
duration: std::time::Duration,
negative: bool,
}
impl From<std::time::Duration> for ExtendTimeout {
fn from(value: std::time::Duration) -> Self {
ExtendTimeout {
duration: value,
negative: false,
}
}
}
impl std::ops::Neg for ExtendTimeout {
type Output = ExtendTimeout;
fn neg(mut self) -> Self::Output {
self.negative = !self.negative;
self
}
}
pub(crate) enum TimerAction {
Start {
id: uuid::Uuid,
engine: Engine,
duration: Option<std::time::Duration>,
},
Extend {
id: uuid::Uuid,
duration: ExtendTimeout,
},
Stop {
id: uuid::Uuid,
},
Cancel {
id: uuid::Uuid,
},
EnterHost {
id: uuid::Uuid,
},
ExitHost {
id: uuid::Uuid,
},
Shutdown,
}
@@ -31,6 +65,8 @@ extern "C" fn cleanup_timer() {
static mut TIMER: std::sync::Mutex<Option<Timer>> = std::sync::Mutex::new(None);
type TimerMap = std::collections::BTreeMap<uuid::Uuid, (Engine, Option<std::time::Instant>)>;
impl Timer {
pub(crate) fn tx() -> std::sync::mpsc::Sender<TimerAction> {
let mut timer = match unsafe { TIMER.lock() } {
@@ -49,7 +85,8 @@ impl Timer {
pub fn init(timer: &mut Option<Timer>) -> std::sync::mpsc::Sender<TimerAction> {
let (tx, rx) = std::sync::mpsc::channel();
let thread = std::thread::spawn(move || {
let mut plugins = std::collections::BTreeMap::new();
let mut plugins = TimerMap::new();
let mut in_host = TimerMap::new();
macro_rules! handle {
($x:expr) => {
@@ -70,12 +107,16 @@ impl Timer {
TimerAction::Stop { id } => {
trace!(plugin = id.to_string(), "handling stop event");
plugins.remove(&id);
in_host.remove(&id);
}
TimerAction::Cancel { id } => {
trace!(plugin = id.to_string(), "handling cancel event");
if let Some((engine, _)) = plugins.remove(&id) {
engine.increment_epoch();
}
if let Some((engine, _)) = in_host.remove(&id) {
engine.increment_epoch();
}
}
TimerAction::Shutdown => {
trace!("Shutting down timer");
@@ -83,8 +124,42 @@ impl Timer {
trace!(plugin = id.to_string(), "handling shutdown event");
engine.increment_epoch();
}
for (id, (engine, _)) in in_host.iter() {
trace!(plugin = id.to_string(), "handling shutdown event");
engine.increment_epoch();
}
return;
}
TimerAction::Extend { id, duration } => {
if let Some((_engine, Some(timeout))) = plugins.get_mut(&id) {
let x = if duration.negative {
timeout.checked_sub(duration.duration)
} else {
timeout.checked_add(duration.duration)
};
if let Some(t) = x {
*timeout = t;
} else {
error!(
plugin = id.to_string(),
"unable to extend timeout by {:?}", duration.duration
);
}
}
}
TimerAction::EnterHost { id } => {
trace!(plugin = id.to_string(), "enter host function");
if let Some(x) = plugins.remove(&id) {
in_host.insert(id, x);
}
}
TimerAction::ExitHost { id } => {
trace!(plugin = id.to_string(), "exit host function");
if let Some(x) = in_host.remove(&id) {
plugins.insert(id, x);
}
}
}
};
}
@@ -96,6 +171,10 @@ impl Timer {
}
}
for x in rx.try_iter() {
handle!(x)
}
plugins = plugins
.into_iter()
.filter(|(_k, (engine, end))| {
@@ -109,10 +188,6 @@ impl Timer {
true
})
.collect();
for x in rx.try_iter() {
handle!(x)
}
}
});
*timer = Some(Timer {