diff --git a/js/client/api.ts b/js/client/api.ts index 90913ab..11199c8 100644 --- a/js/client/api.ts +++ b/js/client/api.ts @@ -123,9 +123,9 @@ async function postDataJson( data = JSON.stringify(bytesToBase64(data)); } - // compress - data = gzip(data); - headers.append('Content-Encoding', 'gzip'); + // // compress + // data = gzip(data); + // headers.append('Content-Encoding', 'gzip'); const response = await fetch(url, { diff --git a/python/blyss/api.py b/python/blyss/api.py index 3d781c9..0f373ce 100644 --- a/python/blyss/api.py +++ b/python/blyss/api.py @@ -30,6 +30,7 @@ WRITE_PATH = "/write" READ_PATH = "/private-read" APIGW_MAX_SIZE = 6e6 / (4 / 3) * 0.95 # 6MB, base64 encoded, plus 5% margin +_GLOBAL_ENABLE_REQUEST_COMPRESSION = False # Not compatible with nested asyncio loops. @@ -103,7 +104,7 @@ async def _async_post_data( f"Request data is too large ({len(data_json)} JSON bytes); maximum size is {APIGW_MAX_SIZE} bytes" ) - if compress: + if compress and _GLOBAL_ENABLE_REQUEST_COMPRESSION: # apply gzip compression to data before sending payload = gzip.compress(data_json) headers["Content-Encoding"] = "gzip" @@ -145,7 +146,9 @@ class API: data_json (str): A JSON-encoded string of the new bucket request. """ return await _async_post_data( - self.api_key, self._service_url_for(CREATE_PATH), data_jsonable + self.api_key, + self._service_url_for(CREATE_PATH), + data_jsonable, ) def _blocking_create(self, *args, **kwargs): @@ -258,10 +261,7 @@ class API: data (bytes): Setup data to upload. """ resp = await _async_post_data( - self.api_key, - self._url_for(bucket_name, SETUP_PATH), - data, - compress=True, + self.api_key, self._url_for(bucket_name, SETUP_PATH), data ) return resp["uuid"] @@ -299,7 +299,7 @@ class API: def _blocking_write(self, *args, **kwargs): async_runner(self.write, *args, **kwargs) - async def async_private_read( + async def private_read( self, bucket_name: str, queries: list[bytes] ) -> list[Optional[bytes]]: """Privately read data from this bucket.""" @@ -313,4 +313,4 @@ class API: return [base64.b64decode(v) if v is not None else None for v in r] def _blocking_private_read(self, *args, **kwargs): - return async_runner(self.async_private_read, *args, **kwargs) + return async_runner(self.private_read, *args, **kwargs) diff --git a/python/blyss/bucket.py b/python/blyss/bucket.py index c76d690..e7d323f 100644 --- a/python/blyss/bucket.py +++ b/python/blyss/bucket.py @@ -121,13 +121,13 @@ class Bucket: ] return queries - def _decode_result( - self, key: str, result_row: bytes, silence_errors: bool = True + def _decode_result_row( + self, result_row: bytes, silence_errors: bool = True ) -> Optional[bytes]: try: decrypted_result = self._lib.decode_response(result_row) decompressed_result = bz2.decompress(decrypted_result) - return self._lib.extract_result(key, decompressed_result) + return decompressed_result except: if not silence_errors: raise @@ -198,43 +198,53 @@ class Bucket: self._api._blocking_clear(self.name) def private_read(self, keys: list[str]) -> list[Optional[bytes]]: - """Privately reads the supplied key(s) from the bucket, - and returns the corresponding value(s). + """Privately reads the supplied keys from the bucket, + and returns the corresponding values. Data will be accessed using fully homomorphic encryption, designed to make it impossible for any entity (including the Blyss service!) to - determine which key(s) are being read. + determine which keys are being read. Args: - keys: A key or list of keys to privately retrieve. - If a list of keys is supplied, - results will be returned in the same order. + keys: A list of keys to privately retrieve. + Results will be returned in the same order. Returns: For each key, the value found for the key in the bucket, or None if the key was not found. """ - single_query = False - if isinstance(keys, str): - keys = [keys] - single_query = True - - if not self._public_uuid or not self._check(): - self.setup() - assert self._public_uuid - - queries = self._generate_query_stream(keys) - rows_per_result = self._api._blocking_private_read(self.name, queries) + row_indices_per_key = [self._lib.get_row(k) for k in keys] + rows_per_result = self.private_read_row(row_indices_per_key) results = [ - self._decode_result(key, result) if result else None - for key, result in zip(keys, rows_per_result) + self._lib.extract_result(key, row) if row else None + for key, row in zip(keys, rows_per_result) ] - if single_query: - return results[0] - return results + def private_read_row(self, row_indices: list[int]) -> list[Optional[bytes]]: + """Direct API for private reads; fetches full bucket rows rather than individual keys. + + Args: + row_indices: A list of row indices to privately retrieve. + Results will be returned in the same order. + + Returns: + For each row index, the value found for the row in the bucket, + or None if the row was not found. + """ + if not self._public_uuid or not self._check(): + self.setup() + assert self._public_uuid + + queries = [self._lib.generate_query(self._public_uuid, i) for i in row_indices] + raw_rows_per_result = self._api._blocking_private_read(self.name, queries) + rows_per_result = [ + self._decode_result_row(rr) if rr else None for rr in raw_rows_per_result + ] + + return rows_per_result + def private_key_intersect(self, keys: list[str]) -> list[str]: """Privately intersects the given set of keys with the keys in this bucket, returning the keys that intersected. This is generally slower than a single @@ -332,17 +342,25 @@ class AsyncBucket(Bucket): await asyncio.gather(*_tasks) async def private_read(self, keys: list[str]) -> list[Optional[bytes]]: - if not self._public_uuid or not await self._check(): - await self.setup() - assert self._public_uuid - - multi_query = self._generate_query_stream(keys) - - rows_per_result = await self._api.async_private_read(self.name, multi_query) + row_indices_per_key = [self._lib.get_row(k) for k in keys] + rows_per_result = await self.private_read_row(row_indices_per_key) results = [ - self._decode_result(key, result) if result else None - for key, result in zip(keys, rows_per_result) + self._lib.extract_result(key, row) if row else None + for key, row in zip(keys, rows_per_result) ] return results + + async def private_read_row(self, row_indices: list[int]) -> list[Optional[bytes]]: + if not self._public_uuid or not await self._check(): + await self.setup() + assert self._public_uuid + + queries = [self._lib.generate_query(self._public_uuid, i) for i in row_indices] + raw_rows_per_result = await self._api.private_read(self.name, queries) + rows_per_result = [ + self._decode_result_row(rr) if rr else None for rr in raw_rows_per_result + ] + + return rows_per_result diff --git a/python/blyss/bucket_service.py b/python/blyss/bucket_service.py index fc2504e..56a5601 100644 --- a/python/blyss/bucket_service.py +++ b/python/blyss/bucket_service.py @@ -14,21 +14,15 @@ ApiConfig = dict[str, str] class BucketService: """A client to the hosted Blyss bucket service. Allows creation, deletion, and modification of buckets.""" - def __init__(self, api_config: Union[str, ApiConfig]): + def __init__(self, api_key: str, endpoint: str = BLYSS_BUCKET_URL): """Initialize a client of the Blyss bucket service. Args: - api_config: An API key string, or - a dictionary containing an API configuration. - The minimum set of keys is: - "endpoint": A fully qualified endpoint URL for the bucket service. - "api_key" : An API key to supply with every request. - """ - if isinstance(api_config, str): - api_config = {"api_key": api_config} + api_key: A valid Blyss API key. + endpoint: A fully qualified endpoint URL for the bucket service, e.g. https://beta.api.blyss.dev. - service_endpoint = api_config.get("endpoint", BLYSS_BUCKET_URL) - self._api = api.API(api_config["api_key"], service_endpoint) + """ + self._api = api.API(api_key, endpoint) def connect( self,