mirror of
https://github.com/simstudioai/sim.git
synced 2026-01-08 22:48:14 -05:00
* fix(billing): should allow restoring subscription (#1728) * fix(already-cancelled-sub): UI should allow restoring subscription * restore functionality fixed * fix * improvement(api-keys): move to workspace level * remove migration to prep merge * remove two more unused cols * prep staging merge * add migration back --------- Co-authored-by: Waleed <walif6@gmail.com> Co-authored-by: Siddharth Ganesan <33737564+Sg312@users.noreply.github.com>
562 lines
19 KiB
Python
562 lines
19 KiB
Python
"""
|
|
Sim SDK for Python
|
|
|
|
Official Python SDK for Sim, allowing you to execute workflows programmatically.
|
|
"""
|
|
|
|
from typing import Any, Dict, Optional, Union
|
|
from dataclasses import dataclass
|
|
import time
|
|
import random
|
|
import os
|
|
|
|
import requests
|
|
|
|
|
|
__version__ = "0.1.0"
|
|
__all__ = [
|
|
"SimStudioClient",
|
|
"SimStudioError",
|
|
"WorkflowExecutionResult",
|
|
"WorkflowStatus",
|
|
"AsyncExecutionResult",
|
|
"RateLimitInfo",
|
|
"UsageLimits",
|
|
]
|
|
|
|
|
|
@dataclass
|
|
class WorkflowExecutionResult:
|
|
"""Result of a workflow execution."""
|
|
success: bool
|
|
output: Optional[Any] = None
|
|
error: Optional[str] = None
|
|
logs: Optional[list] = None
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
trace_spans: Optional[list] = None
|
|
total_duration: Optional[float] = None
|
|
|
|
|
|
@dataclass
|
|
class WorkflowStatus:
|
|
"""Status of a workflow."""
|
|
is_deployed: bool
|
|
deployed_at: Optional[str] = None
|
|
needs_redeployment: bool = False
|
|
|
|
|
|
@dataclass
|
|
class AsyncExecutionResult:
|
|
"""Result of an async workflow execution."""
|
|
success: bool
|
|
task_id: str
|
|
status: str # 'queued'
|
|
created_at: str
|
|
links: Dict[str, str]
|
|
|
|
|
|
@dataclass
|
|
class RateLimitInfo:
|
|
"""Rate limit information from API response headers."""
|
|
limit: int
|
|
remaining: int
|
|
reset: int
|
|
retry_after: Optional[int] = None
|
|
|
|
|
|
@dataclass
|
|
class RateLimitStatus:
|
|
"""Rate limit status for sync/async requests."""
|
|
is_limited: bool
|
|
limit: int
|
|
remaining: int
|
|
reset_at: str
|
|
|
|
|
|
@dataclass
|
|
class UsageLimits:
|
|
"""Usage limits and quota information."""
|
|
success: bool
|
|
rate_limit: Dict[str, Any]
|
|
usage: Dict[str, Any]
|
|
|
|
|
|
class SimStudioError(Exception):
|
|
"""Exception raised for Sim API errors."""
|
|
|
|
def __init__(self, message: str, code: Optional[str] = None, status: Optional[int] = None):
|
|
super().__init__(message)
|
|
self.code = code
|
|
self.status = status
|
|
|
|
|
|
class SimStudioClient:
|
|
"""
|
|
Sim API client for executing workflows programmatically.
|
|
|
|
Args:
|
|
api_key: Your Sim API key
|
|
base_url: Base URL for the Sim API (defaults to https://sim.ai)
|
|
"""
|
|
|
|
def __init__(self, api_key: str, base_url: str = "https://sim.ai"):
|
|
self.api_key = api_key
|
|
self.base_url = base_url.rstrip('/')
|
|
self._session = requests.Session()
|
|
self._session.headers.update({
|
|
'X-API-Key': self.api_key,
|
|
'Content-Type': 'application/json',
|
|
})
|
|
self._rate_limit_info: Optional[RateLimitInfo] = None
|
|
|
|
def _convert_files_to_base64(self, value: Any) -> Any:
|
|
"""
|
|
Convert file objects in input to API format (base64).
|
|
Recursively processes nested dicts and lists.
|
|
"""
|
|
import base64
|
|
import io
|
|
|
|
# Check if this is a file-like object
|
|
if hasattr(value, 'read') and callable(value.read):
|
|
# Save current position if seekable
|
|
initial_pos = value.tell() if hasattr(value, 'tell') else None
|
|
|
|
# Read file bytes
|
|
file_bytes = value.read()
|
|
|
|
# Restore position if seekable
|
|
if initial_pos is not None and hasattr(value, 'seek'):
|
|
value.seek(initial_pos)
|
|
|
|
# Encode to base64
|
|
base64_data = base64.b64encode(file_bytes).decode('utf-8')
|
|
|
|
# Get file metadata
|
|
filename = getattr(value, 'name', 'file')
|
|
if isinstance(filename, str):
|
|
filename = os.path.basename(filename)
|
|
|
|
content_type = getattr(value, 'content_type', 'application/octet-stream')
|
|
|
|
return {
|
|
'type': 'file',
|
|
'data': f'data:{content_type};base64,{base64_data}',
|
|
'name': filename,
|
|
'mime': content_type
|
|
}
|
|
|
|
# Recursively process lists
|
|
if isinstance(value, list):
|
|
return [self._convert_files_to_base64(item) for item in value]
|
|
|
|
# Recursively process dicts
|
|
if isinstance(value, dict):
|
|
return {k: self._convert_files_to_base64(v) for k, v in value.items()}
|
|
|
|
return value
|
|
|
|
def execute_workflow(
|
|
self,
|
|
workflow_id: str,
|
|
input_data: Optional[Dict[str, Any]] = None,
|
|
timeout: float = 30.0,
|
|
stream: Optional[bool] = None,
|
|
selected_outputs: Optional[list] = None,
|
|
async_execution: Optional[bool] = None
|
|
) -> Union[WorkflowExecutionResult, AsyncExecutionResult]:
|
|
"""
|
|
Execute a workflow with optional input data.
|
|
If async_execution is True, returns immediately with a task ID.
|
|
|
|
File objects in input_data will be automatically detected and converted to base64.
|
|
|
|
Args:
|
|
workflow_id: The ID of the workflow to execute
|
|
input_data: Input data to pass to the workflow (can include file-like objects)
|
|
timeout: Timeout in seconds (default: 30.0)
|
|
stream: Enable streaming responses (default: None)
|
|
selected_outputs: Block outputs to stream (e.g., ["agent1.content"])
|
|
async_execution: Execute asynchronously (default: None)
|
|
|
|
Returns:
|
|
WorkflowExecutionResult or AsyncExecutionResult object
|
|
|
|
Raises:
|
|
SimStudioError: If the workflow execution fails
|
|
"""
|
|
url = f"{self.base_url}/api/workflows/{workflow_id}/execute"
|
|
|
|
# Build headers - async execution uses X-Execution-Mode header
|
|
headers = self._session.headers.copy()
|
|
if async_execution:
|
|
headers['X-Execution-Mode'] = 'async'
|
|
|
|
try:
|
|
# Build JSON body - spread input at root level, then add API control parameters
|
|
body = input_data.copy() if input_data is not None else {}
|
|
|
|
# Convert any file objects in the input to base64 format
|
|
body = self._convert_files_to_base64(body)
|
|
|
|
if stream is not None:
|
|
body['stream'] = stream
|
|
if selected_outputs is not None:
|
|
body['selectedOutputs'] = selected_outputs
|
|
|
|
response = self._session.post(
|
|
url,
|
|
json=body,
|
|
headers=headers,
|
|
timeout=timeout
|
|
)
|
|
|
|
# Update rate limit info
|
|
self._update_rate_limit_info(response)
|
|
|
|
# Handle rate limiting
|
|
if response.status_code == 429:
|
|
retry_after = self._rate_limit_info.retry_after if self._rate_limit_info else 1000
|
|
raise SimStudioError(
|
|
f'Rate limit exceeded. Retry after {retry_after}ms',
|
|
'RATE_LIMIT_EXCEEDED',
|
|
429
|
|
)
|
|
|
|
if not response.ok:
|
|
try:
|
|
error_data = response.json()
|
|
error_message = error_data.get('error', f'HTTP {response.status_code}: {response.reason}')
|
|
error_code = error_data.get('code')
|
|
except (ValueError, KeyError):
|
|
error_message = f'HTTP {response.status_code}: {response.reason}'
|
|
error_code = None
|
|
|
|
raise SimStudioError(error_message, error_code, response.status_code)
|
|
|
|
result_data = response.json()
|
|
|
|
# Check if this is an async execution response (202 status)
|
|
if response.status_code == 202 and 'taskId' in result_data:
|
|
return AsyncExecutionResult(
|
|
success=result_data.get('success', True),
|
|
task_id=result_data['taskId'],
|
|
status=result_data.get('status', 'queued'),
|
|
created_at=result_data.get('createdAt', ''),
|
|
links=result_data.get('links', {})
|
|
)
|
|
|
|
return WorkflowExecutionResult(
|
|
success=result_data['success'],
|
|
output=result_data.get('output'),
|
|
error=result_data.get('error'),
|
|
logs=result_data.get('logs'),
|
|
metadata=result_data.get('metadata'),
|
|
trace_spans=result_data.get('traceSpans'),
|
|
total_duration=result_data.get('totalDuration')
|
|
)
|
|
|
|
except requests.Timeout:
|
|
raise SimStudioError(f'Workflow execution timed out after {timeout} seconds', 'TIMEOUT')
|
|
except requests.RequestException as e:
|
|
raise SimStudioError(f'Failed to execute workflow: {str(e)}', 'EXECUTION_ERROR')
|
|
|
|
def get_workflow_status(self, workflow_id: str) -> WorkflowStatus:
|
|
"""
|
|
Get the status of a workflow (deployment status, etc.).
|
|
|
|
Args:
|
|
workflow_id: The ID of the workflow
|
|
|
|
Returns:
|
|
WorkflowStatus object containing the workflow status
|
|
|
|
Raises:
|
|
SimStudioError: If getting the status fails
|
|
"""
|
|
url = f"{self.base_url}/api/workflows/{workflow_id}/status"
|
|
|
|
try:
|
|
response = self._session.get(url)
|
|
|
|
if not response.ok:
|
|
try:
|
|
error_data = response.json()
|
|
error_message = error_data.get('error', f'HTTP {response.status_code}: {response.reason}')
|
|
error_code = error_data.get('code')
|
|
except (ValueError, KeyError):
|
|
error_message = f'HTTP {response.status_code}: {response.reason}'
|
|
error_code = None
|
|
|
|
raise SimStudioError(error_message, error_code, response.status_code)
|
|
|
|
status_data = response.json()
|
|
|
|
return WorkflowStatus(
|
|
is_deployed=status_data.get('isDeployed', False),
|
|
deployed_at=status_data.get('deployedAt'),
|
|
needs_redeployment=status_data.get('needsRedeployment', False)
|
|
)
|
|
|
|
except requests.RequestException as e:
|
|
raise SimStudioError(f'Failed to get workflow status: {str(e)}', 'STATUS_ERROR')
|
|
|
|
def validate_workflow(self, workflow_id: str) -> bool:
|
|
"""
|
|
Validate that a workflow is ready for execution.
|
|
|
|
Args:
|
|
workflow_id: The ID of the workflow
|
|
|
|
Returns:
|
|
True if the workflow is deployed and ready, False otherwise
|
|
"""
|
|
try:
|
|
status = self.get_workflow_status(workflow_id)
|
|
return status.is_deployed
|
|
except SimStudioError:
|
|
return False
|
|
|
|
def execute_workflow_sync(
|
|
self,
|
|
workflow_id: str,
|
|
input_data: Optional[Dict[str, Any]] = None,
|
|
timeout: float = 30.0,
|
|
stream: Optional[bool] = None,
|
|
selected_outputs: Optional[list] = None
|
|
) -> WorkflowExecutionResult:
|
|
"""
|
|
Execute a workflow and poll for completion (useful for long-running workflows).
|
|
|
|
Note: Currently, the API is synchronous, so this method just calls execute_workflow.
|
|
In the future, if async execution is added, this method can be enhanced.
|
|
|
|
Args:
|
|
workflow_id: The ID of the workflow to execute
|
|
input_data: Input data to pass to the workflow (can include file-like objects)
|
|
timeout: Timeout for the initial request in seconds
|
|
stream: Enable streaming responses (default: None)
|
|
selected_outputs: Block outputs to stream (e.g., ["agent1.content"])
|
|
|
|
Returns:
|
|
WorkflowExecutionResult object containing the execution result
|
|
|
|
Raises:
|
|
SimStudioError: If the workflow execution fails
|
|
"""
|
|
# For now, the API is synchronous, so we just execute directly
|
|
# In the future, if async execution is added, this method can be enhanced
|
|
return self.execute_workflow(workflow_id, input_data, timeout, stream, selected_outputs)
|
|
|
|
def set_api_key(self, api_key: str) -> None:
|
|
"""
|
|
Update the API key.
|
|
|
|
Args:
|
|
api_key: New API key
|
|
"""
|
|
self.api_key = api_key
|
|
self._session.headers.update({'X-API-Key': api_key})
|
|
|
|
def set_base_url(self, base_url: str) -> None:
|
|
"""
|
|
Update the base URL.
|
|
|
|
Args:
|
|
base_url: New base URL
|
|
"""
|
|
self.base_url = base_url.rstrip('/')
|
|
|
|
def close(self) -> None:
|
|
"""Close the underlying HTTP session."""
|
|
self._session.close()
|
|
|
|
def get_job_status(self, task_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Get the status of an async job.
|
|
|
|
Args:
|
|
task_id: The task ID returned from async execution
|
|
|
|
Returns:
|
|
Dictionary containing the job status
|
|
|
|
Raises:
|
|
SimStudioError: If getting the status fails
|
|
"""
|
|
url = f"{self.base_url}/api/jobs/{task_id}"
|
|
|
|
try:
|
|
response = self._session.get(url)
|
|
|
|
self._update_rate_limit_info(response)
|
|
|
|
if not response.ok:
|
|
try:
|
|
error_data = response.json()
|
|
error_message = error_data.get('error', f'HTTP {response.status_code}: {response.reason}')
|
|
error_code = error_data.get('code')
|
|
except (ValueError, KeyError):
|
|
error_message = f'HTTP {response.status_code}: {response.reason}'
|
|
error_code = None
|
|
|
|
raise SimStudioError(error_message, error_code, response.status_code)
|
|
|
|
return response.json()
|
|
|
|
except requests.RequestException as e:
|
|
raise SimStudioError(f'Failed to get job status: {str(e)}', 'STATUS_ERROR')
|
|
|
|
def execute_with_retry(
|
|
self,
|
|
workflow_id: str,
|
|
input_data: Optional[Dict[str, Any]] = None,
|
|
timeout: float = 30.0,
|
|
stream: Optional[bool] = None,
|
|
selected_outputs: Optional[list] = None,
|
|
async_execution: Optional[bool] = None,
|
|
max_retries: int = 3,
|
|
initial_delay: float = 1.0,
|
|
max_delay: float = 30.0,
|
|
backoff_multiplier: float = 2.0
|
|
) -> Union[WorkflowExecutionResult, AsyncExecutionResult]:
|
|
"""
|
|
Execute workflow with automatic retry on rate limit.
|
|
|
|
Args:
|
|
workflow_id: The ID of the workflow to execute
|
|
input_data: Input data to pass to the workflow (can include file-like objects)
|
|
timeout: Timeout in seconds
|
|
stream: Enable streaming responses
|
|
selected_outputs: Block outputs to stream
|
|
async_execution: Execute asynchronously
|
|
max_retries: Maximum number of retries (default: 3)
|
|
initial_delay: Initial delay in seconds (default: 1.0)
|
|
max_delay: Maximum delay in seconds (default: 30.0)
|
|
backoff_multiplier: Backoff multiplier (default: 2.0)
|
|
|
|
Returns:
|
|
WorkflowExecutionResult or AsyncExecutionResult object
|
|
|
|
Raises:
|
|
SimStudioError: If max retries exceeded or other error occurs
|
|
"""
|
|
last_error = None
|
|
delay = initial_delay
|
|
|
|
for attempt in range(max_retries + 1):
|
|
try:
|
|
return self.execute_workflow(
|
|
workflow_id,
|
|
input_data,
|
|
timeout,
|
|
stream,
|
|
selected_outputs,
|
|
async_execution
|
|
)
|
|
except SimStudioError as e:
|
|
if e.code != 'RATE_LIMIT_EXCEEDED':
|
|
raise
|
|
|
|
last_error = e
|
|
|
|
# Don't retry after last attempt
|
|
if attempt == max_retries:
|
|
break
|
|
|
|
# Use retry-after if provided, otherwise use exponential backoff
|
|
wait_time = (
|
|
self._rate_limit_info.retry_after / 1000
|
|
if self._rate_limit_info and self._rate_limit_info.retry_after
|
|
else min(delay, max_delay)
|
|
)
|
|
|
|
# Add jitter (±25%)
|
|
jitter = wait_time * (0.75 + random.random() * 0.5)
|
|
|
|
time.sleep(jitter)
|
|
|
|
# Exponential backoff for next attempt
|
|
delay *= backoff_multiplier
|
|
|
|
raise last_error or SimStudioError('Max retries exceeded', 'MAX_RETRIES_EXCEEDED')
|
|
|
|
def get_rate_limit_info(self) -> Optional[RateLimitInfo]:
|
|
"""
|
|
Get current rate limit information.
|
|
|
|
Returns:
|
|
RateLimitInfo object or None if no rate limit info available
|
|
"""
|
|
return self._rate_limit_info
|
|
|
|
def _update_rate_limit_info(self, response: requests.Response) -> None:
|
|
"""
|
|
Update rate limit info from response headers.
|
|
|
|
Args:
|
|
response: The response object to extract headers from
|
|
"""
|
|
limit = response.headers.get('x-ratelimit-limit')
|
|
remaining = response.headers.get('x-ratelimit-remaining')
|
|
reset = response.headers.get('x-ratelimit-reset')
|
|
retry_after = response.headers.get('retry-after')
|
|
|
|
if limit or remaining or reset:
|
|
self._rate_limit_info = RateLimitInfo(
|
|
limit=int(limit) if limit else 0,
|
|
remaining=int(remaining) if remaining else 0,
|
|
reset=int(reset) if reset else 0,
|
|
retry_after=int(retry_after) * 1000 if retry_after else None
|
|
)
|
|
|
|
def get_usage_limits(self) -> UsageLimits:
|
|
"""
|
|
Get current usage limits and quota information.
|
|
|
|
Returns:
|
|
UsageLimits object containing usage and quota data
|
|
|
|
Raises:
|
|
SimStudioError: If getting usage limits fails
|
|
"""
|
|
url = f"{self.base_url}/api/users/me/usage-limits"
|
|
|
|
try:
|
|
response = self._session.get(url)
|
|
|
|
self._update_rate_limit_info(response)
|
|
|
|
if not response.ok:
|
|
try:
|
|
error_data = response.json()
|
|
error_message = error_data.get('error', f'HTTP {response.status_code}: {response.reason}')
|
|
error_code = error_data.get('code')
|
|
except (ValueError, KeyError):
|
|
error_message = f'HTTP {response.status_code}: {response.reason}'
|
|
error_code = None
|
|
|
|
raise SimStudioError(error_message, error_code, response.status_code)
|
|
|
|
data = response.json()
|
|
|
|
return UsageLimits(
|
|
success=data.get('success', True),
|
|
rate_limit=data.get('rateLimit', {}),
|
|
usage=data.get('usage', {})
|
|
)
|
|
|
|
except requests.RequestException as e:
|
|
raise SimStudioError(f'Failed to get usage limits: {str(e)}', 'USAGE_ERROR')
|
|
|
|
def __enter__(self):
|
|
"""Context manager entry."""
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
"""Context manager exit."""
|
|
self.close()
|
|
|
|
|
|
# For backward compatibility
|
|
Client = SimStudioClient |