mirror of
https://github.com/All-Hands-AI/OpenHands.git
synced 2026-01-09 14:57:59 -05:00
Add BatchedWebHookFileStore for batching webhook updates (#10119)
Co-authored-by: openhands <openhands@all-hands.dev>
This commit is contained in:
@@ -72,6 +72,7 @@ class OpenHandsConfig(BaseModel):
|
||||
file_store_path: str = Field(default='~/.openhands')
|
||||
file_store_web_hook_url: str | None = Field(default=None)
|
||||
file_store_web_hook_headers: dict | None = Field(default=None)
|
||||
file_store_web_hook_batch: bool = Field(default=False)
|
||||
enable_browser: bool = Field(default=True)
|
||||
save_trajectory_path: str | None = Field(default=None)
|
||||
save_screenshots_in_trajectory: bool = Field(default=False)
|
||||
|
||||
@@ -31,6 +31,7 @@ file_store: FileStore = get_file_store(
|
||||
config.file_store_path,
|
||||
config.file_store_web_hook_url,
|
||||
config.file_store_web_hook_headers,
|
||||
batch=config.file_store_web_hook_batch,
|
||||
)
|
||||
|
||||
client_manager = None
|
||||
|
||||
@@ -61,9 +61,12 @@ The `WebHookFileStore` wraps another `FileStore` implementation and sends HTTP r
|
||||
**Configuration Options:**
|
||||
- `file_store_web_hook_url`: The base URL for webhook requests
|
||||
- `file_store_web_hook_headers`: HTTP headers to include in webhook requests
|
||||
- `file_store_web_hook_batch`: Whether to use batched webhook requests (default: false)
|
||||
|
||||
### Protocol Details
|
||||
|
||||
#### Standard Webhook Protocol (Non-Batched)
|
||||
|
||||
1. **File Write Operation**:
|
||||
- When a file is written, a POST request is sent to `{base_url}{path}`
|
||||
- The request body contains the file contents
|
||||
@@ -73,6 +76,27 @@ The `WebHookFileStore` wraps another `FileStore` implementation and sends HTTP r
|
||||
- When a file is deleted, a DELETE request is sent to `{base_url}{path}`
|
||||
- The operation is retried up to 3 times with a 1-second delay between attempts
|
||||
|
||||
#### Batched Webhook Protocol
|
||||
|
||||
The `BatchedWebHookFileStore` extends the webhook functionality by batching multiple file operations into a single request, which can significantly improve performance when many files are being modified in a short period of time.
|
||||
|
||||
1. **Batch Request**:
|
||||
- A single POST request is sent to `{base_url}` with a JSON array in the body
|
||||
- Each item in the array contains:
|
||||
- `method`: "POST" for write operations, "DELETE" for delete operations
|
||||
- `path`: The file path
|
||||
- `content`: The file contents (for write operations only)
|
||||
- `encoding`: "base64" if binary content was base64-encoded (optional)
|
||||
|
||||
2. **Batch Triggering**:
|
||||
- Batches are sent when one of the following conditions is met:
|
||||
- A timeout period has elapsed (defaults to 5 seconds, configurable via constructor parameter)
|
||||
- The total size of batched content exceeds a size limit (defaults to 1MB, configurable via constructor parameter)
|
||||
- The `flush()` method is explicitly called
|
||||
|
||||
3. **Error Handling**:
|
||||
- The batch request is retried up to 3 times with a 1-second delay between attempts
|
||||
|
||||
## Configuration
|
||||
|
||||
To configure the storage module in OpenHands, use the following configuration options:
|
||||
@@ -90,4 +114,14 @@ file_store_web_hook_url = "https://example.com/api/files"
|
||||
|
||||
# Optional webhook headers (JSON string)
|
||||
file_store_web_hook_headers = '{"Authorization": "Bearer token"}'
|
||||
|
||||
# Optional batched webhook mode (default: false)
|
||||
file_store_web_hook_batch = true
|
||||
```
|
||||
|
||||
**Batched Webhook Configuration:**
|
||||
The batched webhook behavior uses predefined constants with the following default values:
|
||||
- Batch timeout: 5 seconds
|
||||
- Batch size limit: 1MB (1048576 bytes)
|
||||
|
||||
These values can be customized by passing `batch_timeout_seconds` and `batch_size_limit_bytes` parameters to the `BatchedWebHookFileStore` constructor.
|
||||
|
||||
@@ -2,6 +2,7 @@ import os
|
||||
|
||||
import httpx
|
||||
|
||||
from openhands.storage.batched_web_hook import BatchedWebHookFileStore
|
||||
from openhands.storage.files import FileStore
|
||||
from openhands.storage.google_cloud import GoogleCloudFileStore
|
||||
from openhands.storage.local import LocalFileStore
|
||||
@@ -15,6 +16,7 @@ def get_file_store(
|
||||
file_store_path: str | None = None,
|
||||
file_store_web_hook_url: str | None = None,
|
||||
file_store_web_hook_headers: dict | None = None,
|
||||
batch: bool = False,
|
||||
) -> FileStore:
|
||||
store: FileStore
|
||||
if file_store_type == 'local':
|
||||
@@ -35,9 +37,21 @@ def get_file_store(
|
||||
file_store_web_hook_headers['X-Session-API-Key'] = os.getenv(
|
||||
'SESSION_API_KEY'
|
||||
)
|
||||
store = WebHookFileStore(
|
||||
store,
|
||||
file_store_web_hook_url,
|
||||
httpx.Client(headers=file_store_web_hook_headers or {}),
|
||||
)
|
||||
|
||||
client = httpx.Client(headers=file_store_web_hook_headers or {})
|
||||
|
||||
if batch:
|
||||
# Use batched webhook file store
|
||||
store = BatchedWebHookFileStore(
|
||||
store,
|
||||
file_store_web_hook_url,
|
||||
client,
|
||||
)
|
||||
else:
|
||||
# Use regular webhook file store
|
||||
store = WebHookFileStore(
|
||||
store,
|
||||
file_store_web_hook_url,
|
||||
client,
|
||||
)
|
||||
return store
|
||||
|
||||
274
openhands/storage/batched_web_hook.py
Normal file
274
openhands/storage/batched_web_hook.py
Normal file
@@ -0,0 +1,274 @@
|
||||
import threading
|
||||
from typing import Optional, Union
|
||||
|
||||
import httpx
|
||||
import tenacity
|
||||
|
||||
from openhands.storage.files import FileStore
|
||||
from openhands.utils.async_utils import EXECUTOR
|
||||
|
||||
# Constants for batching configuration
|
||||
WEBHOOK_BATCH_TIMEOUT_SECONDS = 5.0
|
||||
WEBHOOK_BATCH_SIZE_LIMIT_BYTES = 1048576 # 1MB
|
||||
|
||||
|
||||
class BatchedWebHookFileStore(FileStore):
|
||||
"""
|
||||
File store which batches updates before sending them to a webhook.
|
||||
|
||||
This class wraps another FileStore implementation and sends HTTP requests
|
||||
to a specified URL when files are written or deleted. Updates are batched
|
||||
and sent together after a certain amount of time passes or if the content
|
||||
size exceeds a threshold.
|
||||
|
||||
Attributes:
|
||||
file_store: The underlying FileStore implementation
|
||||
base_url: The base URL for webhook requests
|
||||
client: The HTTP client used to make webhook requests
|
||||
batch_timeout_seconds: Time in seconds after which a batch is sent (default: WEBHOOK_BATCH_TIMEOUT_SECONDS)
|
||||
batch_size_limit_bytes: Size limit in bytes after which a batch is sent (default: WEBHOOK_BATCH_SIZE_LIMIT_BYTES)
|
||||
_batch_lock: Lock for thread-safe access to the batch
|
||||
_batch: Dictionary of pending file updates
|
||||
_batch_timer: Timer for sending batches after timeout
|
||||
_batch_size: Current size of the batch in bytes
|
||||
"""
|
||||
|
||||
file_store: FileStore
|
||||
base_url: str
|
||||
client: httpx.Client
|
||||
batch_timeout_seconds: float
|
||||
batch_size_limit_bytes: int
|
||||
_batch_lock: threading.Lock
|
||||
_batch: dict[str, tuple[str, Optional[Union[str, bytes]]]]
|
||||
_batch_timer: Optional[threading.Timer]
|
||||
_batch_size: int
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
file_store: FileStore,
|
||||
base_url: str,
|
||||
client: Optional[httpx.Client] = None,
|
||||
batch_timeout_seconds: Optional[float] = None,
|
||||
batch_size_limit_bytes: Optional[int] = None,
|
||||
):
|
||||
"""
|
||||
Initialize a BatchedWebHookFileStore.
|
||||
|
||||
Args:
|
||||
file_store: The underlying FileStore implementation
|
||||
base_url: The base URL for webhook requests
|
||||
client: Optional HTTP client to use for requests. If None, a new client will be created.
|
||||
batch_timeout_seconds: Time in seconds after which a batch is sent.
|
||||
If None, uses the default constant WEBHOOK_BATCH_TIMEOUT_SECONDS.
|
||||
batch_size_limit_bytes: Size limit in bytes after which a batch is sent.
|
||||
If None, uses the default constant WEBHOOK_BATCH_SIZE_LIMIT_BYTES.
|
||||
"""
|
||||
self.file_store = file_store
|
||||
self.base_url = base_url
|
||||
if client is None:
|
||||
client = httpx.Client()
|
||||
self.client = client
|
||||
|
||||
# Use provided values or default constants
|
||||
self.batch_timeout_seconds = (
|
||||
batch_timeout_seconds or WEBHOOK_BATCH_TIMEOUT_SECONDS
|
||||
)
|
||||
self.batch_size_limit_bytes = (
|
||||
batch_size_limit_bytes or WEBHOOK_BATCH_SIZE_LIMIT_BYTES
|
||||
)
|
||||
|
||||
# Initialize batch state
|
||||
self._batch_lock = threading.Lock()
|
||||
self._batch = {} # Maps path -> (operation, content)
|
||||
self._batch_timer = None
|
||||
self._batch_size = 0
|
||||
|
||||
def write(self, path: str, contents: Union[str, bytes]) -> None:
|
||||
"""
|
||||
Write contents to a file and queue a webhook update.
|
||||
|
||||
Args:
|
||||
path: The path to write to
|
||||
contents: The contents to write
|
||||
"""
|
||||
self.file_store.write(path, contents)
|
||||
self._queue_update(path, 'write', contents)
|
||||
|
||||
def read(self, path: str) -> str:
|
||||
"""
|
||||
Read contents from a file.
|
||||
|
||||
Args:
|
||||
path: The path to read from
|
||||
|
||||
Returns:
|
||||
The contents of the file
|
||||
"""
|
||||
return self.file_store.read(path)
|
||||
|
||||
def list(self, path: str) -> list[str]:
|
||||
"""
|
||||
List files in a directory.
|
||||
|
||||
Args:
|
||||
path: The directory path to list
|
||||
|
||||
Returns:
|
||||
A list of file paths
|
||||
"""
|
||||
return self.file_store.list(path)
|
||||
|
||||
def delete(self, path: str) -> None:
|
||||
"""
|
||||
Delete a file and queue a webhook update.
|
||||
|
||||
Args:
|
||||
path: The path to delete
|
||||
"""
|
||||
self.file_store.delete(path)
|
||||
self._queue_update(path, 'delete', None)
|
||||
|
||||
def _queue_update(
|
||||
self, path: str, operation: str, contents: Optional[Union[str, bytes]]
|
||||
) -> None:
|
||||
"""
|
||||
Queue an update to be sent to the webhook.
|
||||
|
||||
Args:
|
||||
path: The path that was modified
|
||||
operation: The operation performed ("write" or "delete")
|
||||
contents: The contents that were written (None for delete operations)
|
||||
"""
|
||||
with self._batch_lock:
|
||||
# Calculate content size
|
||||
content_size = 0
|
||||
if contents is not None:
|
||||
if isinstance(contents, str):
|
||||
content_size = len(contents.encode('utf-8'))
|
||||
else:
|
||||
content_size = len(contents)
|
||||
|
||||
# Update batch size calculation
|
||||
# If this path already exists in the batch, subtract its previous size
|
||||
if path in self._batch:
|
||||
prev_op, prev_contents = self._batch[path]
|
||||
if prev_contents is not None:
|
||||
if isinstance(prev_contents, str):
|
||||
self._batch_size -= len(prev_contents.encode('utf-8'))
|
||||
else:
|
||||
self._batch_size -= len(prev_contents)
|
||||
|
||||
# Add new content size
|
||||
self._batch_size += content_size
|
||||
|
||||
# Add to batch
|
||||
self._batch[path] = (operation, contents)
|
||||
|
||||
# Check if we need to send the batch due to size limit
|
||||
if self._batch_size >= self.batch_size_limit_bytes:
|
||||
# Submit to executor to avoid blocking
|
||||
EXECUTOR.submit(self._send_batch)
|
||||
return
|
||||
|
||||
# Start or reset the timer for sending the batch
|
||||
if self._batch_timer is not None:
|
||||
self._batch_timer.cancel()
|
||||
self._batch_timer = None
|
||||
|
||||
timer = threading.Timer(
|
||||
self.batch_timeout_seconds, self._send_batch_from_timer
|
||||
)
|
||||
timer.daemon = True
|
||||
timer.start()
|
||||
self._batch_timer = timer
|
||||
|
||||
def _send_batch_from_timer(self) -> None:
|
||||
"""
|
||||
Send the batch from the timer thread.
|
||||
This method is called by the timer and submits the actual sending to the executor.
|
||||
"""
|
||||
EXECUTOR.submit(self._send_batch)
|
||||
|
||||
def _send_batch(self) -> None:
|
||||
"""
|
||||
Send the current batch of updates to the webhook as a single request.
|
||||
This method acquires the batch lock and processes all pending updates in one batch.
|
||||
"""
|
||||
batch_to_send: dict[str, tuple[str, Optional[Union[str, bytes]]]] = {}
|
||||
|
||||
with self._batch_lock:
|
||||
if not self._batch:
|
||||
return
|
||||
|
||||
# Copy the batch and clear the current one
|
||||
batch_to_send = self._batch.copy()
|
||||
self._batch.clear()
|
||||
self._batch_size = 0
|
||||
|
||||
# Cancel any pending timer
|
||||
if self._batch_timer is not None:
|
||||
self._batch_timer.cancel()
|
||||
self._batch_timer = None
|
||||
|
||||
# Process the entire batch in a single request
|
||||
if batch_to_send:
|
||||
try:
|
||||
self._send_batch_request(batch_to_send)
|
||||
except Exception as e:
|
||||
# Log the error
|
||||
print(f'Error sending webhook batch: {e}')
|
||||
|
||||
@tenacity.retry(
|
||||
wait=tenacity.wait_fixed(1),
|
||||
stop=tenacity.stop_after_attempt(3),
|
||||
)
|
||||
def _send_batch_request(
|
||||
self, batch: dict[str, tuple[str, Optional[Union[str, bytes]]]]
|
||||
) -> None:
|
||||
"""
|
||||
Send a single batch request to the webhook URL with all updates.
|
||||
|
||||
This method is retried up to 3 times with a 1-second delay between attempts.
|
||||
|
||||
Args:
|
||||
batch: Dictionary mapping paths to (operation, contents) tuples
|
||||
|
||||
Raises:
|
||||
httpx.HTTPStatusError: If the webhook request fails
|
||||
"""
|
||||
# Prepare the batch payload
|
||||
batch_payload = []
|
||||
|
||||
for path, (operation, contents) in batch.items():
|
||||
item = {
|
||||
'method': 'POST' if operation == 'write' else 'DELETE',
|
||||
'path': path,
|
||||
}
|
||||
|
||||
if operation == 'write' and contents is not None:
|
||||
# Convert bytes to string if needed
|
||||
if isinstance(contents, bytes):
|
||||
try:
|
||||
# Try to decode as UTF-8
|
||||
item['content'] = contents.decode('utf-8')
|
||||
except UnicodeDecodeError:
|
||||
# If not UTF-8, use base64 encoding
|
||||
import base64
|
||||
|
||||
item['content'] = base64.b64encode(contents).decode('ascii')
|
||||
item['encoding'] = 'base64'
|
||||
else:
|
||||
item['content'] = contents
|
||||
|
||||
batch_payload.append(item)
|
||||
|
||||
# Send the batch as a single request
|
||||
response = self.client.post(self.base_url, json=batch_payload)
|
||||
response.raise_for_status()
|
||||
|
||||
def flush(self) -> None:
|
||||
"""
|
||||
Immediately send any pending updates to the webhook.
|
||||
This can be called to ensure all updates are sent before shutting down.
|
||||
"""
|
||||
self._send_batch()
|
||||
235
tests/unit/test_batched_web_hook.py
Normal file
235
tests/unit/test_batched_web_hook.py
Normal file
@@ -0,0 +1,235 @@
|
||||
import time
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import httpx
|
||||
import pytest
|
||||
|
||||
from openhands.storage.batched_web_hook import BatchedWebHookFileStore
|
||||
from openhands.storage.files import FileStore
|
||||
|
||||
|
||||
class MockFileStore(FileStore):
|
||||
def __init__(self):
|
||||
self.files = {}
|
||||
|
||||
def write(self, path: str, contents: str | bytes) -> None:
|
||||
self.files[path] = contents
|
||||
|
||||
def read(self, path: str) -> str:
|
||||
return self.files.get(path, '')
|
||||
|
||||
def list(self, path: str) -> list[str]:
|
||||
return [k for k in self.files.keys() if k.startswith(path)]
|
||||
|
||||
def delete(self, path: str) -> None:
|
||||
if path in self.files:
|
||||
del self.files[path]
|
||||
|
||||
|
||||
class TestBatchedWebHookFileStore:
|
||||
@pytest.fixture
|
||||
def mock_client(self):
|
||||
client = MagicMock(spec=httpx.Client)
|
||||
client.post.return_value.raise_for_status = MagicMock()
|
||||
client.delete.return_value.raise_for_status = MagicMock()
|
||||
return client
|
||||
|
||||
@pytest.fixture
|
||||
def file_store(self):
|
||||
return MockFileStore()
|
||||
|
||||
@pytest.fixture
|
||||
def batched_store(self, file_store, mock_client):
|
||||
# Use a short timeout for testing
|
||||
return BatchedWebHookFileStore(
|
||||
file_store=file_store,
|
||||
base_url='http://example.com',
|
||||
client=mock_client,
|
||||
batch_timeout_seconds=0.1, # Short timeout for testing
|
||||
batch_size_limit_bytes=1000,
|
||||
)
|
||||
|
||||
def test_write_operation_batched(self, batched_store, mock_client):
|
||||
# Write a file
|
||||
batched_store.write('/test.txt', 'Hello, world!')
|
||||
|
||||
# The client should not have been called yet
|
||||
mock_client.post.assert_not_called()
|
||||
|
||||
# Wait for the batch timeout
|
||||
time.sleep(0.2)
|
||||
|
||||
# Now the client should have been called with a batch payload
|
||||
mock_client.post.assert_called_once()
|
||||
args, kwargs = mock_client.post.call_args
|
||||
assert args[0] == 'http://example.com'
|
||||
assert 'json' in kwargs
|
||||
|
||||
# Check the batch payload
|
||||
batch_payload = kwargs['json']
|
||||
assert isinstance(batch_payload, list)
|
||||
assert len(batch_payload) == 1
|
||||
assert batch_payload[0]['method'] == 'POST'
|
||||
assert batch_payload[0]['path'] == '/test.txt'
|
||||
assert batch_payload[0]['content'] == 'Hello, world!'
|
||||
|
||||
def test_delete_operation_batched(self, batched_store, mock_client):
|
||||
# Write and then delete a file
|
||||
batched_store.write('/test.txt', 'Hello, world!')
|
||||
batched_store.delete('/test.txt')
|
||||
|
||||
# The client should not have been called yet
|
||||
mock_client.post.assert_not_called()
|
||||
|
||||
# Wait for the batch timeout
|
||||
time.sleep(0.2)
|
||||
|
||||
# Now the client should have been called with a batch payload
|
||||
mock_client.post.assert_called_once()
|
||||
args, kwargs = mock_client.post.call_args
|
||||
assert args[0] == 'http://example.com'
|
||||
assert 'json' in kwargs
|
||||
|
||||
# Check the batch payload
|
||||
batch_payload = kwargs['json']
|
||||
assert isinstance(batch_payload, list)
|
||||
assert len(batch_payload) == 1
|
||||
assert batch_payload[0]['method'] == 'DELETE'
|
||||
assert batch_payload[0]['path'] == '/test.txt'
|
||||
assert 'content' not in batch_payload[0]
|
||||
|
||||
def test_batch_size_limit_triggers_send(self, batched_store, mock_client):
|
||||
# Write a large file that exceeds the batch size limit
|
||||
large_content = 'x' * 1001 # Exceeds the 1000 byte limit
|
||||
batched_store.write('/large.txt', large_content)
|
||||
|
||||
# The batch might be sent asynchronously, so we need to wait a bit
|
||||
time.sleep(0.2)
|
||||
|
||||
# The client should have been called due to size limit
|
||||
mock_client.post.assert_called_once()
|
||||
args, kwargs = mock_client.post.call_args
|
||||
assert args[0] == 'http://example.com'
|
||||
assert 'json' in kwargs
|
||||
|
||||
# Check the batch payload
|
||||
batch_payload = kwargs['json']
|
||||
assert isinstance(batch_payload, list)
|
||||
assert len(batch_payload) == 1
|
||||
assert batch_payload[0]['method'] == 'POST'
|
||||
assert batch_payload[0]['path'] == '/large.txt'
|
||||
assert batch_payload[0]['content'] == large_content
|
||||
|
||||
def test_multiple_updates_same_file(self, batched_store, mock_client):
|
||||
# Write to the same file multiple times
|
||||
batched_store.write('/test.txt', 'Version 1')
|
||||
batched_store.write('/test.txt', 'Version 2')
|
||||
batched_store.write('/test.txt', 'Version 3')
|
||||
|
||||
# Wait for the batch timeout
|
||||
time.sleep(0.2)
|
||||
|
||||
# Only the latest version should be sent
|
||||
mock_client.post.assert_called_once()
|
||||
args, kwargs = mock_client.post.call_args
|
||||
assert args[0] == 'http://example.com'
|
||||
assert 'json' in kwargs
|
||||
|
||||
# Check the batch payload
|
||||
batch_payload = kwargs['json']
|
||||
assert isinstance(batch_payload, list)
|
||||
assert len(batch_payload) == 1
|
||||
assert batch_payload[0]['method'] == 'POST'
|
||||
assert batch_payload[0]['path'] == '/test.txt'
|
||||
assert batch_payload[0]['content'] == 'Version 3'
|
||||
|
||||
def test_flush_sends_immediately(self, batched_store, mock_client):
|
||||
# Write a file
|
||||
batched_store.write('/test.txt', 'Hello, world!')
|
||||
|
||||
# The client should not have been called yet
|
||||
mock_client.post.assert_not_called()
|
||||
|
||||
# Flush the batch
|
||||
batched_store.flush()
|
||||
|
||||
# Now the client should have been called without waiting for timeout
|
||||
mock_client.post.assert_called_once()
|
||||
args, kwargs = mock_client.post.call_args
|
||||
assert args[0] == 'http://example.com'
|
||||
assert 'json' in kwargs
|
||||
|
||||
# Check the batch payload
|
||||
batch_payload = kwargs['json']
|
||||
assert isinstance(batch_payload, list)
|
||||
assert len(batch_payload) == 1
|
||||
assert batch_payload[0]['method'] == 'POST'
|
||||
assert batch_payload[0]['path'] == '/test.txt'
|
||||
assert batch_payload[0]['content'] == 'Hello, world!'
|
||||
|
||||
def test_multiple_operations_in_single_batch(self, batched_store, mock_client):
|
||||
# Perform multiple operations
|
||||
batched_store.write('/file1.txt', 'Content 1')
|
||||
batched_store.write('/file2.txt', 'Content 2')
|
||||
batched_store.delete('/file3.txt')
|
||||
|
||||
# Wait for the batch timeout
|
||||
time.sleep(0.2)
|
||||
|
||||
# Check that only one POST request was made with all operations
|
||||
mock_client.post.assert_called_once()
|
||||
args, kwargs = mock_client.post.call_args
|
||||
assert args[0] == 'http://example.com'
|
||||
assert 'json' in kwargs
|
||||
|
||||
# Check the batch payload
|
||||
batch_payload = kwargs['json']
|
||||
assert isinstance(batch_payload, list)
|
||||
assert len(batch_payload) == 3
|
||||
|
||||
# Check each operation in the batch
|
||||
operations = {item['path']: item for item in batch_payload}
|
||||
|
||||
assert '/file1.txt' in operations
|
||||
assert operations['/file1.txt']['method'] == 'POST'
|
||||
assert operations['/file1.txt']['content'] == 'Content 1'
|
||||
|
||||
assert '/file2.txt' in operations
|
||||
assert operations['/file2.txt']['method'] == 'POST'
|
||||
assert operations['/file2.txt']['content'] == 'Content 2'
|
||||
|
||||
assert '/file3.txt' in operations
|
||||
assert operations['/file3.txt']['method'] == 'DELETE'
|
||||
assert 'content' not in operations['/file3.txt']
|
||||
|
||||
def test_binary_content_handling(self, batched_store, mock_client):
|
||||
# Write binary content
|
||||
binary_content = b'\x00\x01\x02\x03\xff\xfe\xfd\xfc'
|
||||
batched_store.write('/binary.bin', binary_content)
|
||||
|
||||
# Wait for the batch timeout
|
||||
time.sleep(0.2)
|
||||
|
||||
# Check that the client was called
|
||||
mock_client.post.assert_called_once()
|
||||
args, kwargs = mock_client.post.call_args
|
||||
assert args[0] == 'http://example.com'
|
||||
assert 'json' in kwargs
|
||||
|
||||
# Check the batch payload
|
||||
batch_payload = kwargs['json']
|
||||
assert isinstance(batch_payload, list)
|
||||
assert len(batch_payload) == 1
|
||||
|
||||
# Binary content should be base64 encoded
|
||||
assert batch_payload[0]['method'] == 'POST'
|
||||
assert batch_payload[0]['path'] == '/binary.bin'
|
||||
assert 'content' in batch_payload[0]
|
||||
assert 'encoding' in batch_payload[0]
|
||||
assert batch_payload[0]['encoding'] == 'base64'
|
||||
|
||||
# Verify the content can be decoded back to the original binary
|
||||
import base64
|
||||
|
||||
decoded = base64.b64decode(batch_payload[0]['content'].encode('ascii'))
|
||||
assert decoded == binary_content
|
||||
Reference in New Issue
Block a user