Files
sim/apps/docs/content/docs/ja/sdks/python.mdx
Vikhyath Mondreti fe9ebbf81b improvement(api-keys): move to workspace level (#1765)
* fix(billing): should allow restoring subscription (#1728)

* fix(already-cancelled-sub): UI should allow restoring subscription

* restore functionality fixed

* fix

* improvement(api-keys): move to workspace level

* remove migration to prep merge

* remove two more unused cols

* prep staging  merge

* add migration back

---------

Co-authored-by: Waleed <walif6@gmail.com>
Co-authored-by: Siddharth Ganesan <33737564+Sg312@users.noreply.github.com>
2025-10-30 11:42:58 -07:00

766 lines
22 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
---
title: Python
---
import { Callout } from 'fumadocs-ui/components/callout'
import { Card, Cards } from 'fumadocs-ui/components/card'
import { Step, Steps } from 'fumadocs-ui/components/steps'
import { Tab, Tabs } from 'fumadocs-ui/components/tabs'
Simの公式Python SDKを使用すると、公式Python SDKを使用してPythonアプリケーションからプログラムでワークフローを実行できます。
<Callout type="info">
Python SDKはPython 3.8以上をサポートし、非同期実行、指数バックオフによる自動レート制限、使用状況追跡機能を提供します。
</Callout>
## インストール
pipを使用してSDKをインストールします
```bash
pip install simstudio-sdk
```
## クイックスタート
以下は、始めるための簡単な例です:
```python
from simstudio import SimStudioClient
# Initialize the client
client = SimStudioClient(
api_key="your-api-key-here",
base_url="https://sim.ai" # optional, defaults to https://sim.ai
)
# Execute a workflow
try:
result = client.execute_workflow("workflow-id")
print("Workflow executed successfully:", result)
except Exception as error:
print("Workflow execution failed:", error)
```
## APIリファレンス
### SimStudioClient
#### コンストラクタ
```python
SimStudioClient(api_key: str, base_url: str = "https://sim.ai")
```
**パラメータ:**
- `api_key` (str): SimのAPIキー
- `base_url` (str, オプション): Sim APIのベースURL
#### メソッド
##### execute_workflow()
オプションの入力データでワークフローを実行します。
```python
result = client.execute_workflow(
"workflow-id",
input_data={"message": "Hello, world!"},
timeout=30.0 # 30 seconds
)
```
**パラメータ:**
- `workflow_id` (str): 実行するワークフローのID
- `input_data` (dict, オプション): ワークフローに渡す入力データ
- `timeout` (float, オプション): タイムアウト(秒)(デフォルト: 30.0
- `stream` (bool, オプション): ストリーミングレスポンスを有効にする(デフォルト: False
- `selected_outputs` (list[str], オプション): `blockName.attribute`形式でストリーミングするブロック出力(例: `["agent1.content"]`
- `async_execution` (bool, オプション): 非同期実行(デフォルト: False
**戻り値:** `WorkflowExecutionResult | AsyncExecutionResult`
`async_execution=True`の場合、ポーリング用のタスクIDをすぐに返します。それ以外の場合は、完了を待ちます。
##### get_workflow_status()
ワークフローのステータス(デプロイメントステータスなど)を取得します。
```python
status = client.get_workflow_status("workflow-id")
print("Is deployed:", status.is_deployed)
```
**パラメータ:**
- `workflow_id` (str): ワークフローのID
**戻り値:** `WorkflowStatus`
##### validate_workflow()
ワークフローが実行準備ができているかを検証します。
```python
is_ready = client.validate_workflow("workflow-id")
if is_ready:
# Workflow is deployed and ready
pass
```
**パラメータ:**
- `workflow_id` (str): ワークフローのID
**戻り値:** `bool`
##### get_job_status()
非同期ジョブ実行のステータスを取得します。
```python
status = client.get_job_status("task-id-from-async-execution")
print("Status:", status["status"]) # 'queued', 'processing', 'completed', 'failed'
if status["status"] == "completed":
print("Output:", status["output"])
```
**パラメータ:**
- `task_id` (str): 非同期実行から返されたタスクID
**戻り値:** `Dict[str, Any]`
**レスポンスフィールド:**
- `success` (bool): リクエストが成功したかどうか
- `taskId` (str): タスクID
- `status` (str): 次のいずれか: `'queued'`, `'processing'`, `'completed'`, `'failed'`, `'cancelled'`
- `metadata` (dict): `startedAt`, `completedAt`, `duration`を含む
- `output` (any, オプション): ワークフロー出力(完了時)
- `error` (any, オプション): エラー詳細(失敗時)
- `estimatedDuration` (int, オプション): 推定所要時間(ミリ秒)(処理中/キュー時)
##### execute_with_retry()
指数バックオフを使用してレート制限エラーで自動的に再試行するワークフロー実行。
```python
result = client.execute_with_retry(
"workflow-id",
input_data={"message": "Hello"},
timeout=30.0,
max_retries=3, # Maximum number of retries
initial_delay=1.0, # Initial delay in seconds
max_delay=30.0, # Maximum delay in seconds
backoff_multiplier=2.0 # Exponential backoff multiplier
)
```
**パラメータ:**
- `workflow_id` (str): 実行するワークフローのID
- `input_data` (dict, オプション): ワークフローに渡す入力データ
- `timeout` (float, オプション): タイムアウト(秒)
- `stream` (bool, オプション): ストリーミングレスポンスを有効にする
- `selected_outputs` (list, オプション): ストリーミングするブロック出力
- `async_execution` (bool, オプション): 非同期実行
- `max_retries` (int, オプション): 最大再試行回数(デフォルト: 3
- `initial_delay` (float, オプション): 初期遅延(秒)(デフォルト: 1.0
- `max_delay` (float, オプション): 最大遅延(秒)(デフォルト: 30.0
- `backoff_multiplier` (float, オプション): バックオフ乗数(デフォルト: 2.0
**戻り値:** `WorkflowExecutionResult | AsyncExecutionResult`
リトライロジックは、サンダリングハード問題を防ぐために±25%のジッターを伴う指数バックオフ1秒→2秒→4秒→8秒...を使用します。APIが `retry-after` ヘッダーを提供する場合、代わりにそれが使用されます。
##### get_rate_limit_info()
最後のAPIレスポンスから現在のレート制限情報を取得します。
```python
rate_limit_info = client.get_rate_limit_info()
if rate_limit_info:
print("Limit:", rate_limit_info.limit)
print("Remaining:", rate_limit_info.remaining)
print("Reset:", datetime.fromtimestamp(rate_limit_info.reset))
```
**戻り値:** `RateLimitInfo | None`
##### get_usage_limits()
アカウントの現在の使用制限とクォータ情報を取得します。
```python
limits = client.get_usage_limits()
print("Sync requests remaining:", limits.rate_limit["sync"]["remaining"])
print("Async requests remaining:", limits.rate_limit["async"]["remaining"])
print("Current period cost:", limits.usage["currentPeriodCost"])
print("Plan:", limits.usage["plan"])
```
**戻り値:** `UsageLimits`
**レスポンス構造:**
```python
{
"success": bool,
"rateLimit": {
"sync": {
"isLimited": bool,
"limit": int,
"remaining": int,
"resetAt": str
},
"async": {
"isLimited": bool,
"limit": int,
"remaining": int,
"resetAt": str
},
"authType": str # 'api' or 'manual'
},
"usage": {
"currentPeriodCost": float,
"limit": float,
"plan": str # e.g., 'free', 'pro'
}
}
```
##### set_api_key()
APIキーを更新します。
```python
client.set_api_key("new-api-key")
```
##### set_base_url()
ベースURLを更新します。
```python
client.set_base_url("https://my-custom-domain.com")
```
##### close()
基盤となるHTTPセッションを閉じます。
```python
client.close()
```
## データクラス
### WorkflowExecutionResult
```python
@dataclass
class WorkflowExecutionResult:
success: bool
output: Optional[Any] = None
error: Optional[str] = None
logs: Optional[List[Any]] = None
metadata: Optional[Dict[str, Any]] = None
trace_spans: Optional[List[Any]] = None
total_duration: Optional[float] = None
```
### AsyncExecutionResult
```python
@dataclass
class AsyncExecutionResult:
success: bool
task_id: str
status: str # 'queued'
created_at: str
links: Dict[str, str] # e.g., {"status": "/api/jobs/{taskId}"}
```
### WorkflowStatus
```python
@dataclass
class WorkflowStatus:
is_deployed: bool
deployed_at: Optional[str] = None
needs_redeployment: bool = False
```
### RateLimitInfo
```python
@dataclass
class RateLimitInfo:
limit: int
remaining: int
reset: int
retry_after: Optional[int] = None
```
### UsageLimits
```python
@dataclass
class UsageLimits:
success: bool
rate_limit: Dict[str, Any]
usage: Dict[str, Any]
```
### SimStudioError
```python
class SimStudioError(Exception):
def __init__(self, message: str, code: Optional[str] = None, status: Optional[int] = None):
super().__init__(message)
self.code = code
self.status = status
```
**一般的なエラーコード:**
- `UNAUTHORIZED`: 無効なAPIキー
- `TIMEOUT`: リクエストがタイムアウトしました
- `RATE_LIMIT_EXCEEDED`: レート制限を超えました
- `USAGE_LIMIT_EXCEEDED`: 使用制限を超えました
- `EXECUTION_ERROR`: ワークフローの実行に失敗しました
## 例
### 基本的なワークフロー実行
<Steps>
<Step title="クライアントの初期化">
APIキーを使用してSimStudioClientをセットアップします。
</Step>
<Step title="ワークフローの検証">
ワークフローがデプロイされ、実行準備ができているか確認します。
</Step>
<Step title="ワークフローの実行">
入力データでワークフローを実行します。
</Step>
<Step title="結果の処理">
実行結果を処理し、エラーがあれば対処します。
</Step>
</Steps>
```python
import os
from simstudio import SimStudioClient
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def run_workflow():
try:
# Check if workflow is ready
is_ready = client.validate_workflow("my-workflow-id")
if not is_ready:
raise Exception("Workflow is not deployed or ready")
# Execute the workflow
result = client.execute_workflow(
"my-workflow-id",
input_data={
"message": "Process this data",
"user_id": "12345"
}
)
if result.success:
print("Output:", result.output)
print("Duration:", result.metadata.get("duration") if result.metadata else None)
else:
print("Workflow failed:", result.error)
except Exception as error:
print("Error:", error)
run_workflow()
```
### エラー処理
ワークフロー実行中に発生する可能性のある様々なタイプのエラーを処理します:
```python
from simstudio import SimStudioClient, SimStudioError
import os
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_with_error_handling():
try:
result = client.execute_workflow("workflow-id")
return result
except SimStudioError as error:
if error.code == "UNAUTHORIZED":
print("Invalid API key")
elif error.code == "TIMEOUT":
print("Workflow execution timed out")
elif error.code == "USAGE_LIMIT_EXCEEDED":
print("Usage limit exceeded")
elif error.code == "INVALID_JSON":
print("Invalid JSON in request body")
else:
print(f"Workflow error: {error}")
raise
except Exception as error:
print(f"Unexpected error: {error}")
raise
```
### コンテキストマネージャーの使用
リソースのクリーンアップを自動的に処理するためにクライアントをコンテキストマネージャーとして使用します:
```python
from simstudio import SimStudioClient
import os
# Using context manager to automatically close the session
with SimStudioClient(api_key=os.getenv("SIM_API_KEY")) as client:
result = client.execute_workflow("workflow-id")
print("Result:", result)
# Session is automatically closed here
```
### バッチワークフロー実行
複数のワークフローを効率的に実行します:
```python
from simstudio import SimStudioClient
import os
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_workflows_batch(workflow_data_pairs):
"""Execute multiple workflows with different input data."""
results = []
for workflow_id, input_data in workflow_data_pairs:
try:
# Validate workflow before execution
if not client.validate_workflow(workflow_id):
print(f"Skipping {workflow_id}: not deployed")
continue
result = client.execute_workflow(workflow_id, input_data)
results.append({
"workflow_id": workflow_id,
"success": result.success,
"output": result.output,
"error": result.error
})
except Exception as error:
results.append({
"workflow_id": workflow_id,
"success": False,
"error": str(error)
})
return results
# Example usage
workflows = [
("workflow-1", {"type": "analysis", "data": "sample1"}),
("workflow-2", {"type": "processing", "data": "sample2"}),
]
results = execute_workflows_batch(workflows)
for result in results:
print(f"Workflow {result['workflow_id']}: {'Success' if result['success'] else 'Failed'}")
```
### 非同期ワークフロー実行
長時間実行されるタスクのためにワークフローを非同期で実行します:
```python
import os
import time
from simstudio import SimStudioClient
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_async():
try:
# Start async execution
result = client.execute_workflow(
"workflow-id",
input_data={"data": "large dataset"},
async_execution=True # Execute asynchronously
)
# Check if result is an async execution
if hasattr(result, 'task_id'):
print(f"Task ID: {result.task_id}")
print(f"Status endpoint: {result.links['status']}")
# Poll for completion
status = client.get_job_status(result.task_id)
while status["status"] in ["queued", "processing"]:
print(f"Current status: {status['status']}")
time.sleep(2) # Wait 2 seconds
status = client.get_job_status(result.task_id)
if status["status"] == "completed":
print("Workflow completed!")
print(f"Output: {status['output']}")
print(f"Duration: {status['metadata']['duration']}")
else:
print(f"Workflow failed: {status['error']}")
except Exception as error:
print(f"Error: {error}")
execute_async()
```
### レート制限とリトライ
指数バックオフを使用して自動的にレート制限を処理します:
```python
import os
from simstudio import SimStudioClient, SimStudioError
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_with_retry_handling():
try:
# Automatically retries on rate limit
result = client.execute_with_retry(
"workflow-id",
input_data={"message": "Process this"},
max_retries=5,
initial_delay=1.0,
max_delay=60.0,
backoff_multiplier=2.0
)
print(f"Success: {result}")
except SimStudioError as error:
if error.code == "RATE_LIMIT_EXCEEDED":
print("Rate limit exceeded after all retries")
# Check rate limit info
rate_limit_info = client.get_rate_limit_info()
if rate_limit_info:
from datetime import datetime
reset_time = datetime.fromtimestamp(rate_limit_info.reset)
print(f"Rate limit resets at: {reset_time}")
execute_with_retry_handling()
```
### 使用状況モニタリング
アカウントの使用状況と制限をモニタリングします:
```python
import os
from simstudio import SimStudioClient
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def check_usage():
try:
limits = client.get_usage_limits()
print("=== Rate Limits ===")
print("Sync requests:")
print(f" Limit: {limits.rate_limit['sync']['limit']}")
print(f" Remaining: {limits.rate_limit['sync']['remaining']}")
print(f" Resets at: {limits.rate_limit['sync']['resetAt']}")
print(f" Is limited: {limits.rate_limit['sync']['isLimited']}")
print("\nAsync requests:")
print(f" Limit: {limits.rate_limit['async']['limit']}")
print(f" Remaining: {limits.rate_limit['async']['remaining']}")
print(f" Resets at: {limits.rate_limit['async']['resetAt']}")
print(f" Is limited: {limits.rate_limit['async']['isLimited']}")
print("\n=== Usage ===")
print(f"Current period cost: ${limits.usage['currentPeriodCost']:.2f}")
print(f"Limit: ${limits.usage['limit']:.2f}")
print(f"Plan: {limits.usage['plan']}")
percent_used = (limits.usage['currentPeriodCost'] / limits.usage['limit']) * 100
print(f"Usage: {percent_used:.1f}%")
if percent_used > 80:
print("⚠️ Warning: You are approaching your usage limit!")
except Exception as error:
print(f"Error checking usage: {error}")
check_usage()
```
### ワークフローの実行ストリーミング
リアルタイムのストリーミングレスポンスでワークフローを実行します:
```python
from simstudio import SimStudioClient
import os
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_with_streaming():
"""Execute workflow with streaming enabled."""
try:
# Enable streaming for specific block outputs
result = client.execute_workflow(
"workflow-id",
input_data={"message": "Count to five"},
stream=True,
selected_outputs=["agent1.content"] # Use blockName.attribute format
)
print("Workflow result:", result)
except Exception as error:
print("Error:", error)
execute_with_streaming()
```
ストリーミングレスポンスはServer-Sent EventsSSE形式に従います
```
data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":"One"}
data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":", two"}
data: {"event":"done","success":true,"output":{},"metadata":{"duration":610}}
data: [DONE]
```
**Flaskストリーミングの例**
```python
from flask import Flask, Response, stream_with_context
import requests
import json
import os
app = Flask(__name__)
@app.route('/stream-workflow')
def stream_workflow():
"""Stream workflow execution to the client."""
def generate():
response = requests.post(
'https://sim.ai/api/workflows/WORKFLOW_ID/execute',
headers={
'Content-Type': 'application/json',
'X-API-Key': os.getenv('SIM_API_KEY')
},
json={
'message': 'Generate a story',
'stream': True,
'selectedOutputs': ['agent1.content']
},
stream=True
)
for line in response.iter_lines():
if line:
decoded_line = line.decode('utf-8')
if decoded_line.startswith('data: '):
data = decoded_line[6:] # Remove 'data: ' prefix
if data == '[DONE]':
break
try:
parsed = json.loads(data)
if 'chunk' in parsed:
yield f"data: {json.dumps(parsed)}\n\n"
elif parsed.get('event') == 'done':
yield f"data: {json.dumps(parsed)}\n\n"
print("Execution complete:", parsed.get('metadata'))
except json.JSONDecodeError:
pass
return Response(
stream_with_context(generate()),
mimetype='text/event-stream'
)
if __name__ == '__main__':
app.run(debug=True)
```
### 環境設定
環境変数を使用してクライアントを設定します:
<Tabs items={['Development', 'Production']}>
<Tab value="Development">
```python
import os
from simstudio import SimStudioClient
# Development configuration
client = SimStudioClient(
api_key=os.getenv("SIM_API_KEY")
base_url=os.getenv("SIM_BASE_URL", "https://sim.ai")
)
```
</Tab>
<Tab value="Production">
```python
import os
from simstudio import SimStudioClient
# Production configuration with error handling
api_key = os.getenv("SIM_API_KEY")
if not api_key:
raise ValueError("SIM_API_KEY environment variable is required")
client = SimStudioClient(
api_key=api_key,
base_url=os.getenv("SIM_BASE_URL", "https://sim.ai")
)
```
</Tab>
</Tabs>
## APIキーの取得方法
<Steps>
<Step title="Simにログイン">
[Sim](https://sim.ai)に移動してアカウントにログインします。
</Step>
<Step title="ワークフローを開く">
プログラムで実行したいワークフローに移動します。
</Step>
<Step title="ワークフローをデプロイする">
まだデプロイされていない場合は、「デプロイ」をクリックしてワークフローをデプロイします。
</Step>
<Step title="APIキーを作成または選択する">
デプロイプロセス中に、APIキーを選択または作成します。
</Step>
<Step title="APIキーをコピーする">
Pythonアプリケーションで使用するAPIキーをコピーします。
</Step>
</Steps>
## 要件
- Python 3.8+
- requests >= 2.25.0
## ライセンス
Apache-2.0