mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-30 03:00:41 -04:00
Compare commits
7 Commits
swiftyos/r
...
spare/6
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2b4727e8b2 | ||
|
|
0cd0a76305 | ||
|
|
bd2efed080 | ||
|
|
5fccd8a762 | ||
|
|
d27d22159d | ||
|
|
df205b5444 | ||
|
|
4efa1c4310 |
@@ -124,15 +124,6 @@ def create_copilot_queue_config() -> RabbitMQConfig:
|
||||
vhost="/",
|
||||
exchanges=[COPILOT_EXECUTION_EXCHANGE, COPILOT_CANCEL_EXCHANGE],
|
||||
queues=[run_queue, cancel_queue],
|
||||
# The consumer threads sit in pika's blocking ``start_consuming()`` for
|
||||
# the full lifetime of the process. If the TCP connection is dropped
|
||||
# (server restart, NAT timeout, laptop sleep) while pika's IO thread is
|
||||
# starved, the socket rots in CLOSE_WAIT and no message is ever
|
||||
# consumed — see zombie-consumer incident notes. A short heartbeat plus
|
||||
# kernel-level TCP keepalive makes both the app and the OS notice a
|
||||
# dead peer within a couple of minutes instead of hours.
|
||||
heartbeat=60,
|
||||
tcp_keepalive=True,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -174,18 +174,14 @@ sandbox so `bash_exec` can access it for further processing.
|
||||
The exact sandbox path is shown in the `[Sandbox copy available at ...]` note.
|
||||
|
||||
### GitHub CLI (`gh`) and git
|
||||
- To check if the user has their GitHub account already connected, run `gh auth status`. Always check this before running `connect_integration(provider="github")` which will ask the user to connect their GitHub regardless if it's already connected.
|
||||
- To check if the user has their GitHub account already connected, run `gh auth status`. Always check this before asking them to connect it.
|
||||
- If the user has connected their GitHub account, both `gh` and `git` are
|
||||
pre-authenticated — use them directly without any manual login step.
|
||||
`git` HTTPS operations (clone, push, pull) work automatically.
|
||||
- If the token changes mid-session (e.g. user reconnects with a new token),
|
||||
run `gh auth setup-git` to re-register the credential helper.
|
||||
- **MANDATORY:** You MUST run `gh auth status` before EVER calling
|
||||
`connect_integration(provider="github")`. If it shows `Logged in`,
|
||||
proceed directly — no integration connection needed. Never skip this check.
|
||||
- If `gh auth status` shows NOT logged in, or `gh`/`git` fails with an
|
||||
authentication error (e.g. "authentication required", "could not read
|
||||
Username", or exit code 128), THEN call
|
||||
- If `gh` or `git` fails with an authentication error (e.g. "authentication
|
||||
required", "could not read Username", or exit code 128), call
|
||||
`connect_integration(provider="github")` to surface the GitHub credentials
|
||||
setup card so the user can connect their account. Once connected, retry
|
||||
the operation.
|
||||
|
||||
@@ -880,6 +880,202 @@ class TestUploadCliSession:
|
||||
assert meta_content["mode"] == "baseline"
|
||||
assert meta_content["message_count"] == 4
|
||||
|
||||
def test_strips_session_before_upload_and_writes_back(self, tmp_path):
|
||||
"""Strippable entries (progress, thinking blocks) are removed before upload.
|
||||
|
||||
The stripped content is written back to disk (so same-pod turns benefit)
|
||||
and the smaller bytes are uploaded to GCS.
|
||||
"""
|
||||
import asyncio
|
||||
import os
|
||||
import re
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
from .transcript import _sanitize_id, upload_cli_session
|
||||
|
||||
projects_base = str(tmp_path)
|
||||
session_id = "12345678-0000-0000-0000-000000000010"
|
||||
sdk_cwd = str(tmp_path)
|
||||
|
||||
encoded_cwd = re.sub(r"[^a-zA-Z0-9]", "-", os.path.realpath(sdk_cwd))
|
||||
session_dir = tmp_path / encoded_cwd
|
||||
session_dir.mkdir(parents=True, exist_ok=True)
|
||||
session_file = session_dir / f"{_sanitize_id(session_id)}.jsonl"
|
||||
|
||||
# A CLI session with a progress entry (strippable) and a real assistant message.
|
||||
import json
|
||||
|
||||
progress_entry = {
|
||||
"type": "progress",
|
||||
"uuid": "p1",
|
||||
"parentUuid": "u1",
|
||||
"data": {"type": "bash_progress", "stdout": "running..."},
|
||||
}
|
||||
user_entry = {
|
||||
"type": "user",
|
||||
"uuid": "u1",
|
||||
"message": {"role": "user", "content": "hello"},
|
||||
}
|
||||
asst_entry = {
|
||||
"type": "assistant",
|
||||
"uuid": "a1",
|
||||
"parentUuid": "u1",
|
||||
"message": {"role": "assistant", "content": "world"},
|
||||
}
|
||||
raw_content = (
|
||||
json.dumps(progress_entry)
|
||||
+ "\n"
|
||||
+ json.dumps(user_entry)
|
||||
+ "\n"
|
||||
+ json.dumps(asst_entry)
|
||||
+ "\n"
|
||||
)
|
||||
raw_bytes = raw_content.encode("utf-8")
|
||||
session_file.write_bytes(raw_bytes)
|
||||
|
||||
mock_storage = AsyncMock()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"backend.copilot.transcript._projects_base",
|
||||
return_value=projects_base,
|
||||
),
|
||||
patch(
|
||||
"backend.copilot.transcript.get_workspace_storage",
|
||||
new_callable=AsyncMock,
|
||||
return_value=mock_storage,
|
||||
),
|
||||
):
|
||||
asyncio.run(
|
||||
upload_cli_session(
|
||||
user_id="user-1",
|
||||
session_id=session_id,
|
||||
sdk_cwd=sdk_cwd,
|
||||
)
|
||||
)
|
||||
|
||||
# Upload should have been called with stripped bytes (no progress entry).
|
||||
mock_storage.store.assert_called_once()
|
||||
stored_content: bytes = mock_storage.store.call_args.kwargs["content"]
|
||||
stored_lines = stored_content.decode("utf-8").strip().split("\n")
|
||||
stored_types = [json.loads(line).get("type") for line in stored_lines]
|
||||
assert "progress" not in stored_types
|
||||
assert "user" in stored_types
|
||||
assert "assistant" in stored_types
|
||||
# Stripped bytes should be smaller than raw.
|
||||
assert len(stored_content) < len(raw_bytes)
|
||||
# File on disk should also be the stripped version.
|
||||
disk_content = session_file.read_bytes()
|
||||
assert disk_content == stored_content
|
||||
|
||||
def test_strips_stale_thinking_blocks_before_upload(self, tmp_path):
|
||||
"""Thinking blocks in non-last assistant turns are stripped to reduce size."""
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
from .transcript import _sanitize_id, upload_cli_session
|
||||
|
||||
projects_base = str(tmp_path)
|
||||
session_id = "12345678-0000-0000-0000-000000000011"
|
||||
sdk_cwd = str(tmp_path)
|
||||
|
||||
encoded_cwd = re.sub(r"[^a-zA-Z0-9]", "-", os.path.realpath(sdk_cwd))
|
||||
session_dir = tmp_path / encoded_cwd
|
||||
session_dir.mkdir(parents=True, exist_ok=True)
|
||||
session_file = session_dir / f"{_sanitize_id(session_id)}.jsonl"
|
||||
|
||||
# Two turns: first assistant has thinking block (stale), second doesn't.
|
||||
u1 = {
|
||||
"type": "user",
|
||||
"uuid": "u1",
|
||||
"message": {"role": "user", "content": "q1"},
|
||||
}
|
||||
a1_with_thinking = {
|
||||
"type": "assistant",
|
||||
"uuid": "a1",
|
||||
"parentUuid": "u1",
|
||||
"message": {
|
||||
"id": "msg_a1",
|
||||
"role": "assistant",
|
||||
"content": [
|
||||
{"type": "thinking", "thinking": "A" * 5000},
|
||||
{"type": "text", "text": "answer1"},
|
||||
],
|
||||
},
|
||||
}
|
||||
u2 = {
|
||||
"type": "user",
|
||||
"uuid": "u2",
|
||||
"parentUuid": "a1",
|
||||
"message": {"role": "user", "content": "q2"},
|
||||
}
|
||||
a2_no_thinking = {
|
||||
"type": "assistant",
|
||||
"uuid": "a2",
|
||||
"parentUuid": "u2",
|
||||
"message": {
|
||||
"id": "msg_a2",
|
||||
"role": "assistant",
|
||||
"content": [{"type": "text", "text": "answer2"}],
|
||||
},
|
||||
}
|
||||
raw_content = (
|
||||
json.dumps(u1)
|
||||
+ "\n"
|
||||
+ json.dumps(a1_with_thinking)
|
||||
+ "\n"
|
||||
+ json.dumps(u2)
|
||||
+ "\n"
|
||||
+ json.dumps(a2_no_thinking)
|
||||
+ "\n"
|
||||
)
|
||||
raw_bytes = raw_content.encode("utf-8")
|
||||
session_file.write_bytes(raw_bytes)
|
||||
|
||||
mock_storage = AsyncMock()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"backend.copilot.transcript._projects_base",
|
||||
return_value=projects_base,
|
||||
),
|
||||
patch(
|
||||
"backend.copilot.transcript.get_workspace_storage",
|
||||
new_callable=AsyncMock,
|
||||
return_value=mock_storage,
|
||||
),
|
||||
):
|
||||
asyncio.run(
|
||||
upload_cli_session(
|
||||
user_id="user-1",
|
||||
session_id=session_id,
|
||||
sdk_cwd=sdk_cwd,
|
||||
)
|
||||
)
|
||||
|
||||
stored_content: bytes = mock_storage.store.call_args.kwargs["content"]
|
||||
stored_lines = stored_content.decode("utf-8").strip().split("\n")
|
||||
|
||||
# a1 should have its thinking block stripped (it's not the last assistant turn).
|
||||
a1_stored = json.loads(stored_lines[1])
|
||||
a1_content = a1_stored["message"]["content"]
|
||||
assert all(
|
||||
b["type"] != "thinking" for b in a1_content
|
||||
), "stale thinking block should be stripped from a1"
|
||||
assert any(
|
||||
b["type"] == "text" for b in a1_content
|
||||
), "text block should be kept in a1"
|
||||
|
||||
# a2 (last turn) should be unchanged.
|
||||
a2_stored = json.loads(stored_lines[3])
|
||||
assert a2_stored["message"]["content"] == [{"type": "text", "text": "answer2"}]
|
||||
|
||||
# Stripped bytes smaller than raw.
|
||||
assert len(stored_content) < len(raw_bytes)
|
||||
|
||||
|
||||
class TestRestoreCliSession:
|
||||
def test_returns_none_when_file_not_found_in_storage(self):
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import socket
|
||||
from abc import ABC, abstractmethod
|
||||
from enum import Enum
|
||||
from typing import Awaitable, Optional
|
||||
@@ -43,39 +42,6 @@ CONNECTION_ATTEMPTS = 5
|
||||
# Use case: Faster reconnection for long-running executions that need to resume quickly
|
||||
RETRY_DELAY = 1
|
||||
|
||||
# DEFAULT_HEARTBEAT (300s = 5 min)
|
||||
# AMQP application-level heartbeat. Server drops the connection if no heartbeat
|
||||
# is seen within ~2x this interval. Consumers that sit in CLOSE_WAIT because
|
||||
# pika's IO loop was starved (e.g. laptop sleep, blocking main thread) recover
|
||||
# faster with a lower value. See `create_copilot_queue_config` for a case that
|
||||
# overrides this.
|
||||
DEFAULT_HEARTBEAT = 300
|
||||
|
||||
|
||||
def _tcp_keepalive_options() -> dict[str, int]:
|
||||
"""Platform-aware TCP keepalive socket options for pika.
|
||||
|
||||
pika enables ``SO_KEEPALIVE`` on every socket by default; this dict tunes
|
||||
how quickly the kernel declares a silent peer dead. Without these knobs,
|
||||
the OS default on Linux is ~2 hours of idle before the first probe — long
|
||||
enough for a half-closed socket to sit in CLOSE_WAIT forever while the
|
||||
consumer thread is blocked inside ``start_consuming()``.
|
||||
|
||||
pika passes each key through ``getattr(socket, key)`` at ``IPPROTO_TCP``
|
||||
level, so names must exist on the current platform. Linux has
|
||||
``TCP_KEEPIDLE``; macOS uses ``TCP_KEEPALIVE`` for the equivalent knob.
|
||||
"""
|
||||
opts: dict[str, int] = {}
|
||||
if hasattr(socket, "TCP_KEEPIDLE"):
|
||||
opts["TCP_KEEPIDLE"] = 60
|
||||
elif hasattr(socket, "TCP_KEEPALIVE"):
|
||||
opts["TCP_KEEPALIVE"] = 60
|
||||
if hasattr(socket, "TCP_KEEPINTVL"):
|
||||
opts["TCP_KEEPINTVL"] = 20
|
||||
if hasattr(socket, "TCP_KEEPCNT"):
|
||||
opts["TCP_KEEPCNT"] = 3
|
||||
return opts
|
||||
|
||||
|
||||
class ExchangeType(str, Enum):
|
||||
DIRECT = "direct"
|
||||
@@ -107,8 +73,6 @@ class RabbitMQConfig(BaseModel):
|
||||
vhost: str = "/"
|
||||
exchanges: list[Exchange]
|
||||
queues: list[Queue]
|
||||
heartbeat: int = DEFAULT_HEARTBEAT
|
||||
tcp_keepalive: bool = False
|
||||
|
||||
|
||||
class RabbitMQBase(ABC):
|
||||
@@ -177,8 +141,7 @@ class SyncRabbitMQ(RabbitMQBase):
|
||||
socket_timeout=SOCKET_TIMEOUT,
|
||||
connection_attempts=CONNECTION_ATTEMPTS,
|
||||
retry_delay=RETRY_DELAY,
|
||||
heartbeat=self.config.heartbeat,
|
||||
tcp_options=_tcp_keepalive_options() if self.config.tcp_keepalive else None,
|
||||
heartbeat=300, # 5 minute timeout (heartbeats sent every 2.5 min)
|
||||
)
|
||||
|
||||
self._connection = pika.BlockingConnection(parameters)
|
||||
@@ -297,7 +260,7 @@ class AsyncRabbitMQ(RabbitMQBase):
|
||||
password=self.password,
|
||||
virtualhost=self.config.vhost.lstrip("/"),
|
||||
blocked_connection_timeout=BLOCKED_CONNECTION_TIMEOUT,
|
||||
heartbeat=self.config.heartbeat,
|
||||
heartbeat=300, # 5 minute timeout (heartbeats sent every 2.5 min)
|
||||
)
|
||||
self._channel = await self._connection.channel()
|
||||
await self._channel.set_qos(prefetch_count=1)
|
||||
|
||||
@@ -110,7 +110,7 @@ export const Flow = () => {
|
||||
event.preventDefault();
|
||||
}}
|
||||
maxZoom={2}
|
||||
minZoom={0.1}
|
||||
minZoom={0.05}
|
||||
onDragOver={onDragOver}
|
||||
onDrop={onDrop}
|
||||
nodesDraggable={!isLocked}
|
||||
|
||||
Reference in New Issue
Block a user