runtime: Implement sled tree batch writes.

This commit is contained in:
parazyd
2022-11-05 14:46:58 +01:00
parent 38054cc751
commit 3bdd6cf1c9
5 changed files with 138 additions and 76 deletions

View File

@@ -61,19 +61,17 @@ define_contract!(
fn init_contract(cid: ContractId, _ix: &[u8]) -> ContractResult { fn init_contract(cid: ContractId, _ix: &[u8]) -> ContractResult {
msg!("wakeup wagies!"); msg!("wakeup wagies!");
db_init(cid, "wagies")?;
// Initialize a state tree. db_init will fail if the tree already exists.
// Otherwise, it will return a `DbHandle` that can be used further on.
// TODO: If the deploy execution fails, whatever is initialized with db_init // TODO: If the deploy execution fails, whatever is initialized with db_init
// should be deleted from sled afterwards. There's no way to create a // should be deleted from sled afterwards. There's no way to create a
// tree but only apply the creation when we're done, so db_init creates // tree but only apply the creation when we're done, so db_init creates
// it and upon failure it should delete it // it and upon failure it should delete it
let wagies_handle = db_init(cid, "wagies")?;
db_set(wagies_handle, &serialize(&"jason_gulag".to_string()), &serialize(&110))?;
// Lets write a value in there //let db_handle = db_lookup("wagies")?;
let tx_handle = db_begin_tx()?;
db_set(tx_handle, "jason_gulag".as_bytes(), serialize(&110))?;
let db_handle = db_lookup("wagies")?;
db_end_tx(db_handle, tx_handle)?;
// Host will clear delete the batches array after calling this func.
Ok(()) Ok(())
} }
@@ -141,10 +139,10 @@ fn process_update(_cid: ContractId, update_data: &[u8]) -> ContractResult {
let update: FooUpdate = deserialize(&update_data[1..])?; let update: FooUpdate = deserialize(&update_data[1..])?;
// Write the wagie to the db // Write the wagie to the db
let tx_handle = db_begin_tx()?; //let tx_handle = db_begin_tx()?;
db_set(tx_handle, update.name.as_bytes(), serialize(&update.age))?; //db_set(tx_handle, &serialize(&update.name), &serialize(&update.age))?;
let db_handle = db_lookup("wagies")?; //let db_handle = db_lookup("wagies")?;
db_end_tx(db_handle, tx_handle)?; //db_end_tx(db_handle, tx_handle)?;
} }
_ => unreachable!(), _ => unreachable!(),
} }

View File

