Files
sim/apps/docs/content/docs/zh/sdks/python.mdx
Vikhyath Mondreti fe9ebbf81b improvement(api-keys): move to workspace level (#1765)
* fix(billing): should allow restoring subscription (#1728)

* fix(already-cancelled-sub): UI should allow restoring subscription

* restore functionality fixed

* fix

* improvement(api-keys): move to workspace level

* remove migration to prep merge

* remove two more unused cols

* prep staging  merge

* add migration back

---------

Co-authored-by: Waleed <walif6@gmail.com>
Co-authored-by: Siddharth Ganesan <33737564+Sg312@users.noreply.github.com>
2025-10-30 11:42:58 -07:00

766 lines
20 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
---
title: Python
---
import { Callout } from 'fumadocs-ui/components/callout'
import { Card, Cards } from 'fumadocs-ui/components/card'
import { Step, Steps } from 'fumadocs-ui/components/steps'
import { Tab, Tabs } from 'fumadocs-ui/components/tabs'
官方的 Python SDK 允许您通过 Python 应用程序以编程方式执行工作流。
<Callout type="info">
Python SDK 支持 Python 3.8+,具备异步执行支持、自动速率限制(带指数退避)以及使用情况跟踪功能。
</Callout>
## 安装
使用 pip 安装 SDK
```bash
pip install simstudio-sdk
```
## 快速开始
以下是一个简单的示例,帮助您快速入门:
```python
from simstudio import SimStudioClient
# Initialize the client
client = SimStudioClient(
api_key="your-api-key-here",
base_url="https://sim.ai" # optional, defaults to https://sim.ai
)
# Execute a workflow
try:
result = client.execute_workflow("workflow-id")
print("Workflow executed successfully:", result)
except Exception as error:
print("Workflow execution failed:", error)
```
## API 参考
### SimStudioClient
#### 构造函数
```python
SimStudioClient(api_key: str, base_url: str = "https://sim.ai")
```
**参数:**
- `api_key` (str): 您的 Sim API 密钥
- `base_url` (str, 可选): Sim API 的基础 URL
#### 方法
##### execute_workflow()
执行带有可选输入数据的工作流。
```python
result = client.execute_workflow(
"workflow-id",
input_data={"message": "Hello, world!"},
timeout=30.0 # 30 seconds
)
```
**参数:**
- `workflow_id` (str): 要执行的工作流 ID
- `input_data` (dict, optional): 传递给工作流的输入数据
- `timeout` (float, optional): 超时时间以秒为单位默认值30.0
- `stream` (bool, optional): 启用流式响应默认值False
- `selected_outputs` (list[str], optional): 以 `blockName.attribute` 格式阻止输出流(例如,`["agent1.content"]`
- `async_execution` (bool, optional): 异步执行默认值False
**返回值:** `WorkflowExecutionResult | AsyncExecutionResult`
当 `async_execution=True` 时,立即返回任务 ID 以供轮询。否则,等待完成。
##### get_workflow_status()
获取工作流的状态(部署状态等)。
```python
status = client.get_workflow_status("workflow-id")
print("Is deployed:", status.is_deployed)
```
**参数:**
- `workflow_id` (str): 工作流的 ID
**返回值:** `WorkflowStatus`
##### validate_workflow()
验证工作流是否已准备好执行。
```python
is_ready = client.validate_workflow("workflow-id")
if is_ready:
# Workflow is deployed and ready
pass
```
**参数:**
- `workflow_id` (str): 工作流的 ID
**返回值:** `bool`
##### get_job_status()
获取异步任务执行的状态。
```python
status = client.get_job_status("task-id-from-async-execution")
print("Status:", status["status"]) # 'queued', 'processing', 'completed', 'failed'
if status["status"] == "completed":
print("Output:", status["output"])
```
**参数:**
- `task_id` (str): 异步执行返回的任务 ID
**返回值:** `Dict[str, Any]`
**响应字段:**
- `success` (bool): 请求是否成功
- `taskId` (str): 任务 ID
- `status` (str): 可能的值包括 `'queued'`, `'processing'`, `'completed'`, `'failed'`, `'cancelled'`
- `metadata` (dict): 包含 `startedAt`, `completedAt` 和 `duration`
- `output` (any, optional): 工作流输出(完成时)
- `error` (any, optional): 错误详情(失败时)
- `estimatedDuration` (int, optional): 估计持续时间(以毫秒为单位,处理中/排队时)
##### execute_with_retry()
使用指数退避在速率限制错误上自动重试执行工作流。
```python
result = client.execute_with_retry(
"workflow-id",
input_data={"message": "Hello"},
timeout=30.0,
max_retries=3, # Maximum number of retries
initial_delay=1.0, # Initial delay in seconds
max_delay=30.0, # Maximum delay in seconds
backoff_multiplier=2.0 # Exponential backoff multiplier
)
```
**参数:**
- `workflow_id` (str): 要执行的工作流 ID
- `input_data` (dict, optional): 传递给工作流的输入数据
- `timeout` (float, optional): 超时时间(以秒为单位)
- `stream` (bool, optional): 启用流式响应
- `selected_outputs` (list, optional): 阻止输出流
- `async_execution` (bool, optional): 异步执行
- `max_retries` (int, optional): 最大重试次数默认值3
- `initial_delay` (float, optional): 初始延迟时间以秒为单位默认值1.0
- `max_delay` (float, optional): 最大延迟时间以秒为单位默认值30.0
- `backoff_multiplier` (float, optional): 退避倍数默认值2.0
**返回值:** `WorkflowExecutionResult | AsyncExecutionResult`
重试逻辑使用指数退避1 秒 → 2 秒 → 4 秒 → 8 秒...),并带有 ±25% 的抖动以防止惊群效应。如果 API 提供了 `retry-after` 标头,则会使用该标头。
##### get_rate_limit_info()
从上一次 API 响应中获取当前的速率限制信息。
```python
rate_limit_info = client.get_rate_limit_info()
if rate_limit_info:
print("Limit:", rate_limit_info.limit)
print("Remaining:", rate_limit_info.remaining)
print("Reset:", datetime.fromtimestamp(rate_limit_info.reset))
```
**返回值:** `RateLimitInfo | None`
##### get_usage_limits()
获取您的账户当前的使用限制和配额信息。
```python
limits = client.get_usage_limits()
print("Sync requests remaining:", limits.rate_limit["sync"]["remaining"])
print("Async requests remaining:", limits.rate_limit["async"]["remaining"])
print("Current period cost:", limits.usage["currentPeriodCost"])
print("Plan:", limits.usage["plan"])
```
**返回值:** `UsageLimits`
**响应结构:**
```python
{
"success": bool,
"rateLimit": {
"sync": {
"isLimited": bool,
"limit": int,
"remaining": int,
"resetAt": str
},
"async": {
"isLimited": bool,
"limit": int,
"remaining": int,
"resetAt": str
},
"authType": str # 'api' or 'manual'
},
"usage": {
"currentPeriodCost": float,
"limit": float,
"plan": str # e.g., 'free', 'pro'
}
}
```
##### set_api_key()
更新 API 密钥。
```python
client.set_api_key("new-api-key")
```
##### set_base_url()
更新基础 URL。
```python
client.set_base_url("https://my-custom-domain.com")
```
##### close()
关闭底层 HTTP 会话。
```python
client.close()
```
## 数据类
### WorkflowExecutionResult
```python
@dataclass
class WorkflowExecutionResult:
success: bool
output: Optional[Any] = None
error: Optional[str] = None
logs: Optional[List[Any]] = None
metadata: Optional[Dict[str, Any]] = None
trace_spans: Optional[List[Any]] = None
total_duration: Optional[float] = None
```
### AsyncExecutionResult
```python
@dataclass
class AsyncExecutionResult:
success: bool
task_id: str
status: str # 'queued'
created_at: str
links: Dict[str, str] # e.g., {"status": "/api/jobs/{taskId}"}
```
### WorkflowStatus
```python
@dataclass
class WorkflowStatus:
is_deployed: bool
deployed_at: Optional[str] = None
needs_redeployment: bool = False
```
### RateLimitInfo
```python
@dataclass
class RateLimitInfo:
limit: int
remaining: int
reset: int
retry_after: Optional[int] = None
```
### UsageLimits
```python
@dataclass
class UsageLimits:
success: bool
rate_limit: Dict[str, Any]
usage: Dict[str, Any]
```
### SimStudioError
```python
class SimStudioError(Exception):
def __init__(self, message: str, code: Optional[str] = None, status: Optional[int] = None):
super().__init__(message)
self.code = code
self.status = status
```
**常见错误代码:**
- `UNAUTHORIZED`: 无效的 API 密钥
- `TIMEOUT`: 请求超时
- `RATE_LIMIT_EXCEEDED`: 超出速率限制
- `USAGE_LIMIT_EXCEEDED`: 超出使用限制
- `EXECUTION_ERROR`: 工作流执行失败
## 示例
### 基本工作流执行
<Steps>
<Step title="初始化客户端">
使用您的 API 密钥设置 SimStudioClient。
</Step>
<Step title="验证工作流">
检查工作流是否已部署并准备好执行。
</Step>
<Step title="执行工作流">
使用您的输入数据运行工作流。
</Step>
<Step title="处理结果">
处理执行结果并处理任何错误。
</Step>
</Steps>
```python
import os
from simstudio import SimStudioClient
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def run_workflow():
try:
# Check if workflow is ready
is_ready = client.validate_workflow("my-workflow-id")
if not is_ready:
raise Exception("Workflow is not deployed or ready")
# Execute the workflow
result = client.execute_workflow(
"my-workflow-id",
input_data={
"message": "Process this data",
"user_id": "12345"
}
)
if result.success:
print("Output:", result.output)
print("Duration:", result.metadata.get("duration") if result.metadata else None)
else:
print("Workflow failed:", result.error)
except Exception as error:
print("Error:", error)
run_workflow()
```
### 错误处理
处理工作流执行过程中可能发生的不同类型的错误:
```python
from simstudio import SimStudioClient, SimStudioError
import os
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_with_error_handling():
try:
result = client.execute_workflow("workflow-id")
return result
except SimStudioError as error:
if error.code == "UNAUTHORIZED":
print("Invalid API key")
elif error.code == "TIMEOUT":
print("Workflow execution timed out")
elif error.code == "USAGE_LIMIT_EXCEEDED":
print("Usage limit exceeded")
elif error.code == "INVALID_JSON":
print("Invalid JSON in request body")
else:
print(f"Workflow error: {error}")
raise
except Exception as error:
print(f"Unexpected error: {error}")
raise
```
### 上下文管理器的使用
将客户端用作上下文管理器以自动处理资源清理:
```python
from simstudio import SimStudioClient
import os
# Using context manager to automatically close the session
with SimStudioClient(api_key=os.getenv("SIM_API_KEY")) as client:
result = client.execute_workflow("workflow-id")
print("Result:", result)
# Session is automatically closed here
```
### 批量工作流执行
高效地执行多个工作流:
```python
from simstudio import SimStudioClient
import os
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_workflows_batch(workflow_data_pairs):
"""Execute multiple workflows with different input data."""
results = []
for workflow_id, input_data in workflow_data_pairs:
try:
# Validate workflow before execution
if not client.validate_workflow(workflow_id):
print(f"Skipping {workflow_id}: not deployed")
continue
result = client.execute_workflow(workflow_id, input_data)
results.append({
"workflow_id": workflow_id,
"success": result.success,
"output": result.output,
"error": result.error
})
except Exception as error:
results.append({
"workflow_id": workflow_id,
"success": False,
"error": str(error)
})
return results
# Example usage
workflows = [
("workflow-1", {"type": "analysis", "data": "sample1"}),
("workflow-2", {"type": "processing", "data": "sample2"}),
]
results = execute_workflows_batch(workflows)
for result in results:
print(f"Workflow {result['workflow_id']}: {'Success' if result['success'] else 'Failed'}")
```
### 异步工作流执行
为长时间运行的任务异步执行工作流:
```python
import os
import time
from simstudio import SimStudioClient
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_async():
try:
# Start async execution
result = client.execute_workflow(
"workflow-id",
input_data={"data": "large dataset"},
async_execution=True # Execute asynchronously
)
# Check if result is an async execution
if hasattr(result, 'task_id'):
print(f"Task ID: {result.task_id}")
print(f"Status endpoint: {result.links['status']}")
# Poll for completion
status = client.get_job_status(result.task_id)
while status["status"] in ["queued", "processing"]:
print(f"Current status: {status['status']}")
time.sleep(2) # Wait 2 seconds
status = client.get_job_status(result.task_id)
if status["status"] == "completed":
print("Workflow completed!")
print(f"Output: {status['output']}")
print(f"Duration: {status['metadata']['duration']}")
else:
print(f"Workflow failed: {status['error']}")
except Exception as error:
print(f"Error: {error}")
execute_async()
```
### 速率限制与重试
通过指数退避自动处理速率限制:
```python
import os
from simstudio import SimStudioClient, SimStudioError
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_with_retry_handling():
try:
# Automatically retries on rate limit
result = client.execute_with_retry(
"workflow-id",
input_data={"message": "Process this"},
max_retries=5,
initial_delay=1.0,
max_delay=60.0,
backoff_multiplier=2.0
)
print(f"Success: {result}")
except SimStudioError as error:
if error.code == "RATE_LIMIT_EXCEEDED":
print("Rate limit exceeded after all retries")
# Check rate limit info
rate_limit_info = client.get_rate_limit_info()
if rate_limit_info:
from datetime import datetime
reset_time = datetime.fromtimestamp(rate_limit_info.reset)
print(f"Rate limit resets at: {reset_time}")
execute_with_retry_handling()
```
### 使用监控
监控您的账户使用情况和限制:
```python
import os
from simstudio import SimStudioClient
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def check_usage():
try:
limits = client.get_usage_limits()
print("=== Rate Limits ===")
print("Sync requests:")
print(f" Limit: {limits.rate_limit['sync']['limit']}")
print(f" Remaining: {limits.rate_limit['sync']['remaining']}")
print(f" Resets at: {limits.rate_limit['sync']['resetAt']}")
print(f" Is limited: {limits.rate_limit['sync']['isLimited']}")
print("\nAsync requests:")
print(f" Limit: {limits.rate_limit['async']['limit']}")
print(f" Remaining: {limits.rate_limit['async']['remaining']}")
print(f" Resets at: {limits.rate_limit['async']['resetAt']}")
print(f" Is limited: {limits.rate_limit['async']['isLimited']}")
print("\n=== Usage ===")
print(f"Current period cost: ${limits.usage['currentPeriodCost']:.2f}")
print(f"Limit: ${limits.usage['limit']:.2f}")
print(f"Plan: {limits.usage['plan']}")
percent_used = (limits.usage['currentPeriodCost'] / limits.usage['limit']) * 100
print(f"Usage: {percent_used:.1f}%")
if percent_used > 80:
print("⚠️ Warning: You are approaching your usage limit!")
except Exception as error:
print(f"Error checking usage: {error}")
check_usage()
```
### 流式工作流执行
通过实时流式响应执行工作流:
```python
from simstudio import SimStudioClient
import os
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_with_streaming():
"""Execute workflow with streaming enabled."""
try:
# Enable streaming for specific block outputs
result = client.execute_workflow(
"workflow-id",
input_data={"message": "Count to five"},
stream=True,
selected_outputs=["agent1.content"] # Use blockName.attribute format
)
print("Workflow result:", result)
except Exception as error:
print("Error:", error)
execute_with_streaming()
```
流式响应遵循服务器发送事件 (SSE) 格式:
```
data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":"One"}
data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":", two"}
data: {"event":"done","success":true,"output":{},"metadata":{"duration":610}}
data: [DONE]
```
**Flask 流式示例:**
```python
from flask import Flask, Response, stream_with_context
import requests
import json
import os
app = Flask(__name__)
@app.route('/stream-workflow')
def stream_workflow():
"""Stream workflow execution to the client."""
def generate():
response = requests.post(
'https://sim.ai/api/workflows/WORKFLOW_ID/execute',
headers={
'Content-Type': 'application/json',
'X-API-Key': os.getenv('SIM_API_KEY')
},
json={
'message': 'Generate a story',
'stream': True,
'selectedOutputs': ['agent1.content']
},
stream=True
)
for line in response.iter_lines():
if line:
decoded_line = line.decode('utf-8')
if decoded_line.startswith('data: '):
data = decoded_line[6:] # Remove 'data: ' prefix
if data == '[DONE]':
break
try:
parsed = json.loads(data)
if 'chunk' in parsed:
yield f"data: {json.dumps(parsed)}\n\n"
elif parsed.get('event') == 'done':
yield f"data: {json.dumps(parsed)}\n\n"
print("Execution complete:", parsed.get('metadata'))
except json.JSONDecodeError:
pass
return Response(
stream_with_context(generate()),
mimetype='text/event-stream'
)
if __name__ == '__main__':
app.run(debug=True)
```
### 环境配置
使用环境变量配置客户端:
<Tabs items={['开发', '生产']}>
<Tab value="开发">
```python
import os
from simstudio import SimStudioClient
# Development configuration
client = SimStudioClient(
api_key=os.getenv("SIM_API_KEY")
base_url=os.getenv("SIM_BASE_URL", "https://sim.ai")
)
```
</Tab>
<Tab value="生产">
```python
import os
from simstudio import SimStudioClient
# Production configuration with error handling
api_key = os.getenv("SIM_API_KEY")
if not api_key:
raise ValueError("SIM_API_KEY environment variable is required")
client = SimStudioClient(
api_key=api_key,
base_url=os.getenv("SIM_BASE_URL", "https://sim.ai")
)
```
</Tab>
</Tabs>
## 获取您的 API 密钥
<Steps>
<Step title="登录 Sim">
前往 [Sim](https://sim.ai) 并登录您的账户。
</Step>
<Step title="打开您的工作流">
前往您想要以编程方式执行的工作流。
</Step>
<Step title="部署您的工作流">
如果尚未部署,请点击“部署”以部署您的工作流。
</Step>
<Step title="创建或选择一个 API 密钥">
在部署过程中,选择或创建一个 API 密钥。
</Step>
<Step title="复制 API 密钥">
复制 API 密钥以在您的 Python 应用程序中使用。
</Step>
</Steps>
## 系统要求
- Python 3.8+
- requests >= 2.25.0
## 许可证
Apache-2.0