feat(backend): thread tenancy through webhooks, scheduler, SSE, and seeder

Fills the three must-do-before-merge gaps:

PR10 — Webhook/background execution tenancy:
- Add organization_id + org_workspace_id to ExecutionContext
- Add org/workspace fields to GraphExecutionJobArgs (scheduler)
- Pass tenant context through _execute_graph scheduled job
- Pass tenant context through add_graph_execution_schedule
- Update v1.py schedule creation route with RequestContext
- Webhook node/preset triggers resolve user's org/workspace via
  get_user_default_org_workspace() before calling add_graph_execution

PR11 — SSE/realtime tenancy:
- Add organization_id + org_workspace_id to CoPilotExecutionEntry
- Update enqueue_copilot_turn() to accept and pass org/workspace
- Chat stream_chat_post() resolves RequestContext and passes to enqueue
- Tenant context flows through RabbitMQ to executor service

Seeder:
- Add _get_user_org_ws() cache helper to TestDataCreator
- create_test_graphs() now passes org/workspace to create_graph()
- New get_user_default_org_workspace() helper in orgs/db.py

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Nicholas Tindle
2026-04-03 16:32:00 +02:00
parent 019ad3cade
commit 2c573f1add
8 changed files with 84 additions and 3 deletions

View File

@@ -668,6 +668,7 @@ async def stream_chat_post(
session_id: str,
request: StreamChatRequest,
user_id: str = Security(auth.get_user_id),
ctx: auth.RequestContext = Security(auth.get_request_context),
):
"""
Stream chat responses for a session (POST with context support).
@@ -811,6 +812,8 @@ async def stream_chat_post(
is_user_message=request.is_user_message,
context=request.context,
file_ids=sanitized_file_ids,
organization_id=ctx.org_id,
org_workspace_id=ctx.workspace_id,
)
setup_time = (time.perf_counter() - stream_start_time) * 1000

View File

@@ -469,11 +469,16 @@ async def _execute_webhook_node_trigger(
return
logger.debug(f"Executing graph #{node.graph_id} node #{node.id}")
try:
from backend.api.features.orgs.db import get_user_default_org_workspace
org_id, ws_id = await get_user_default_org_workspace(webhook.user_id)
await add_graph_execution(
user_id=webhook.user_id,
graph_id=node.graph_id,
graph_version=node.graph_version,
nodes_input_masks={node.id: {"payload": payload}},
organization_id=org_id,
org_workspace_id=ws_id,
)
except GraphNotInLibraryError as e:
logger.warning(
@@ -530,6 +535,9 @@ async def _execute_webhook_preset_trigger(
logger.debug(f"Executing preset #{preset.id} for webhook #{webhook.id}")
try:
from backend.api.features.orgs.db import get_user_default_org_workspace
org_id, ws_id = await get_user_default_org_workspace(webhook.user_id)
await add_graph_execution(
user_id=webhook.user_id,
graph_id=preset.graph_id,
@@ -537,6 +545,8 @@ async def _execute_webhook_preset_trigger(
graph_version=preset.graph_version,
graph_credentials_inputs=preset.credentials,
nodes_input_masks={trigger_node.id: {**preset.inputs, "payload": payload}},
organization_id=org_id,
org_workspace_id=ws_id,
)
except GraphNotInLibraryError as e:
logger.warning(

View File

@@ -8,6 +8,32 @@ from backend.util.exceptions import NotFoundError
logger = logging.getLogger(__name__)
async def get_user_default_org_workspace(
user_id: str,
) -> tuple[str | None, str | None]:
"""Get the user's personal org ID and its default workspace ID.
Returns (organization_id, workspace_id). Either may be None if
the user has no org (e.g., migration hasn't run yet).
"""
member = await prisma.orgmember.find_first(
where={
"userId": user_id,
"isOwner": True,
"Org": {"isPersonal": True},
},
)
if member is None:
return None, None
org_id = member.orgId
workspace = await prisma.orgworkspace.find_first(
where={"orgId": org_id, "isDefault": True}
)
ws_id = workspace.id if workspace else None
return org_id, ws_id
async def create_org(
name: str,
slug: str,

View File

@@ -1362,6 +1362,7 @@ class ScheduleCreationRequest(pydantic.BaseModel):
)
async def create_graph_execution_schedule(
user_id: Annotated[str, Security(get_user_id)],
ctx: Annotated[RequestContext, Security(get_request_context)],
graph_id: str = Path(..., description="ID of the graph to schedule"),
schedule_params: ScheduleCreationRequest = Body(),
) -> scheduler.GraphExecutionJobInfo:
@@ -1392,6 +1393,8 @@ async def create_graph_execution_schedule(
input_data=schedule_params.inputs,
input_credentials=schedule_params.credentials,
user_timezone=user_timezone,
organization_id=ctx.org_id,
org_workspace_id=ctx.workspace_id,
)
# Convert the next_run_time back to user timezone for display

View File

@@ -156,6 +156,12 @@ class CoPilotExecutionEntry(BaseModel):
file_ids: list[str] | None = None
"""Workspace file IDs attached to the user's message"""
organization_id: str | None = None
"""Active organization for tenant-scoped execution"""
org_workspace_id: str | None = None
"""Active workspace for tenant-scoped execution"""
class CancelCoPilotEvent(BaseModel):
"""Event to cancel a CoPilot operation."""
@@ -175,6 +181,8 @@ async def enqueue_copilot_turn(
is_user_message: bool = True,
context: dict[str, str] | None = None,
file_ids: list[str] | None = None,
organization_id: str | None = None,
org_workspace_id: str | None = None,
) -> None:
"""Enqueue a CoPilot task for processing by the executor service.
@@ -197,6 +205,8 @@ async def enqueue_copilot_turn(
is_user_message=is_user_message,
context=context,
file_ids=file_ids,
organization_id=organization_id,
org_workspace_id=org_workspace_id,
)
queue_client = await get_async_copilot_queue()

View File

@@ -98,10 +98,14 @@ class ExecutionContext(BaseModel):
root_execution_id: Optional[str] = None
parent_execution_id: Optional[str] = None
# Workspace
# Workspace (file storage)
workspace_id: Optional[str] = None
session_id: Optional[str] = None
# Org/workspace tenancy context
organization_id: Optional[str] = None
org_workspace_id: Optional[str] = None
# -------------------------- Models -------------------------- #

View File

@@ -159,6 +159,8 @@ async def _execute_graph(**kwargs):
graph_version=args.graph_version,
inputs=args.input_data,
graph_credentials_inputs=args.input_credentials,
organization_id=args.organization_id,
org_workspace_id=args.org_workspace_id,
)
await db.increment_onboarding_runs(args.user_id)
elapsed = asyncio.get_event_loop().time() - start_time
@@ -390,6 +392,8 @@ class GraphExecutionJobArgs(BaseModel):
cron: str
input_data: GraphInput
input_credentials: dict[str, CredentialsMetaInput] = Field(default_factory=dict)
organization_id: str | None = None
org_workspace_id: str | None = None
class GraphExecutionJobInfo(GraphExecutionJobArgs):
@@ -667,6 +671,8 @@ class Scheduler(AppService):
input_credentials: dict[str, CredentialsMetaInput],
name: Optional[str] = None,
user_timezone: str | None = None,
organization_id: Optional[str] = None,
org_workspace_id: Optional[str] = None,
) -> GraphExecutionJobInfo:
# Validate the graph before scheduling to prevent runtime failures
# We don't need the return value, just want the validation to run
@@ -703,6 +709,8 @@ class Scheduler(AppService):
cron=cron,
input_data=input_data,
input_credentials=input_credentials,
organization_id=organization_id,
org_workspace_id=org_workspace_id,
)
job = self.scheduler.add_job(
execute_graph,

View File

@@ -112,6 +112,17 @@ class TestDataCreator:
self.api_keys: List[Dict[str, Any]] = []
self.presets: List[Dict[str, Any]] = []
self.profiles: List[Dict[str, Any]] = []
# Org/workspace context per user (populated after migration runs)
self._user_org_cache: Dict[str, tuple[str | None, str | None]] = {}
async def _get_user_org_ws(self, user_id: str) -> tuple[str | None, str | None]:
"""Get (organization_id, org_workspace_id) for a user, with caching."""
if user_id not in self._user_org_cache:
from backend.api.features.orgs.db import get_user_default_org_workspace
org_id, ws_id = await get_user_default_org_workspace(user_id)
self._user_org_cache[user_id] = (org_id, ws_id)
return self._user_org_cache[user_id]
async def create_test_users(self) -> List[Dict[str, Any]]:
"""Create test users using Supabase client."""
@@ -366,8 +377,14 @@ class TestDataCreator:
)
try:
# Use the API function to create graph
created_graph = await create_graph(graph, user["id"])
# Use the API function to create graph with org context
org_id, ws_id = await self._get_user_org_ws(user["id"])
created_graph = await create_graph(
graph,
user["id"],
organization_id=org_id,
org_workspace_id=ws_id,
)
graph_dict = created_graph.model_dump()
# Ensure userId is included for store submissions
graph_dict["userId"] = user["id"]