fix workflow tests

update standalone Spiral test server to use new JSON interface
This commit is contained in:
Neil Movva
2023-09-11 21:37:34 +00:00
parent 975c5d353b
commit 18629fa984
8 changed files with 86 additions and 82 deletions

View File

@@ -30,14 +30,6 @@ function generateKeys(n: number, seed: number = 0): string[] {
);
}
// async function generateKVPairs(n: number, seed: number, itemSize: number): Promise<{ [key: string]: Uint8Array }> {
// const keys = generateKeys(n, seed);
// const kvPairs: { [key: string]: Uint8Array } = {};
// keys.forEach(async key => {
// kvPairs[key] = await keyToValue(key, itemSize);
// });
// return kvPairs;
// }
async function generateKVPairs(n: number, seed: number, itemSize: number): Promise<{ [key: string]: Uint8Array }> {
const keys = generateKeys(n, seed);

View File

@@ -8,17 +8,20 @@ export default async function main(port: string) {
console.log(bucket.metadata);
// buckets are bytes-in/bytes-out. SDK write() will automatically serialize as UTF-8.
await bucket.write({
Ohio: 'Columbus',
California: 'Sacramento'
});
let capital = await bucket.privateRead('Ohio');
// but reads are always bytes-out, and must be decoded.
let capital = new TextDecoder().decode(await bucket.privateRead('Ohio'));
if (capital !== 'Columbus') {
throw 'Incorrect result.';
}
capital = await bucket.privateRead('California');
// capital = await bucket.privateRead('California');
capital = new TextDecoder().decode(await bucket.privateRead('California'));
if (capital !== 'Sacramento') {
throw 'Incorrect result.';
}

View File

@@ -175,33 +175,6 @@ export class Bucket {
);
return endResults;
// const queries: { key: string; queryData: Uint8Array }[] = [];
// for (const key of keys) {
// const rowIdx = this.lib.getRow(key);
// const queryData = this.lib.generateQuery(this.uuid, rowIdx);
// queries.push({ key, queryData });
// }
// const endResults = [];
// const batches = Math.ceil(queries.length / this.batchSize);
// for (let i = 0; i < batches; i++) {
// const queriesForBatch = queries.slice(
// i * this.batchSize,
// (i + 1) * this.batchSize
// );
// const queryBatch = serializeChunks(queriesForBatch.map(x => x.queryData));
// const rawResultChunks = await this.getRawResponse(queryBatch);
// const rawResults = deserializeChunks(rawResultChunks);
// const batchEndResults = await Promise.all(
// rawResults.map((r, i) => this.getEndResult(queriesForBatch[i].key, r))
// );
// endResults.push(...batchEndResults);
// }
}
private async performPrivateRead(key: string): Promise<any> {
@@ -357,10 +330,20 @@ export class Bucket {
* 1024 UTF-8 bytes.
*/
async write(
keyValuePairs: { [key: string]: Uint8Array | null }
keyValuePairs: { [key: string]: Uint8Array | string | null }
) {
this.ensureSpiral();
await this.api.write(this.name, keyValuePairs);
// convert any string KV pairs to Uint8Array
const kvPairs: { [key: string]: Uint8Array | null } = {};
for (const key in keyValuePairs) {
const value = keyValuePairs[key];
if (!(value instanceof Uint8Array)) {
kvPairs[key] = new TextEncoder().encode(value);
} else {
kvPairs[key] = value;
}
}
await this.api.write(this.name, kvPairs);
}
/**

15
lib/server/Cargo.lock generated
View File

@@ -937,6 +937,20 @@ name = "serde"
version = "1.0.159"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c04e8343c3daeec41f58990b9d77068df31209f2af111e059e9fe9646693065"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.159"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c614d17805b093df4b147b51339e7e44bf05ef59fba1e45d83500bcfb4d8585"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.13",
]
[[package]]
name = "serde_json"
@@ -1041,6 +1055,7 @@ dependencies = [
"rand",
"rand_chacha",
"rayon",
"serde",
"serde_json",
"sha2",
"spiral-rs",

View File

@@ -24,7 +24,8 @@ default = []
[dependencies]
spiral-rs = { version = "0.2.1-alpha.2", path = "../spiral-rs" }
rand = { version = "0.8.5", features = ["small_rng"] }
serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0"}
rayon = "1.5.2"
rand_chacha = "0.3.1"

View File

@@ -1,4 +1,5 @@
use actix_web::HttpServer;
use serde::Serialize;
use spiral_rs::client::*;
use spiral_rs::params::*;
use spiral_rs::util::*;
@@ -17,7 +18,6 @@ use uuid::Uuid;
use actix_web::error::PayloadError;
use actix_web::{get, post, web, App};
use base64::{engine::general_purpose, Engine as _};
struct ServerState {
params: &'static Params,
@@ -49,7 +49,11 @@ async fn write(body: web::Bytes, data: web::Data<ServerState>) -> Result<String,
let mut db_mut = data.db.write().unwrap();
let kv_pairs = unwrap_kv_pairs(&body);
update_database(data.params, &kv_pairs, &mut rows_mut, &mut db_mut);
let kv_pairs_slices: Vec<(&str, &[u8])> = kv_pairs
.iter()
.map(|(key, value)| (key.as_str(), value.as_slice()))
.collect();
update_database(data.params, &kv_pairs_slices, &mut rows_mut, &mut db_mut);
let mut version_mut = data.version.write().unwrap();
*version_mut += 1;
@@ -59,19 +63,34 @@ async fn write(body: web::Bytes, data: web::Data<ServerState>) -> Result<String,
))
}
#[derive(Serialize)]
pub struct UuidResponse {
pub uuid: String,
}
#[post("/setup")]
async fn setup(
body: web::Bytes,
body: String,
data: web::Data<ServerState>,
) -> Result<String, actix_web::error::Error> {
// parse body as json str
let body_str = serde_json::from_str::<String>(&body).unwrap();
// decode body from base64
let client_pub_params = base64::decode(&body_str).unwrap();
let mut pub_params_map_mut = data.pub_params.write().unwrap();
assert_eq!(body.len(), data.params.setup_bytes());
let pub_params = PublicParameters::deserialize(&data.params, &body);
assert_eq!(client_pub_params.len(), data.params.setup_bytes());
let pub_params = PublicParameters::deserialize(&data.params, &client_pub_params);
let uuid = Uuid::new_v4();
pub_params_map_mut.insert(uuid.to_string(), pub_params);
Ok(format!("{{\"uuid\":\"{}\"}}", uuid.to_string()))
// return uuid as JSON string
let uuid_json = serde_json::to_string(&UuidResponse {
uuid: uuid.to_string(),
})
.unwrap();
Ok(uuid_json)
}
const UUID_V4_STR_BYTES: usize = 36;
@@ -126,22 +145,22 @@ async fn private_read(
body: web::Bytes,
data: web::Data<ServerState>,
) -> Result<String, actix_web::error::Error> {
let mut out = Vec::new();
let mut i = 0;
let num_chunks = u64::from_le_bytes(body[..8].try_into().unwrap()) as usize;
i += 8;
out.extend(u64::to_le_bytes(num_chunks as u64));
for _ in 0..num_chunks {
let chunk_len = u64::from_le_bytes(body[i..i + 8].try_into().unwrap()) as usize;
i += 8;
let result = private_read_impl(&body[i..i + chunk_len], data.clone()).await?;
i += chunk_len;
// parse body as list of json strings
let query_strs = serde_json::from_slice::<Vec<String>>(&body).unwrap();
out.extend(u64::to_le_bytes(result.len() as u64));
out.extend(result);
let mut out = Vec::new();
for query_str in query_strs.iter() {
// decode each query from base64
let query_bytes = base64::decode(query_str).unwrap();
let result = private_read_impl(&query_bytes, data.clone()).await?;
// store base64-encoded results in out
let result_str = base64::encode(&result);
out.push(result_str);
}
Ok(general_purpose::STANDARD.encode(out))
let out_json = serde_json::to_string(&out).unwrap();
Ok(out_json)
}
#[get("/meta")]

View File

@@ -1,5 +1,6 @@
use std::{collections::HashMap, io::Read};
use base64::{engine::general_purpose, Engine};
use bzip2::{read::BzEncoder, Compression};
use sha2::{Digest, Sha256};
use spiral_rs::params::Params;
@@ -125,30 +126,20 @@ pub fn update_row(row: &mut Vec<u8>, key: &str, value: &[u8]) {
}
}
pub fn unwrap_kv_pairs(data: &[u8]) -> Vec<(&str, &[u8])> {
pub fn unwrap_kv_pairs(data: &[u8]) -> Vec<(String, Vec<u8>)> {
let mut kv_pairs = Vec::new();
let mut i = 0;
while i < data.len() {
// 1. Read key length.
let (key_len, key_len_bytes) = varint_decode(&data[i..]);
i += key_len_bytes;
// 2. Read key.
let key_bytes = &data[i..i + key_len];
i += key_len;
// 3. Read value length.
let (value_len, value_len_bytes) = varint_decode(&data[i..]);
i += value_len_bytes;
// 4. Read value.
let value_bytes = &data[i..i + value_len];
i += value_len;
// 5. Yield key/value pair.
let pair = (std::str::from_utf8(key_bytes).unwrap(), value_bytes);
kv_pairs.push(pair);
// Parse the data as a JSON object
if let Ok(json_data) = serde_json::from_slice::<HashMap<String, String>>(data) {
for (key, base64_value) in json_data.iter() {
// Decode the Base64-encoded value
if let Ok(decoded_value) = base64::decode(base64_value) {
kv_pairs.push((key.clone(), decoded_value));
}
}
}
// print KV pairs
println!("kv_pairs: {:?}", kv_pairs);
kv_pairs
}

View File

@@ -43,7 +43,7 @@ def generateBucketName() -> str:
async def test_e2e_async(
endpoint: str, api_key: str, N: int = 4000, itemSize: int = 32
):
client = blyss.AsyncClient({"endpoint": endpoint, "api_key": api_key})
client = blyss.AsyncClient(api_key, endpoint)
# generate random string for bucket name
bucket_name = generateBucketName()
await client.create(bucket_name, usage_hints={"maxItemSize": 40_000})
@@ -114,7 +114,7 @@ async def test_e2e_async(
def test_e2e(endpoint: str, api_key: str, N: int = 4000, itemSize: int = 32):
client = blyss.Client({"endpoint": endpoint, "api_key": api_key})
client = blyss.Client(api_key, endpoint)
# generate random string for bucket name
bucket_name = generateBucketName()
client.create(bucket_name, usage_hints={"maxItemSize": 40_000})