Compare commits

..

5 Commits

Author SHA1 Message Date
Otto
d8909af967 fix: enable scope checking for block credentials (consistency with graphs)
Previously run_block didn't check OAuth2 scopes while run_agent did.
Now both use the same scope-checking logic for credential matching.
2026-02-03 13:19:15 +00:00
Otto
ff8ca11845 fix: preserve original credential matching behavior
- Add check_scopes parameter to find_matching_credential and
  match_credentials_to_requirements (default True)
- run_block uses check_scopes=False to preserve original behavior
  (original run_block did not verify OAuth2 scopes)
- Add isinstance check to get_inputs_from_schema for safety
  (original returned [] if input_schema wasn't a dict)
2026-02-03 13:15:59 +00:00
Zamil Majdy
4d6471a7eb Merge branch 'dev' into otto/copilot-cleanup-dev-v2 2026-02-03 20:08:03 +07:00
Otto
dcdd886067 chore: remove docstrings and use sorted() for deterministic UUID ordering 2026-02-03 12:29:31 +00:00
Otto
6098c5eed6 refactor(copilot): code cleanup - extract shared helpers and reduce duplication
- Create util/validation.py with UUID validation helpers
- Create tools/helpers.py with shared utilities (get_inputs_from_schema, etc.)
- Add shared credential matching utilities to utils.py
- Refactor run_block to use shared matching with discriminator support
- Extract _create_stream_generator in routes.py
- Update run_agent.py to use shared helpers

Preserves discriminator logic for multi-provider credential matching.
2026-02-03 12:15:24 +00:00
7 changed files with 303 additions and 174 deletions

View File

@@ -17,6 +17,13 @@ from .model import ChatSession, create_chat_session, get_chat_session, get_user_
config = ChatConfig()
SSE_RESPONSE_HEADERS = {
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
"x-vercel-ai-ui-message-stream": "v1",
}
logger = logging.getLogger(__name__)
@@ -32,6 +39,48 @@ async def _validate_and_get_session(
return session
async def _create_stream_generator(
session_id: str,
message: str,
user_id: str | None,
session: ChatSession,
is_user_message: bool = True,
context: dict[str, str] | None = None,
) -> AsyncGenerator[str, None]:
"""Create SSE event generator for chat streaming."""
chunk_count = 0
first_chunk_type: str | None = None
async for chunk in chat_service.stream_chat_completion(
session_id,
message,
is_user_message=is_user_message,
user_id=user_id,
session=session,
context=context,
):
if chunk_count < 3:
logger.info(
"Chat stream chunk",
extra={
"session_id": session_id,
"chunk_type": str(chunk.type),
},
)
if not first_chunk_type:
first_chunk_type = str(chunk.type)
chunk_count += 1
yield chunk.to_sse()
logger.info(
"Chat stream completed",
extra={
"session_id": session_id,
"chunk_count": chunk_count,
"first_chunk_type": first_chunk_type,
},
)
yield "data: [DONE]\n\n"
router = APIRouter(
tags=["chat"],
)
@@ -221,49 +270,17 @@ async def stream_chat_post(
"""
session = await _validate_and_get_session(session_id, user_id)
async def event_generator() -> AsyncGenerator[str, None]:
chunk_count = 0
first_chunk_type: str | None = None
async for chunk in chat_service.stream_chat_completion(
session_id,
request.message,
is_user_message=request.is_user_message,
user_id=user_id,
session=session, # Pass pre-fetched session to avoid double-fetch
context=request.context,
):
if chunk_count < 3:
logger.info(
"Chat stream chunk",
extra={
"session_id": session_id,
"chunk_type": str(chunk.type),
},
)
if not first_chunk_type:
first_chunk_type = str(chunk.type)
chunk_count += 1
yield chunk.to_sse()
logger.info(
"Chat stream completed",
extra={
"session_id": session_id,
"chunk_count": chunk_count,
"first_chunk_type": first_chunk_type,
},
)
# AI SDK protocol termination
yield "data: [DONE]\n\n"
return StreamingResponse(
event_generator(),
_create_stream_generator(
session_id=session_id,
message=request.message,
user_id=user_id,
session=session,
is_user_message=request.is_user_message,
context=request.context,
),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
"x-vercel-ai-ui-message-stream": "v1", # AI SDK protocol header
},
headers=SSE_RESPONSE_HEADERS,
)
@@ -295,48 +312,16 @@ async def stream_chat_get(
"""
session = await _validate_and_get_session(session_id, user_id)
async def event_generator() -> AsyncGenerator[str, None]:
chunk_count = 0
first_chunk_type: str | None = None
async for chunk in chat_service.stream_chat_completion(
session_id,
message,
is_user_message=is_user_message,
user_id=user_id,
session=session, # Pass pre-fetched session to avoid double-fetch
):
if chunk_count < 3:
logger.info(
"Chat stream chunk",
extra={
"session_id": session_id,
"chunk_type": str(chunk.type),
},
)
if not first_chunk_type:
first_chunk_type = str(chunk.type)
chunk_count += 1
yield chunk.to_sse()
logger.info(
"Chat stream completed",
extra={
"session_id": session_id,
"chunk_count": chunk_count,
"first_chunk_type": first_chunk_type,
},
)
# AI SDK protocol termination
yield "data: [DONE]\n\n"
return StreamingResponse(
event_generator(),
_create_stream_generator(
session_id=session_id,
message=message,
user_id=user_id,
session=session,
is_user_message=is_user_message,
),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
"x-vercel-ai-ui-message-stream": "v1", # AI SDK protocol header
},
headers=SSE_RESPONSE_HEADERS,
)

View File

@@ -0,0 +1,81 @@
"""Shared helpers for chat tools."""
from typing import Any
from .models import ErrorResponse
def error_response(
message: str, session_id: str | None, **kwargs: Any
) -> ErrorResponse:
"""Create standardized error response.
Args:
message: Error message to display
session_id: Current session ID
**kwargs: Additional fields to pass to ErrorResponse
Returns:
ErrorResponse with the given message and session_id
"""
return ErrorResponse(message=message, session_id=session_id, **kwargs)
def get_inputs_from_schema(
input_schema: dict[str, Any],
exclude_fields: set[str] | None = None,
) -> list[dict[str, Any]]:
"""Extract input field info from JSON schema.
Args:
input_schema: JSON schema dict with 'properties' and 'required'
exclude_fields: Set of field names to exclude (e.g., credential fields)
Returns:
List of dicts with field info (name, title, type, description, required, default)
"""
# Safety check: original code returned [] if input_schema wasn't a dict
if not isinstance(input_schema, dict):
return []
exclude = exclude_fields or set()
properties = input_schema.get("properties", {})
required = set(input_schema.get("required", []))
return [
{
"name": name,
"title": schema.get("title", name),
"type": schema.get("type", "string"),
"description": schema.get("description", ""),
"required": name in required,
"default": schema.get("default"),
}
for name, schema in properties.items()
if name not in exclude
]
def format_inputs_as_markdown(inputs: list[dict[str, Any]]) -> str:
"""Format input fields as a readable markdown list.
Args:
inputs: List of input dicts from get_inputs_from_schema
Returns:
Markdown-formatted string listing the inputs
"""
if not inputs:
return "No inputs required."
lines = []
for inp in inputs:
required_marker = " (required)" if inp.get("required") else ""
default = inp.get("default")
default_info = f" [default: {default}]" if default is not None else ""
description = inp.get("description", "")
desc_info = f" - {description}" if description else ""
lines.append(f"- **{inp['name']}**{required_marker}{default_info}{desc_info}")
return "\n".join(lines)

View File

@@ -24,6 +24,7 @@ from backend.util.timezone_utils import (
)
from .base import BaseTool
from .helpers import get_inputs_from_schema
from .models import (
AgentDetails,
AgentDetailsResponse,
@@ -371,19 +372,7 @@ class RunAgentTool(BaseTool):
def _get_inputs_list(self, input_schema: dict[str, Any]) -> list[dict[str, Any]]:
"""Extract inputs list from schema."""
inputs_list = []
if isinstance(input_schema, dict) and "properties" in input_schema:
for field_name, field_schema in input_schema["properties"].items():
inputs_list.append(
{
"name": field_name,
"title": field_schema.get("title", field_name),
"type": field_schema.get("type", "string"),
"description": field_schema.get("description", ""),
"required": field_name in input_schema.get("required", []),
}
)
return inputs_list
return get_inputs_from_schema(input_schema)
def _get_execution_modes(self, graph: GraphModel) -> list[str]:
"""Get available execution modes for the graph."""

View File

@@ -10,12 +10,13 @@ from pydantic_core import PydanticUndefined
from backend.api.features.chat.model import ChatSession
from backend.data.block import get_block
from backend.data.execution import ExecutionContext
from backend.data.model import CredentialsMetaInput
from backend.data.model import CredentialsFieldInfo, CredentialsMetaInput
from backend.data.workspace import get_or_create_workspace
from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.util.exceptions import BlockError
from .base import BaseTool
from .helpers import get_inputs_from_schema
from .models import (
BlockOutputResponse,
ErrorResponse,
@@ -24,7 +25,10 @@ from .models import (
ToolResponseBase,
UserReadiness,
)
from .utils import build_missing_credentials_from_field_info
from .utils import (
build_missing_credentials_from_field_info,
match_credentials_to_requirements,
)
logger = logging.getLogger(__name__)
@@ -73,41 +77,22 @@ class RunBlockTool(BaseTool):
def requires_auth(self) -> bool:
return True
async def _check_block_credentials(
def _resolve_discriminated_credentials(
self,
user_id: str,
block: Any,
input_data: dict[str, Any] | None = None,
) -> tuple[dict[str, CredentialsMetaInput], list[CredentialsMetaInput]]:
"""
Check if user has required credentials for a block.
Args:
user_id: User ID
block: Block to check credentials for
input_data: Input data for the block (used to determine provider via discriminator)
Returns:
tuple[matched_credentials, missing_credentials]
"""
matched_credentials: dict[str, CredentialsMetaInput] = {}
missing_credentials: list[CredentialsMetaInput] = []
input_data = input_data or {}
# Get credential field info from block's input schema
input_data: dict[str, Any],
) -> dict[str, CredentialsFieldInfo]:
"""Resolve credential requirements, applying discriminator logic where needed."""
credentials_fields_info = block.input_schema.get_credentials_fields_info()
if not credentials_fields_info:
return matched_credentials, missing_credentials
return {}
# Get user's available credentials
creds_manager = IntegrationCredentialsManager()
available_creds = await creds_manager.store.get_all_creds(user_id)
resolved: dict[str, CredentialsFieldInfo] = {}
for field_name, field_info in credentials_fields_info.items():
effective_field_info = field_info
if field_info.discriminator and field_info.discriminator_mapping:
# Get discriminator from input, falling back to schema default
discriminator_value = input_data.get(field_info.discriminator)
if discriminator_value is None:
field = block.input_schema.model_fields.get(
@@ -126,37 +111,34 @@ class RunBlockTool(BaseTool):
f"{discriminator_value} -> {effective_field_info.provider}"
)
matching_cred = next(
(
cred
for cred in available_creds
if cred.provider in effective_field_info.provider
and cred.type in effective_field_info.supported_types
),
None,
)
resolved[field_name] = effective_field_info
if matching_cred:
matched_credentials[field_name] = CredentialsMetaInput(
id=matching_cred.id,
provider=matching_cred.provider, # type: ignore
type=matching_cred.type,
title=matching_cred.title,
)
else:
# Create a placeholder for the missing credential
provider = next(iter(effective_field_info.provider), "unknown")
cred_type = next(iter(effective_field_info.supported_types), "api_key")
missing_credentials.append(
CredentialsMetaInput(
id=field_name,
provider=provider, # type: ignore
type=cred_type, # type: ignore
title=field_name.replace("_", " ").title(),
)
)
return resolved
return matched_credentials, missing_credentials
async def _check_block_credentials(
self,
user_id: str,
block: Any,
input_data: dict[str, Any] | None = None,
) -> tuple[dict[str, CredentialsMetaInput], list[CredentialsMetaInput]]:
"""
Check if user has required credentials for a block.
Args:
user_id: User ID
block: Block to check credentials for
input_data: Input data for the block (used to determine provider via discriminator)
Returns:
tuple[matched_credentials, missing_credentials]
"""
input_data = input_data or {}
requirements = self._resolve_discriminated_credentials(block, input_data)
if not requirements:
return {}, []
return await match_credentials_to_requirements(user_id, requirements)
async def _execute(
self,
@@ -347,27 +329,6 @@ class RunBlockTool(BaseTool):
def _get_inputs_list(self, block: Any) -> list[dict[str, Any]]:
"""Extract non-credential inputs from block schema."""
inputs_list = []
schema = block.input_schema.jsonschema()
properties = schema.get("properties", {})
required_fields = set(schema.get("required", []))
# Get credential field names to exclude
credentials_fields = set(block.input_schema.get_credentials_fields().keys())
for field_name, field_schema in properties.items():
# Skip credential fields
if field_name in credentials_fields:
continue
inputs_list.append(
{
"name": field_name,
"title": field_schema.get("title", field_name),
"type": field_schema.get("type", "string"),
"description": field_schema.get("description", ""),
"required": field_name in required_fields,
}
)
return inputs_list
return get_inputs_from_schema(schema, exclude_fields=credentials_fields)

View File

@@ -225,6 +225,103 @@ async def get_or_create_library_agent(
return library_agents[0]
async def get_user_credentials(user_id: str) -> list:
"""Get all available credentials for a user."""
creds_manager = IntegrationCredentialsManager()
return await creds_manager.store.get_all_creds(user_id)
def find_matching_credential(
available_creds: list,
field_info: CredentialsFieldInfo,
check_scopes: bool = True,
):
"""Find a credential that matches the required provider, type, and optionally scopes."""
for cred in available_creds:
if cred.provider not in field_info.provider:
continue
if cred.type not in field_info.supported_types:
continue
if check_scopes and not _credential_has_required_scopes(cred, field_info):
continue
return cred
return None
def create_credential_meta_from_match(matching_cred) -> CredentialsMetaInput:
"""Create a CredentialsMetaInput from a matched credential."""
return CredentialsMetaInput(
id=matching_cred.id,
provider=matching_cred.provider, # type: ignore
type=matching_cred.type,
title=matching_cred.title,
)
async def match_credentials_to_requirements(
user_id: str,
requirements: dict[str, CredentialsFieldInfo],
check_scopes: bool = True,
) -> tuple[dict[str, CredentialsMetaInput], list[CredentialsMetaInput]]:
"""
Match user's credentials against a dictionary of credential requirements.
This is the core matching logic shared by both graph and block credential matching.
Args:
user_id: User ID to fetch credentials for
requirements: Dict mapping field names to CredentialsFieldInfo
check_scopes: Whether to verify OAuth2 scopes match requirements (default True).
Set to False to preserve original run_block behavior which didn't check scopes.
"""
matched: dict[str, CredentialsMetaInput] = {}
missing: list[CredentialsMetaInput] = []
if not requirements:
return matched, missing
available_creds = await get_user_credentials(user_id)
for field_name, field_info in requirements.items():
matching_cred = find_matching_credential(
available_creds, field_info, check_scopes=check_scopes
)
if matching_cred:
try:
matched[field_name] = create_credential_meta_from_match(matching_cred)
except Exception as e:
logger.error(
f"Failed to create CredentialsMetaInput for field '{field_name}': "
f"provider={matching_cred.provider}, type={matching_cred.type}, "
f"credential_id={matching_cred.id}",
exc_info=True,
)
provider = next(iter(field_info.provider), "unknown")
cred_type = next(iter(field_info.supported_types), "api_key")
missing.append(
CredentialsMetaInput(
id=field_name,
provider=provider, # type: ignore
type=cred_type, # type: ignore
title=f"{field_name} (validation failed: {e})",
)
)
else:
provider = next(iter(field_info.provider), "unknown")
cred_type = next(iter(field_info.supported_types), "api_key")
missing.append(
CredentialsMetaInput(
id=field_name,
provider=provider, # type: ignore
type=cred_type, # type: ignore
title=field_name.replace("_", " ").title(),
)
)
return matched, missing
async def match_user_credentials_to_graph(
user_id: str,
graph: GraphModel,

View File

@@ -0,0 +1,16 @@
"""Validation utilities."""
import re
_UUID_V4_PATTERN = re.compile(
r"[a-f0-9]{8}-[a-f0-9]{4}-4[a-f0-9]{3}-[89ab][a-f0-9]{3}-[a-f0-9]{12}",
re.IGNORECASE,
)
def is_uuid_v4(text: str) -> bool:
return bool(_UUID_V4_PATTERN.fullmatch(text.strip()))
def extract_uuids(text: str) -> list[str]:
return sorted({m.lower() for m in _UUID_V4_PATTERN.findall(text)})

View File

@@ -80,7 +80,7 @@ export default function WrapIfAdditionalTemplate(
uiSchema={uiSchema}
/>
{!isHandleConnected && (
<div className="nodrag flex flex-1 items-center gap-2">
<div className="flex flex-1 items-center gap-2">
<Input
label={""}
hideLabel={true}