Files
sim/apps/docs/content/docs/de/sdks/python.mdx
cherkanov_art d1d43b27bd feat(i18n): change lockfile (#3216)
* fix: update i18n.lock

* feat(docs): enhance documentation with new sections on file handling, form deployment, quick reference, agent skills, and A2A integration
2026-02-16 00:00:12 -08:00

766 lines
21 KiB
Plaintext
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
---
title: Python
---
import { Callout } from 'fumadocs-ui/components/callout'
import { Card, Cards } from 'fumadocs-ui/components/card'
import { Step, Steps } from 'fumadocs-ui/components/steps'
import { Tab, Tabs } from 'fumadocs-ui/components/tabs'
Das offizielle Python SDK für Sim ermöglicht es Ihnen, Workflows programmatisch aus Ihren Python-Anwendungen heraus mit dem offiziellen Python SDK auszuführen.
<Callout type="info">
Das Python SDK unterstützt Python 3.8+ mit Unterstützung für asynchrone Ausführung, automatischer Ratenbegrenzung mit exponentiellem Backoff und Nutzungsverfolgung.
</Callout>
## Installation
Installieren Sie das SDK mit pip:
```bash
pip install simstudio-sdk
```
## Schnellstart
Hier ist ein einfaches Beispiel für den Einstieg:
```python
from simstudio import SimStudioClient
# Initialize the client
client = SimStudioClient(
api_key="your-api-key-here",
base_url="https://sim.ai" # optional, defaults to https://sim.ai
)
# Execute a workflow
try:
result = client.execute_workflow("workflow-id")
print("Workflow executed successfully:", result)
except Exception as error:
print("Workflow execution failed:", error)
```
## API-Referenz
### SimStudioClient
#### Konstruktor
```python
SimStudioClient(api_key: str, base_url: str = "https://sim.ai")
```
**Parameter:**
- `api_key` (str): Ihr Sim API-Schlüssel
- `base_url` (str, optional): Basis-URL für die Sim API
#### Methoden
##### execute_workflow()
Führt einen Workflow mit optionalen Eingabedaten aus.
```python
result = client.execute_workflow(
"workflow-id",
input_data={"message": "Hello, world!"},
timeout=30.0 # 30 seconds
)
```
**Parameter:**
- `workflow_id` (str): Die ID des auszuführenden Workflows
- `input_data` (dict, optional): Eingabedaten, die an den Workflow übergeben werden
- `timeout` (float, optional): Timeout in Sekunden (Standard: 30.0)
- `stream` (bool, optional): Streaming-Antworten aktivieren (Standard: False)
- `selected_outputs` (list[str], optional): Block-Ausgaben zum Streamen im Format `blockName.attribute` (z. B. `["agent1.content"]`)
- `async_execution` (bool, optional): Asynchron ausführen (Standard: False)
**Rückgabewert:** `WorkflowExecutionResult | AsyncExecutionResult`
Wenn `async_execution=True`, wird sofort mit einer Task-ID zum Polling zurückgegeben. Andernfalls wird auf die Fertigstellung gewartet.
##### get_workflow_status()
Ruft den Status eines Workflows ab (Deployment-Status usw.).
```python
status = client.get_workflow_status("workflow-id")
print("Is deployed:", status.is_deployed)
```
**Parameter:**
- `workflow_id` (str): Die ID des Workflows
**Rückgabe:** `WorkflowStatus`
##### validate_workflow()
Überprüft, ob ein Workflow zur Ausführung bereit ist.
```python
is_ready = client.validate_workflow("workflow-id")
if is_ready:
# Workflow is deployed and ready
pass
```
**Parameter:**
- `workflow_id` (str): Die ID des Workflows
**Rückgabe:** `bool`
##### get_job_status()
Ruft den Status einer asynchronen Job-Ausführung ab.
```python
status = client.get_job_status("task-id-from-async-execution")
print("Status:", status["status"]) # 'queued', 'processing', 'completed', 'failed'
if status["status"] == "completed":
print("Output:", status["output"])
```
**Parameter:**
- `task_id` (str): Die Task-ID, die von der asynchronen Ausführung zurückgegeben wurde
**Rückgabe:** `Dict[str, Any]`
**Antwortfelder:**
- `success` (bool): Ob die Anfrage erfolgreich war
- `taskId` (str): Die Task-ID
- `status` (str): Einer von `'queued'`, `'processing'`, `'completed'`, `'failed'`, `'cancelled'`
- `metadata` (dict): Enthält `startedAt`, `completedAt` und `duration`
- `output` (any, optional): Die Workflow-Ausgabe (wenn abgeschlossen)
- `error` (any, optional): Fehlerdetails (wenn fehlgeschlagen)
- `estimatedDuration` (int, optional): Geschätzte Dauer in Millisekunden (wenn in Bearbeitung/in Warteschlange)
##### execute_with_retry()
Führt einen Workflow mit automatischer Wiederholung bei Rate-Limit-Fehlern unter Verwendung von exponentiellem Backoff aus.
```python
result = client.execute_with_retry(
"workflow-id",
input_data={"message": "Hello"},
timeout=30.0,
max_retries=3, # Maximum number of retries
initial_delay=1.0, # Initial delay in seconds
max_delay=30.0, # Maximum delay in seconds
backoff_multiplier=2.0 # Exponential backoff multiplier
)
```
**Parameter:**
- `workflow_id` (str): Die ID des auszuführenden Workflows
- `input_data` (dict, optional): Eingabedaten, die an den Workflow übergeben werden
- `timeout` (float, optional): Timeout in Sekunden
- `stream` (bool, optional): Streaming-Antworten aktivieren
- `selected_outputs` (list, optional): Block-Ausgaben zum Streamen
- `async_execution` (bool, optional): Asynchron ausführen
- `max_retries` (int, optional): Maximale Anzahl von Wiederholungen (Standard: 3)
- `initial_delay` (float, optional): Anfangsverzögerung in Sekunden (Standard: 1.0)
- `max_delay` (float, optional): Maximale Verzögerung in Sekunden (Standard: 30.0)
- `backoff_multiplier` (float, optional): Backoff-Multiplikator (Standard: 2.0)
**Rückgabe:** `WorkflowExecutionResult | AsyncExecutionResult`
Die Wiederholungslogik verwendet exponentielles Backoff (1s → 2s → 4s → 8s...) mit ±25% Jitter, um Thundering Herd zu verhindern. Wenn die API einen `retry-after`-Header bereitstellt, wird dieser stattdessen verwendet.
##### get_rate_limit_info()
Ruft die aktuellen Rate-Limit-Informationen aus der letzten API-Antwort ab.
```python
rate_limit_info = client.get_rate_limit_info()
if rate_limit_info:
print("Limit:", rate_limit_info.limit)
print("Remaining:", rate_limit_info.remaining)
print("Reset:", datetime.fromtimestamp(rate_limit_info.reset))
```
**Rückgabewert:** `RateLimitInfo | None`
##### get_usage_limits()
Ruft aktuelle Nutzungslimits und Kontingentinformationen für Ihr Konto ab.
```python
limits = client.get_usage_limits()
print("Sync requests remaining:", limits.rate_limit["sync"]["remaining"])
print("Async requests remaining:", limits.rate_limit["async"]["remaining"])
print("Current period cost:", limits.usage["currentPeriodCost"])
print("Plan:", limits.usage["plan"])
```
**Rückgabewert:** `UsageLimits`
**Antwortstruktur:**
```python
{
"success": bool,
"rateLimit": {
"sync": {
"isLimited": bool,
"limit": int,
"remaining": int,
"resetAt": str
},
"async": {
"isLimited": bool,
"limit": int,
"remaining": int,
"resetAt": str
},
"authType": str # 'api' or 'manual'
},
"usage": {
"currentPeriodCost": float,
"limit": float,
"plan": str # e.g., 'free', 'pro'
}
}
```
##### set_api_key()
Aktualisiert den API-Schlüssel.
```python
client.set_api_key("new-api-key")
```
##### set_base_url()
Aktualisiert die Basis-URL.
```python
client.set_base_url("https://my-custom-domain.com")
```
##### close()
Schließt die zugrunde liegende HTTP-Sitzung.
```python
client.close()
```
## Datenklassen
### WorkflowExecutionResult
```python
@dataclass
class WorkflowExecutionResult:
success: bool
output: Optional[Any] = None
error: Optional[str] = None
logs: Optional[List[Any]] = None
metadata: Optional[Dict[str, Any]] = None
trace_spans: Optional[List[Any]] = None
total_duration: Optional[float] = None
```
### AsyncExecutionResult
```python
@dataclass
class AsyncExecutionResult:
success: bool
task_id: str
status: str # 'queued'
created_at: str
links: Dict[str, str] # e.g., {"status": "/api/jobs/{taskId}"}
```
### WorkflowStatus
```python
@dataclass
class WorkflowStatus:
is_deployed: bool
deployed_at: Optional[str] = None
needs_redeployment: bool = False
```
### RateLimitInfo
```python
@dataclass
class RateLimitInfo:
limit: int
remaining: int
reset: int
retry_after: Optional[int] = None
```
### UsageLimits
```python
@dataclass
class UsageLimits:
success: bool
rate_limit: Dict[str, Any]
usage: Dict[str, Any]
```
### SimStudioError
```python
class SimStudioError(Exception):
def __init__(self, message: str, code: Optional[str] = None, status: Optional[int] = None):
super().__init__(message)
self.code = code
self.status = status
```
**Häufige Fehlercodes:**
- `UNAUTHORIZED`: Ungültiger API-Schlüssel
- `TIMEOUT`: Zeitüberschreitung der Anfrage
- `RATE_LIMIT_EXCEEDED`: Ratenlimit überschritten
- `USAGE_LIMIT_EXCEEDED`: Nutzungslimit überschritten
- `EXECUTION_ERROR`: Workflow-Ausführung fehlgeschlagen
## Beispiele
### Grundlegende Workflow-Ausführung
<Steps>
<Step title="Client initialisieren">
Richten Sie den SimStudioClient mit Ihrem API-Schlüssel ein.
</Step>
<Step title="Workflow validieren">
Prüfen Sie, ob der Workflow bereitgestellt und zur Ausführung bereit ist.
</Step>
<Step title="Workflow ausführen">
Führen Sie den Workflow mit Ihren Eingabedaten aus.
</Step>
<Step title="Ergebnis verarbeiten">
Verarbeiten Sie das Ausführungsergebnis und behandeln Sie eventuelle Fehler.
</Step>
</Steps>
```python
import os
from simstudio import SimStudioClient
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def run_workflow():
try:
# Check if workflow is ready
is_ready = client.validate_workflow("my-workflow-id")
if not is_ready:
raise Exception("Workflow is not deployed or ready")
# Execute the workflow
result = client.execute_workflow(
"my-workflow-id",
input_data={
"message": "Process this data",
"user_id": "12345"
}
)
if result.success:
print("Output:", result.output)
print("Duration:", result.metadata.get("duration") if result.metadata else None)
else:
print("Workflow failed:", result.error)
except Exception as error:
print("Error:", error)
run_workflow()
```
### Fehlerbehandlung
Behandeln Sie verschiedene Fehlertypen, die während der Workflow-Ausführung auftreten können:
```python
from simstudio import SimStudioClient, SimStudioError
import os
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_with_error_handling():
try:
result = client.execute_workflow("workflow-id")
return result
except SimStudioError as error:
if error.code == "UNAUTHORIZED":
print("Invalid API key")
elif error.code == "TIMEOUT":
print("Workflow execution timed out")
elif error.code == "USAGE_LIMIT_EXCEEDED":
print("Usage limit exceeded")
elif error.code == "INVALID_JSON":
print("Invalid JSON in request body")
else:
print(f"Workflow error: {error}")
raise
except Exception as error:
print(f"Unexpected error: {error}")
raise
```
### Verwendung des Context-Managers
Verwenden Sie den Client als Context-Manager, um die Ressourcenbereinigung automatisch zu handhaben:
```python
from simstudio import SimStudioClient
import os
# Using context manager to automatically close the session
with SimStudioClient(api_key=os.getenv("SIM_API_KEY")) as client:
result = client.execute_workflow("workflow-id")
print("Result:", result)
# Session is automatically closed here
```
### Batch-Workflow-Ausführung
Führen Sie mehrere Workflows effizient aus:
```python
from simstudio import SimStudioClient
import os
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_workflows_batch(workflow_data_pairs):
"""Execute multiple workflows with different input data."""
results = []
for workflow_id, input_data in workflow_data_pairs:
try:
# Validate workflow before execution
if not client.validate_workflow(workflow_id):
print(f"Skipping {workflow_id}: not deployed")
continue
result = client.execute_workflow(workflow_id, input_data)
results.append({
"workflow_id": workflow_id,
"success": result.success,
"output": result.output,
"error": result.error
})
except Exception as error:
results.append({
"workflow_id": workflow_id,
"success": False,
"error": str(error)
})
return results
# Example usage
workflows = [
("workflow-1", {"type": "analysis", "data": "sample1"}),
("workflow-2", {"type": "processing", "data": "sample2"}),
]
results = execute_workflows_batch(workflows)
for result in results:
print(f"Workflow {result['workflow_id']}: {'Success' if result['success'] else 'Failed'}")
```
### Asynchrone Workflow-Ausführung
Führen Sie Workflows asynchron für langwierige Aufgaben aus:
```python
import os
import time
from simstudio import SimStudioClient
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_async():
try:
# Start async execution
result = client.execute_workflow(
"workflow-id",
input_data={"data": "large dataset"},
async_execution=True # Execute asynchronously
)
# Check if result is an async execution
if hasattr(result, 'task_id'):
print(f"Task ID: {result.task_id}")
print(f"Status endpoint: {result.links['status']}")
# Poll for completion
status = client.get_job_status(result.task_id)
while status["status"] in ["queued", "processing"]:
print(f"Current status: {status['status']}")
time.sleep(2) # Wait 2 seconds
status = client.get_job_status(result.task_id)
if status["status"] == "completed":
print("Workflow completed!")
print(f"Output: {status['output']}")
print(f"Duration: {status['metadata']['duration']}")
else:
print(f"Workflow failed: {status['error']}")
except Exception as error:
print(f"Error: {error}")
execute_async()
```
### Ratenlimitierung und Wiederholung
Behandeln Sie Ratenbegrenzungen automatisch mit exponentiellem Backoff:
```python
import os
from simstudio import SimStudioClient, SimStudioError
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_with_retry_handling():
try:
# Automatically retries on rate limit
result = client.execute_with_retry(
"workflow-id",
input_data={"message": "Process this"},
max_retries=5,
initial_delay=1.0,
max_delay=60.0,
backoff_multiplier=2.0
)
print(f"Success: {result}")
except SimStudioError as error:
if error.code == "RATE_LIMIT_EXCEEDED":
print("Rate limit exceeded after all retries")
# Check rate limit info
rate_limit_info = client.get_rate_limit_info()
if rate_limit_info:
from datetime import datetime
reset_time = datetime.fromtimestamp(rate_limit_info.reset)
print(f"Rate limit resets at: {reset_time}")
execute_with_retry_handling()
```
### Nutzungsüberwachung
Überwachen Sie die Nutzung und Limits Ihres Kontos:
```python
import os
from simstudio import SimStudioClient
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def check_usage():
try:
limits = client.get_usage_limits()
print("=== Rate Limits ===")
print("Sync requests:")
print(f" Limit: {limits.rate_limit['sync']['limit']}")
print(f" Remaining: {limits.rate_limit['sync']['remaining']}")
print(f" Resets at: {limits.rate_limit['sync']['resetAt']}")
print(f" Is limited: {limits.rate_limit['sync']['isLimited']}")
print("\nAsync requests:")
print(f" Limit: {limits.rate_limit['async']['limit']}")
print(f" Remaining: {limits.rate_limit['async']['remaining']}")
print(f" Resets at: {limits.rate_limit['async']['resetAt']}")
print(f" Is limited: {limits.rate_limit['async']['isLimited']}")
print("\n=== Usage ===")
print(f"Current period cost: ${limits.usage['currentPeriodCost']:.2f}")
print(f"Limit: ${limits.usage['limit']:.2f}")
print(f"Plan: {limits.usage['plan']}")
percent_used = (limits.usage['currentPeriodCost'] / limits.usage['limit']) * 100
print(f"Usage: {percent_used:.1f}%")
if percent_used > 80:
print("⚠️ Warning: You are approaching your usage limit!")
except Exception as error:
print(f"Error checking usage: {error}")
check_usage()
```
### Streaming-Workflow-Ausführung
Führen Sie Workflows mit Echtzeit-Streaming-Antworten aus:
```python
from simstudio import SimStudioClient
import os
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_with_streaming():
"""Execute workflow with streaming enabled."""
try:
# Enable streaming for specific block outputs
result = client.execute_workflow(
"workflow-id",
input_data={"message": "Count to five"},
stream=True,
selected_outputs=["agent1.content"] # Use blockName.attribute format
)
print("Workflow result:", result)
except Exception as error:
print("Error:", error)
execute_with_streaming()
```
Die Streaming-Antwort folgt dem Server-Sent-Events- (SSE-) Format:
```
data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":"One"}
data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":", two"}
data: {"event":"done","success":true,"output":{},"metadata":{"duration":610}}
data: [DONE]
```
**Flask-Streaming-Beispiel:**
```python
from flask import Flask, Response, stream_with_context
import requests
import json
import os
app = Flask(__name__)
@app.route('/stream-workflow')
def stream_workflow():
"""Stream workflow execution to the client."""
def generate():
response = requests.post(
'https://sim.ai/api/workflows/WORKFLOW_ID/execute',
headers={
'Content-Type': 'application/json',
'X-API-Key': os.getenv('SIM_API_KEY')
},
json={
'message': 'Generate a story',
'stream': True,
'selectedOutputs': ['agent1.content']
},
stream=True
)
for line in response.iter_lines():
if line:
decoded_line = line.decode('utf-8')
if decoded_line.startswith('data: '):
data = decoded_line[6:] # Remove 'data: ' prefix
if data == '[DONE]':
break
try:
parsed = json.loads(data)
if 'chunk' in parsed:
yield f"data: {json.dumps(parsed)}\n\n"
elif parsed.get('event') == 'done':
yield f"data: {json.dumps(parsed)}\n\n"
print("Execution complete:", parsed.get('metadata'))
except json.JSONDecodeError:
pass
return Response(
stream_with_context(generate()),
mimetype='text/event-stream'
)
if __name__ == '__main__':
app.run(debug=True)
```
### Umgebungs­konfiguration
Konfigurieren Sie den Client mit Umgebungsvariablen:
<Tabs items={['Development', 'Production']}>
<Tab value="Development">
```python
import os
from simstudio import SimStudioClient
# Development configuration
client = SimStudioClient(
api_key=os.getenv("SIM_API_KEY")
base_url=os.getenv("SIM_BASE_URL", "https://sim.ai")
)
```
</Tab>
<Tab value="Production">
```python
import os
from simstudio import SimStudioClient
# Production configuration with error handling
api_key = os.getenv("SIM_API_KEY")
if not api_key:
raise ValueError("SIM_API_KEY environment variable is required")
client = SimStudioClient(
api_key=api_key,
base_url=os.getenv("SIM_BASE_URL", "https://sim.ai")
)
```
</Tab>
</Tabs>
## Ihren API-Schlüssel erhalten
<Steps>
<Step title="Bei Sim anmelden">
Navigieren Sie zu [Sim](https://sim.ai) und melden Sie sich in Ihrem Konto an.
</Step>
<Step title="Workflow öffnen">
Navigieren Sie zu dem Workflow, den Sie programmatisch ausführen möchten.
</Step>
<Step title="Workflow bereitstellen">
Klicken Sie auf "Bereitstellen", um Ihren Workflow bereitzustellen, falls dies noch nicht geschehen ist.
</Step>
<Step title="API-Schlüssel erstellen oder auswählen">
Wählen oder erstellen Sie während des Bereitstellungsprozesses einen API-Schlüssel.
</Step>
<Step title="API-Schlüssel kopieren">
Kopieren Sie den API-Schlüssel, um ihn in Ihrer Python-Anwendung zu verwenden.
</Step>
</Steps>
## Voraussetzungen
- Python 3.8+
- requests >= 2.25.0
## Lizenz
Apache-2.0