@@ -22,7 +22,10 @@ use log::error;
use std::io::Cursor; use std::io::Cursor;
use wasmer::{FunctionEnvMut, WasmPtr}; use wasmer::{FunctionEnvMut, WasmPtr};
use crate::runtime::vm_runtime::{ContractSection, Env}; use crate::{
runtime::vm_runtime::{ContractSection, Env},
Result,
};
/// Internal wasm runtime API for sled trees /// Internal wasm runtime API for sled trees
pub struct DbHandle { pub struct DbHandle {
@@ -34,6 +37,10 @@ impl DbHandle {
pub fn new(contract_id: ContractId, tree: sled::Tree) -> Self { pub fn new(contract_id: ContractId, tree: sled::Tree) -> Self {
Self { contract_id, tree } Self { contract_id, tree }
} }
pub fn apply_batch(&self, batch: sled::Batch) -> Result<()> {
Ok(self.tree.apply_batch(batch)?)
}
} }
/// Only deploy() can call this. Creates a new database instance for this contract. /// Only deploy() can call this. Creates a new database instance for this contract.
@@ -58,7 +65,7 @@ pub(crate) fn db_init(ctx: FunctionEnvMut<Env>, ptr: WasmPtr<u8>, len: u32) -> i
let mut buf = vec![0_u8; len as usize]; let mut buf = vec![0_u8; len as usize];
if let Err(e) = mem_slice.read_slice(&mut buf) { if let Err(e) = mem_slice.read_slice(&mut buf) {
error!(target: "wasm_runtime::db_init", "Failed to read from memory slice"); error!(target: "wasm_runtime::db_init", "Failed to read from memory slice: {}", e);
return -2 return -2
}; };
@@ -80,6 +87,8 @@ pub(crate) fn db_init(ctx: FunctionEnvMut<Env>, ptr: WasmPtr<u8>, len: u32) -> i
} }
}; };
// TODO: Ensure we've read the entire buffer above.
if &cid != contract_id { if &cid != contract_id {
error!(target: "wasm_runtime::db_init", "Unauthorized ContractId for db_init"); error!(target: "wasm_runtime::db_init", "Unauthorized ContractId for db_init");
return -1 return -1
@@ -94,7 +103,9 @@ pub(crate) fn db_init(ctx: FunctionEnvMut<Env>, ptr: WasmPtr<u8>, len: u32) -> i
}; };
let mut db_handles = env.db_handles.borrow_mut(); let mut db_handles = env.db_handles.borrow_mut();
let mut db_batches = env.db_batches.borrow_mut();
db_handles.push(DbHandle::new(*contract_id, tree_handle)); db_handles.push(DbHandle::new(*contract_id, tree_handle));
db_batches.push(sled::Batch::default());
return (db_handles.len() - 1) as i32 return (db_handles.len() - 1) as i32
} }
_ => { _ => {
@@ -150,36 +161,74 @@ pub(crate) fn db_get(ctx: FunctionEnvMut<Env>) -> i32 {
/// ``` /// ```
/// db_set(tx_handle, key, value); /// db_set(tx_handle, key, value);
/// ``` /// ```
pub(crate) fn db_set(ctx: FunctionEnvMut<Env>) -> i32 { pub(crate) fn db_set(ctx: FunctionEnvMut<Env>, ptr: WasmPtr<u8>, len: u32) -> i32 {
let env = ctx.data(); let env = ctx.data();
match env.contract_section { match env.contract_section {
ContractSection::Deploy | ContractSection::Update => 0, ContractSection::Deploy | ContractSection::Update => {
_ => -1, let memory_view = env.memory_view(&ctx);
}
}
/// Only update() can call this. Starts an atomic transaction. let Ok(mem_slice) = ptr.slice(&memory_view, len) else {
/// error!(target: "wasm_runtime::db_set", "Failed to make slice from ptr");
/// ``` return -2
/// tx_handle = db_begin_tx(); };
/// ```
pub(crate) fn db_begin_tx(ctx: FunctionEnvMut<Env>) -> i32 {
let env = ctx.data();
match env.contract_section {
ContractSection::Deploy | ContractSection::Update => 0,
_ => -1,
}
}
/// Only update() can call this. This writes the atomic tx to the database. let mut buf = vec![0_u8; len as usize];
/// if let Err(e) = mem_slice.read_slice(&mut buf) {
/// ``` error!(target: "wasm_runtime:db_set", "Failed to read from memory slice");
/// db_end_tx(db_handle, tx_handle); return -2
/// ``` };
pub(crate) fn db_end_tx(ctx: FunctionEnvMut<Env>) -> i32 {
let env = ctx.data(); let mut buf_reader = Cursor::new(buf);
match env.contract_section {
ContractSection::Deploy | ContractSection::Update => 0, // FIXME: There's a type DbHandle=u32, but this should maybe be renamed
let db_handle: u32 = match Decodable::decode(&mut buf_reader) {
Ok(v) => v,
Err(e) => {
error!(target: "wasm_runtime::db_set", "Failed to decode DbHandle");
return -2
}
};
let db_handle = db_handle as usize;
let key: Vec<u8> = match Decodable::decode(&mut buf_reader) {
Ok(v) => v,
Err(e) => {
error!(target: "wasm_runtime::db_set", "Failed to decode key vec");
return -2
}
};
let value: Vec<u8> = match Decodable::decode(&mut buf_reader) {
Ok(v) => v,
Err(e) => {
error!(target: "wasm_runtime::db_set", "Failed to decode value vec");
return -2
}
};
// TODO: Ensure we've read the entire buffer above.
let db_handles = env.db_handles.borrow();
let mut db_batches = env.db_batches.borrow_mut();
if db_handles.len() <= db_handle || db_batches.len() <= db_handle {
error!(target: "wasm_runtime::db_set", "Requested DbHandle that is out of bounds");
return -2
}
let handle_idx = db_handle;
let db_handle = &db_handles[handle_idx];
let db_batch = &mut db_batches[handle_idx];
if db_handle.contract_id != env.contract_id {
error!(target: "wasm_runtime::db_set", "Unauthorized to write to DbHandle");
return -1
}
db_batch.insert(key, value);
0
}
_ => -1, _ => -1,
} }
} }

View File

@@ -51,6 +51,7 @@ pub enum ContractSection {
Exec, Exec,
/// Apply function of a contract /// Apply function of a contract
Update, Update,
/// Placeholder state before any initialization
Null, Null,
} }
@@ -71,6 +72,8 @@ pub struct Env {
pub blockchain: Blockchain, pub blockchain: Blockchain,
/// sled tree handles used with `db_*` /// sled tree handles used with `db_*`
pub db_handles: RefCell<Vec<DbHandle>>, pub db_handles: RefCell<Vec<DbHandle>>,
/// sled tree batches, indexed the same as `db_handles`.
pub db_batches: RefCell<Vec<sled::Batch>>,
/// The contract ID being executed /// The contract ID being executed
pub contract_id: ContractId, pub contract_id: ContractId,
/// The contract section being executed /// The contract section being executed
@@ -110,7 +113,7 @@ pub struct Runtime {
impl Runtime { impl Runtime {
/// Create a new wasm runtime instance that contains the given wasm module. /// Create a new wasm runtime instance that contains the given wasm module.
pub fn new(wasm_bytes: &[u8], blockchain: Blockchain, contract_id: ContractId) -> Result<Self> { pub fn new(wasm_bytes: &[u8], blockchain: Blockchain, contract_id: ContractId) -> Result<Self> {
info!(target: "warm_runtime::new", "Instantiating a new runtime"); info!(target: "wasm_runtime::new", "Instantiating a new runtime");
// This function will be called for each `Operator` encountered during // This function will be called for each `Operator` encountered during
// the wasm module execution. It should return the cost of the operator // the wasm module execution. It should return the cost of the operator
// that it received as its first argument. // that it received as its first argument.
@@ -139,6 +142,7 @@ impl Runtime {
// Initialize data // Initialize data
let db_handles = RefCell::new(vec![]); let db_handles = RefCell::new(vec![]);
let db_batches = RefCell::new(vec![]);
let logs = RefCell::new(vec![]); let logs = RefCell::new(vec![]);
debug!(target: "wasm_runtime::new", "Importing functions"); debug!(target: "wasm_runtime::new", "Importing functions");
@@ -148,6 +152,7 @@ impl Runtime {
Env { Env {
blockchain, blockchain,
db_handles, db_handles,
db_batches,
contract_id, contract_id,
contract_section: ContractSection::Null, contract_section: ContractSection::Null,
contract_return_data: Cell::new(None), contract_return_data: Cell::new(None),
@@ -188,23 +193,11 @@ impl Runtime {
import::db::db_get, import::db::db_get,
), ),
"db_begin_tx_" => Function::new_typed_with_env(
&mut store,
&ctx,
import::db::db_begin_tx,
),
"db_set_" => Function::new_typed_with_env( "db_set_" => Function::new_typed_with_env(
&mut store, &mut store,
&ctx, &ctx,
import::db::db_set, import::db::db_set,
), ),
"db_end_tx_" => Function::new_typed_with_env(
&mut store,
&ctx,
import::db::db_end_tx,
),
} }
}; };
@@ -302,6 +295,15 @@ impl Runtime {
/// a state update from `env` and passes it into the wasm runtime. /// a state update from `env` and passes it into the wasm runtime.
pub fn apply(&mut self, update: &[u8]) -> Result<()> { pub fn apply(&mut self, update: &[u8]) -> Result<()> {
let _ = self.call(ContractSection::Update, update)?; let _ = self.call(ContractSection::Update, update)?;
// If the above didn't fail, we write the batches.
// TODO: Make all the writes atomic in a transaction over all trees.
let env_mut = self.ctx.as_mut(&mut self.store);
for (idx, db) in env_mut.db_handles.get_mut().iter().enumerate() {
let batch = env_mut.db_batches.borrow()[idx].clone();
db.apply_batch(batch)?;
}
Ok(()) Ok(())
} }
@@ -353,8 +355,8 @@ impl Runtime {
} }
/// Serialize contract payload to the format accepted by the runtime functions. /// Serialize contract payload to the format accepted by the runtime functions.
/// We keep the same payload as a slice of bytes, and prepend it with a /// We keep the same payload as a slice of bytes, and prepend it with a ContractId,
/// little-endian u64 to tell the payload's length. /// and then a little-endian u64 to tell the payload's length.
fn serialize_payload(cid: &ContractId, payload: &[u8]) -> Vec<u8> { fn serialize_payload(cid: &ContractId, payload: &[u8]) -> Vec<u8> {
let ser_cid = serialize(cid); let ser_cid = serialize(cid);
let payload_len = payload.len(); let payload_len = payload.len();

View File

@@ -77,6 +77,33 @@ pub fn db_get(db_handle: DbHandle, key: &[u8]) -> GenericResult<Vec<u8>> {
todo!("db_get"); todo!("db_get");
} }
/// Only update() can call this. Set a value within the transaction.
///
/// ```
/// db_set(tx_handle, key, value);
/// ```
pub fn db_set(db_handle: DbHandle, key: &[u8], value: &[u8]) -> GenericResult<()> {
// Check entry for tx_handle is not None
#[cfg(target_arch = "wasm32")]
unsafe {
let mut len = 0;
let mut buf = vec![];
len += db_handle.encode(&mut buf)?;
len += key.to_vec().encode(&mut buf)?;
len += value.to_vec().encode(&mut buf)?;
return match db_set_(buf.as_ptr(), len as u32) {
0 => Ok(()),
-1 => Err(ContractError::CallerAccessDenied),
-2 => Err(ContractError::DbSetFailed),
_ => unreachable!(),
}
}
#[cfg(not(target_arch = "wasm32"))]
todo!("db_set");
}
/// Only update() can call this. Starts an atomic transaction. /// Only update() can call this. Starts an atomic transaction.
/// ///
/// ``` /// ```
@@ -96,26 +123,6 @@ pub fn db_begin_tx() -> GenericResult<TxHandle> {
todo!("db_begin_tx"); todo!("db_begin_tx");
} }
/// Only update() can call this. Set a value within the transaction.
///
/// ```
/// db_set(tx_handle, key, value);
/// ```
pub fn db_set(tx_handle: TxHandle, key: &[u8], value: Vec<u8>) -> GenericResult<()> {
// Check entry for tx_handle is not None
#[cfg(target_arch = "wasm32")]
unsafe {
return match db_set_() {
0 => Ok(()),
-1 => Err(ContractError::CallerAccessDenied),
_ => unreachable!(),
}
}
#[cfg(not(target_arch = "wasm32"))]
todo!("db_set");
}
/// Only update() can call this. This writes the atomic tx to the database. /// Only update() can call this. This writes the atomic tx to the database.
/// ///
/// ``` /// ```
@@ -142,6 +149,6 @@ extern "C" {
fn db_lookup_(ptr: *const u8, len: u32) -> i32; fn db_lookup_(ptr: *const u8, len: u32) -> i32;
fn db_get_() -> i32; fn db_get_() -> i32;
fn db_begin_tx_() -> i32; fn db_begin_tx_() -> i32;
fn db_set_() -> i32; fn db_set_(ptr: *const u8, len: u32) -> i32;
fn db_end_tx_() -> i32; fn db_end_tx_() -> i32;
} }

View File

@@ -56,6 +56,9 @@ pub enum ContractError {
#[error("Db not found")] #[error("Db not found")]
DbNotFound, DbNotFound,
#[error("Db set failed")]
DbSetFailed,
} }
/// Builtin return values occupy the upper 32 bits /// Builtin return values occupy the upper 32 bits
@@ -76,6 +79,7 @@ pub const UPDATE_ALREADY_SET: u64 = to_builtin!(7);
pub const DB_INIT_FAILED: u64 = to_builtin!(8); pub const DB_INIT_FAILED: u64 = to_builtin!(8);
pub const CALLER_ACCESS_DENIED: u64 = to_builtin!(9); pub const CALLER_ACCESS_DENIED: u64 = to_builtin!(9);
pub const DB_NOT_FOUND: u64 = to_builtin!(10); pub const DB_NOT_FOUND: u64 = to_builtin!(10);
pub const DB_SET_FAILED: u64 = to_builtin!(11);
impl From<ContractError> for u64 { impl From<ContractError> for u64 {
fn from(err: ContractError) -> Self { fn from(err: ContractError) -> Self {
@@ -89,6 +93,7 @@ impl From<ContractError> for u64 {
ContractError::DbInitFailed => DB_INIT_FAILED, ContractError::DbInitFailed => DB_INIT_FAILED,
ContractError::CallerAccessDenied => CALLER_ACCESS_DENIED, ContractError::CallerAccessDenied => CALLER_ACCESS_DENIED,
ContractError::DbNotFound => DB_NOT_FOUND, ContractError::DbNotFound => DB_NOT_FOUND,
ContractError::DbSetFailed => DB_SET_FAILED,
ContractError::Custom(error) => { ContractError::Custom(error) => {
if error == 0 { if error == 0 {
CUSTOM_ZERO CUSTOM_ZERO
@@ -113,6 +118,7 @@ impl From<u64> for ContractError {
DB_INIT_FAILED => Self::DbInitFailed, DB_INIT_FAILED => Self::DbInitFailed,
CALLER_ACCESS_DENIED => Self::CallerAccessDenied, CALLER_ACCESS_DENIED => Self::CallerAccessDenied,
DB_NOT_FOUND => Self::DbNotFound, DB_NOT_FOUND => Self::DbNotFound,
DB_SET_FAILED => Self::DbSetFailed,
_ => Self::Custom(error as u32), _ => Self::Custom(error as u32),
} }
} }