mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-08 03:00:28 -04:00
Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into feat/autopilot-dry-run-flag
This commit is contained in:
@@ -107,6 +107,13 @@ Do not re-fetch or re-generate data you already have from prior tool calls.
|
||||
After building the file, reference it with `@@agptfile:` in other tools:
|
||||
`@@agptfile:/home/user/report.md`
|
||||
|
||||
### Web search best practices
|
||||
- If 3 similar web searches don't return the specific data you need, conclude
|
||||
it isn't publicly available and work with what you have.
|
||||
- Prefer fewer, well-targeted searches over many variations of the same query.
|
||||
- When spawning sub-agents for research, ensure each has a distinct
|
||||
non-overlapping scope to avoid redundant searches.
|
||||
|
||||
### Sub-agent tasks
|
||||
- When using the Task tool, NEVER set `run_in_background` to true.
|
||||
All tasks must run in the foreground.
|
||||
|
||||
@@ -27,6 +27,7 @@ from backend.copilot.response_model import (
|
||||
StreamError,
|
||||
StreamFinish,
|
||||
StreamFinishStep,
|
||||
StreamHeartbeat,
|
||||
StreamStart,
|
||||
StreamStartStep,
|
||||
StreamTextDelta,
|
||||
@@ -76,6 +77,12 @@ class SDKResponseAdapter:
|
||||
# Open the first step (matches non-SDK: StreamStart then StreamStartStep)
|
||||
responses.append(StreamStartStep())
|
||||
self.step_open = True
|
||||
elif sdk_message.subtype == "task_progress":
|
||||
# Emit a heartbeat so publish_chunk is called during long
|
||||
# sub-agent runs. Without this, the Redis stream and meta
|
||||
# key TTLs expire during gaps where no real chunks are
|
||||
# produced (task_progress events were previously silent).
|
||||
responses.append(StreamHeartbeat())
|
||||
|
||||
elif isinstance(sdk_message, AssistantMessage):
|
||||
# Flush any SDK built-in tool calls that didn't get a UserMessage
|
||||
|
||||
@@ -18,6 +18,7 @@ from backend.copilot.response_model import (
|
||||
StreamError,
|
||||
StreamFinish,
|
||||
StreamFinishStep,
|
||||
StreamHeartbeat,
|
||||
StreamStart,
|
||||
StreamStartStep,
|
||||
StreamTextDelta,
|
||||
@@ -59,6 +60,14 @@ def test_system_non_init_emits_nothing():
|
||||
assert results == []
|
||||
|
||||
|
||||
def test_task_progress_emits_heartbeat():
|
||||
"""task_progress events emit a StreamHeartbeat to keep Redis TTL alive."""
|
||||
adapter = _adapter()
|
||||
results = adapter.convert_message(SystemMessage(subtype="task_progress", data={}))
|
||||
assert len(results) == 1
|
||||
assert isinstance(results[0], StreamHeartbeat)
|
||||
|
||||
|
||||
# -- AssistantMessage with TextBlock -----------------------------------------
|
||||
|
||||
|
||||
|
||||
@@ -313,8 +313,7 @@ def create_security_hooks(
|
||||
.replace("\r", "")
|
||||
)
|
||||
logger.info(
|
||||
"[SDK] Context compaction triggered: %s, user=%s, "
|
||||
"transcript_path=%s",
|
||||
"[SDK] Context compaction triggered: %s, user=%s, transcript_path=%s",
|
||||
trigger,
|
||||
user_id,
|
||||
transcript_path,
|
||||
|
||||
@@ -11,7 +11,11 @@ import pytest
|
||||
|
||||
from backend.copilot.context import _current_project_dir
|
||||
|
||||
from .security_hooks import _validate_tool_access, _validate_user_isolation
|
||||
from .security_hooks import (
|
||||
_validate_tool_access,
|
||||
_validate_user_isolation,
|
||||
create_security_hooks,
|
||||
)
|
||||
|
||||
SDK_CWD = "/tmp/copilot-abc123"
|
||||
|
||||
@@ -220,8 +224,6 @@ def test_bash_builtin_blocked_message_clarity():
|
||||
@pytest.fixture()
|
||||
def _hooks():
|
||||
"""Create security hooks and return (pre, post, post_failure) handlers."""
|
||||
from .security_hooks import create_security_hooks
|
||||
|
||||
hooks = create_security_hooks(user_id="u1", sdk_cwd=SDK_CWD, max_subtasks=2)
|
||||
pre = hooks["PreToolUse"][0].hooks[0]
|
||||
post = hooks["PostToolUse"][0].hooks[0]
|
||||
|
||||
@@ -1214,6 +1214,14 @@ async def _run_stream_attempt(
|
||||
|
||||
consecutive_empty_tool_calls = 0
|
||||
|
||||
# --- Intermediate persistence tracking ---
|
||||
# Flush session messages to DB periodically so page reloads show progress
|
||||
# during long-running turns (see incident d2f7cba3: 82-min turn lost on refresh).
|
||||
_last_flush_time = time.monotonic()
|
||||
_msgs_since_flush = 0
|
||||
_FLUSH_INTERVAL_SECONDS = 30.0
|
||||
_FLUSH_MESSAGE_THRESHOLD = 10
|
||||
|
||||
# Use manual __aenter__/__aexit__ instead of ``async with`` so we can
|
||||
# suppress SDK cleanup errors that occur when the SSE client disconnects
|
||||
# mid-stream. GeneratorExit causes the SDK's ``__aexit__`` to run in a
|
||||
@@ -1482,6 +1490,34 @@ async def _run_stream_attempt(
|
||||
model=sdk_msg.model,
|
||||
)
|
||||
|
||||
# --- Intermediate persistence ---
|
||||
# Flush session messages to DB periodically so page reloads
|
||||
# show progress during long-running turns.
|
||||
_msgs_since_flush += 1
|
||||
now = time.monotonic()
|
||||
if (
|
||||
_msgs_since_flush >= _FLUSH_MESSAGE_THRESHOLD
|
||||
or (now - _last_flush_time) >= _FLUSH_INTERVAL_SECONDS
|
||||
):
|
||||
try:
|
||||
await asyncio.shield(upsert_chat_session(ctx.session))
|
||||
logger.debug(
|
||||
"%s Intermediate flush: %d messages "
|
||||
"(msgs_since=%d, elapsed=%.1fs)",
|
||||
ctx.log_prefix,
|
||||
len(ctx.session.messages),
|
||||
_msgs_since_flush,
|
||||
now - _last_flush_time,
|
||||
)
|
||||
except Exception as flush_err:
|
||||
logger.warning(
|
||||
"%s Intermediate flush failed: %s",
|
||||
ctx.log_prefix,
|
||||
flush_err,
|
||||
)
|
||||
_last_flush_time = now
|
||||
_msgs_since_flush = 0
|
||||
|
||||
if acc.stream_completed:
|
||||
break
|
||||
finally:
|
||||
|
||||
@@ -12,6 +12,7 @@ from backend.util.truncate import truncate
|
||||
|
||||
from .tool_adapter import (
|
||||
_MCP_MAX_CHARS,
|
||||
SDK_DISALLOWED_TOOLS,
|
||||
_text_from_mcp_result,
|
||||
cancel_pending_tool_tasks,
|
||||
create_tool_handler,
|
||||
@@ -772,3 +773,19 @@ class TestFiveConcurrentPrelaunchAllComplete:
|
||||
assert result["isError"] is False, f"Result {i} should not be an error"
|
||||
text = result["content"][0]["text"]
|
||||
assert "done-" in text, f"Result {i} missing expected output: {text}"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# SDK_DISALLOWED_TOOLS
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestSDKDisallowedTools:
|
||||
"""Verify that dangerous SDK built-in tools are in the disallowed list."""
|
||||
|
||||
def test_bash_tool_is_disallowed(self):
|
||||
assert "Bash" in SDK_DISALLOWED_TOOLS
|
||||
|
||||
def test_webfetch_tool_is_disallowed(self):
|
||||
"""WebFetch is disallowed due to SSRF risk."""
|
||||
assert "WebFetch" in SDK_DISALLOWED_TOOLS
|
||||
|
||||
@@ -221,9 +221,21 @@ async def create_session(
|
||||
return session
|
||||
|
||||
|
||||
_meta_ttl_refresh_at: dict[str, float] = {}
|
||||
"""Tracks the last time the session meta key TTL was refreshed.
|
||||
|
||||
Used by `publish_chunk` to avoid refreshing on every single chunk
|
||||
(expensive). Refreshes at most once every 60 seconds per session.
|
||||
"""
|
||||
|
||||
_META_TTL_REFRESH_INTERVAL = 60 # seconds
|
||||
|
||||
|
||||
async def publish_chunk(
|
||||
turn_id: str,
|
||||
chunk: StreamBaseResponse,
|
||||
*,
|
||||
session_id: str | None = None,
|
||||
) -> str:
|
||||
"""Publish a chunk to Redis Stream.
|
||||
|
||||
@@ -232,6 +244,9 @@ async def publish_chunk(
|
||||
Args:
|
||||
turn_id: Turn ID (per-turn UUID) identifying the stream
|
||||
chunk: The stream response chunk to publish
|
||||
session_id: Chat session ID — when provided, the session meta key
|
||||
TTL is refreshed periodically to prevent expiration during
|
||||
long-running turns (see SECRT-2178).
|
||||
|
||||
Returns:
|
||||
The Redis Stream message ID
|
||||
@@ -265,6 +280,23 @@ async def publish_chunk(
|
||||
# Set TTL on stream to match session metadata TTL
|
||||
await redis.expire(stream_key, config.stream_ttl)
|
||||
|
||||
# Periodically refresh session-related TTLs so they don't expire
|
||||
# during long-running turns. Without this, turns exceeding stream_ttl
|
||||
# (default 1h) lose their "running" status and stream data, making
|
||||
# the session invisible to the resume endpoint (empty on page reload).
|
||||
# Both meta key AND stream key are refreshed: the stream key's expire
|
||||
# above only fires when publish_chunk is called, but during long
|
||||
# sub-agent gaps (task_progress events don't produce chunks), neither
|
||||
# key gets refreshed.
|
||||
if session_id:
|
||||
now = time.perf_counter()
|
||||
last_refresh = _meta_ttl_refresh_at.get(session_id, 0)
|
||||
if now - last_refresh >= _META_TTL_REFRESH_INTERVAL:
|
||||
meta_key = _get_session_meta_key(session_id)
|
||||
await redis.expire(meta_key, config.stream_ttl)
|
||||
await redis.expire(stream_key, config.stream_ttl)
|
||||
_meta_ttl_refresh_at[session_id] = now
|
||||
|
||||
total_time = (time.perf_counter() - start_time) * 1000
|
||||
# Only log timing for significant chunks or slow operations
|
||||
if (
|
||||
@@ -331,7 +363,7 @@ async def stream_and_publish(
|
||||
async for event in stream:
|
||||
if turn_id and not isinstance(event, (StreamFinish, StreamError)):
|
||||
try:
|
||||
await publish_chunk(turn_id, event)
|
||||
await publish_chunk(turn_id, event, session_id=session_id)
|
||||
except (RedisError, ConnectionError, OSError):
|
||||
if not publish_failed_once:
|
||||
publish_failed_once = True
|
||||
@@ -800,6 +832,9 @@ async def mark_session_completed(
|
||||
# Atomic compare-and-swap: only update if status is "running"
|
||||
result = await redis.eval(COMPLETE_SESSION_SCRIPT, 1, meta_key, status) # type: ignore[misc]
|
||||
|
||||
# Clean up the in-memory TTL refresh tracker to prevent unbounded growth.
|
||||
_meta_ttl_refresh_at.pop(session_id, None)
|
||||
|
||||
if result == 0:
|
||||
logger.debug(f"Session {session_id} already completed/failed, skipping")
|
||||
return False
|
||||
|
||||
@@ -114,6 +114,7 @@ async def execute_block(
|
||||
error=sim_error[0],
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
return BlockOutputResponse(
|
||||
message=f"Block '{block.name}' executed successfully",
|
||||
block_id=block_id,
|
||||
|
||||
@@ -73,7 +73,10 @@ def make_openai_response(
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_simulate_block_basic():
|
||||
"""simulate_block returns correct (output_name, output_data) tuples."""
|
||||
"""simulate_block returns correct (output_name, output_data) tuples.
|
||||
|
||||
Empty "error" pins are dropped at source — only non-empty errors are yielded.
|
||||
"""
|
||||
mock_block = make_mock_block()
|
||||
mock_client = AsyncMock()
|
||||
mock_client.chat.completions.create = AsyncMock(
|
||||
@@ -88,7 +91,8 @@ async def test_simulate_block_basic():
|
||||
outputs.append((name, data))
|
||||
|
||||
assert ("result", "simulated output") in outputs
|
||||
assert ("error", "") in outputs
|
||||
# Empty error pin is dropped at the simulator level
|
||||
assert ("error", "") not in outputs
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -113,6 +117,8 @@ async def test_simulate_block_json_retry():
|
||||
|
||||
assert mock_client.chat.completions.create.call_count == 3
|
||||
assert ("result", "ok") in outputs
|
||||
# Empty error pin is dropped
|
||||
assert ("error", "") not in outputs
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -141,7 +147,7 @@ async def test_simulate_block_all_retries_exhausted():
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_simulate_block_missing_output_pins():
|
||||
"""LLM response missing some output pins; verify they're filled with None."""
|
||||
"""LLM response missing some output pins; verify non-error pins filled with None."""
|
||||
mock_block = make_mock_block(
|
||||
output_props={
|
||||
"result": {"type": "string"},
|
||||
@@ -164,7 +170,29 @@ async def test_simulate_block_missing_output_pins():
|
||||
|
||||
assert outputs["result"] == "hello"
|
||||
assert outputs["count"] is None # missing pin filled with None
|
||||
assert outputs["error"] == "" # "error" pin filled with ""
|
||||
assert "error" not in outputs # missing error pin is omitted entirely
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_simulate_block_keeps_nonempty_error():
|
||||
"""simulate_block keeps non-empty error pins (simulated logical errors)."""
|
||||
mock_block = make_mock_block()
|
||||
mock_client = AsyncMock()
|
||||
mock_client.chat.completions.create = AsyncMock(
|
||||
return_value=make_openai_response(
|
||||
'{"result": "", "error": "API rate limit exceeded"}'
|
||||
)
|
||||
)
|
||||
|
||||
with patch(
|
||||
"backend.executor.simulator.get_openai_client", return_value=mock_client
|
||||
):
|
||||
outputs = []
|
||||
async for name, data in simulate_block(mock_block, {"query": "test"}):
|
||||
outputs.append((name, data))
|
||||
|
||||
assert ("result", "") in outputs
|
||||
assert ("error", "API rate limit exceeded") in outputs
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -200,6 +228,19 @@ async def test_simulate_block_truncates_long_inputs():
|
||||
assert len(parsed["text"]) < 25000
|
||||
|
||||
|
||||
def test_build_simulation_prompt_excludes_error_from_must_include():
|
||||
"""The 'MUST include' prompt line should NOT list 'error' — the prompt
|
||||
already instructs the LLM to OMIT error unless simulating a logical error.
|
||||
Including it in 'MUST include' would be contradictory."""
|
||||
block = make_mock_block() # default output_props has "result" and "error"
|
||||
system_prompt, _ = build_simulation_prompt(block, {"query": "test"})
|
||||
must_include_line = [
|
||||
line for line in system_prompt.splitlines() if "MUST include" in line
|
||||
][0]
|
||||
assert '"result"' in must_include_line
|
||||
assert '"error"' not in must_include_line
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# execute_block dry-run tests
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -334,6 +375,97 @@ def test_run_block_tool_dry_run_calls_execute():
|
||||
assert "dry_run=dry_run" in source_execute
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execute_block_dry_run_no_empty_error_from_simulator():
|
||||
"""The simulator no longer yields empty error pins, so execute_block
|
||||
simply passes through whatever the simulator produces.
|
||||
|
||||
Since the fix is at the simulator level, even if a simulator somehow
|
||||
yields only non-error outputs, they pass through unchanged.
|
||||
"""
|
||||
mock_block = make_mock_block()
|
||||
|
||||
async def fake_simulate(block, input_data):
|
||||
# Simulator now omits empty error pins at source
|
||||
yield "result", "simulated output"
|
||||
|
||||
with patch(
|
||||
"backend.copilot.tools.helpers.simulate_block", side_effect=fake_simulate
|
||||
):
|
||||
response = await execute_block(
|
||||
block=mock_block,
|
||||
block_id="test-block-id",
|
||||
input_data={"query": "hello"},
|
||||
user_id="user-1",
|
||||
session_id="session-1",
|
||||
node_exec_id="node-exec-1",
|
||||
matched_credentials={},
|
||||
dry_run=True,
|
||||
)
|
||||
|
||||
assert isinstance(response, BlockOutputResponse)
|
||||
assert response.success is True
|
||||
assert response.is_dry_run is True
|
||||
assert "error" not in response.outputs
|
||||
assert response.outputs == {"result": ["simulated output"]}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execute_block_dry_run_keeps_nonempty_error_pin():
|
||||
"""Dry-run should keep the 'error' pin when it contains a real error message."""
|
||||
mock_block = make_mock_block()
|
||||
|
||||
async def fake_simulate(block, input_data):
|
||||
yield "result", ""
|
||||
yield "error", "API rate limit exceeded"
|
||||
|
||||
with patch(
|
||||
"backend.copilot.tools.helpers.simulate_block", side_effect=fake_simulate
|
||||
):
|
||||
response = await execute_block(
|
||||
block=mock_block,
|
||||
block_id="test-block-id",
|
||||
input_data={"query": "hello"},
|
||||
user_id="user-1",
|
||||
session_id="session-1",
|
||||
node_exec_id="node-exec-1",
|
||||
matched_credentials={},
|
||||
dry_run=True,
|
||||
)
|
||||
|
||||
assert isinstance(response, BlockOutputResponse)
|
||||
assert response.success is True
|
||||
# Non-empty error should be preserved
|
||||
assert "error" in response.outputs
|
||||
assert response.outputs["error"] == ["API rate limit exceeded"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execute_block_dry_run_message_includes_completed_status():
|
||||
"""Dry-run message should clearly indicate COMPLETED status."""
|
||||
mock_block = make_mock_block()
|
||||
|
||||
async def fake_simulate(block, input_data):
|
||||
yield "result", "simulated"
|
||||
|
||||
with patch(
|
||||
"backend.copilot.tools.helpers.simulate_block", side_effect=fake_simulate
|
||||
):
|
||||
response = await execute_block(
|
||||
block=mock_block,
|
||||
block_id="test-block-id",
|
||||
input_data={"query": "hello"},
|
||||
user_id="user-1",
|
||||
session_id="session-1",
|
||||
node_exec_id="node-exec-1",
|
||||
matched_credentials={},
|
||||
dry_run=True,
|
||||
)
|
||||
|
||||
assert isinstance(response, BlockOutputResponse)
|
||||
assert "executed successfully" in response.message
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execute_block_dry_run_simulator_error_returns_error_response():
|
||||
"""When simulate_block yields a SIMULATOR ERROR tuple, execute_block returns ErrorResponse."""
|
||||
|
||||
@@ -789,6 +789,28 @@ class WriteWorkspaceFileTool(BaseTool):
|
||||
)
|
||||
|
||||
if not filename:
|
||||
# When ALL parameters are missing, the most likely cause is
|
||||
# output token truncation: the LLM tried to inline a very large
|
||||
# file as `content`, the SDK silently truncated the tool call
|
||||
# arguments to `{}`, and we receive nothing. Return an
|
||||
# actionable error instead of a generic "filename required".
|
||||
has_any_content = any(
|
||||
kwargs.get(k) for k in ("content", "content_base64", "source_path")
|
||||
)
|
||||
if not has_any_content:
|
||||
return ErrorResponse(
|
||||
message=(
|
||||
"Tool call appears truncated (no arguments received). "
|
||||
"This happens when the content is too large for a "
|
||||
"single tool call. Instead of passing content inline, "
|
||||
"first write the file to the working directory using "
|
||||
"bash_exec (e.g. cat > /home/user/file.md << 'EOF'... "
|
||||
"EOF), then use source_path to copy it to workspace: "
|
||||
"write_workspace_file(filename='file.md', "
|
||||
"source_path='/home/user/file.md')"
|
||||
),
|
||||
session_id=session_id,
|
||||
)
|
||||
return ErrorResponse(
|
||||
message="Please provide a filename", session_id=session_id
|
||||
)
|
||||
|
||||
@@ -13,7 +13,7 @@ Inspired by https://github.com/Significant-Gravitas/agent-simulator
|
||||
|
||||
import json
|
||||
import logging
|
||||
from collections.abc import AsyncIterator
|
||||
from collections.abc import AsyncGenerator
|
||||
from typing import Any
|
||||
|
||||
from backend.util.clients import get_openai_client
|
||||
@@ -96,6 +96,10 @@ def build_simulation_prompt(block: Any, input_data: dict[str, Any]) -> tuple[str
|
||||
input_pins = _describe_schema_pins(input_schema)
|
||||
output_pins = _describe_schema_pins(output_schema)
|
||||
output_properties = list(output_schema.get("properties", {}).keys())
|
||||
# Build a separate list for the "MUST include" instruction that excludes
|
||||
# "error" — the prompt already tells the LLM to OMIT the error pin unless
|
||||
# simulating a logical error. Including it in "MUST include" is contradictory.
|
||||
required_output_properties = [k for k in output_properties if k != "error"]
|
||||
|
||||
block_name = getattr(block, "name", type(block).__name__)
|
||||
block_description = getattr(block, "description", "No description available.")
|
||||
@@ -117,10 +121,10 @@ Rules:
|
||||
- Respond with a single JSON object whose keys are EXACTLY the output pin names listed above.
|
||||
- Assume all credentials and authentication are present and valid. Never simulate authentication failures.
|
||||
- Make the simulated outputs realistic and consistent with the inputs.
|
||||
- If there is an "error" pin, set it to "" (empty string) unless you are simulating a logical error.
|
||||
- If there is an "error" pin, OMIT it entirely unless you are simulating a logical error. Only include the "error" pin when there is a genuine error message to report.
|
||||
- Do not include any extra keys beyond the output pins.
|
||||
|
||||
Output pin names you MUST include: {json.dumps(output_properties)}
|
||||
Output pin names you MUST include: {json.dumps(required_output_properties)}
|
||||
"""
|
||||
|
||||
safe_inputs = _truncate_input_values(input_data)
|
||||
@@ -132,7 +136,7 @@ Output pin names you MUST include: {json.dumps(output_properties)}
|
||||
async def simulate_block(
|
||||
block: Any,
|
||||
input_data: dict[str, Any],
|
||||
) -> AsyncIterator[tuple[str, Any]]:
|
||||
) -> AsyncGenerator[tuple[str, Any], None]:
|
||||
"""Simulate block execution using an LLM.
|
||||
|
||||
Yields (output_name, output_data) tuples matching the Block.execute() interface.
|
||||
@@ -172,13 +176,26 @@ async def simulate_block(
|
||||
if not isinstance(parsed, dict):
|
||||
raise ValueError(f"LLM returned non-object JSON: {raw[:200]}")
|
||||
|
||||
# Fill missing output pins with defaults
|
||||
# Fill missing output pins with defaults.
|
||||
# Skip empty "error" pins — an empty string means "no error" and
|
||||
# would only confuse downstream consumers (LLM, frontend).
|
||||
result: dict[str, Any] = {}
|
||||
for pin_name in output_properties:
|
||||
if pin_name in parsed:
|
||||
result[pin_name] = parsed[pin_name]
|
||||
else:
|
||||
result[pin_name] = "" if pin_name == "error" else None
|
||||
value = parsed[pin_name]
|
||||
# Drop empty/blank error pins: they carry no information.
|
||||
# Uses strip() intentionally so whitespace-only strings
|
||||
# (e.g. " ", "\n") are also treated as empty.
|
||||
if (
|
||||
pin_name == "error"
|
||||
and isinstance(value, str)
|
||||
and not value.strip()
|
||||
):
|
||||
continue
|
||||
result[pin_name] = value
|
||||
elif pin_name != "error":
|
||||
# Only fill non-error missing pins with None
|
||||
result[pin_name] = None
|
||||
|
||||
logger.debug(
|
||||
"simulate_block: block=%s attempt=%d tokens=%s/%s",
|
||||
|
||||
@@ -39,39 +39,49 @@ export const AgentOutputs = ({ flowID }: { flowID: string | null }) => {
|
||||
return outputNodes
|
||||
.map((node) => {
|
||||
const executionResults = node.data.nodeExecutionResults || [];
|
||||
const latestResult =
|
||||
executionResults.length > 0
|
||||
? executionResults[executionResults.length - 1]
|
||||
: undefined;
|
||||
const outputData = latestResult?.output_data?.output;
|
||||
|
||||
const renderer = globalRegistry.getRenderer(outputData);
|
||||
const items = executionResults
|
||||
.filter((result) => result.output_data?.output !== undefined)
|
||||
.map((result) => {
|
||||
const outputData = result.output_data!.output;
|
||||
const renderer = globalRegistry.getRenderer(outputData);
|
||||
return {
|
||||
nodeExecID: result.node_exec_id,
|
||||
value: outputData,
|
||||
renderer,
|
||||
};
|
||||
})
|
||||
.filter(
|
||||
(
|
||||
item,
|
||||
): item is typeof item & {
|
||||
renderer: NonNullable<typeof item.renderer>;
|
||||
} => item.renderer !== null,
|
||||
);
|
||||
|
||||
if (items.length === 0) return null;
|
||||
|
||||
return {
|
||||
nodeID: node.id,
|
||||
metadata: {
|
||||
name: node.data.hardcodedValues?.name || "Output",
|
||||
description:
|
||||
node.data.hardcodedValues?.description || "Output from the agent",
|
||||
},
|
||||
value: outputData ?? "No output yet",
|
||||
renderer,
|
||||
items,
|
||||
};
|
||||
})
|
||||
.filter(
|
||||
(
|
||||
output,
|
||||
): output is typeof output & {
|
||||
renderer: NonNullable<typeof output.renderer>;
|
||||
} => output.renderer !== null,
|
||||
);
|
||||
.filter((group): group is NonNullable<typeof group> => group !== null);
|
||||
}, [nodes]);
|
||||
|
||||
const actionItems = useMemo(() => {
|
||||
return outputs.map((output) => ({
|
||||
value: output.value,
|
||||
metadata: {},
|
||||
renderer: output.renderer,
|
||||
}));
|
||||
return outputs.flatMap((group) =>
|
||||
group.items.map((item) => ({
|
||||
value: item.value,
|
||||
metadata: group.metadata,
|
||||
renderer: item.renderer,
|
||||
})),
|
||||
);
|
||||
}, [outputs]);
|
||||
|
||||
return (
|
||||
@@ -116,24 +126,27 @@ export const AgentOutputs = ({ flowID }: { flowID: string | null }) => {
|
||||
<ScrollArea className="h-full overflow-auto pr-4">
|
||||
<div className="space-y-6">
|
||||
{outputs && outputs.length > 0 ? (
|
||||
outputs.map((output, i) => (
|
||||
<div key={i} className="space-y-2">
|
||||
outputs.map((group) => (
|
||||
<div key={group.nodeID} className="space-y-2">
|
||||
<div>
|
||||
<Label className="text-base font-semibold">
|
||||
{output.metadata.name || "Unnamed Output"}
|
||||
{group.metadata.name || "Unnamed Output"}
|
||||
</Label>
|
||||
{output.metadata.description && (
|
||||
{group.metadata.description && (
|
||||
<Label className="mt-1 block text-sm text-gray-600">
|
||||
{output.metadata.description}
|
||||
{group.metadata.description}
|
||||
</Label>
|
||||
)}
|
||||
</div>
|
||||
|
||||
<OutputItem
|
||||
value={output.value}
|
||||
metadata={{}}
|
||||
renderer={output.renderer}
|
||||
/>
|
||||
{group.items.map((item) => (
|
||||
<OutputItem
|
||||
key={item.nodeExecID}
|
||||
value={item.value}
|
||||
metadata={group.metadata}
|
||||
renderer={item.renderer}
|
||||
/>
|
||||
))}
|
||||
</div>
|
||||
))
|
||||
) : (
|
||||
|
||||
@@ -33,6 +33,12 @@ export const useRunGraph = () => {
|
||||
const clearAllNodeErrors = useNodeStore(
|
||||
useShallow((state) => state.clearAllNodeErrors),
|
||||
);
|
||||
const cleanNodesStatuses = useNodeStore(
|
||||
useShallow((state) => state.cleanNodesStatuses),
|
||||
);
|
||||
const clearAllNodeExecutionResults = useNodeStore(
|
||||
useShallow((state) => state.clearAllNodeExecutionResults),
|
||||
);
|
||||
|
||||
// Tutorial integration - force open dialog when tutorial requests it
|
||||
const forceOpenRunInputDialog = useTutorialStore(
|
||||
@@ -137,6 +143,9 @@ export const useRunGraph = () => {
|
||||
if (!dryRun && (hasInputs() || hasCredentials())) {
|
||||
setOpenRunInputDialog(true);
|
||||
} else {
|
||||
// Clear stale results so the UI shows fresh output from this execution
|
||||
clearAllNodeExecutionResults();
|
||||
cleanNodesStatuses();
|
||||
// Optimistically set running state immediately for responsive UI
|
||||
setIsGraphRunning(true);
|
||||
await executeGraph({
|
||||
|
||||
@@ -10,9 +10,12 @@ import { NodeExecutionResult } from "@/app/api/__generated__/models/nodeExecutio
|
||||
import { AgentExecutionStatus } from "@/app/api/__generated__/models/agentExecutionStatus";
|
||||
import { useGraphStore } from "../../../stores/graphStore";
|
||||
import { useEdgeStore } from "../../../stores/edgeStore";
|
||||
import { useQueryClient } from "@tanstack/react-query";
|
||||
import { getGetV1GetExecutionDetailsQueryKey } from "@/app/api/__generated__/endpoints/graphs/graphs";
|
||||
|
||||
export const useFlowRealtime = () => {
|
||||
const api = useBackendAPI();
|
||||
const queryClient = useQueryClient();
|
||||
const updateNodeExecutionResult = useNodeStore(
|
||||
useShallow((state) => state.updateNodeExecutionResult),
|
||||
);
|
||||
@@ -71,6 +74,16 @@ export const useFlowRealtime = () => {
|
||||
console.debug(
|
||||
`Subscribed to updates for execution #${flowExecutionID}`,
|
||||
);
|
||||
// Refetch execution details to catch any events that were
|
||||
// published before the WebSocket subscription was established.
|
||||
// This closes the race-condition window for fast-completing
|
||||
// executions like dry-runs / simulations.
|
||||
void queryClient.invalidateQueries({
|
||||
queryKey: getGetV1GetExecutionDetailsQueryKey(
|
||||
flowID!,
|
||||
flowExecutionID,
|
||||
),
|
||||
});
|
||||
})
|
||||
.catch((error) =>
|
||||
console.error(
|
||||
@@ -87,7 +100,7 @@ export const useFlowRealtime = () => {
|
||||
deregisterGraphExecutionStatusEvent();
|
||||
resetEdgeBeads();
|
||||
};
|
||||
}, [api, flowExecutionID, resetEdgeBeads]);
|
||||
}, [api, flowExecutionID, resetEdgeBeads, queryClient, flowID]);
|
||||
|
||||
return {};
|
||||
};
|
||||
|
||||
@@ -99,7 +99,19 @@ export function isRunBlockReviewRequiredOutput(
|
||||
export function isRunBlockErrorOutput(
|
||||
output: RunBlockToolOutput,
|
||||
): output is ErrorResponse {
|
||||
return output.type === ResponseType.error || "error" in output;
|
||||
// Only match actual error responses (type=error), not block outputs that
|
||||
// happen to have an "error" key in their outputs dict. The old
|
||||
// `"error" in output` check was too broad and caused BlockOutputResponse
|
||||
// to be mis-identified as errors, showing dry-run results as failed.
|
||||
if (output.type === ResponseType.error) return true;
|
||||
// Fallback for untyped payloads: match only if "error" exists at the top
|
||||
// level AND there is no "block_id" (which distinguishes BlockOutputResponse
|
||||
// from ErrorResponse). Note: `type` is optional in both interfaces, so
|
||||
// correctness here depends on `block_id` presence (always set on
|
||||
// BlockOutputResponse), not on `type` presence.
|
||||
if (!("type" in output) && "error" in output && !("block_id" in output))
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
function parseOutput(output: unknown): RunBlockToolOutput | null {
|
||||
@@ -122,7 +134,9 @@ function parseOutput(output: unknown): RunBlockToolOutput | null {
|
||||
if ("block_id" in output) return output as BlockOutputResponse;
|
||||
if ("block" in output) return output as BlockDetailsResponse;
|
||||
if ("setup_info" in output) return output as SetupRequirementsResponse;
|
||||
if ("error" in output || "details" in output)
|
||||
// Only match error responses that have an "error" key but NOT "block_id"
|
||||
// (which would indicate a BlockOutputResponse, not an error).
|
||||
if (("error" in output || "details" in output) && !("block_id" in output))
|
||||
return output as ErrorResponse;
|
||||
}
|
||||
return null;
|
||||
|
||||
Reference in New Issue
Block a user