Files
sim/packages/python-sdk/simstudio/__init__.py
Waleed Latif ae43381d84 feat(domain): drop the 'studio' (#818)
* feat(domain): drop the

* change all references for Sim Studio to Sim

* change back license and notice

* lint

---------

Co-authored-by: waleedlatif <waleedlatif@waleedlatifs-MacBook-Pro.local>
2025-07-29 12:51:43 -07:00

239 lines
7.8 KiB
Python

"""
Sim SDK for Python
Official Python SDK for Sim, allowing you to execute workflows programmatically.
"""
from typing import Any, Dict, Optional
from dataclasses import dataclass
import requests
__version__ = "0.1.0"
__all__ = ["SimStudioClient", "SimStudioError", "WorkflowExecutionResult", "WorkflowStatus"]
@dataclass
class WorkflowExecutionResult:
"""Result of a workflow execution."""
success: bool
output: Optional[Any] = None
error: Optional[str] = None
logs: Optional[list] = None
metadata: Optional[Dict[str, Any]] = None
trace_spans: Optional[list] = None
total_duration: Optional[float] = None
@dataclass
class WorkflowStatus:
"""Status of a workflow."""
is_deployed: bool
deployed_at: Optional[str] = None
is_published: bool = False
needs_redeployment: bool = False
class SimStudioError(Exception):
"""Exception raised for Sim API errors."""
def __init__(self, message: str, code: Optional[str] = None, status: Optional[int] = None):
super().__init__(message)
self.code = code
self.status = status
class SimStudioClient:
"""
Sim API client for executing workflows programmatically.
Args:
api_key: Your Sim API key
base_url: Base URL for the Sim API (defaults to https://sim.ai)
"""
def __init__(self, api_key: str, base_url: str = "https://sim.ai"):
self.api_key = api_key
self.base_url = base_url.rstrip('/')
self._session = requests.Session()
self._session.headers.update({
'X-API-Key': self.api_key,
'Content-Type': 'application/json',
})
def execute_workflow(
self,
workflow_id: str,
input_data: Optional[Dict[str, Any]] = None,
timeout: float = 30.0
) -> WorkflowExecutionResult:
"""
Execute a workflow with optional input data.
Args:
workflow_id: The ID of the workflow to execute
input_data: Input data to pass to the workflow
timeout: Timeout in seconds (default: 30.0)
Returns:
WorkflowExecutionResult object containing the execution result
Raises:
SimStudioError: If the workflow execution fails
"""
url = f"{self.base_url}/api/workflows/{workflow_id}/execute"
try:
response = self._session.post(
url,
json=input_data or {},
timeout=timeout
)
if not response.ok:
try:
error_data = response.json()
error_message = error_data.get('error', f'HTTP {response.status_code}: {response.reason}')
error_code = error_data.get('code')
except (ValueError, KeyError):
error_message = f'HTTP {response.status_code}: {response.reason}'
error_code = None
raise SimStudioError(error_message, error_code, response.status_code)
result_data = response.json()
return WorkflowExecutionResult(
success=result_data['success'],
output=result_data.get('output'),
error=result_data.get('error'),
logs=result_data.get('logs'),
metadata=result_data.get('metadata'),
trace_spans=result_data.get('traceSpans'),
total_duration=result_data.get('totalDuration')
)
except requests.Timeout:
raise SimStudioError(f'Workflow execution timed out after {timeout} seconds', 'TIMEOUT')
except requests.RequestException as e:
raise SimStudioError(f'Failed to execute workflow: {str(e)}', 'EXECUTION_ERROR')
def get_workflow_status(self, workflow_id: str) -> WorkflowStatus:
"""
Get the status of a workflow (deployment status, etc.).
Args:
workflow_id: The ID of the workflow
Returns:
WorkflowStatus object containing the workflow status
Raises:
SimStudioError: If getting the status fails
"""
url = f"{self.base_url}/api/workflows/{workflow_id}/status"
try:
response = self._session.get(url)
if not response.ok:
try:
error_data = response.json()
error_message = error_data.get('error', f'HTTP {response.status_code}: {response.reason}')
error_code = error_data.get('code')
except (ValueError, KeyError):
error_message = f'HTTP {response.status_code}: {response.reason}'
error_code = None
raise SimStudioError(error_message, error_code, response.status_code)
status_data = response.json()
return WorkflowStatus(
is_deployed=status_data.get('isDeployed', False),
deployed_at=status_data.get('deployedAt'),
is_published=status_data.get('isPublished', False),
needs_redeployment=status_data.get('needsRedeployment', False)
)
except requests.RequestException as e:
raise SimStudioError(f'Failed to get workflow status: {str(e)}', 'STATUS_ERROR')
def validate_workflow(self, workflow_id: str) -> bool:
"""
Validate that a workflow is ready for execution.
Args:
workflow_id: The ID of the workflow
Returns:
True if the workflow is deployed and ready, False otherwise
"""
try:
status = self.get_workflow_status(workflow_id)
return status.is_deployed
except SimStudioError:
return False
def execute_workflow_sync(
self,
workflow_id: str,
input_data: Optional[Dict[str, Any]] = None,
timeout: float = 30.0
) -> WorkflowExecutionResult:
"""
Execute a workflow and poll for completion (useful for long-running workflows).
Note: Currently, the API is synchronous, so this method just calls execute_workflow.
In the future, if async execution is added, this method can be enhanced.
Args:
workflow_id: The ID of the workflow to execute
input_data: Input data to pass to the workflow
timeout: Timeout for the initial request in seconds
Returns:
WorkflowExecutionResult object containing the execution result
Raises:
SimStudioError: If the workflow execution fails
"""
# For now, the API is synchronous, so we just execute directly
# In the future, if async execution is added, this method can be enhanced
return self.execute_workflow(workflow_id, input_data, timeout)
def set_api_key(self, api_key: str) -> None:
"""
Update the API key.
Args:
api_key: New API key
"""
self.api_key = api_key
self._session.headers.update({'X-API-Key': api_key})
def set_base_url(self, base_url: str) -> None:
"""
Update the base URL.
Args:
base_url: New base URL
"""
self.base_url = base_url.rstrip('/')
def close(self) -> None:
"""Close the underlying HTTP session."""
self._session.close()
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.close()
# For backward compatibility
Client = SimStudioClient