--- title: Python --- import { Callout } from 'fumadocs-ui/components/callout' import { Card, Cards } from 'fumadocs-ui/components/card' import { Step, Steps } from 'fumadocs-ui/components/steps' import { Tab, Tabs } from 'fumadocs-ui/components/tabs' Das offizielle Python SDK für Sim ermöglicht es Ihnen, Workflows programmatisch aus Ihren Python-Anwendungen heraus mit dem offiziellen Python SDK auszuführen. Das Python SDK unterstützt Python 3.8+ mit Unterstützung für asynchrone Ausführung, automatischer Ratenbegrenzung mit exponentiellem Backoff und Nutzungsverfolgung. ## Installation Installieren Sie das SDK mit pip: ```bash pip install simstudio-sdk ``` ## Schnellstart Hier ist ein einfaches Beispiel für den Einstieg: ```python from simstudio import SimStudioClient # Initialize the client client = SimStudioClient( api_key="your-api-key-here", base_url="https://sim.ai" # optional, defaults to https://sim.ai ) # Execute a workflow try: result = client.execute_workflow("workflow-id") print("Workflow executed successfully:", result) except Exception as error: print("Workflow execution failed:", error) ``` ## API-Referenz ### SimStudioClient #### Konstruktor ```python SimStudioClient(api_key: str, base_url: str = "https://sim.ai") ``` **Parameter:** - `api_key` (str): Ihr Sim API-Schlüssel - `base_url` (str, optional): Basis-URL für die Sim API #### Methoden ##### execute_workflow() Führt einen Workflow mit optionalen Eingabedaten aus. ```python result = client.execute_workflow( "workflow-id", input_data={"message": "Hello, world!"}, timeout=30.0 # 30 seconds ) ``` **Parameter:** - `workflow_id` (str): Die ID des auszuführenden Workflows - `input_data` (dict, optional): Eingabedaten, die an den Workflow übergeben werden - `timeout` (float, optional): Timeout in Sekunden (Standard: 30.0) - `stream` (bool, optional): Streaming-Antworten aktivieren (Standard: False) - `selected_outputs` (list[str], optional): Block-Ausgaben zum Streamen im Format `blockName.attribute` (z. B. `["agent1.content"]`) - `async_execution` (bool, optional): Asynchron ausführen (Standard: False) **Rückgabewert:** `WorkflowExecutionResult | AsyncExecutionResult` Wenn `async_execution=True`, wird sofort mit einer Task-ID zum Polling zurückgegeben. Andernfalls wird auf die Fertigstellung gewartet. ##### get_workflow_status() Ruft den Status eines Workflows ab (Deployment-Status usw.). ```python status = client.get_workflow_status("workflow-id") print("Is deployed:", status.is_deployed) ``` **Parameter:** - `workflow_id` (str): Die ID des Workflows **Rückgabe:** `WorkflowStatus` ##### validate_workflow() Überprüft, ob ein Workflow zur Ausführung bereit ist. ```python is_ready = client.validate_workflow("workflow-id") if is_ready: # Workflow is deployed and ready pass ``` **Parameter:** - `workflow_id` (str): Die ID des Workflows **Rückgabe:** `bool` ##### get_job_status() Ruft den Status einer asynchronen Job-Ausführung ab. ```python status = client.get_job_status("task-id-from-async-execution") print("Status:", status["status"]) # 'queued', 'processing', 'completed', 'failed' if status["status"] == "completed": print("Output:", status["output"]) ``` **Parameter:** - `task_id` (str): Die Task-ID, die von der asynchronen Ausführung zurückgegeben wurde **Rückgabe:** `Dict[str, Any]` **Antwortfelder:** - `success` (bool): Ob die Anfrage erfolgreich war - `taskId` (str): Die Task-ID - `status` (str): Einer von `'queued'`, `'processing'`, `'completed'`, `'failed'`, `'cancelled'` - `metadata` (dict): Enthält `startedAt`, `completedAt` und `duration` - `output` (any, optional): Die Workflow-Ausgabe (wenn abgeschlossen) - `error` (any, optional): Fehlerdetails (wenn fehlgeschlagen) - `estimatedDuration` (int, optional): Geschätzte Dauer in Millisekunden (wenn in Bearbeitung/in Warteschlange) ##### execute_with_retry() Führt einen Workflow mit automatischer Wiederholung bei Rate-Limit-Fehlern unter Verwendung von exponentiellem Backoff aus. ```python result = client.execute_with_retry( "workflow-id", input_data={"message": "Hello"}, timeout=30.0, max_retries=3, # Maximum number of retries initial_delay=1.0, # Initial delay in seconds max_delay=30.0, # Maximum delay in seconds backoff_multiplier=2.0 # Exponential backoff multiplier ) ``` **Parameter:** - `workflow_id` (str): Die ID des auszuführenden Workflows - `input_data` (dict, optional): Eingabedaten, die an den Workflow übergeben werden - `timeout` (float, optional): Timeout in Sekunden - `stream` (bool, optional): Streaming-Antworten aktivieren - `selected_outputs` (list, optional): Block-Ausgaben zum Streamen - `async_execution` (bool, optional): Asynchron ausführen - `max_retries` (int, optional): Maximale Anzahl von Wiederholungen (Standard: 3) - `initial_delay` (float, optional): Anfangsverzögerung in Sekunden (Standard: 1.0) - `max_delay` (float, optional): Maximale Verzögerung in Sekunden (Standard: 30.0) - `backoff_multiplier` (float, optional): Backoff-Multiplikator (Standard: 2.0) **Rückgabe:** `WorkflowExecutionResult | AsyncExecutionResult` Die Wiederholungslogik verwendet exponentielles Backoff (1s → 2s → 4s → 8s...) mit ±25% Jitter, um Thundering Herd zu verhindern. Wenn die API einen `retry-after`-Header bereitstellt, wird dieser stattdessen verwendet. ##### get_rate_limit_info() Ruft die aktuellen Rate-Limit-Informationen aus der letzten API-Antwort ab. ```python rate_limit_info = client.get_rate_limit_info() if rate_limit_info: print("Limit:", rate_limit_info.limit) print("Remaining:", rate_limit_info.remaining) print("Reset:", datetime.fromtimestamp(rate_limit_info.reset)) ``` **Rückgabewert:** `RateLimitInfo | None` ##### get_usage_limits() Ruft aktuelle Nutzungslimits und Kontingentinformationen für Ihr Konto ab. ```python limits = client.get_usage_limits() print("Sync requests remaining:", limits.rate_limit["sync"]["remaining"]) print("Async requests remaining:", limits.rate_limit["async"]["remaining"]) print("Current period cost:", limits.usage["currentPeriodCost"]) print("Plan:", limits.usage["plan"]) ``` **Rückgabewert:** `UsageLimits` **Antwortstruktur:** ```python { "success": bool, "rateLimit": { "sync": { "isLimited": bool, "limit": int, "remaining": int, "resetAt": str }, "async": { "isLimited": bool, "limit": int, "remaining": int, "resetAt": str }, "authType": str # 'api' or 'manual' }, "usage": { "currentPeriodCost": float, "limit": float, "plan": str # e.g., 'free', 'pro' } } ``` ##### set_api_key() Aktualisiert den API-Schlüssel. ```python client.set_api_key("new-api-key") ``` ##### set_base_url() Aktualisiert die Basis-URL. ```python client.set_base_url("https://my-custom-domain.com") ``` ##### close() Schließt die zugrunde liegende HTTP-Sitzung. ```python client.close() ``` ## Datenklassen ### WorkflowExecutionResult ```python @dataclass class WorkflowExecutionResult: success: bool output: Optional[Any] = None error: Optional[str] = None logs: Optional[List[Any]] = None metadata: Optional[Dict[str, Any]] = None trace_spans: Optional[List[Any]] = None total_duration: Optional[float] = None ``` ### AsyncExecutionResult ```python @dataclass class AsyncExecutionResult: success: bool task_id: str status: str # 'queued' created_at: str links: Dict[str, str] # e.g., {"status": "/api/jobs/{taskId}"} ``` ### WorkflowStatus ```python @dataclass class WorkflowStatus: is_deployed: bool deployed_at: Optional[str] = None needs_redeployment: bool = False ``` ### RateLimitInfo ```python @dataclass class RateLimitInfo: limit: int remaining: int reset: int retry_after: Optional[int] = None ``` ### UsageLimits ```python @dataclass class UsageLimits: success: bool rate_limit: Dict[str, Any] usage: Dict[str, Any] ``` ### SimStudioError ```python class SimStudioError(Exception): def __init__(self, message: str, code: Optional[str] = None, status: Optional[int] = None): super().__init__(message) self.code = code self.status = status ``` **Häufige Fehlercodes:** - `UNAUTHORIZED`: Ungültiger API-Schlüssel - `TIMEOUT`: Zeitüberschreitung der Anfrage - `RATE_LIMIT_EXCEEDED`: Ratenlimit überschritten - `USAGE_LIMIT_EXCEEDED`: Nutzungslimit überschritten - `EXECUTION_ERROR`: Workflow-Ausführung fehlgeschlagen ## Beispiele ### Grundlegende Workflow-Ausführung Richten Sie den SimStudioClient mit Ihrem API-Schlüssel ein. Prüfen Sie, ob der Workflow bereitgestellt und zur Ausführung bereit ist. Führen Sie den Workflow mit Ihren Eingabedaten aus. Verarbeiten Sie das Ausführungsergebnis und behandeln Sie eventuelle Fehler. ```python import os from simstudio import SimStudioClient client = SimStudioClient(api_key=os.getenv("SIM_API_KEY")) def run_workflow(): try: # Check if workflow is ready is_ready = client.validate_workflow("my-workflow-id") if not is_ready: raise Exception("Workflow is not deployed or ready") # Execute the workflow result = client.execute_workflow( "my-workflow-id", input_data={ "message": "Process this data", "user_id": "12345" } ) if result.success: print("Output:", result.output) print("Duration:", result.metadata.get("duration") if result.metadata else None) else: print("Workflow failed:", result.error) except Exception as error: print("Error:", error) run_workflow() ``` ### Fehlerbehandlung Behandeln Sie verschiedene Fehlertypen, die während der Workflow-Ausführung auftreten können: ```python from simstudio import SimStudioClient, SimStudioError import os client = SimStudioClient(api_key=os.getenv("SIM_API_KEY")) def execute_with_error_handling(): try: result = client.execute_workflow("workflow-id") return result except SimStudioError as error: if error.code == "UNAUTHORIZED": print("Invalid API key") elif error.code == "TIMEOUT": print("Workflow execution timed out") elif error.code == "USAGE_LIMIT_EXCEEDED": print("Usage limit exceeded") elif error.code == "INVALID_JSON": print("Invalid JSON in request body") else: print(f"Workflow error: {error}") raise except Exception as error: print(f"Unexpected error: {error}") raise ``` ### Verwendung des Context-Managers Verwenden Sie den Client als Context-Manager, um die Ressourcenbereinigung automatisch zu handhaben: ```python from simstudio import SimStudioClient import os # Using context manager to automatically close the session with SimStudioClient(api_key=os.getenv("SIM_API_KEY")) as client: result = client.execute_workflow("workflow-id") print("Result:", result) # Session is automatically closed here ``` ### Batch-Workflow-Ausführung Führen Sie mehrere Workflows effizient aus: ```python from simstudio import SimStudioClient import os client = SimStudioClient(api_key=os.getenv("SIM_API_KEY")) def execute_workflows_batch(workflow_data_pairs): """Execute multiple workflows with different input data.""" results = [] for workflow_id, input_data in workflow_data_pairs: try: # Validate workflow before execution if not client.validate_workflow(workflow_id): print(f"Skipping {workflow_id}: not deployed") continue result = client.execute_workflow(workflow_id, input_data) results.append({ "workflow_id": workflow_id, "success": result.success, "output": result.output, "error": result.error }) except Exception as error: results.append({ "workflow_id": workflow_id, "success": False, "error": str(error) }) return results # Example usage workflows = [ ("workflow-1", {"type": "analysis", "data": "sample1"}), ("workflow-2", {"type": "processing", "data": "sample2"}), ] results = execute_workflows_batch(workflows) for result in results: print(f"Workflow {result['workflow_id']}: {'Success' if result['success'] else 'Failed'}") ``` ### Asynchrone Workflow-Ausführung Führen Sie Workflows asynchron für langwierige Aufgaben aus: ```python import os import time from simstudio import SimStudioClient client = SimStudioClient(api_key=os.getenv("SIM_API_KEY")) def execute_async(): try: # Start async execution result = client.execute_workflow( "workflow-id", input_data={"data": "large dataset"}, async_execution=True # Execute asynchronously ) # Check if result is an async execution if hasattr(result, 'task_id'): print(f"Task ID: {result.task_id}") print(f"Status endpoint: {result.links['status']}") # Poll for completion status = client.get_job_status(result.task_id) while status["status"] in ["queued", "processing"]: print(f"Current status: {status['status']}") time.sleep(2) # Wait 2 seconds status = client.get_job_status(result.task_id) if status["status"] == "completed": print("Workflow completed!") print(f"Output: {status['output']}") print(f"Duration: {status['metadata']['duration']}") else: print(f"Workflow failed: {status['error']}") except Exception as error: print(f"Error: {error}") execute_async() ``` ### Ratenlimitierung und Wiederholung Behandeln Sie Ratenbegrenzungen automatisch mit exponentiellem Backoff: ```python import os from simstudio import SimStudioClient, SimStudioError client = SimStudioClient(api_key=os.getenv("SIM_API_KEY")) def execute_with_retry_handling(): try: # Automatically retries on rate limit result = client.execute_with_retry( "workflow-id", input_data={"message": "Process this"}, max_retries=5, initial_delay=1.0, max_delay=60.0, backoff_multiplier=2.0 ) print(f"Success: {result}") except SimStudioError as error: if error.code == "RATE_LIMIT_EXCEEDED": print("Rate limit exceeded after all retries") # Check rate limit info rate_limit_info = client.get_rate_limit_info() if rate_limit_info: from datetime import datetime reset_time = datetime.fromtimestamp(rate_limit_info.reset) print(f"Rate limit resets at: {reset_time}") execute_with_retry_handling() ``` ### Nutzungsüberwachung Überwachen Sie die Nutzung und Limits Ihres Kontos: ```python import os from simstudio import SimStudioClient client = SimStudioClient(api_key=os.getenv("SIM_API_KEY")) def check_usage(): try: limits = client.get_usage_limits() print("=== Rate Limits ===") print("Sync requests:") print(f" Limit: {limits.rate_limit['sync']['limit']}") print(f" Remaining: {limits.rate_limit['sync']['remaining']}") print(f" Resets at: {limits.rate_limit['sync']['resetAt']}") print(f" Is limited: {limits.rate_limit['sync']['isLimited']}") print("\nAsync requests:") print(f" Limit: {limits.rate_limit['async']['limit']}") print(f" Remaining: {limits.rate_limit['async']['remaining']}") print(f" Resets at: {limits.rate_limit['async']['resetAt']}") print(f" Is limited: {limits.rate_limit['async']['isLimited']}") print("\n=== Usage ===") print(f"Current period cost: ${limits.usage['currentPeriodCost']:.2f}") print(f"Limit: ${limits.usage['limit']:.2f}") print(f"Plan: {limits.usage['plan']}") percent_used = (limits.usage['currentPeriodCost'] / limits.usage['limit']) * 100 print(f"Usage: {percent_used:.1f}%") if percent_used > 80: print("⚠️ Warning: You are approaching your usage limit!") except Exception as error: print(f"Error checking usage: {error}") check_usage() ``` ### Streaming-Workflow-Ausführung Führen Sie Workflows mit Echtzeit-Streaming-Antworten aus: ```python from simstudio import SimStudioClient import os client = SimStudioClient(api_key=os.getenv("SIM_API_KEY")) def execute_with_streaming(): """Execute workflow with streaming enabled.""" try: # Enable streaming for specific block outputs result = client.execute_workflow( "workflow-id", input_data={"message": "Count to five"}, stream=True, selected_outputs=["agent1.content"] # Use blockName.attribute format ) print("Workflow result:", result) except Exception as error: print("Error:", error) execute_with_streaming() ``` Die Streaming-Antwort folgt dem Server-Sent-Events- (SSE-) Format: ``` data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":"One"} data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":", two"} data: {"event":"done","success":true,"output":{},"metadata":{"duration":610}} data: [DONE] ``` **Flask-Streaming-Beispiel:** ```python from flask import Flask, Response, stream_with_context import requests import json import os app = Flask(__name__) @app.route('/stream-workflow') def stream_workflow(): """Stream workflow execution to the client.""" def generate(): response = requests.post( 'https://sim.ai/api/workflows/WORKFLOW_ID/execute', headers={ 'Content-Type': 'application/json', 'X-API-Key': os.getenv('SIM_API_KEY') }, json={ 'message': 'Generate a story', 'stream': True, 'selectedOutputs': ['agent1.content'] }, stream=True ) for line in response.iter_lines(): if line: decoded_line = line.decode('utf-8') if decoded_line.startswith('data: '): data = decoded_line[6:] # Remove 'data: ' prefix if data == '[DONE]': break try: parsed = json.loads(data) if 'chunk' in parsed: yield f"data: {json.dumps(parsed)}\n\n" elif parsed.get('event') == 'done': yield f"data: {json.dumps(parsed)}\n\n" print("Execution complete:", parsed.get('metadata')) except json.JSONDecodeError: pass return Response( stream_with_context(generate()), mimetype='text/event-stream' ) if __name__ == '__main__': app.run(debug=True) ``` ### Umgebungs­konfiguration Konfigurieren Sie den Client mit Umgebungsvariablen: ```python import os from simstudio import SimStudioClient # Development configuration client = SimStudioClient( api_key=os.getenv("SIM_API_KEY") base_url=os.getenv("SIM_BASE_URL", "https://sim.ai") ) ``` ```python import os from simstudio import SimStudioClient # Production configuration with error handling api_key = os.getenv("SIM_API_KEY") if not api_key: raise ValueError("SIM_API_KEY environment variable is required") client = SimStudioClient( api_key=api_key, base_url=os.getenv("SIM_BASE_URL", "https://sim.ai") ) ``` ## Ihren API-Schlüssel erhalten Navigieren Sie zu [Sim](https://sim.ai) und melden Sie sich in Ihrem Konto an. Navigieren Sie zu dem Workflow, den Sie programmatisch ausführen möchten. Klicken Sie auf "Bereitstellen", um Ihren Workflow bereitzustellen, falls dies noch nicht geschehen ist. Wählen oder erstellen Sie während des Bereitstellungsprozesses einen API-Schlüssel. Kopieren Sie den API-Schlüssel, um ihn in Ihrer Python-Anwendung zu verwenden. ## Voraussetzungen - Python 3.8+ - requests >= 2.25.0 ## Lizenz Apache-2.0