Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into fix/dry-run-simulation-streaming

This commit is contained in:
Zamil Majdy
2026-04-01 06:15:12 +02:00
10 changed files with 183 additions and 36 deletions

View File

@@ -107,6 +107,13 @@ Do not re-fetch or re-generate data you already have from prior tool calls.
After building the file, reference it with `@@agptfile:` in other tools:
`@@agptfile:/home/user/report.md`
### Web search best practices
- If 3 similar web searches don't return the specific data you need, conclude
it isn't publicly available and work with what you have.
- Prefer fewer, well-targeted searches over many variations of the same query.
- When spawning sub-agents for research, ensure each has a distinct
non-overlapping scope to avoid redundant searches.
### Sub-agent tasks
- When using the Task tool, NEVER set `run_in_background` to true.
All tasks must run in the foreground.

View File

@@ -27,6 +27,7 @@ from backend.copilot.response_model import (
StreamError,
StreamFinish,
StreamFinishStep,
StreamHeartbeat,
StreamStart,
StreamStartStep,
StreamTextDelta,
@@ -76,6 +77,12 @@ class SDKResponseAdapter:
# Open the first step (matches non-SDK: StreamStart then StreamStartStep)
responses.append(StreamStartStep())
self.step_open = True
elif sdk_message.subtype == "task_progress":
# Emit a heartbeat so publish_chunk is called during long
# sub-agent runs. Without this, the Redis stream and meta
# key TTLs expire during gaps where no real chunks are
# produced (task_progress events were previously silent).
responses.append(StreamHeartbeat())
elif isinstance(sdk_message, AssistantMessage):
# Flush any SDK built-in tool calls that didn't get a UserMessage

View File

@@ -18,6 +18,7 @@ from backend.copilot.response_model import (
StreamError,
StreamFinish,
StreamFinishStep,
StreamHeartbeat,
StreamStart,
StreamStartStep,
StreamTextDelta,
@@ -59,6 +60,14 @@ def test_system_non_init_emits_nothing():
assert results == []
def test_task_progress_emits_heartbeat():
"""task_progress events emit a StreamHeartbeat to keep Redis TTL alive."""
adapter = _adapter()
results = adapter.convert_message(SystemMessage(subtype="task_progress", data={}))
assert len(results) == 1
assert isinstance(results[0], StreamHeartbeat)
# -- AssistantMessage with TextBlock -----------------------------------------

View File

@@ -313,8 +313,7 @@ def create_security_hooks(
.replace("\r", "")
)
logger.info(
"[SDK] Context compaction triggered: %s, user=%s, "
"transcript_path=%s",
"[SDK] Context compaction triggered: %s, user=%s, transcript_path=%s",
trigger,
user_id,
transcript_path,

View File

@@ -11,7 +11,11 @@ import pytest
from backend.copilot.context import _current_project_dir
from .security_hooks import _validate_tool_access, _validate_user_isolation
from .security_hooks import (
_validate_tool_access,
_validate_user_isolation,
create_security_hooks,
)
SDK_CWD = "/tmp/copilot-abc123"
@@ -220,8 +224,6 @@ def test_bash_builtin_blocked_message_clarity():
@pytest.fixture()
def _hooks():
"""Create security hooks and return (pre, post, post_failure) handlers."""
from .security_hooks import create_security_hooks
hooks = create_security_hooks(user_id="u1", sdk_cwd=SDK_CWD, max_subtasks=2)
pre = hooks["PreToolUse"][0].hooks[0]
post = hooks["PostToolUse"][0].hooks[0]

View File

@@ -1214,6 +1214,14 @@ async def _run_stream_attempt(
consecutive_empty_tool_calls = 0
# --- Intermediate persistence tracking ---
# Flush session messages to DB periodically so page reloads show progress
# during long-running turns (see incident d2f7cba3: 82-min turn lost on refresh).
_last_flush_time = time.monotonic()
_msgs_since_flush = 0
_FLUSH_INTERVAL_SECONDS = 30.0
_FLUSH_MESSAGE_THRESHOLD = 10
# Use manual __aenter__/__aexit__ instead of ``async with`` so we can
# suppress SDK cleanup errors that occur when the SSE client disconnects
# mid-stream. GeneratorExit causes the SDK's ``__aexit__`` to run in a
@@ -1482,6 +1490,34 @@ async def _run_stream_attempt(
model=sdk_msg.model,
)
# --- Intermediate persistence ---
# Flush session messages to DB periodically so page reloads
# show progress during long-running turns.
_msgs_since_flush += 1
now = time.monotonic()
if (
_msgs_since_flush >= _FLUSH_MESSAGE_THRESHOLD
or (now - _last_flush_time) >= _FLUSH_INTERVAL_SECONDS
):
try:
await asyncio.shield(upsert_chat_session(ctx.session))
logger.debug(
"%s Intermediate flush: %d messages "
"(msgs_since=%d, elapsed=%.1fs)",
ctx.log_prefix,
len(ctx.session.messages),
_msgs_since_flush,
now - _last_flush_time,
)
except Exception as flush_err:
logger.warning(
"%s Intermediate flush failed: %s",
ctx.log_prefix,
flush_err,
)
_last_flush_time = now
_msgs_since_flush = 0
if acc.stream_completed:
break
finally:

View File

@@ -12,6 +12,7 @@ from backend.util.truncate import truncate
from .tool_adapter import (
_MCP_MAX_CHARS,
SDK_DISALLOWED_TOOLS,
_text_from_mcp_result,
cancel_pending_tool_tasks,
create_tool_handler,
@@ -772,3 +773,19 @@ class TestFiveConcurrentPrelaunchAllComplete:
assert result["isError"] is False, f"Result {i} should not be an error"
text = result["content"][0]["text"]
assert "done-" in text, f"Result {i} missing expected output: {text}"
# ---------------------------------------------------------------------------
# SDK_DISALLOWED_TOOLS
# ---------------------------------------------------------------------------
class TestSDKDisallowedTools:
"""Verify that dangerous SDK built-in tools are in the disallowed list."""
def test_bash_tool_is_disallowed(self):
assert "Bash" in SDK_DISALLOWED_TOOLS
def test_webfetch_tool_is_disallowed(self):
"""WebFetch is disallowed due to SSRF risk."""
assert "WebFetch" in SDK_DISALLOWED_TOOLS

View File

@@ -221,9 +221,21 @@ async def create_session(
return session
_meta_ttl_refresh_at: dict[str, float] = {}
"""Tracks the last time the session meta key TTL was refreshed.
Used by `publish_chunk` to avoid refreshing on every single chunk
(expensive). Refreshes at most once every 60 seconds per session.
"""
_META_TTL_REFRESH_INTERVAL = 60 # seconds
async def publish_chunk(
turn_id: str,
chunk: StreamBaseResponse,
*,
session_id: str | None = None,
) -> str:
"""Publish a chunk to Redis Stream.
@@ -232,6 +244,9 @@ async def publish_chunk(
Args:
turn_id: Turn ID (per-turn UUID) identifying the stream
chunk: The stream response chunk to publish
session_id: Chat session ID — when provided, the session meta key
TTL is refreshed periodically to prevent expiration during
long-running turns (see SECRT-2178).
Returns:
The Redis Stream message ID
@@ -265,6 +280,23 @@ async def publish_chunk(
# Set TTL on stream to match session metadata TTL
await redis.expire(stream_key, config.stream_ttl)
# Periodically refresh session-related TTLs so they don't expire
# during long-running turns. Without this, turns exceeding stream_ttl
# (default 1h) lose their "running" status and stream data, making
# the session invisible to the resume endpoint (empty on page reload).
# Both meta key AND stream key are refreshed: the stream key's expire
# above only fires when publish_chunk is called, but during long
# sub-agent gaps (task_progress events don't produce chunks), neither
# key gets refreshed.
if session_id:
now = time.perf_counter()
last_refresh = _meta_ttl_refresh_at.get(session_id, 0)
if now - last_refresh >= _META_TTL_REFRESH_INTERVAL:
meta_key = _get_session_meta_key(session_id)
await redis.expire(meta_key, config.stream_ttl)
await redis.expire(stream_key, config.stream_ttl)
_meta_ttl_refresh_at[session_id] = now
total_time = (time.perf_counter() - start_time) * 1000
# Only log timing for significant chunks or slow operations
if (
@@ -331,7 +363,7 @@ async def stream_and_publish(
async for event in stream:
if turn_id and not isinstance(event, (StreamFinish, StreamError)):
try:
await publish_chunk(turn_id, event)
await publish_chunk(turn_id, event, session_id=session_id)
except (RedisError, ConnectionError, OSError):
if not publish_failed_once:
publish_failed_once = True
@@ -800,6 +832,9 @@ async def mark_session_completed(
# Atomic compare-and-swap: only update if status is "running"
result = await redis.eval(COMPLETE_SESSION_SCRIPT, 1, meta_key, status) # type: ignore[misc]
# Clean up the in-memory TTL refresh tracker to prevent unbounded growth.
_meta_ttl_refresh_at.pop(session_id, None)
if result == 0:
logger.debug(f"Session {session_id} already completed/failed, skipping")
return False

View File

@@ -789,6 +789,28 @@ class WriteWorkspaceFileTool(BaseTool):
)
if not filename:
# When ALL parameters are missing, the most likely cause is
# output token truncation: the LLM tried to inline a very large
# file as `content`, the SDK silently truncated the tool call
# arguments to `{}`, and we receive nothing. Return an
# actionable error instead of a generic "filename required".
has_any_content = any(
kwargs.get(k) for k in ("content", "content_base64", "source_path")
)
if not has_any_content:
return ErrorResponse(
message=(
"Tool call appears truncated (no arguments received). "
"This happens when the content is too large for a "
"single tool call. Instead of passing content inline, "
"first write the file to the working directory using "
"bash_exec (e.g. cat > /home/user/file.md << 'EOF'... "
"EOF), then use source_path to copy it to workspace: "
"write_workspace_file(filename='file.md', "
"source_path='/home/user/file.md')"
),
session_id=session_id,
)
return ErrorResponse(
message="Please provide a filename", session_id=session_id
)

View File

@@ -39,39 +39,49 @@ export const AgentOutputs = ({ flowID }: { flowID: string | null }) => {
return outputNodes
.map((node) => {
const executionResults = node.data.nodeExecutionResults || [];
const latestResult =
executionResults.length > 0
? executionResults[executionResults.length - 1]
: undefined;
const outputData = latestResult?.output_data?.output;
const renderer = globalRegistry.getRenderer(outputData);
const items = executionResults
.filter((result) => result.output_data?.output !== undefined)
.map((result) => {
const outputData = result.output_data!.output;
const renderer = globalRegistry.getRenderer(outputData);
return {
nodeExecID: result.node_exec_id,
value: outputData,
renderer,
};
})
.filter(
(
item,
): item is typeof item & {
renderer: NonNullable<typeof item.renderer>;
} => item.renderer !== null,
);
if (items.length === 0) return null;
return {
nodeID: node.id,
metadata: {
name: node.data.hardcodedValues?.name || "Output",
description:
node.data.hardcodedValues?.description || "Output from the agent",
},
value: outputData ?? "No output yet",
renderer,
items,
};
})
.filter(
(
output,
): output is typeof output & {
renderer: NonNullable<typeof output.renderer>;
} => output.renderer !== null,
);
.filter((group): group is NonNullable<typeof group> => group !== null);
}, [nodes]);
const actionItems = useMemo(() => {
return outputs.map((output) => ({
value: output.value,
metadata: {},
renderer: output.renderer,
}));
return outputs.flatMap((group) =>
group.items.map((item) => ({
value: item.value,
metadata: group.metadata,
renderer: item.renderer,
})),
);
}, [outputs]);
return (
@@ -116,24 +126,27 @@ export const AgentOutputs = ({ flowID }: { flowID: string | null }) => {
<ScrollArea className="h-full overflow-auto pr-4">
<div className="space-y-6">
{outputs && outputs.length > 0 ? (
outputs.map((output, i) => (
<div key={i} className="space-y-2">
outputs.map((group) => (
<div key={group.nodeID} className="space-y-2">
<div>
<Label className="text-base font-semibold">
{output.metadata.name || "Unnamed Output"}
{group.metadata.name || "Unnamed Output"}
</Label>
{output.metadata.description && (
{group.metadata.description && (
<Label className="mt-1 block text-sm text-gray-600">
{output.metadata.description}
{group.metadata.description}
</Label>
)}
</div>
<OutputItem
value={output.value}
metadata={{}}
renderer={output.renderer}
/>
{group.items.map((item) => (
<OutputItem
key={item.nodeExecID}
value={item.value}
metadata={group.metadata}
renderer={item.renderer}
/>
))}
</div>
))
) : (