* docs(sdk): update README to reflect new interface * improvement(docs): add quick reference page and update SDK documentation * docs(copilot): update copilot documentation with all features
13 KiB
Sim Python SDK
The official Python SDK for Sim, allowing you to execute workflows programmatically from your Python applications.
Installation
pip install simstudio-sdk
Quick Start
import os
from simstudio import SimStudioClient
# Initialize the client
client = SimStudioClient(
api_key=os.getenv("SIM_API_KEY", "your-api-key-here"),
base_url="https://sim.ai" # optional, defaults to https://sim.ai
)
# Execute a workflow
try:
result = client.execute_workflow("workflow-id")
print("Workflow executed successfully:", result)
except Exception as error:
print("Workflow execution failed:", error)
API Reference
SimStudioClient
Constructor
SimStudioClient(api_key: str, base_url: str = "https://sim.ai")
api_key(str): Your Sim API keybase_url(str, optional): Base URL for the Sim API (defaults tohttps://sim.ai)
Methods
execute_workflow(workflow_id, input=None, *, timeout=30.0, stream=None, selected_outputs=None, async_execution=None)
Execute a workflow with optional input data.
# With dict input (spread at root level of request body)
result = client.execute_workflow("workflow-id", {"message": "Hello, world!"})
# With primitive input (wrapped as { input: value })
result = client.execute_workflow("workflow-id", "NVDA")
# With options (keyword-only arguments)
result = client.execute_workflow("workflow-id", {"message": "Hello"}, timeout=60.0)
Parameters:
workflow_id(str): The ID of the workflow to executeinput(any, optional): Input data to pass to the workflow. Dicts are spread at the root level, primitives/lists are wrapped in{ input: value }. File objects are automatically converted to base64.timeout(float, keyword-only): Timeout in seconds (default: 30.0)stream(bool, keyword-only): Enable streaming responsesselected_outputs(list, keyword-only): Block outputs to stream (e.g.,["agent1.content"])async_execution(bool, keyword-only): Execute asynchronously and return execution ID
Returns: WorkflowExecutionResult or AsyncExecutionResult
get_workflow_status(workflow_id)
Get the status of a workflow (deployment status, etc.).
status = client.get_workflow_status("workflow-id")
print("Is deployed:", status.is_deployed)
Parameters:
workflow_id(str): The ID of the workflow
Returns: WorkflowStatus
validate_workflow(workflow_id)
Validate that a workflow is ready for execution.
is_ready = client.validate_workflow("workflow-id")
if is_ready:
# Workflow is deployed and ready
pass
Parameters:
workflow_id(str): The ID of the workflow
Returns: bool
execute_workflow_sync(workflow_id, input=None, *, timeout=30.0, stream=None, selected_outputs=None)
Execute a workflow synchronously (ensures non-async mode).
result = client.execute_workflow_sync("workflow-id", {"data": "some input"}, timeout=60.0)
Parameters:
workflow_id(str): The ID of the workflow to executeinput(any, optional): Input data to pass to the workflowtimeout(float, keyword-only): Timeout in seconds (default: 30.0)stream(bool, keyword-only): Enable streaming responsesselected_outputs(list, keyword-only): Block outputs to stream (e.g.,["agent1.content"])
Returns: WorkflowExecutionResult
get_job_status(task_id)
Get the status of an async job.
status = client.get_job_status("task-id-from-async-execution")
print("Job status:", status)
Parameters:
task_id(str): The task ID returned from async execution
Returns: dict
execute_with_retry(workflow_id, input=None, *, timeout=30.0, stream=None, selected_outputs=None, async_execution=None, max_retries=3, initial_delay=1.0, max_delay=30.0, backoff_multiplier=2.0)
Execute a workflow with automatic retry on rate limit errors.
result = client.execute_with_retry(
"workflow-id",
{"message": "Hello"},
timeout=30.0,
max_retries=3,
initial_delay=1.0,
max_delay=30.0,
backoff_multiplier=2.0
)
Parameters:
workflow_id(str): The ID of the workflow to executeinput(any, optional): Input data to pass to the workflowtimeout(float, keyword-only): Timeout in seconds (default: 30.0)stream(bool, keyword-only): Enable streaming responsesselected_outputs(list, keyword-only): Block outputs to streamasync_execution(bool, keyword-only): Execute asynchronouslymax_retries(int, keyword-only): Maximum retry attempts (default: 3)initial_delay(float, keyword-only): Initial delay in seconds (default: 1.0)max_delay(float, keyword-only): Maximum delay in seconds (default: 30.0)backoff_multiplier(float, keyword-only): Backoff multiplier (default: 2.0)
Returns: WorkflowExecutionResult or AsyncExecutionResult
get_rate_limit_info()
Get current rate limit information from the last API response.
rate_info = client.get_rate_limit_info()
if rate_info:
print("Remaining requests:", rate_info.remaining)
Returns: RateLimitInfo or None
get_usage_limits()
Get current usage limits and quota information.
limits = client.get_usage_limits()
print("Current usage:", limits.usage)
Returns: UsageLimits
set_api_key(api_key)
Update the API key.
client.set_api_key("new-api-key")
set_base_url(base_url)
Update the base URL.
client.set_base_url("https://my-custom-domain.com")
close()
Close the underlying HTTP session.
client.close()
Data Classes
WorkflowExecutionResult
@dataclass
class WorkflowExecutionResult:
success: bool
output: Optional[Any] = None
error: Optional[str] = None
logs: Optional[list] = None
metadata: Optional[Dict[str, Any]] = None
trace_spans: Optional[list] = None
total_duration: Optional[float] = None
WorkflowStatus
@dataclass
class WorkflowStatus:
is_deployed: bool
deployed_at: Optional[str] = None
needs_redeployment: bool = False
SimStudioError
class SimStudioError(Exception):
def __init__(self, message: str, code: Optional[str] = None, status: Optional[int] = None):
super().__init__(message)
self.code = code
self.status = status
AsyncExecutionResult
@dataclass
class AsyncExecutionResult:
success: bool
task_id: str
status: str # 'queued'
created_at: str
links: Dict[str, str]
RateLimitInfo
@dataclass
class RateLimitInfo:
limit: int
remaining: int
reset: int
retry_after: Optional[int] = None
UsageLimits
@dataclass
class UsageLimits:
success: bool
rate_limit: Dict[str, Any]
usage: Dict[str, Any]
Examples
Basic Workflow Execution
import os
from simstudio import SimStudioClient
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def run_workflow():
try:
# Check if workflow is ready
is_ready = client.validate_workflow("my-workflow-id")
if not is_ready:
raise Exception("Workflow is not deployed or ready")
# Execute the workflow
result = client.execute_workflow(
"my-workflow-id",
{
"message": "Process this data",
"user_id": "12345"
}
)
if result.success:
print("Output:", result.output)
print("Duration:", result.metadata.get("duration") if result.metadata else None)
else:
print("Workflow failed:", result.error)
except Exception as error:
print("Error:", error)
run_workflow()
Error Handling
from simstudio import SimStudioClient, SimStudioError
import os
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_with_error_handling():
try:
result = client.execute_workflow("workflow-id")
return result
except SimStudioError as error:
if error.code == "UNAUTHORIZED":
print("Invalid API key")
elif error.code == "TIMEOUT":
print("Workflow execution timed out")
elif error.code == "USAGE_LIMIT_EXCEEDED":
print("Usage limit exceeded")
elif error.code == "INVALID_JSON":
print("Invalid JSON in request body")
else:
print(f"Workflow error: {error}")
raise
except Exception as error:
print(f"Unexpected error: {error}")
raise
Context Manager Usage
from simstudio import SimStudioClient
import os
# Using context manager to automatically close the session
with SimStudioClient(api_key=os.getenv("SIM_API_KEY")) as client:
result = client.execute_workflow("workflow-id")
print("Result:", result)
# Session is automatically closed here
Environment Configuration
import os
from simstudio import SimStudioClient
# Using environment variables
client = SimStudioClient(
api_key=os.getenv("SIM_API_KEY"),
base_url=os.getenv("SIM_BASE_URL", "https://sim.ai")
)
File Upload
File objects are automatically detected and converted to base64 format. Include them in your input under the field name matching your workflow's API trigger input format:
The SDK converts file objects to this format:
{
'type': 'file',
'data': 'data:mime/type;base64,base64data',
'name': 'filename',
'mime': 'mime/type'
}
Alternatively, you can manually provide files using the URL format:
{
'type': 'url',
'data': 'https://example.com/file.pdf',
'name': 'file.pdf',
'mime': 'application/pdf'
}
from simstudio import SimStudioClient
import os
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
# Upload a single file - include it under the field name from your API trigger
with open('document.pdf', 'rb') as f:
result = client.execute_workflow(
'workflow-id',
{
'documents': [f], # Must match your workflow's "files" field name
'instructions': 'Analyze this document'
}
)
# Upload multiple files
with open('doc1.pdf', 'rb') as f1, open('doc2.pdf', 'rb') as f2:
result = client.execute_workflow(
'workflow-id',
{
'attachments': [f1, f2], # Must match your workflow's "files" field name
'query': 'Compare these documents'
}
)
Batch Workflow Execution
from simstudio import SimStudioClient
import os
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_workflows_batch(workflow_data_pairs):
"""Execute multiple workflows with different input data."""
results = []
for workflow_id, workflow_input in workflow_data_pairs:
try:
# Validate workflow before execution
if not client.validate_workflow(workflow_id):
print(f"Skipping {workflow_id}: not deployed")
continue
result = client.execute_workflow(workflow_id, workflow_input)
results.append({
"workflow_id": workflow_id,
"success": result.success,
"output": result.output,
"error": result.error
})
except Exception as error:
results.append({
"workflow_id": workflow_id,
"success": False,
"error": str(error)
})
return results
# Example usage
workflows = [
("workflow-1", {"type": "analysis", "data": "sample1"}),
("workflow-2", {"type": "processing", "data": "sample2"}),
]
results = execute_workflows_batch(workflows)
for result in results:
print(f"Workflow {result['workflow_id']}: {'Success' if result['success'] else 'Failed'}")
Getting Your API Key
- Log in to your Sim account
- Navigate to your workflow
- Click on "Deploy" to deploy your workflow
- Select or create an API key during the deployment process
- Copy the API key to use in your application
Development
Running Tests
To run the tests locally:
-
Clone the repository and navigate to the Python SDK directory:
cd packages/python-sdk -
Create and activate a virtual environment:
python3 -m venv venv source venv/bin/activate # On Windows: venv\Scripts\activate -
Install the package in development mode with test dependencies:
pip install -e ".[dev]" -
Run the tests:
pytest tests/ -v
Code Quality
Run code quality checks:
# Code formatting
black simstudio/
# Linting
flake8 simstudio/ --max-line-length=100
# Type checking
mypy simstudio/
# Import sorting
isort simstudio/
Requirements
- Python 3.8+
- requests >= 2.25.0
License
Apache-2.0