[BLY-64] Python Spiral v1, and docs

Python client: use spiral v1, add basic docs
Unify client versions at 0.2.0
Add min rustc version to all crates
This commit is contained in:
Neil Movva
2023-08-29 13:19:27 -07:00
committed by GitHub
parent 8a4854b13e
commit 7740f75ec2
23 changed files with 259 additions and 293 deletions

View File

@@ -67,7 +67,7 @@ jobs:
client-publish:
name: Publish JavaScript SDK
runs-on: ubuntu-latest
if: ${{ inputs.publish }}
if: ${{ inputs.publish || github.ref == 'refs/heads/main' }}
needs: [client-build]
steps:
- uses: actions/checkout@v3

View File

@@ -1,8 +1,8 @@
name: Build Python SDK
env:
BLYSS_STAGING_SERVER: https://dev2.api.blyss.dev
BLYSS_STAGING_API_KEY: Gh1pz1kEiNa1npEdDaRRvM1LsVypM1u2x1YbGb54
BLYSS_STAGING_SERVER: ${{ vars.BLYSS_STAGING_SERVER }}
BLYSS_STAGING_API_KEY: ${{ secrets.BLYSS_STAGING_API_KEY }}
on:
push:
@@ -58,6 +58,24 @@ jobs:
name: wheels
path: python/dist
# only run docs on target x86_64
- name: Build Python docs
if: matrix.target == 'x86_64'
working-directory: python
shell: bash
# TODO: pdoc is documenting the installed module, not the source folder.
# pdoc's import-based docs don't play well with maturin's structure.
run: |
pip install pdoc
pdoc -o pdoc -d google -n blyss --no-search
- name: Upload Python docs
if: matrix.target == 'x86_64'
uses: actions/upload-pages-artifact@v2
with:
path: python/pdoc
windows:
runs-on: windows-latest
needs: [linux]
@@ -126,7 +144,7 @@ jobs:
publish:
name: Publish
runs-on: ubuntu-latest
if: ${{ inputs.publish }}
if: ${{ inputs.publish || github.ref == 'refs/heads/main' }}
needs: [linux, macos, windows, sdist]
steps:
- uses: actions/download-artifact@v3

1
.gitignore vendored
View File

@@ -8,3 +8,4 @@ node_modules
*.pyc
*build/
.DS_Store
.env

View File

