mirror of
https://github.com/simstudioai/sim.git
synced 2026-04-28 03:00:29 -04:00
* fix(misc): remove duplicate docs page, update clopus 4.7 * fix(docs): consolidate duplicate docs and fix SDK API signatures - Remove duplicate custom-tools page (custom-tools/index.mdx → tools/custom-tools.mdx is canonical) - Remove comparison table from custom-tools per product preference - Fix permissions inconsistency: delete now requires Admin across all docs - Consolidate sdks/ into api-reference/ (sdks/ directory deleted) - Fix Python SDK docs: correct param is `input`, not `input_data` - Fix TypeScript SDK docs: correct signature is executeWorkflow(id, input, options) not options-object form - Add FAQ sections to both SDK reference pages * fix(docs): update SDKs card links from /sdks to /api-reference * fix(docs): update /sdks references to /api-reference in llms.txt files
772 lines
23 KiB
Plaintext
772 lines
23 KiB
Plaintext
---
|
|
title: Python
|
|
---
|
|
|
|
import { Callout } from 'fumadocs-ui/components/callout'
|
|
import { Card, Cards } from 'fumadocs-ui/components/card'
|
|
import { Step, Steps } from 'fumadocs-ui/components/steps'
|
|
import { Tab, Tabs } from 'fumadocs-ui/components/tabs'
|
|
|
|
The official Python SDK for Sim allows you to execute workflows programmatically from your Python applications using the official Python SDK.
|
|
|
|
<Callout type="info">
|
|
The Python SDK supports Python 3.8+ with async execution support, automatic rate limiting with exponential backoff, and usage tracking.
|
|
</Callout>
|
|
|
|
## Installation
|
|
|
|
Install the SDK using pip:
|
|
|
|
```bash
|
|
pip install simstudio-sdk
|
|
```
|
|
|
|
## Quick Start
|
|
|
|
Here's a simple example to get you started:
|
|
|
|
```python
|
|
from simstudio import SimStudioClient
|
|
|
|
# Initialize the client
|
|
client = SimStudioClient(
|
|
api_key="your-api-key-here",
|
|
base_url="https://sim.ai" # optional, defaults to https://sim.ai
|
|
)
|
|
|
|
# Execute a workflow
|
|
try:
|
|
result = client.execute_workflow("workflow-id")
|
|
print("Workflow executed successfully:", result)
|
|
except Exception as error:
|
|
print("Workflow execution failed:", error)
|
|
```
|
|
|
|
## API Reference
|
|
|
|
### SimStudioClient
|
|
|
|
#### Constructor
|
|
|
|
```python
|
|
SimStudioClient(api_key: str, base_url: str = "https://sim.ai")
|
|
```
|
|
|
|
**Parameters:**
|
|
- `api_key` (str): Your Sim API key
|
|
- `base_url` (str, optional): Base URL for the Sim API
|
|
|
|
#### Methods
|
|
|
|
##### execute_workflow()
|
|
|
|
Execute a workflow with optional input data.
|
|
|
|
```python
|
|
result = client.execute_workflow(
|
|
"workflow-id",
|
|
input={"message": "Hello, world!"},
|
|
timeout=30.0 # 30 seconds
|
|
)
|
|
```
|
|
|
|
**Parameters:**
|
|
- `workflow_id` (str): The ID of the workflow to execute
|
|
- `input` (dict, optional): Input data to pass to the workflow
|
|
- `timeout` (float, optional): Timeout in seconds (default: 30.0)
|
|
- `stream` (bool, optional): Enable streaming responses (default: False)
|
|
- `selected_outputs` (list[str], optional): Block outputs to stream in `blockName.attribute` format (e.g., `["agent1.content"]`)
|
|
- `async_execution` (bool, optional): Execute asynchronously (default: False)
|
|
|
|
**Returns:** `WorkflowExecutionResult | AsyncExecutionResult`
|
|
|
|
When `async_execution=True`, returns immediately with a task ID for polling. Otherwise, waits for completion.
|
|
|
|
##### get_workflow_status()
|
|
|
|
Get the status of a workflow (deployment status, etc.).
|
|
|
|
```python
|
|
status = client.get_workflow_status("workflow-id")
|
|
print("Is deployed:", status.is_deployed)
|
|
```
|
|
|
|
**Parameters:**
|
|
- `workflow_id` (str): The ID of the workflow
|
|
|
|
**Returns:** `WorkflowStatus`
|
|
|
|
##### validate_workflow()
|
|
|
|
Validate that a workflow is ready for execution.
|
|
|
|
```python
|
|
is_ready = client.validate_workflow("workflow-id")
|
|
if is_ready:
|
|
# Workflow is deployed and ready
|
|
pass
|
|
```
|
|
|
|
**Parameters:**
|
|
- `workflow_id` (str): The ID of the workflow
|
|
|
|
**Returns:** `bool`
|
|
|
|
##### get_job_status()
|
|
|
|
Get the status of an async job execution.
|
|
|
|
```python
|
|
status = client.get_job_status("task-id-from-async-execution")
|
|
print("Status:", status["status"]) # 'queued', 'processing', 'completed', 'failed'
|
|
if status["status"] == "completed":
|
|
print("Output:", status["output"])
|
|
```
|
|
|
|
**Parameters:**
|
|
- `task_id` (str): The task ID returned from async execution
|
|
|
|
**Returns:** `Dict[str, Any]`
|
|
|
|
**Response fields:**
|
|
- `success` (bool): Whether the request was successful
|
|
- `taskId` (str): The task ID
|
|
- `status` (str): One of `'queued'`, `'processing'`, `'completed'`, `'failed'`, `'cancelled'`
|
|
- `metadata` (dict): Contains `startedAt`, `completedAt`, and `duration`
|
|
- `output` (any, optional): The workflow output (when completed)
|
|
- `error` (any, optional): Error details (when failed)
|
|
- `estimatedDuration` (int, optional): Estimated duration in milliseconds (when processing/queued)
|
|
|
|
##### execute_with_retry()
|
|
|
|
Execute a workflow with automatic retry on rate limit errors using exponential backoff.
|
|
|
|
```python
|
|
result = client.execute_with_retry(
|
|
"workflow-id",
|
|
input={"message": "Hello"},
|
|
timeout=30.0,
|
|
max_retries=3, # Maximum number of retries
|
|
initial_delay=1.0, # Initial delay in seconds
|
|
max_delay=30.0, # Maximum delay in seconds
|
|
backoff_multiplier=2.0 # Exponential backoff multiplier
|
|
)
|
|
```
|
|
|
|
**Parameters:**
|
|
- `workflow_id` (str): The ID of the workflow to execute
|
|
- `input` (dict, optional): Input data to pass to the workflow
|
|
- `timeout` (float, optional): Timeout in seconds
|
|
- `stream` (bool, optional): Enable streaming responses
|
|
- `selected_outputs` (list, optional): Block outputs to stream
|
|
- `async_execution` (bool, optional): Execute asynchronously
|
|
- `max_retries` (int, optional): Maximum number of retries (default: 3)
|
|
- `initial_delay` (float, optional): Initial delay in seconds (default: 1.0)
|
|
- `max_delay` (float, optional): Maximum delay in seconds (default: 30.0)
|
|
- `backoff_multiplier` (float, optional): Backoff multiplier (default: 2.0)
|
|
|
|
**Returns:** `WorkflowExecutionResult | AsyncExecutionResult`
|
|
|
|
The retry logic uses exponential backoff (1s → 2s → 4s → 8s...) with ±25% jitter to prevent thundering herd. If the API provides a `retry-after` header, it will be used instead.
|
|
|
|
##### get_rate_limit_info()
|
|
|
|
Get the current rate limit information from the last API response.
|
|
|
|
```python
|
|
rate_limit_info = client.get_rate_limit_info()
|
|
if rate_limit_info:
|
|
print("Limit:", rate_limit_info.limit)
|
|
print("Remaining:", rate_limit_info.remaining)
|
|
print("Reset:", datetime.fromtimestamp(rate_limit_info.reset))
|
|
```
|
|
|
|
**Returns:** `RateLimitInfo | None`
|
|
|
|
##### get_usage_limits()
|
|
|
|
Get current usage limits and quota information for your account.
|
|
|
|
```python
|
|
limits = client.get_usage_limits()
|
|
print("Sync requests remaining:", limits.rate_limit["sync"]["remaining"])
|
|
print("Async requests remaining:", limits.rate_limit["async"]["remaining"])
|
|
print("Current period cost:", limits.usage["currentPeriodCost"])
|
|
print("Plan:", limits.usage["plan"])
|
|
```
|
|
|
|
**Returns:** `UsageLimits`
|
|
|
|
**Response structure:**
|
|
```python
|
|
{
|
|
"success": bool,
|
|
"rateLimit": {
|
|
"sync": {
|
|
"isLimited": bool,
|
|
"limit": int,
|
|
"remaining": int,
|
|
"resetAt": str
|
|
},
|
|
"async": {
|
|
"isLimited": bool,
|
|
"limit": int,
|
|
"remaining": int,
|
|
"resetAt": str
|
|
},
|
|
"authType": str # 'api' or 'manual'
|
|
},
|
|
"usage": {
|
|
"currentPeriodCost": float,
|
|
"limit": float,
|
|
"plan": str # e.g., 'free', 'pro'
|
|
}
|
|
}
|
|
```
|
|
|
|
##### set_api_key()
|
|
|
|
Update the API key.
|
|
|
|
```python
|
|
client.set_api_key("new-api-key")
|
|
```
|
|
|
|
##### set_base_url()
|
|
|
|
Update the base URL.
|
|
|
|
```python
|
|
client.set_base_url("https://my-custom-domain.com")
|
|
```
|
|
|
|
##### close()
|
|
|
|
Close the underlying HTTP session.
|
|
|
|
```python
|
|
client.close()
|
|
```
|
|
|
|
## Data Classes
|
|
|
|
### WorkflowExecutionResult
|
|
|
|
```python
|
|
@dataclass
|
|
class WorkflowExecutionResult:
|
|
success: bool
|
|
output: Optional[Any] = None
|
|
error: Optional[str] = None
|
|
logs: Optional[List[Any]] = None
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
trace_spans: Optional[List[Any]] = None
|
|
total_duration: Optional[float] = None
|
|
```
|
|
|
|
### AsyncExecutionResult
|
|
|
|
```python
|
|
@dataclass
|
|
class AsyncExecutionResult:
|
|
success: bool
|
|
task_id: str
|
|
status: str # 'queued'
|
|
created_at: str
|
|
links: Dict[str, str] # e.g., {"status": "/api/jobs/{taskId}"}
|
|
```
|
|
|
|
### WorkflowStatus
|
|
|
|
```python
|
|
@dataclass
|
|
class WorkflowStatus:
|
|
is_deployed: bool
|
|
deployed_at: Optional[str] = None
|
|
needs_redeployment: bool = False
|
|
```
|
|
|
|
### RateLimitInfo
|
|
|
|
```python
|
|
@dataclass
|
|
class RateLimitInfo:
|
|
limit: int
|
|
remaining: int
|
|
reset: int
|
|
retry_after: Optional[int] = None
|
|
```
|
|
|
|
### UsageLimits
|
|
|
|
```python
|
|
@dataclass
|
|
class UsageLimits:
|
|
success: bool
|
|
rate_limit: Dict[str, Any]
|
|
usage: Dict[str, Any]
|
|
```
|
|
|
|
### SimStudioError
|
|
|
|
```python
|
|
class SimStudioError(Exception):
|
|
def __init__(self, message: str, code: Optional[str] = None, status: Optional[int] = None):
|
|
super().__init__(message)
|
|
self.code = code
|
|
self.status = status
|
|
```
|
|
|
|
**Common error codes:**
|
|
- `UNAUTHORIZED`: Invalid API key
|
|
- `TIMEOUT`: Request timed out
|
|
- `RATE_LIMIT_EXCEEDED`: Rate limit exceeded
|
|
- `USAGE_LIMIT_EXCEEDED`: Usage limit exceeded
|
|
- `EXECUTION_ERROR`: Workflow execution failed
|
|
|
|
## Examples
|
|
|
|
### Basic Workflow Execution
|
|
|
|
<Steps>
|
|
<Step title="Initialize the client">
|
|
Set up the SimStudioClient with your API key.
|
|
</Step>
|
|
<Step title="Validate the workflow">
|
|
Check if the workflow is deployed and ready for execution.
|
|
</Step>
|
|
<Step title="Execute the workflow">
|
|
Run the workflow with your input data.
|
|
</Step>
|
|
<Step title="Handle the result">
|
|
Process the execution result and handle any errors.
|
|
</Step>
|
|
</Steps>
|
|
|
|
```python
|
|
import os
|
|
from simstudio import SimStudioClient
|
|
|
|
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
|
|
|
|
def run_workflow():
|
|
try:
|
|
# Check if workflow is ready
|
|
is_ready = client.validate_workflow("my-workflow-id")
|
|
if not is_ready:
|
|
raise Exception("Workflow is not deployed or ready")
|
|
|
|
# Execute the workflow
|
|
result = client.execute_workflow(
|
|
"my-workflow-id",
|
|
input={
|
|
"message": "Process this data",
|
|
"user_id": "12345"
|
|
}
|
|
)
|
|
|
|
if result.success:
|
|
print("Output:", result.output)
|
|
print("Duration:", result.metadata.get("duration") if result.metadata else None)
|
|
else:
|
|
print("Workflow failed:", result.error)
|
|
|
|
except Exception as error:
|
|
print("Error:", error)
|
|
|
|
run_workflow()
|
|
```
|
|
|
|
### Error Handling
|
|
|
|
Handle different types of errors that may occur during workflow execution:
|
|
|
|
```python
|
|
from simstudio import SimStudioClient, SimStudioError
|
|
import os
|
|
|
|
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
|
|
|
|
def execute_with_error_handling():
|
|
try:
|
|
result = client.execute_workflow("workflow-id")
|
|
return result
|
|
except SimStudioError as error:
|
|
if error.code == "UNAUTHORIZED":
|
|
print("Invalid API key")
|
|
elif error.code == "TIMEOUT":
|
|
print("Workflow execution timed out")
|
|
elif error.code == "USAGE_LIMIT_EXCEEDED":
|
|
print("Usage limit exceeded")
|
|
elif error.code == "INVALID_JSON":
|
|
print("Invalid JSON in request body")
|
|
else:
|
|
print(f"Workflow error: {error}")
|
|
raise
|
|
except Exception as error:
|
|
print(f"Unexpected error: {error}")
|
|
raise
|
|
```
|
|
|
|
### Context Manager Usage
|
|
|
|
Use the client as a context manager to automatically handle resource cleanup:
|
|
|
|
```python
|
|
from simstudio import SimStudioClient
|
|
import os
|
|
|
|
# Using context manager to automatically close the session
|
|
with SimStudioClient(api_key=os.getenv("SIM_API_KEY")) as client:
|
|
result = client.execute_workflow("workflow-id")
|
|
print("Result:", result)
|
|
# Session is automatically closed here
|
|
```
|
|
|
|
### Batch Workflow Execution
|
|
|
|
Execute multiple workflows efficiently:
|
|
|
|
```python
|
|
from simstudio import SimStudioClient
|
|
import os
|
|
|
|
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
|
|
|
|
def execute_workflows_batch(workflow_data_pairs):
|
|
"""Execute multiple workflows with different input data."""
|
|
results = []
|
|
|
|
for workflow_id, input_data in workflow_data_pairs:
|
|
try:
|
|
# Validate workflow before execution
|
|
if not client.validate_workflow(workflow_id):
|
|
print(f"Skipping {workflow_id}: not deployed")
|
|
continue
|
|
|
|
result = client.execute_workflow(workflow_id, input_data)
|
|
results.append({
|
|
"workflow_id": workflow_id,
|
|
"success": result.success,
|
|
"output": result.output,
|
|
"error": result.error
|
|
})
|
|
|
|
except Exception as error:
|
|
results.append({
|
|
"workflow_id": workflow_id,
|
|
"success": False,
|
|
"error": str(error)
|
|
})
|
|
|
|
return results
|
|
|
|
# Example usage
|
|
workflows = [
|
|
("workflow-1", {"type": "analysis", "data": "sample1"}),
|
|
("workflow-2", {"type": "processing", "data": "sample2"}),
|
|
]
|
|
|
|
results = execute_workflows_batch(workflows)
|
|
for result in results:
|
|
print(f"Workflow {result['workflow_id']}: {'Success' if result['success'] else 'Failed'}")
|
|
```
|
|
|
|
### Async Workflow Execution
|
|
|
|
Execute workflows asynchronously for long-running tasks:
|
|
|
|
```python
|
|
import os
|
|
import time
|
|
from simstudio import SimStudioClient
|
|
|
|
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
|
|
|
|
def execute_async():
|
|
try:
|
|
# Start async execution
|
|
result = client.execute_workflow(
|
|
"workflow-id",
|
|
input={"data": "large dataset"},
|
|
async_execution=True # Execute asynchronously
|
|
)
|
|
|
|
# Check if result is an async execution
|
|
if hasattr(result, 'task_id'):
|
|
print(f"Task ID: {result.task_id}")
|
|
print(f"Status endpoint: {result.links['status']}")
|
|
|
|
# Poll for completion
|
|
status = client.get_job_status(result.task_id)
|
|
|
|
while status["status"] in ["queued", "processing"]:
|
|
print(f"Current status: {status['status']}")
|
|
time.sleep(2) # Wait 2 seconds
|
|
status = client.get_job_status(result.task_id)
|
|
|
|
if status["status"] == "completed":
|
|
print("Workflow completed!")
|
|
print(f"Output: {status['output']}")
|
|
print(f"Duration: {status['metadata']['duration']}")
|
|
else:
|
|
print(f"Workflow failed: {status['error']}")
|
|
|
|
except Exception as error:
|
|
print(f"Error: {error}")
|
|
|
|
execute_async()
|
|
```
|
|
|
|
### Rate Limiting and Retry
|
|
|
|
Handle rate limits automatically with exponential backoff:
|
|
|
|
```python
|
|
import os
|
|
from simstudio import SimStudioClient, SimStudioError
|
|
|
|
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
|
|
|
|
def execute_with_retry_handling():
|
|
try:
|
|
# Automatically retries on rate limit
|
|
result = client.execute_with_retry(
|
|
"workflow-id",
|
|
input={"message": "Process this"},
|
|
max_retries=5,
|
|
initial_delay=1.0,
|
|
max_delay=60.0,
|
|
backoff_multiplier=2.0
|
|
)
|
|
|
|
print(f"Success: {result}")
|
|
except SimStudioError as error:
|
|
if error.code == "RATE_LIMIT_EXCEEDED":
|
|
print("Rate limit exceeded after all retries")
|
|
|
|
# Check rate limit info
|
|
rate_limit_info = client.get_rate_limit_info()
|
|
if rate_limit_info:
|
|
from datetime import datetime
|
|
reset_time = datetime.fromtimestamp(rate_limit_info.reset)
|
|
print(f"Rate limit resets at: {reset_time}")
|
|
|
|
execute_with_retry_handling()
|
|
```
|
|
|
|
### Usage Monitoring
|
|
|
|
Monitor your account usage and limits:
|
|
|
|
```python
|
|
import os
|
|
from simstudio import SimStudioClient
|
|
|
|
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
|
|
|
|
def check_usage():
|
|
try:
|
|
limits = client.get_usage_limits()
|
|
|
|
print("=== Rate Limits ===")
|
|
print("Sync requests:")
|
|
print(f" Limit: {limits.rate_limit['sync']['limit']}")
|
|
print(f" Remaining: {limits.rate_limit['sync']['remaining']}")
|
|
print(f" Resets at: {limits.rate_limit['sync']['resetAt']}")
|
|
print(f" Is limited: {limits.rate_limit['sync']['isLimited']}")
|
|
|
|
print("\nAsync requests:")
|
|
print(f" Limit: {limits.rate_limit['async']['limit']}")
|
|
print(f" Remaining: {limits.rate_limit['async']['remaining']}")
|
|
print(f" Resets at: {limits.rate_limit['async']['resetAt']}")
|
|
print(f" Is limited: {limits.rate_limit['async']['isLimited']}")
|
|
|
|
print("\n=== Usage ===")
|
|
print(f"Current period cost: ${limits.usage['currentPeriodCost']:.2f}")
|
|
print(f"Limit: ${limits.usage['limit']:.2f}")
|
|
print(f"Plan: {limits.usage['plan']}")
|
|
|
|
percent_used = (limits.usage['currentPeriodCost'] / limits.usage['limit']) * 100
|
|
print(f"Usage: {percent_used:.1f}%")
|
|
|
|
if percent_used > 80:
|
|
print("⚠️ Warning: You are approaching your usage limit!")
|
|
|
|
except Exception as error:
|
|
print(f"Error checking usage: {error}")
|
|
|
|
check_usage()
|
|
```
|
|
|
|
### Streaming Workflow Execution
|
|
|
|
Execute workflows with real-time streaming responses:
|
|
|
|
```python
|
|
from simstudio import SimStudioClient
|
|
import os
|
|
|
|
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
|
|
|
|
def execute_with_streaming():
|
|
"""Execute workflow with streaming enabled."""
|
|
try:
|
|
# Enable streaming for specific block outputs
|
|
result = client.execute_workflow(
|
|
"workflow-id",
|
|
input={"message": "Count to five"},
|
|
stream=True,
|
|
selected_outputs=["agent1.content"] # Use blockName.attribute format
|
|
)
|
|
|
|
print("Workflow result:", result)
|
|
except Exception as error:
|
|
print("Error:", error)
|
|
|
|
execute_with_streaming()
|
|
```
|
|
|
|
The streaming response follows the Server-Sent Events (SSE) format:
|
|
|
|
```
|
|
data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":"One"}
|
|
|
|
data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":", two"}
|
|
|
|
data: {"event":"done","success":true,"output":{},"metadata":{"duration":610}}
|
|
|
|
data: [DONE]
|
|
```
|
|
|
|
**Flask Streaming Example:**
|
|
|
|
```python
|
|
from flask import Flask, Response, stream_with_context
|
|
import requests
|
|
import json
|
|
import os
|
|
|
|
app = Flask(__name__)
|
|
|
|
@app.route('/stream-workflow')
|
|
def stream_workflow():
|
|
"""Stream workflow execution to the client."""
|
|
|
|
def generate():
|
|
response = requests.post(
|
|
'https://sim.ai/api/workflows/WORKFLOW_ID/execute',
|
|
headers={
|
|
'Content-Type': 'application/json',
|
|
'X-API-Key': os.getenv('SIM_API_KEY')
|
|
},
|
|
json={
|
|
'message': 'Generate a story',
|
|
'stream': True,
|
|
'selectedOutputs': ['agent1.content']
|
|
},
|
|
stream=True
|
|
)
|
|
|
|
for line in response.iter_lines():
|
|
if line:
|
|
decoded_line = line.decode('utf-8')
|
|
if decoded_line.startswith('data: '):
|
|
data = decoded_line[6:] # Remove 'data: ' prefix
|
|
|
|
if data == '[DONE]':
|
|
break
|
|
|
|
try:
|
|
parsed = json.loads(data)
|
|
if 'chunk' in parsed:
|
|
yield f"data: {json.dumps(parsed)}\n\n"
|
|
elif parsed.get('event') == 'done':
|
|
yield f"data: {json.dumps(parsed)}\n\n"
|
|
print("Execution complete:", parsed.get('metadata'))
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
return Response(
|
|
stream_with_context(generate()),
|
|
mimetype='text/event-stream'
|
|
)
|
|
|
|
if __name__ == '__main__':
|
|
app.run(debug=True)
|
|
```
|
|
|
|
### Environment Configuration
|
|
|
|
Configure the client using environment variables:
|
|
|
|
<Tabs items={['Development', 'Production']}>
|
|
<Tab value="Development">
|
|
```python
|
|
import os
|
|
from simstudio import SimStudioClient
|
|
|
|
# Development configuration
|
|
client = SimStudioClient(
|
|
api_key=os.getenv("SIM_API_KEY")
|
|
base_url=os.getenv("SIM_BASE_URL", "https://sim.ai")
|
|
)
|
|
```
|
|
</Tab>
|
|
<Tab value="Production">
|
|
```python
|
|
import os
|
|
from simstudio import SimStudioClient
|
|
|
|
# Production configuration with error handling
|
|
api_key = os.getenv("SIM_API_KEY")
|
|
if not api_key:
|
|
raise ValueError("SIM_API_KEY environment variable is required")
|
|
|
|
client = SimStudioClient(
|
|
api_key=api_key,
|
|
base_url=os.getenv("SIM_BASE_URL", "https://sim.ai")
|
|
)
|
|
```
|
|
</Tab>
|
|
</Tabs>
|
|
|
|
## Getting Your API Key
|
|
|
|
<Steps>
|
|
<Step title="Log in to Sim">
|
|
Navigate to [Sim](https://sim.ai) and log in to your account.
|
|
</Step>
|
|
<Step title="Open your workflow">
|
|
Navigate to the workflow you want to execute programmatically.
|
|
</Step>
|
|
<Step title="Deploy your workflow">
|
|
Click on "Deploy" to deploy your workflow if it hasn't been deployed yet.
|
|
</Step>
|
|
<Step title="Create or select an API key">
|
|
During the deployment process, select or create an API key.
|
|
</Step>
|
|
<Step title="Copy the API key">
|
|
Copy the API key to use in your Python application.
|
|
</Step>
|
|
</Steps>
|
|
|
|
## Requirements
|
|
|
|
- Python 3.8+
|
|
- requests >= 2.25.0
|
|
|
|
## License
|
|
|
|
Apache-2.0
|
|
|
|
import { FAQ } from '@/components/ui/faq'
|
|
|
|
<FAQ items={[
|
|
{ question: "Do I need to deploy a workflow before I can execute it via the SDK?", answer: "Yes. Workflows must be deployed before they can be executed through the SDK. You can use the validate_workflow() method to check whether a workflow is deployed and ready. If it returns False, deploy the workflow from the Sim UI first and create or select an API key during deployment." },
|
|
{ question: "What is the difference between sync and async execution?", answer: "Sync execution (the default) blocks until the workflow completes and returns the full result. Async execution (async_execution=True) returns immediately with a task ID that you can poll using get_job_status(). Use async mode for long-running workflows to avoid request timeouts. Async job statuses include queued, processing, completed, failed, and cancelled." },
|
|
{ question: "How does the SDK handle rate limiting?", answer: "The SDK provides built-in rate limiting support through the execute_with_retry() method. It uses exponential backoff (1s, 2s, 4s, 8s...) with 25% jitter to avoid thundering herd problems. If the API returns a retry-after header, that value is used instead. You can configure max_retries, initial_delay, max_delay, and backoff_multiplier. Use get_rate_limit_info() to check your current rate limit status." },
|
|
{ question: "Can I use the Python SDK as a context manager?", answer: "Yes. The SimStudioClient supports Python's context manager protocol. Use it with the 'with' statement to automatically close the underlying HTTP session when you are done, which is especially useful for scripts that create and discard client instances." },
|
|
{ question: "How do I handle different types of errors from the SDK?", answer: "The SDK raises SimStudioError with a code property for API-specific errors. Common error codes are UNAUTHORIZED (invalid API key), TIMEOUT (request timed out), RATE_LIMIT_EXCEEDED (too many requests), USAGE_LIMIT_EXCEEDED (billing limit reached), and EXECUTION_ERROR (workflow failed). Use the error code to implement targeted error handling and recovery logic." },
|
|
{ question: "How do I monitor my API usage and remaining quota?", answer: "Use the get_usage_limits() method to check your current usage. It returns sync and async rate limit details (limit, remaining, reset time, whether you are currently limited), plus your current period cost, usage limit, and plan tier. This lets you monitor consumption and alert before hitting limits." },
|
|
]} /> |