@@ -56,10 +56,19 @@ You can also use the Blyss SDK completely locally.
2. Run the server by entering `lib/server` and running `cargo run --release`. The server will run on `localhost:8008` by default.
3. Run the client by entering `examples/node-local` and running `npx ts-node main.ts`. This will perform some writes and then a private read to your bucket.
## Installing as a package
## Install
### JavaScript / Node
To use the Blyss SDK in an existing TypeScript project, install it with `npm install @blyss/sdk`. Then, import the client with `import { Client } from '@blyss/sdk';`. If you're using SDK in Node, and prefer not to use ESM, you can instead import it as `const blyss = require('@blyss/sdk/node')`.
### Python
#### From PyPI:
`pip install --upgrade blyss`
#### From repo:
1. `cd python` from repo root.
2. `pip install --upgrade .`
## Examples
The `examples/` directory has several examples of how to use the Blyss SDK. Running the examples requires [an API key](https://blyss.dev).
@@ -90,8 +99,8 @@ The Node.js example shows how to use the Blyss SDK in server-side JS. Node 18+ i
### Python
1. Enter `examples/python`.
2. Run `pip install blyss`.
1. Install blyss.
2. Enter `examples/python`.
3. Run `python main.py`.
## Documentation

View File

@@ -1,13 +1,14 @@
[package]
name = "spiral-rs-js-bridge"
description = "Bridge crate between the spiral-rs-client library and the JS client"
version = "0.1.0"
version = "0.2.0"
authors = ["Samir Menon <samir@blyss.dev>"]
license = "MIT"
repository = "https://github.com/blyssprivacy/sdk"
categories = ["wasm"]
readme = "README.md"
edition = "2018"
rust-version = "1.70.0"
[lib]
crate-type = ["cdylib", "rlib"]

View File

@@ -9,11 +9,11 @@ export interface BucketParameters {
version: number;
}
type MaxItemSizeIdentifier = '100B' | '1KB' | '10KB';
type MaxItemSizeIdentifier = '100' | '1000' | '10000';
type kspIdentifier = 'none' | 'bloom' | 'full';
const DEFAULT_BUCKET_PARAMETERS: BucketParameters = {
maxItemSize: '1KB',
maxItemSize: '1000',
keyStoragePolicy: 'bloom',
version: 1
};

View File

@@ -12,6 +12,7 @@ keywords = ["privacy", "fhe", "cryptography"]
categories = ["cryptography"]
readme = "README.md"
license = "MIT"
rust-version = "1.70.0"
[dependencies]
base64 = "0.21.0"

View File

@@ -2,6 +2,7 @@
name = "doublepir-rs"
version = "0.1.0"
edition = "2021"
rust-version = "1.70.0"
[features]
client = ["reqwest", "base64", "sha1"]

View File

@@ -12,6 +12,7 @@ keywords = ["privacy", "fhe", "cryptography"]
categories = ["cryptography"]
readme = "README.md"
license = "MIT"
rust-version = "1.70.0"
[[bin]]
name = "server"

View File

@@ -312,7 +312,7 @@ dependencies = [
[[package]]
name = "spiral-rs"
version = "0.2.0-alpha.1"
version = "0.2.1-alpha.2"
dependencies = [
"getrandom",
"rand",

View File

@@ -12,6 +12,7 @@ keywords = ["privacy", "fhe", "cryptography"]
categories = ["cryptography"]
readme = "README.md"
license = "MIT"
rust-version = "1.70.0"
[features]
server = ["rayon"]

View File

@@ -1,7 +1,7 @@
{
"author": "Samir Menon <samir@blyss.dev>",
"name": "@blyss/sdk",
"version": "0.1.9",
"version": "0.2.0",
"description": "Blyss SDK, enabling private retrievals from Blyss buckets",
"type": "module",
"main": "./dist/index.js",

2
python/Cargo.lock generated
View File

@@ -25,7 +25,7 @@ dependencies = [
[[package]]
name = "blyss-client-python"
version = "0.1.9"
version = "0.2.0"
dependencies = [
"pyo3",
"spiral-rs",

View File

@@ -1,7 +1,8 @@
[package]
name = "blyss-client-python"
version = "0.1.9"
version = "0.2.0"
edition = "2021"
rust-version = "1.70.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]

View File

@@ -1 +1,7 @@
from .main import *
"""
.. include:: ../docs/main.md
"""
from .main import *
__all__ = ["Bucket", "AsyncBucket", "BucketService", "ApiException"]

View File

@@ -14,7 +14,6 @@ import json
import logging
import base64
from blyss.req_compression import get_session
from blyss.bloom import BloomFilter
CREATE_PATH = "/create"
@@ -33,19 +32,26 @@ READ_PATH = "/private-read"
class ApiException(Exception):
"""Exception raised when an API call to the Blyss service fails."""
def __init__(self, message: str, code: int):
"""Initialize ApiException with message."""
self.message = message
"""Error message returned by the server."""
self.code = code
"""HTTP status code returned by the server."""
super().__init__(message)
def _check_http_error(resp: requests.Response):
def _check_http_error(resp: requests.Response | httpx.Response):
"""Throws an ApiException with message on any unsuccessful HTTP response."""
status_code = resp.status_code
if status_code < 200 or status_code > 299:
try:
errmsg = resp.text
except:
errmsg = f"<undecodable response body, size {len(resp.content)} bytes>"
raise ApiException(
f"Request failed, with unsuccessful HTTP status code {status_code} and message '{resp.content}'",
errmsg,
status_code,
)
@@ -97,7 +103,10 @@ def _post_data(api_key: str, url: str, data: Union[bytes, str]) -> bytes:
logging.info(f"POST {url} (length: {len(data)} bytes)")
resp = None
if type(data) == bytes:
resp = get_session().post(url, data, headers=headers)
# compress data before sending
zdata = gzip.compress(data)
headers["Content-Encoding"] = "gzip"
resp = requests.post(url, zdata, headers=headers)
else:
resp = requests.post(url, data, headers=headers)
_check_http_error(resp)
@@ -212,7 +221,7 @@ class API:
def modify(self, bucket_name: str, data_json: str) -> dict[Any, Any]:
"""Modify existing bucket.
Args:
data_json (str): same as create.
"""
@@ -262,9 +271,9 @@ class API:
return prelim_result
def list_keys(self, bucket_name: str) -> dict[str, Any]:
def list_keys(self, bucket_name: str) -> list[str]:
"""List all keys in this bucket."""
return _get_data_json(self.api_key, self._url_for(bucket_name, LIST_KEYS_PATH))
return _get_data_json(self.api_key, self._url_for(bucket_name, LIST_KEYS_PATH)) # type: ignore
def destroy(self, bucket_name: str):
"""Destroy this bucket."""

View File

@@ -8,6 +8,7 @@ the compiled Rust code.
from typing import Any, Optional
from .seed import seed_from_string
from . import blyss, seed # type: ignore
# NB: There are many "type: ignore"s on purpose. Type information
@@ -29,7 +30,7 @@ class BlyssLib:
blyss.generate_keys( # type: ignore
self.inner_client,
seed.seed_from_string(self.secret_seed),
seed_from_string(self.secret_seed),
False,
)
@@ -44,7 +45,7 @@ class BlyssLib:
return blyss.generate_keys( # type: ignore
self.inner_client,
seed.seed_from_string(self.secret_seed),
seed_from_string(self.secret_seed),
True,
)
@@ -90,7 +91,7 @@ class BlyssLib:
data (bytes): The plaintext data from the PIR response.
Returns:
bytes: The plaintext data corresponding to the given key.
bytes: The plaintext data corresponding to the given key, or None if the key was not found.
"""
r = blyss.extract_result(self.inner_client, key, data)
if r is None:

View File

@@ -33,27 +33,34 @@ def _chunk_parser(raw_data: bytes) -> Iterator[bytes]:
class Bucket:
"""A class representing a client to a single Blyss bucket."""
"""Interface to a single Blyss bucket."""
def __init__(self, api: api.API, name: str, secret_seed: Optional[str] = None):
"""Initialize a client for a single, existing Blyss bucket.
"""
@private
Initialize a client for a single, existing Blyss bucket.
Args:
api (api.API): A target API to send all underlying API calls to.
name (str): The name of the bucket.
secret_seed (Optional[str], optional): An optional secret seed to
initialize the client with. A random one will be generated if not
supplied. Defaults to None.
api: A target API to send all underlying API calls to.
name: The name of the bucket.
secret_seed: An optional secret seed to initialize the client with.
A random one will be generated if not supplied.
"""
self.api = api
self.name = name
self.metadata = self.api.meta(self.name)
self.secret_seed = seed.get_random_seed()
self.name: str = name
"""Name of the bucket. See [bucket naming rules](https://docs.blyss.dev/docs/buckets#names)."""
# Internal attributes
self._api = api
self._metadata = self._api.meta(self.name)
if secret_seed:
self.secret_seed = secret_seed
self.lib = BlyssLib(json.dumps(self.metadata["pir_scheme"]), self.secret_seed)
self.public_uuid: Optional[str] = None
self.exfil: Any = None
self._secret_seed = secret_seed
else:
self._secret_seed = seed.get_random_seed()
self._lib = BlyssLib(
json.dumps(self._metadata["pir_scheme"]), self._secret_seed
)
self._public_uuid: Optional[str] = None
self._exfil: Any = None # used for benchmarking
def _check(self, uuid: str) -> bool:
"""Checks if the server has the given UUID.
@@ -65,7 +72,7 @@ class Bucket:
bool: Whether the server has the given UUID.
"""
try:
self.api.check(uuid)
self._api.check(uuid)
return True
except api.ApiException as e:
if e.code == 404:
@@ -75,7 +82,7 @@ class Bucket:
async def _async_check(self, uuid: str) -> bool:
try:
await self.api.async_check(uuid)
await self._api.async_check(uuid)
return True
except api.ApiException as e:
if e.code == 404:
@@ -91,7 +98,7 @@ class Bucket:
# 1. Bin keys by row index
keys_by_index: dict[int, list[str]] = {}
for k in kv_pairs.keys():
i = self.lib.get_row(k)
i = self._lib.get_row(k)
if i in keys_by_index:
keys_by_index[i].append(k)
else:
@@ -136,9 +143,11 @@ class Bucket:
return kv_chunks
def _generate_query_stream(self, keys: list[str]) -> bytes:
assert self._public_uuid
# generate encrypted queries
queries: list[bytes] = [
self.lib.generate_query(self.public_uuid, self.lib.get_row(k)) for k in keys
self._lib.generate_query(self._public_uuid, self._lib.get_row(k))
for k in keys
]
# interleave the queries with their lengths (uint64_t)
query_lengths = [len(q).to_bytes(8, "little") for q in queries]
@@ -150,145 +159,134 @@ class Bucket:
return multi_query
def _unpack_query_result(
self, keys: list[str], raw_result: bytes, parse_metadata: bool = True
) -> list[bytes]:
self, keys: list[str], raw_result: bytes, ignore_errors=False
) -> list[Optional[bytes]]:
retrievals = []
for key, result in zip(keys, _chunk_parser(raw_result)):
decrypted_result = self.lib.decode_response(result)
decompressed_result = bz2.decompress(decrypted_result)
extracted_result = self.lib.extract_result(key, decompressed_result)
if parse_metadata:
output = serializer.deserialize(extracted_result)
if len(result) == 0:
# error in processing this query
if ignore_errors:
extracted_result = None
else:
raise RuntimeError(f"Failed to process query for key {key}.")
else:
output = extracted_result
retrievals.append(output)
decrypted_result = self._lib.decode_response(result)
decompressed_result = bz2.decompress(decrypted_result)
extracted_result = self._lib.extract_result(key, decompressed_result)
retrievals.append(extracted_result)
return retrievals
def _private_read(self, keys: list[str]) -> list[bytes]:
def _private_read(self, keys: list[str]) -> list[Optional[bytes]]:
"""Performs the underlying private retrieval.
Args:
keys (str): A list of keys to retrieve.
Returns:
tuple[bytes, Optional[dict]]: Returns a tuple of (value, optional_metadata).
a list of values (bytes) corresponding to keys. None for keys not found.
"""
if not self.public_uuid or not self._check(self.public_uuid):
if not self._public_uuid or not self._check(self._public_uuid):
self.setup()
assert self.public_uuid
assert self._public_uuid
multi_query = self._generate_query_stream(keys)
start = time.perf_counter()
multi_result = self.api.private_read(self.name, multi_query)
self.exfil = time.perf_counter() - start
multi_result = self._api.private_read(self.name, multi_query)
self._exfil = time.perf_counter() - start
retrievals = self._unpack_query_result(keys, multi_result)
return retrievals
def setup(self, uuid: Optional[str] = None):
def setup(self):
"""Prepares this bucket client for private reads.
This method will be called automatically by :method:`read`, but
clients may call it explicitly prior to make subsequent
:method:`read` calls faster.
`private_read` calls faster.
Can upload significant amounts of data (1-10 MB).
"""
if uuid is not None and self._check(uuid):
self.lib.generate_keys()
self.public_uuid = uuid
else:
public_params = self.lib.generate_keys_with_public_params()
setup_resp = self.api.setup(self.name, bytes(public_params))
self.public_uuid = setup_resp["uuid"]
public_params = self._lib.generate_keys_with_public_params()
setup_resp = self._api.setup(self.name, bytes(public_params))
self._public_uuid = setup_resp["uuid"]
def info(self) -> dict[Any, Any]:
"""Gets info on this bucket from the service."""
return self.api.meta(self.name)
"""Fetch this bucket's properties from the service, such as access permissions and PIR scheme parameters."""
return self._api.meta(self.name)
def list_keys(self) -> list[str]:
"""List all key strings in this bucket. Only available if bucket was created with keyStoragePolicy="full"."""
return self._api.list_keys(self.name)
def list_keys(self) -> dict[str, Any]:
"""Gets info on all keys in this bucket."""
return self.api.list_keys(self.name)
def rename(self, new_name: str):
"""Renames this bucket."""
"""Rename this bucket to new_name."""
bucket_create_req = {
"name": new_name,
}
self.api.modify(self.name, json.dumps(bucket_create_req))
self._api.modify(self.name, json.dumps(bucket_create_req))
self.name = new_name
def write(self, kv_pairs: dict[str, Union[tuple[Any, Optional[Any]], Any]]):
def write(self, kv_pairs: dict[str, bytes]):
"""Writes the supplied key-value pair(s) into the bucket.
To supply metadata for a key, set the value in
the dict to a tuple of (value_to_write, metadata).
Args:
kv_pairs (dict[str, Union[tuple[Any, Optional[Any]], Any]]):
A dictionary containing the key-value pairs to write.
Keys must be strings, and values may be any JSON-serializable value,
bytes, or a tuple (see above).
kv_pairs: A dictionary of key-value pairs to write into the bucket.
Keys must be UTF8 strings, and values may be arbitrary bytes.
"""
concatenated_kv_items = b""
for key, value in kv_pairs.items():
if isinstance(value, tuple):
value, metadata = value
else:
_ = value
metadata = None
serialized_value = serializer.serialize(value, metadata)
concatenated_kv_items += serializer.wrap_key_val(
key.encode("utf-8"), serialized_value
)
concatenated_kv_items += serializer.wrap_key_val(key.encode("utf-8"), value)
# single call to API endpoint
self.api.write(self.name, concatenated_kv_items)
self._api.write(self.name, concatenated_kv_items)
def delete_key(self, key: str):
"""Deletes the supplied key from the bucket.
"""Deletes a single key-value pair from the bucket.
Args:
key (str): The key to delete.
key: The key to delete.
"""
self.api.delete_key(self.name, key)
self._api.delete_key(self.name, key)
def destroy_entire_bucket(self):
"""Destroys the entire bucket. This action is permanent and irreversible."""
self.api.destroy(self.name)
self._api.destroy(self.name)
def clear_entire_bucket(self):
"""Deletes all keys in this bucket. This action is permanent and irreversible.
Differs from destroy in that the bucket's metadata
Differs from destroy in that the bucket's metadata
(e.g. permissions, PIR scheme parameters, and clients' setup data) are preserved.
"""
self.api.clear(self.name)
self._api.clear(self.name)
def private_read(self, keys: Union[str, list[str]]) -> Union[bytes, list[bytes]]:
"""Privately reads the supplied key from the bucket,
returning the value corresponding to the key.
def private_read(
self, keys: Union[str, list[str]]
) -> Union[Optional[bytes], list[Optional[bytes]]]:
"""Privately reads the supplied key(s) from the bucket,
and returns the corresponding value(s).
No entity, including the Blyss service, should be able to
determine which key(s) this method was called for.
Data will be accessed using fully homomorphic encryption, designed to
make it impossible for any entity (including the Blyss service!) to
determine which key(s) are being read.
Args:
keys (str): A key or list of keys to privately read.
If a list of keys is supplied,
results will be returned in the same order.
keys: A key or list of keys to privately retrieve.
If a list of keys is supplied,
results will be returned in the same order.
Returns:
bytes: The value found for the key in the bucket,
or None if the key was not found.
For each key, the value found for the key in the bucket,
or None if the key was not found.
"""
single_query = False
if isinstance(keys, str):
keys = [keys]
single_query = True
results = [r[0] for r in self._private_read(keys)]
results = [r if r is not None else None for r in self._private_read(keys)]
if single_query:
return results[0]
@@ -297,47 +295,64 @@ class Bucket:
def private_key_intersect(self, keys: list[str]) -> list[str]:
"""Privately intersects the given set of keys with the keys in this bucket,
returning the keys that intersected. This is generally slower than a single
private read.
private read, but much faster than making a private read for each key.
No entity, including the Blyss service, should be able to determine which
keys this method was called for.
Has the same privacy guarantees as private_read - zero information is leaked
about keys being intersected.
Requires that the bucket was created with key_storage_policy of "bloom" or "full".
If the bucket cannot support private bloom filter lookups, an exception will be raised.
Args:
keys (list[str]): The keys to _privately_ intersect the value of.
keys: A list of keys to privately intersect with this bucket.
"""
bloom_filter = self.api.bloom(self.name)
bloom_filter = self._api.bloom(self.name)
present_keys = list(filter(bloom_filter.lookup, keys))
return present_keys
class AsyncBucket(Bucket):
"""Asyncio-compatible version of Bucket."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
async def write(self, kv_pairs: dict[str, bytes], MAX_CONCURRENCY=8):
async def write(self, kv_pairs: dict[str, bytes], CONCURRENCY=4):
"""
Functionally equivalent to Bucket.write.
Handles chunking and parallel submission of writes, up to CONCURRENCY.
For maximum performance, call this function with as much data as possible.
Data races are possible with parallel writes, but will never corrupt data.
Args:
CONCURRENCY: The number of concurrent server writes. Maximum is 8.
"""
CONCURRENCY = min(CONCURRENCY, 8)
# Split the key-value pairs into chunks not exceeding max payload size.
kv_chunks = self._split_into_chunks(kv_pairs)
# Make one write call per chunk, while respecting a max concurrency limit.
sem = asyncio.Semaphore(MAX_CONCURRENCY)
sem = asyncio.Semaphore(CONCURRENCY)
async def _paced_writer(chunk):
async with sem:
await self.api.async_write(self.name, json.dumps(chunk))
await self._api.async_write(self.name, json.dumps(chunk))
_tasks = [asyncio.create_task(_paced_writer(c)) for c in kv_chunks]
await asyncio.gather(*_tasks)
async def private_read(self, keys: list[str]) -> list[bytes]:
if not self.public_uuid or not await self._async_check(self.public_uuid):
async def private_read(self, keys: list[str]) -> list[Optional[bytes]]:
if not self._public_uuid or not await self._async_check(self._public_uuid):
self.setup()
assert self.public_uuid
assert self._public_uuid
multi_query = self._generate_query_stream(keys)
start = time.perf_counter()
multi_result = await self.api.async_private_read(self.name, multi_query)
self.exfil = time.perf_counter() - start
multi_result = await self._api.async_private_read(self.name, multi_query)
self._exfil = time.perf_counter() - start
retrievals = self._unpack_query_result(keys, multi_result, parse_metadata=False)
retrievals = self._unpack_query_result(keys, multi_result)
return retrievals

View File

@@ -3,19 +3,23 @@ from . import bucket, api, seed
import json
BLYSS_BUCKET_URL = "https://beta.api.blyss.dev"
DEFAULT_BUCKET_PARAMETERS = {"maxItemSize": 1000, "keyStoragePolicy": "bloom"}
DEFAULT_BUCKET_PARAMETERS = {
"maxItemSize": 1000,
"keyStoragePolicy": "none",
"version": 1,
}
ApiConfig = dict[str, str]
class BucketService:
"""A class representing a client to the Blyss bucket service."""
"""A client to the hosted Blyss bucket service. Allows creation, deletion, and modification of buckets."""
def __init__(self, api_config: Union[str, ApiConfig]):
"""Initialize a client of the Blyss bucket service.
Args:
api_config (Union[str, ApiConfig]): An API key, or
api_config: An API key string, or
a dictionary containing an API configuration.
The minimum set of keys is:
"endpoint": A fully qualified endpoint URL for the bucket service.
@@ -23,13 +27,9 @@ class BucketService:
"""
if isinstance(api_config, str):
api_config = {"api_key": api_config}
self.api_config = api_config
self.service_endpoint = BLYSS_BUCKET_URL
if "endpoint" in api_config:
self.service_endpoint = api_config["endpoint"]
self.api = api.API(self.api_config["api_key"], self.service_endpoint)
service_endpoint = api_config.get("endpoint", BLYSS_BUCKET_URL)
self._api = api.API(api_config["api_key"], service_endpoint)
def connect(
self,
@@ -39,62 +39,63 @@ class BucketService:
"""Connect to an existing Blyss bucket.
Args:
bucket_name (str): The name of the bucket to connect to.
secret_seed (Optional[str], optional): An optional secret seed to
initialize the client using. The secret seed is used to encrypt
client queries. If not supplied, a random one is generated with `os.urandom`.
bucket_name: The name of the bucket to connect to.
secret_seed: An optional secret seed to derive the client secret,
which will be used to encrypt all client queries.
If not supplied, a random one is generated with `os.urandom`.
Returns:
bucket.Bucket: An object representing a client to the Blyss bucket.
An object representing a client to the Blyss bucket.
"""
if secret_seed is None:
secret_seed = seed.get_random_seed()
b = bucket.Bucket(self.api, bucket_name, secret_seed=secret_seed)
b = bucket.Bucket(self._api, bucket_name, secret_seed=secret_seed)
return b
def connect_async(
self, bucket_name: str, secret_seed: Optional[str] = None
) -> bucket.AsyncBucket:
"""Connect to an existing Blyss bucket, using an asyncio-ready interface.
Args:
see connect()
Returns:
bucket.Bucket: An object representing a client to the Blyss bucket.
"""
return bucket.AsyncBucket(self.api, bucket_name, secret_seed=secret_seed)
"""Returns an asynchronous client to the Blyss bucket. Identical functionality to `connect`."""
return bucket.AsyncBucket(self._api, bucket_name, secret_seed=secret_seed)
def create(
self,
bucket_name: str,
open_access: bool = False,
usage_hints: dict[Any, Any] = {},
usage_hints: dict[str, Any] = {},
):
"""Create a new Blyss bucket.
Args:
bucket_name (str): The bucket name. See sanitize_bucket_name for naming rules.
open_access (bool): If True, bucket will support open read-only access.
usage_hints (dict): A dictionary of hints describing the intended usage of this bucket.
The Blyss service will optimize the encryption scheme accordingly.
bucket_name: Name of the new bucket. See [bucket naming rules](https://docs.blyss.dev/docs/buckets#names).
open_access: If True, bucket will support open read-only access, i.e. any user can perform reads. See [open access permissions](https://docs.blyss.dev/docs/buckets#permissions).
usage_hints: A dictionary of hints describing the intended usage of this bucket. Supported keys:
- "maxItemSize": The maximum size of any item in the bucket, in bytes.
A scheme will be chosen that can support at least this size, and possibly more.
Larger item sizes carry performance costs; expect longer query times and more bandwidth usage.
- "keyStoragePolicy": The key storage policy to use for this bucket. Options:
- "none" (default): Stores no key-related information. This is the most performant option and will maximize write speed.
- "bloom": Enables `Bucket.private_intersect()`. Uses a bloom filter to store probablistic information of key membership, with minimal impact on write speed.
- "full": Store all keys in full. Enables `Bucket.list_keys()`. Will result in significantly slower writes.
"""
parameters = {**DEFAULT_BUCKET_PARAMETERS, **usage_hints}
parameters = {**DEFAULT_BUCKET_PARAMETERS}
parameters.update(usage_hints)
bucket_create_req = {
"name": bucket_name,
"parameters": json.dumps(parameters),
"open_access": open_access,
}
self.api.create(json.dumps(bucket_create_req))
self._api.create(json.dumps(bucket_create_req))
def exists(self, name: str) -> bool:
"""Checks if a bucket exists.
"""Check if a bucket exists.
Args:
bucket_name (str): The bucket name.
name: Bucket name to look up.
Returns:
bool: Whether a bucket with the given name already exists.
True if a bucket with the given name currently exists.
"""
try:
self.connect(name)
@@ -113,7 +114,7 @@ class BucketService:
A dictionary of bucket metadata, keyed by bucket name.
"""
buckets = {}
for b in self.api.list_buckets()["buckets"]:
for b in self._api.list_buckets()["buckets"]:
n = b.pop("name")
buckets[n] = b
return buckets

View File

@@ -1,70 +0,0 @@
import requests
from requests.adapters import HTTPAdapter
import io
import gzip
COMPRESSION_SCHEMES = [
"http://",
"https://",
]
class CompressionAdapter(HTTPAdapter):
"""Adapter used with `requests` library for sending compressed data."""
CONTENT_LENGTH = "Content-Length"
def add_headers(self, request, **kwargs):
"""Tell the server that we support compression."""
super(CompressionAdapter, self).add_headers(request, **kwargs)
body = request.body
if isinstance(body, bytes):
content_length = len(body)
else:
content_length = body.seek(0, 2)
body.seek(0, 0)
headers = {
"Content-Encoding": "gzip",
"Accept-Encoding": "gzip",
"Content-Length": content_length,
}
request.headers.update(headers)
def send(self, request, stream=False, **kwargs):
"""Compress data before sending."""
if stream:
# Having a file-like object, therefore we need to stream the
# content into a new one through the compressor.
compressed_body = io.BytesIO()
compressed_body.name = request.url
compressor = gzip.open(compressed_body, mode="wb")
# Read, write and compress the content at the same time.
compressor.write(request.body.read())
compressor.flush()
compressor.close()
# Make available the new compressed file-like object as the new
# request body.
compressed_body.seek(0, 0) # make it readable
request.body = compressed_body
else:
# We're dealing with a plain bytes content, so compress it.
request.body = gzip.compress(request.body)
return super(CompressionAdapter, self).send(request, stream=stream, **kwargs)
def get_session():
"""Get a requests session supporting compression."""
# Prepare the adapter and the session.
compression_adapter = CompressionAdapter()
session = requests.Session()
# Mount the adapter to all affected schemes.
for scheme in COMPRESSION_SCHEMES:
session.mount(scheme, compression_adapter)
# Use this sessions for CRUD-ing.
return session

View File

@@ -14,54 +14,6 @@ from . import varint
ClientPayloadType = Union[bytes, str, list[Any], dict[Any, Any]]
def get_obj_as_bytes(obj: ClientPayloadType) -> bytes:
if isinstance(obj, bytes):
return obj
obj_json = json.dumps(obj)
return obj_json.encode()
def get_header_bytes(
obj: ClientPayloadType, metadata: Optional[dict[Any, Any]] = None
) -> bytes:
if not metadata and type(obj) == bytes:
return varint.encode(0)
header_data = {"contentType": "application/json"}
if metadata:
header_data = {**header_data, **metadata}
header = json.dumps(header_data)
header_varint = varint.encode(len(header))
return header_varint + header.encode()
def serialize(obj: Any, metadata: Optional[dict[Any, Any]] = None) -> bytes:
header_bytes = get_header_bytes(obj, metadata)
obj_bytes = get_obj_as_bytes(obj)
return header_bytes + obj_bytes
def deserialize(data: bytes) -> tuple[bytes, Optional[dict[Any, Any]]]:
header_length = varint.decode_bytes(data)
bytes_processed = len(varint.encode(header_length))
i = bytes_processed
if header_length == 0:
return (data[i:], None)
header = json.loads(data[i : i + header_length])
i += header_length
obj = data[i:]
if "contentType" in header and header["contentType"] == "application/json":
obj = json.loads(obj)
return (obj, header)
def wrap_key_val(key: bytes, value: bytes) -> bytes:
"""
Wraps a key and value into a single bytes sequence, following Blyss "kv-item" spec.

1
python/docs/main.md Normal file
View File

@@ -0,0 +1 @@
This page documents the Blyss SDK for Python.

View File

@@ -1,3 +1,5 @@
from typing import Optional
import os
import sys
import random
@@ -38,12 +40,12 @@ async def main(endpoint: str, api_key: str):
client = blyss.Client({"endpoint": endpoint, "api_key": api_key})
# generate random string for bucket name
bucketName = generateBucketName()
client.create(bucketName)
client.create(bucketName, usage_hints={"maxItemSize": 40_000})
bucket = client.connect_async(bucketName)
print(bucket.info())
# generate N random keys
N = 20000
N = 4000
itemSize = 32
localKeys = generate_keys(N, 0)
# write all N keys
@@ -52,17 +54,25 @@ async def main(endpoint: str, api_key: str):
# read a random key
testKey = random.choice(localKeys)
value = await bucket.private_read([testKey])
verify_read(testKey, value[0])
print(f"Read key {testKey}")
value = (await bucket.private_read([testKey]))[0]
assert value is not None
verify_read(testKey, value)
print(f"Read key {testKey}, got {value.hex()[:8]}[...]")
# delete testKey from the bucket, and localData.
bucket.delete_key(testKey)
localKeys.remove(testKey)
value = await bucket.private_read([testKey])
# TODO: why aren't deletes reflected in the next read?
# assert value is None
print(f"Deleted key {testKey}")
value = (await bucket.private_read([testKey]))[0]
def _test_delete(key: str, value: Optional[bytes]):
if value is None:
print(f"Deleted key {testKey}")
else:
# this happens only sometimes??
print("ERROR: delete not reflected in read!")
print(f"Read deleted key {testKey} and got {value.hex()[:8]}[...]")
_test_delete(testKey, value)
# clear all keys
bucket.clear_entire_bucket()
@@ -74,8 +84,14 @@ async def main(endpoint: str, api_key: str):
await bucket.write({k: key_to_gold_value(k, itemSize) for k in localKeys})
print(f"Wrote {N} keys")
# read a random key
testKey = random.choice(localKeys)
value = (await bucket.private_read([testKey]))[0]
assert value is not None
verify_read(testKey, value)
# test if clear took AFTER the new write
value = await bucket.private_read([testKey])
value = (await bucket.private_read([testKey]))[0]
if value is not None:
print(f"ERROR: {testKey} was not deleted or cleared!")
@@ -87,8 +103,9 @@ async def main(endpoint: str, api_key: str):
# read a random key
testKey = random.choice(localKeys)
value = await bucket.private_read([testKey])
verify_read(testKey, value[0])
value = (await bucket.private_read([testKey]))[0]
assert value is not None
verify_read(testKey, value)
print(f"Read key {testKey}")
# destroy the bucket