refactor(forge): replace Selenium with Playwright for web browsing

- Remove selenium.py and test_selenium.py
- Add playwright_browser.py with WebPlaywrightComponent
- Update web component exports to use Playwright
- Update dependencies in pyproject.toml/poetry.lock
- Minor agent and reflexion strategy improvements
- Update CLAUDE.md documentation

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Nicholas Tindle
2026-01-19 23:57:17 -06:00
parent d591f36c7b
commit 634bff8277
16 changed files with 1576 additions and 2065 deletions

View File

@@ -226,18 +226,6 @@ typing_extensions = {version = ">=4.5", markers = "python_version < \"3.13\""}
[package.extras]
trio = ["trio (>=0.31.0) ; python_version < \"3.10\"", "trio (>=0.32.0) ; python_version >= \"3.10\""]
[[package]]
name = "async-generator"
version = "1.10"
description = "Async generators and context managers for Python 3.5+"
optional = false
python-versions = ">=3.5"
groups = ["main"]
files = [
{file = "async_generator-1.10-py3-none-any.whl", hash = "sha256:01c7bf666359b4967d2cda0000cc2e4af16a0ae098cbffcb8472fb9e8ad6585b"},
{file = "async_generator-1.10.tar.gz", hash = "sha256:6ebb3d106c12920aaae42ccb6f787ef5eefdcdd166ea3d628fa8476abe712144"},
]
[[package]]
name = "attrs"
version = "25.4.0"
@@ -277,9 +265,6 @@ requests = "*"
rich = "^13.0"
sentry-sdk = "^1.40.4"
[package.extras]
benchmark = ["agbenchmark @ file:///Users/ntindle/code/agpt/AutoGPT/main/classic/benchmark"]
[package.source]
type = "directory"
url = "../original_autogpt"
@@ -321,6 +306,7 @@ numpy = ">=2.0.0"
openai = "^1.50.0"
Pillow = "*"
playsound = "~1.2.2"
playwright = "^1.50.0"
pydantic = "^2.7.2"
pylatexenc = "*"
pypdf = "^3.1.0"
@@ -329,7 +315,6 @@ python-dotenv = "^1.0.0"
python-multipart = "^0.0.7"
pyyaml = "^6.0"
requests = "*"
selenium = "^4.13.0"
sentry-sdk = "^1.40.4"
spacy = "^3.8.0"
sqlalchemy = "^2.0.19"
@@ -339,10 +324,6 @@ toml = "^0.10.2"
trafilatura = "^2.0"
uvicorn = {version = ">=0.23.2,<1", extras = ["standard"]}
watchdog = "^6.0.0"
webdriver-manager = "^4.0.2"
[package.extras]
benchmark = ["agbenchmark @ file:///Users/ntindle/code/agpt/AutoGPT/main/classic/benchmark"]
[package.source]
type = "directory"
@@ -850,7 +831,7 @@ description = "Foreign Function Interface for Python calling C code."
optional = false
python-versions = ">=3.9"
groups = ["main"]
markers = "platform_python_implementation != \"CPython\" or os_name == \"nt\" and implementation_name != \"pypy\""
markers = "platform_python_implementation != \"CPython\""
files = [
{file = "cffi-2.0.0-cp310-cp310-macosx_10_13_x86_64.whl", hash = "sha256:0cf2d91ecc3fcc0625c2c530fe004f82c110405f101548512cce44322fa8ac44"},
{file = "cffi-2.0.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:f73b96c41e3b2adedc34a7356e64c8eb96e03a3782b535e043a986276ce12a49"},
@@ -2106,7 +2087,6 @@ description = "Lightweight in-process concurrent programming"
optional = false
python-versions = ">=3.10"
groups = ["main"]
markers = "platform_machine == \"aarch64\" or platform_machine == \"ppc64le\" or platform_machine == \"x86_64\" or platform_machine == \"amd64\" or platform_machine == \"AMD64\" or platform_machine == \"win32\" or platform_machine == \"WIN32\""
files = [
{file = "greenlet-3.3.0-cp310-cp310-macosx_11_0_universal2.whl", hash = "sha256:6f8496d434d5cb2dce025773ba5597f71f5410ae499d5dd9533e0653258cdb3d"},
{file = "greenlet-3.3.0-cp310-cp310-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:b96dc7eef78fd404e022e165ec55327f935b9b52ff355b067eb4a0267fc1cffb"},
@@ -3717,7 +3697,7 @@ version = "1.1.0"
description = "Type system extensions for programs checked with the mypy type checker."
optional = false
python-versions = ">=3.8"
groups = ["main", "dev"]
groups = ["dev"]
files = [
{file = "mypy_extensions-1.1.0-py3-none-any.whl", hash = "sha256:1be4cccdb0f2482337c4743e60421de3a356cd97508abadd57d47403e94f5505"},
{file = "mypy_extensions-1.1.0.tar.gz", hash = "sha256:52e68efc3284861e772bbcd66823fde5ae21fd2fdb51c62a211403730b916558"},
@@ -4162,21 +4142,6 @@ files = [
{file = "orjson-3.11.5.tar.gz", hash = "sha256:82393ab47b4fe44ffd0a7659fa9cfaacc717eb617c93cde83795f14af5c2e9d5"},
]
[[package]]
name = "outcome"
version = "1.3.0.post0"
description = "Capture the outcome of Python function calls."
optional = false
python-versions = ">=3.7"
groups = ["main"]
files = [
{file = "outcome-1.3.0.post0-py2.py3-none-any.whl", hash = "sha256:e771c5ce06d1415e356078d3bdd68523f284b4ce5419828922b6871e65eda82b"},
{file = "outcome-1.3.0.post0.tar.gz", hash = "sha256:9dcf02e65f2971b80047b377468e72a268e15c0af3cf1238e6ff14f7f91143b8"},
]
[package.dependencies]
attrs = ">=19.2.0"
[[package]]
name = "overrides"
version = "7.7.0"
@@ -4356,6 +4321,28 @@ files = [
{file = "playsound-1.2.2-py2.py3-none-any.whl", hash = "sha256:1e83750a5325cbccee03d6e751ba3e78c037ac95b95a3ba1f38d0c5aca9e1a34"},
]
[[package]]
name = "playwright"
version = "1.57.0"
description = "A high-level API to automate web browsers"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "playwright-1.57.0-py3-none-macosx_10_13_x86_64.whl", hash = "sha256:9351c1ac3dfd9b3820fe7fc4340d96c0d3736bb68097b9b7a69bd45d25e9370c"},
{file = "playwright-1.57.0-py3-none-macosx_11_0_arm64.whl", hash = "sha256:a4a9d65027bce48eeba842408bcc1421502dfd7e41e28d207e94260fa93ca67e"},
{file = "playwright-1.57.0-py3-none-macosx_11_0_universal2.whl", hash = "sha256:99104771abc4eafee48f47dac2369e0015516dc1ce8c409807d2dd440828b9a4"},
{file = "playwright-1.57.0-py3-none-manylinux1_x86_64.whl", hash = "sha256:284ed5a706b7c389a06caa431b2f0ba9ac4130113c3a779767dda758c2497bb1"},
{file = "playwright-1.57.0-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:38a1bae6c0a07839cdeaddbc0756b3b2b85e476c07945f64ece08f1f956a86f1"},
{file = "playwright-1.57.0-py3-none-win32.whl", hash = "sha256:1dd93b265688da46e91ecb0606d36f777f8eadcf7fbef12f6426b20bf0c9137c"},
{file = "playwright-1.57.0-py3-none-win_amd64.whl", hash = "sha256:6caefb08ed2c6f29d33b8088d05d09376946e49a73be19271c8cd5384b82b14c"},
{file = "playwright-1.57.0-py3-none-win_arm64.whl", hash = "sha256:5f065f5a133dbc15e6e7c71e7bc04f258195755b1c32a432b792e28338c8335e"},
]
[package.dependencies]
greenlet = ">=3.1.1,<4.0.0"
pyee = ">=13,<14"
[[package]]
name = "posthog"
version = "5.4.0"
@@ -4978,7 +4965,7 @@ description = "C parser in Python"
optional = false
python-versions = ">=3.8"
groups = ["main"]
markers = "(platform_python_implementation != \"CPython\" or os_name == \"nt\") and (platform_python_implementation != \"CPython\" or implementation_name != \"pypy\") and implementation_name != \"PyPy\""
markers = "platform_python_implementation != \"CPython\" and implementation_name != \"PyPy\""
files = [
{file = "pycparser-2.23-py3-none-any.whl", hash = "sha256:e5c6e8d3fbad53479cab09ac03729e0a9faf2bee3db8208a550daf5af81a5934"},
{file = "pycparser-2.23.tar.gz", hash = "sha256:78816d4f24add8f10a06d6f05b4d424ad9e96cfebf68a4ddc99c65c0720d00c2"},
@@ -5140,6 +5127,24 @@ files = [
[package.dependencies]
typing-extensions = ">=4.14.1"
[[package]]
name = "pyee"
version = "13.0.0"
description = "A rough port of Node.js's EventEmitter to Python with a few tricks of its own"
optional = false
python-versions = ">=3.8"
groups = ["main"]
files = [
{file = "pyee-13.0.0-py3-none-any.whl", hash = "sha256:48195a3cddb3b1515ce0695ed76036b5ccc2ef3a9f963ff9f77aec0139845498"},
{file = "pyee-13.0.0.tar.gz", hash = "sha256:b391e3c5a434d1f5118a25615001dbc8f669cf410ab67d04c4d4e07c55481c37"},
]
[package.dependencies]
typing-extensions = "*"
[package.extras]
dev = ["black", "build", "flake8", "flake8-black", "isort", "jupyter-console", "mkdocs", "mkdocs-include-markdown-plugin", "mkdocstrings[python]", "mypy", "pytest", "pytest-asyncio ; python_version >= \"3.4\"", "pytest-trio ; python_version >= \"3.7\"", "sphinx", "toml", "tox", "trio", "trio ; python_version > \"3.6\"", "trio-typing ; python_version > \"3.6\"", "twine", "twisted", "validate-pyproject[all]"]
[[package]]
name = "pygments"
version = "2.19.2"
@@ -5261,19 +5266,6 @@ all = ["nodejs-wheel-binaries", "twine (>=3.4.1)"]
dev = ["twine (>=3.4.1)"]
nodejs = ["nodejs-wheel-binaries"]
[[package]]
name = "pysocks"
version = "1.7.1"
description = "A Python SOCKS client module. See https://github.com/Anorov/PySocks for more information."
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*"
groups = ["main"]
files = [
{file = "PySocks-1.7.1-py27-none-any.whl", hash = "sha256:08e69f092cc6dbe92a0fdd16eeb9b9ffbc13cadfe5ca4c7bd92ffb078b293299"},
{file = "PySocks-1.7.1-py3-none-any.whl", hash = "sha256:2725bd0a9925919b9b51739eea5f9e2bae91e83288108a9ad338b2e3a4435ee5"},
{file = "PySocks-1.7.1.tar.gz", hash = "sha256:3f8804571ebe159c380ac6de37643bb4685970655d3bba243530d6558b799aa0"},
]
[[package]]
name = "python-dateutil"
version = "2.9.0.post0"
@@ -5837,29 +5829,6 @@ botocore = ">=1.37.4,<2.0a0"
[package.extras]
crt = ["botocore[crt] (>=1.37.4,<2.0a0)"]
[[package]]
name = "selenium"
version = "4.40.0"
description = "Official Python bindings for Selenium WebDriver"
optional = false
python-versions = ">=3.10"
groups = ["main"]
files = [
{file = "selenium-4.40.0-py3-none-any.whl", hash = "sha256:c8823fc02e2c771d9ad9a0cf899cee7de1a57a6697e3d0b91f67566129f2b729"},
{file = "selenium-4.40.0.tar.gz", hash = "sha256:a88f5905d88ad0b84991c2386ea39e2bbde6d6c334be38df5842318ba98eaa8c"},
]
[package.dependencies]
certifi = ">=2026.1.4"
trio = ">=0.31.0,<1.0"
trio-typing = ">=0.10.0"
trio-websocket = ">=0.12.2,<1.0"
types-certifi = ">=2021.10.8.3"
types-urllib3 = ">=1.26.25.14"
typing_extensions = ">=4.15.0,<5.0"
urllib3 = {version = ">=2.6.3,<3.0", extras = ["socks"]}
websocket-client = ">=1.8.0,<2.0"
[[package]]
name = "sentry-sdk"
version = "1.45.1"
@@ -6015,28 +5984,16 @@ files = [
{file = "socksio-1.0.0.tar.gz", hash = "sha256:f88beb3da5b5c38b9890469de67d0cb0f9d494b78b106ca1845f96c10b91c4ac"},
]
[[package]]
name = "sortedcontainers"
version = "2.4.0"
description = "Sorted Containers -- Sorted List, Sorted Dict, Sorted Set"
optional = false
python-versions = "*"
groups = ["main"]
files = [
{file = "sortedcontainers-2.4.0-py2.py3-none-any.whl", hash = "sha256:a163dcaede0f1c021485e957a39245190e74249897e2ae4b2aa38595db237ee0"},
{file = "sortedcontainers-2.4.0.tar.gz", hash = "sha256:25caa5a06cc30b6b83d11423433f65d1f9d76c4c6a0c90e3379eaa43b9bfdb88"},
]
[[package]]
name = "soupsieve"
version = "2.8.2"
version = "2.8.3"
description = "A modern CSS selector implementation for Beautiful Soup."
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "soupsieve-2.8.2-py3-none-any.whl", hash = "sha256:0f4c2f6b5a5fb97a641cf69c0bd163670a0e45e6d6c01a2107f93a6a6f93c51a"},
{file = "soupsieve-2.8.2.tar.gz", hash = "sha256:78a66b0fdee2ab40b7199dc3e747ee6c6e231899feeaae0b9b98a353afd48fd8"},
{file = "soupsieve-2.8.3-py3-none-any.whl", hash = "sha256:ed64f2ba4eebeab06cc4962affce381647455978ffc1e36bb79a545b91f45a95"},
{file = "soupsieve-2.8.3.tar.gz", hash = "sha256:3267f1eeea4251fb42728b6dfb746edc9acaffc4a45b27e19450b676586e8349"},
]
[[package]]
@@ -6846,66 +6803,6 @@ urllib3 = ">=1.26,<3"
all = ["brotli", "cchardet (>=2.1.7) ; python_version < \"3.11\"", "faust-cchardet (>=2.1.19) ; python_version >= \"3.11\"", "htmldate[speed] (>=1.9.2)", "py3langid (>=0.3.0)", "pycurl (>=7.45.3)", "urllib3[socks]", "zstandard (>=0.23.0)"]
dev = ["flake8", "mypy", "pytest", "pytest-cov", "types-lxml", "types-urllib3"]
[[package]]
name = "trio"
version = "0.32.0"
description = "A friendly Python library for async concurrency and I/O"
optional = false
python-versions = ">=3.10"
groups = ["main"]
files = [
{file = "trio-0.32.0-py3-none-any.whl", hash = "sha256:4ab65984ef8370b79a76659ec87aa3a30c5c7c83ff250b4de88c29a8ab6123c5"},
{file = "trio-0.32.0.tar.gz", hash = "sha256:150f29ec923bcd51231e1d4c71c7006e65247d68759dd1c19af4ea815a25806b"},
]
[package.dependencies]
attrs = ">=23.2.0"
cffi = {version = ">=1.14", markers = "os_name == \"nt\" and implementation_name != \"pypy\""}
idna = "*"
outcome = "*"
sniffio = ">=1.3.0"
sortedcontainers = "*"
[[package]]
name = "trio-typing"
version = "0.10.0"
description = "Static type checking support for Trio and related projects"
optional = false
python-versions = "*"
groups = ["main"]
files = [
{file = "trio-typing-0.10.0.tar.gz", hash = "sha256:065ee684296d52a8ab0e2374666301aec36ee5747ac0e7a61f230250f8907ac3"},
{file = "trio_typing-0.10.0-py3-none-any.whl", hash = "sha256:6d0e7ec9d837a2fe03591031a172533fbf4a1a95baf369edebfc51d5a49f0264"},
]
[package.dependencies]
async-generator = "*"
importlib-metadata = "*"
mypy-extensions = ">=0.4.2"
packaging = "*"
trio = ">=0.16.0"
typing-extensions = ">=3.7.4"
[package.extras]
mypy = ["mypy (>=1.0)"]
[[package]]
name = "trio-websocket"
version = "0.12.2"
description = "WebSocket library for Trio"
optional = false
python-versions = ">=3.8"
groups = ["main"]
files = [
{file = "trio_websocket-0.12.2-py3-none-any.whl", hash = "sha256:df605665f1db533f4a386c94525870851096a223adcb97f72a07e8b4beba45b6"},
{file = "trio_websocket-0.12.2.tar.gz", hash = "sha256:22c72c436f3d1e264d0910a3951934798dcc5b00ae56fc4ee079d46c7cf20fae"},
]
[package.dependencies]
outcome = ">=1.2.0"
trio = ">=0.11"
wsproto = ">=0.14"
[[package]]
name = "typer"
version = "0.21.1"
@@ -6943,30 +6840,6 @@ typing-extensions = ">=3.7.4.3"
[package.extras]
standard = ["rich (>=10.11.0)", "shellingham (>=1.3.0)"]
[[package]]
name = "types-certifi"
version = "2021.10.8.3"
description = "Typing stubs for certifi"
optional = false
python-versions = "*"
groups = ["main"]
files = [
{file = "types-certifi-2021.10.8.3.tar.gz", hash = "sha256:72cf7798d165bc0b76e1c10dd1ea3097c7063c42c21d664523b928e88b554a4f"},
{file = "types_certifi-2021.10.8.3-py3-none-any.whl", hash = "sha256:b2d1e325e69f71f7c78e5943d410e650b4707bb0ef32e4ddf3da37f54176e88a"},
]
[[package]]
name = "types-urllib3"
version = "1.26.25.14"
description = "Typing stubs for urllib3"
optional = false
python-versions = "*"
groups = ["main"]
files = [
{file = "types-urllib3-1.26.25.14.tar.gz", hash = "sha256:229b7f577c951b8c1b92c1bc2b2fdb0b49847bd2af6d1cc2a2e3dd340f3bda8f"},
{file = "types_urllib3-1.26.25.14-py3-none-any.whl", hash = "sha256:9683bbb7fb72e32bfe9d2be6e04875fbe1b3eeec3cbb4ea231435aa7fd6b4f0e"},
]
[[package]]
name = "typing-extensions"
version = "4.15.0"
@@ -7049,9 +6922,6 @@ files = [
{file = "urllib3-2.6.3.tar.gz", hash = "sha256:1b62b6884944a57dbe321509ab94fd4d3b307075e0c2eae991ac71ee15ad38ed"},
]
[package.dependencies]
pysocks = {version = ">=1.5.6,<1.5.7 || >1.5.7,<2.0", optional = true, markers = "extra == \"socks\""}
[package.extras]
brotli = ["brotli (>=1.2.0) ; platform_python_implementation == \"CPython\"", "brotlicffi (>=1.2.0.0) ; platform_python_implementation != \"CPython\""]
h2 = ["h2 (>=4,<5)"]
@@ -7364,23 +7234,6 @@ srsly = ">=2.4.3,<3.0.0"
typer-slim = ">=0.3.0,<1.0.0"
wasabi = ">=0.9.1,<1.2.0"
[[package]]
name = "webdriver-manager"
version = "4.0.2"
description = "Library provides the way to automatically manage drivers for different browsers"
optional = false
python-versions = ">=3.7"
groups = ["main"]
files = [
{file = "webdriver_manager-4.0.2-py2.py3-none-any.whl", hash = "sha256:75908d92ecc45ff2b9953614459c633db8f9aa1ff30181cefe8696e312908129"},
{file = "webdriver_manager-4.0.2.tar.gz", hash = "sha256:efedf428f92fd6d5c924a0d054e6d1322dd77aab790e834ee767af392b35590f"},
]
[package.dependencies]
packaging = "*"
python-dotenv = "*"
requests = "*"
[[package]]
name = "websocket-client"
version = "1.9.0"

View File

@@ -266,7 +266,7 @@ class MyComponent(CommandProvider):
| `FileManagerComponent` | DirectiveProvider, CommandProvider | read/write/list files |
| `CodeExecutorComponent` | CommandProvider | Python & shell execution (Docker) |
| `WebSearchComponent` | DirectiveProvider, CommandProvider | DuckDuckGo & Google search |
| `WebSeleniumComponent` | CommandProvider | Browser automation |
| `WebPlaywrightComponent` | DirectiveProvider, CommandProvider | Browser automation (Playwright) |
| `ActionHistoryComponent` | MessageProvider, AfterParse, AfterExecute | Track & summarize history |
| `WatchdogComponent` | AfterParse | Loop detection, LLM switching |
| `ContextComponent` | MessageProvider, CommandProvider | Keep files in prompt context |

View File

@@ -55,9 +55,7 @@ def test_dalle(
)
@pytest.mark.xfail(
reason="HuggingFace image generation is unreliable in CI."
)
@pytest.mark.xfail(reason="HuggingFace image generation is unreliable in CI.")
@pytest.mark.parametrize(
"image_model",
["CompVis/stable-diffusion-v1-4", "stabilityai/stable-diffusion-2-1"],

View File

@@ -1,4 +1,12 @@
from .playwright_browser import BrowsingError, WebPlaywrightComponent
from .search import WebSearchComponent
from .selenium import BrowsingError, WebSeleniumComponent
__all__ = ["WebSearchComponent", "BrowsingError", "WebSeleniumComponent"]
# WebPlaywrightComponent is the default browser component
WebBrowserComponent = WebPlaywrightComponent
__all__ = [
"WebSearchComponent",
"BrowsingError",
"WebPlaywrightComponent",
"WebBrowserComponent",
]

View File

@@ -0,0 +1,735 @@
"""Web browsing component using Playwright for reliable browser automation."""
from __future__ import annotations
import logging
from pathlib import Path
from typing import Iterator, Literal, Optional
from bs4 import BeautifulSoup
from pydantic import BaseModel, SecretStr
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
from forge.agent.components import ConfigurableComponent
from forge.agent.protocols import CommandProvider, DirectiveProvider
from forge.command import Command, command
from forge.content_processing.html import extract_hyperlinks, format_hyperlinks
from forge.content_processing.text import extract_information, summarize_text
from forge.llm.providers import MultiProvider
from forge.llm.providers.multi import ModelName
from forge.llm.providers.openai import OpenAIModelName
from forge.models.config import UserConfigurable
from forge.models.json_schema import JSONSchema
from forge.utils.exceptions import CommandExecutionError
from forge.utils.url_validator import validate_url
logger = logging.getLogger(__name__)
# Lazy imports for playwright to avoid import errors if not installed
Playwright = None
Browser = None
Page = None
BrowserContext = None
PlaywrightError = None
MAX_RAW_CONTENT_LENGTH = 500
LINKS_TO_RETURN = 20
MAX_CONTENT_LENGTH = 100_000
def _ensure_playwright_imported():
"""Lazily import playwright to provide better error messages."""
global Playwright, Browser, Page, BrowserContext, PlaywrightError
if Playwright is None:
try:
from playwright.async_api import Browser as _Browser
from playwright.async_api import BrowserContext as _BrowserContext
from playwright.async_api import Error as _PlaywrightError
from playwright.async_api import Page as _Page
from playwright.async_api import Playwright as _Playwright
Playwright = _Playwright
Browser = _Browser
Page = _Page
BrowserContext = _BrowserContext
PlaywrightError = _PlaywrightError
except ImportError:
raise ImportError(
"Playwright is not installed. Install it with: "
"poetry install && playwright install chromium"
)
class BrowsingError(CommandExecutionError):
"""An error occurred while trying to browse the page"""
class WebPlaywrightConfiguration(BaseModel):
"""Configuration for the Playwright-based web browsing component."""
llm_name: ModelName = OpenAIModelName.GPT3
"""Name of the LLM model used to read websites"""
browser_type: Literal["chromium", "firefox", "webkit"] = "chromium"
"""Browser engine to use"""
headless: bool = True
"""Run browser in headless mode"""
user_agent: str = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
)
"""User agent string for the browser"""
browse_spacy_language_model: str = "en_core_web_sm"
"""Spacy language model used for chunking text"""
max_retries: int = 3
"""Maximum number of retry attempts for transient failures"""
retry_delay: float = 1.0
"""Base delay in seconds between retries (exponential backoff)"""
page_load_timeout: int = 30000
"""Timeout in milliseconds for page loads"""
max_content_length: int = MAX_CONTENT_LENGTH
"""Maximum content length before truncation (characters)"""
# Optional cloud fallback via CDP
browserless_token: Optional[SecretStr] = UserConfigurable(
default=None, from_env="BROWSERLESS_TOKEN"
)
"""Token for Browserless.io cloud browser service (optional)"""
use_cloud_fallback: bool = True
"""Whether to fallback to cloud browser if local browser fails"""
proxy: Optional[str] = None
"""HTTP proxy to use (e.g., http://proxy:8080)"""
block_resources: bool = True
"""Block images, fonts, and other non-essential resources for faster loads"""
class WebPlaywrightComponent(
DirectiveProvider,
CommandProvider,
ConfigurableComponent[WebPlaywrightConfiguration],
):
"""Provides commands to browse the web using Playwright.
Features over Selenium:
- Connection pooling: Single browser instance reused across commands
- Smart waiting: Adaptive waits instead of hardcoded sleeps
- Retry with backoff: Automatic retries on transient failures
- Content truncation: Large pages are truncated instead of rejected
- Proper cleanup: Browser properly closed on exit
- Cloud fallback: Optional connection to Browserless.io if local fails
"""
config_class = WebPlaywrightConfiguration
def __init__(
self,
llm_provider: MultiProvider,
data_dir: Path,
config: Optional[WebPlaywrightConfiguration] = None,
):
ConfigurableComponent.__init__(self, config)
self.llm_provider = llm_provider
self.data_dir = data_dir
self._playwright = None
self._browser = None
self._context = None
async def __aenter__(self):
"""Async context manager entry - initializes browser."""
await self._ensure_browser()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit - cleans up browser."""
await self._cleanup()
async def _ensure_browser(self) -> None:
"""Lazily initialize the browser if not already initialized."""
_ensure_playwright_imported()
if self._browser is not None:
return
from playwright.async_api import async_playwright
try:
self._playwright = await async_playwright().start()
self._browser = await self._launch_browser()
self._context = await self._create_context()
logger.debug("Playwright browser initialized successfully")
except Exception as e:
logger.warning(f"Local browser launch failed: {e}")
if self.config.use_cloud_fallback and self.config.browserless_token:
await self._connect_cloud_browser()
else:
raise BrowsingError(
f"Failed to launch browser: {e}. "
"Run 'playwright install chromium' to install browser binaries."
)
async def _launch_browser(self):
"""Launch a local browser instance."""
browser_launcher = getattr(self._playwright, self.config.browser_type)
launch_args = {
"headless": self.config.headless,
}
# Add proxy if configured
if self.config.proxy:
launch_args["proxy"] = {"server": self.config.proxy}
return await browser_launcher.launch(**launch_args)
async def _connect_cloud_browser(self) -> None:
"""Connect to Browserless.io cloud browser service."""
if not self.config.browserless_token:
raise BrowsingError("No browserless token configured for cloud fallback")
_ensure_playwright_imported()
from playwright.async_api import async_playwright
token = self.config.browserless_token.get_secret_value()
ws_endpoint = f"wss://chrome.browserless.io?token={token}"
try:
self._playwright = await async_playwright().start()
self._browser = await self._playwright.chromium.connect_over_cdp(
ws_endpoint
)
self._context = await self._create_context()
logger.info("Connected to Browserless.io cloud browser")
except Exception as e:
raise BrowsingError(f"Failed to connect to cloud browser: {e}")
async def _create_context(self):
"""Create a browser context with configured settings."""
context = await self._browser.new_context(
user_agent=self.config.user_agent,
viewport={"width": 1920, "height": 1080},
)
# Block non-essential resources for faster page loads
if self.config.block_resources:
await context.route(
"**/*.{png,jpg,jpeg,gif,svg,ico,woff,woff2,ttf,eot}",
lambda route: route.abort(),
)
return context
async def _cleanup(self) -> None:
"""Clean up browser resources properly."""
try:
if self._context:
await self._context.close()
self._context = None
if self._browser:
await self._browser.close()
self._browser = None
if self._playwright:
await self._playwright.stop()
self._playwright = None
logger.debug("Playwright browser cleaned up")
except Exception as e:
logger.warning(f"Error during browser cleanup: {e}")
async def _smart_wait(self, page) -> None:
"""Wait for page to be fully loaded using adaptive waiting.
This replaces hardcoded sleeps with intelligent waiting:
1. Wait for network to be idle (no requests for 500ms)
2. Wait for DOM to stabilize (no mutations for 500ms)
"""
try:
# Wait for network idle
await page.wait_for_load_state(
"networkidle", timeout=self.config.page_load_timeout
)
except Exception:
# Fallback to domcontentloaded if networkidle times out
logger.debug("Network idle timeout, using domcontentloaded instead")
await page.wait_for_load_state("domcontentloaded")
# Wait for DOM to stabilize
try:
await page.evaluate(
"""
() => new Promise(resolve => {
let timer;
const observer = new MutationObserver(() => {
clearTimeout(timer);
timer = setTimeout(() => {
observer.disconnect();
resolve();
}, 500);
});
observer.observe(document.body, {
childList: true,
subtree: true
});
timer = setTimeout(() => {
observer.disconnect();
resolve();
}, 500);
})
"""
)
except Exception as e:
logger.debug(f"DOM stability check failed (non-critical): {e}")
async def _open_page(self, url: str):
"""Open a new page and navigate to URL with smart waiting."""
await self._ensure_browser()
page = await self._context.new_page()
try:
await page.goto(url, timeout=self.config.page_load_timeout)
await self._smart_wait(page)
return page
except Exception as e:
await page.close()
raise e
def _extract_text(self, html: str) -> str:
"""Extract clean text from HTML content."""
soup = BeautifulSoup(html, "html.parser")
# Remove script and style elements
for element in soup(["script", "style", "noscript"]):
element.extract()
# Get text and clean it up
text = soup.get_text()
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = "\n".join(chunk for chunk in chunks if chunk)
return text
def _extract_links(self, html: str, base_url: str) -> list[str]:
"""Extract and format links from HTML content."""
soup = BeautifulSoup(html, "html.parser")
# Remove script and style elements
for element in soup(["script", "style"]):
element.extract()
hyperlinks = extract_hyperlinks(soup, base_url)
return format_hyperlinks(hyperlinks)
def _truncate_content(self, text: str) -> str:
"""Truncate content if it exceeds max length."""
if len(text) > self.config.max_content_length:
truncated = text[: self.config.max_content_length]
return f"{truncated}\n\n[Content truncated - {len(text)} chars total]"
return text
def get_resources(self) -> Iterator[str]:
yield "Ability to read websites using Playwright browser automation."
def get_commands(self) -> Iterator[Command]:
yield self.read_webpage
yield self.take_screenshot
yield self.click_element
yield self.fill_form
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type(Exception),
reraise=True,
)
async def _read_webpage_with_retry(
self,
url: str,
topics_of_interest: list[str],
get_raw_content: bool,
question: str,
) -> str:
"""Internal method with retry logic for read_webpage."""
page = None
try:
page = await self._open_page(url)
html = await page.content()
text = self._extract_text(html)
links = self._extract_links(html, url)
return_literal_content = True
summarized = False
if not text:
return f"Website did not contain any text.\n\nLinks: {links}"
elif get_raw_content:
# Truncate instead of rejecting large pages
text = self._truncate_content(text)
return text + (f"\n\nLinks: {links}" if links else "")
else:
text = await self.summarize_webpage(
text, question or None, topics_of_interest
)
return_literal_content = bool(question)
summarized = True
# Limit links to LINKS_TO_RETURN
if len(links) > LINKS_TO_RETURN:
links = links[:LINKS_TO_RETURN]
text_fmt = f"'''{text}'''" if "\n" in text else f"'{text}'"
links_fmt = "\n".join(f"- {link}" for link in links)
return (
f"Page content{' (summary)' if summarized else ''}:"
if return_literal_content
else "Answer gathered from webpage:"
) + f" {text_fmt}\n\nLinks:\n{links_fmt}"
finally:
if page:
await page.close()
@command(
["read_webpage"],
(
"Read a webpage, and extract specific information from it."
" You must specify either topics_of_interest,"
" a question, or get_raw_content."
),
{
"url": JSONSchema(
type=JSONSchema.Type.STRING,
description="The URL to visit",
required=True,
),
"topics_of_interest": JSONSchema(
type=JSONSchema.Type.ARRAY,
items=JSONSchema(type=JSONSchema.Type.STRING),
description=(
"A list of topics about which you want to extract information "
"from the page."
),
required=False,
),
"question": JSONSchema(
type=JSONSchema.Type.STRING,
description=(
"A question you want to answer using the content of the webpage."
),
required=False,
),
"get_raw_content": JSONSchema(
type=JSONSchema.Type.BOOLEAN,
description=(
"If true, the unprocessed content of the webpage will be returned. "
"Large pages will be truncated to avoid overwhelming context."
),
required=False,
),
},
)
@validate_url
async def read_webpage(
self,
url: str,
*,
topics_of_interest: list[str] = [],
get_raw_content: bool = False,
question: str = "",
) -> str:
"""Browse a website and return the answer and links to the user.
Args:
url: The url of the website to browse
topics_of_interest: Topics to extract information about
get_raw_content: If true, return raw page content (truncated if large)
question: The question to answer using the content of the webpage
Returns:
The answer and links to the user
"""
_ensure_playwright_imported()
try:
return await self._read_webpage_with_retry(
url, topics_of_interest, get_raw_content, question
)
except Exception as e:
error_msg = str(e)
if "net::" in error_msg:
raise BrowsingError(
"A networking error occurred while trying to load the page: "
f"{error_msg}"
)
raise CommandExecutionError(f"Failed to read webpage: {error_msg}")
async def summarize_webpage(
self,
text: str,
question: str | None,
topics_of_interest: list[str],
) -> str:
"""Summarize text using the LLM.
Args:
text: The text to summarize
question: The question to ask the model
topics_of_interest: Topics to extract information about
Returns:
The summary of the text
"""
if not text:
raise ValueError("No text to summarize")
text_length = len(text)
logger.debug(f"Web page content length: {text_length} characters")
if topics_of_interest:
information = await extract_information(
text,
topics_of_interest=topics_of_interest,
llm_provider=self.llm_provider,
model_name=self.config.llm_name,
spacy_model=self.config.browse_spacy_language_model,
)
return "\n".join(f"* {i}" for i in information)
else:
result, _ = await summarize_text(
text,
question=question,
llm_provider=self.llm_provider,
model_name=self.config.llm_name,
spacy_model=self.config.browse_spacy_language_model,
)
return result
@command(
["take_screenshot"],
"Take a screenshot of a webpage and save it to a file.",
{
"url": JSONSchema(
type=JSONSchema.Type.STRING,
description="The URL of the webpage to screenshot",
required=True,
),
"filename": JSONSchema(
type=JSONSchema.Type.STRING,
description="Filename for screenshot (e.g. 'screenshot.png')",
required=True,
),
"full_page": JSONSchema(
type=JSONSchema.Type.BOOLEAN,
description="Capture full page including scrollable content",
required=False,
),
},
)
@validate_url
async def take_screenshot(
self, url: str, filename: str, full_page: bool = False
) -> str:
"""Take a screenshot of a webpage.
Args:
url: The URL to screenshot
filename: The filename to save to
full_page: Whether to capture full scrollable page
Returns:
Success message with file path
"""
_ensure_playwright_imported()
page = None
try:
page = await self._open_page(url)
# Save screenshot
screenshot_path = self.data_dir / filename
screenshot_path.parent.mkdir(parents=True, exist_ok=True)
await page.screenshot(path=str(screenshot_path), full_page=full_page)
return f"Screenshot saved to {screenshot_path}"
except Exception as e:
raise CommandExecutionError(f"Screenshot failed: {e}")
finally:
if page:
await page.close()
@command(
["click_element"],
"Click an element on a webpage identified by a CSS selector or XPath.",
{
"url": JSONSchema(
type=JSONSchema.Type.STRING,
description="The URL of the webpage",
required=True,
),
"selector": JSONSchema(
type=JSONSchema.Type.STRING,
description="CSS selector or XPath expression to find the element",
required=True,
),
"selector_type": JSONSchema(
type=JSONSchema.Type.STRING,
description="Type of selector: 'css' or 'xpath' (default: 'css')",
required=False,
),
"timeout": JSONSchema(
type=JSONSchema.Type.INTEGER,
description="Timeout in seconds to wait for element (default: 10)",
required=False,
),
},
)
@validate_url
async def click_element(
self,
url: str,
selector: str,
selector_type: str = "css",
timeout: int = 10,
) -> str:
"""Click an element on a webpage.
Args:
url: The URL of the webpage
selector: The CSS selector or XPath
selector_type: Type of selector ('css' or 'xpath')
timeout: Timeout to wait for element
Returns:
Success message
"""
_ensure_playwright_imported()
page = None
try:
page = await self._open_page(url)
# Convert timeout to milliseconds
timeout_ms = timeout * 1000
# Use appropriate locator based on selector type
if selector_type == "xpath":
locator = page.locator(f"xpath={selector}")
else:
locator = page.locator(selector)
# Wait for element and click
await locator.click(timeout=timeout_ms)
# Wait for any navigation or changes
await self._smart_wait(page)
return f"Clicked element matching '{selector}'"
except Exception as e:
raise CommandExecutionError(f"Click failed: {e}")
finally:
if page:
await page.close()
@command(
["fill_form"],
"Fill form fields on a webpage with provided values.",
{
"url": JSONSchema(
type=JSONSchema.Type.STRING,
description="The URL of the webpage with the form",
required=True,
),
"fields": JSONSchema(
type=JSONSchema.Type.OBJECT,
description="Dictionary mapping CSS selectors to values to enter",
required=True,
),
"submit": JSONSchema(
type=JSONSchema.Type.BOOLEAN,
description="Whether to submit the form after filling (default: False)",
required=False,
),
},
)
@validate_url
async def fill_form(
self,
url: str,
fields: dict[str, str],
submit: bool = False,
) -> str:
"""Fill form fields on a webpage.
Args:
url: The URL of the webpage
fields: Dict mapping selectors to values
submit: Whether to submit the form
Returns:
Success message with filled fields
"""
_ensure_playwright_imported()
page = None
try:
page = await self._open_page(url)
filled = []
for selector, value in fields.items():
try:
locator = page.locator(selector)
await locator.fill(value)
filled.append(selector)
except Exception as e:
raise CommandExecutionError(
f"Could not fill field '{selector}': {e}"
)
if submit and filled:
# Try to find and click submit button
try:
submit_btn = page.locator(
"button[type='submit'], input[type='submit']"
)
await submit_btn.click()
await self._smart_wait(page)
except Exception:
# Try submitting the form directly
try:
await page.locator("form").evaluate("form => form.submit()")
await self._smart_wait(page)
except Exception as e:
raise CommandExecutionError(f"Could not submit form: {e}")
msg = f"Filled {len(filled)} field(s): {', '.join(filled)}"
if submit:
msg += " and submitted form"
return msg
except CommandExecutionError:
raise
except Exception as e:
raise CommandExecutionError(f"Form fill failed: {e}")
finally:
if page:
await page.close()
async def close(self) -> None:
"""Explicitly close the browser and clean up resources.
Call this when done using the component to ensure proper cleanup.
"""
await self._cleanup()

View File

@@ -1,624 +0,0 @@
import asyncio
import logging
import re
from pathlib import Path
from sys import platform
from typing import Iterator, Literal, Optional, Type
from urllib.request import urlretrieve
from bs4 import BeautifulSoup
from pydantic import BaseModel
from selenium.common.exceptions import WebDriverException
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.chrome.service import Service as ChromeDriverService
from selenium.webdriver.chrome.webdriver import WebDriver as ChromeDriver
from selenium.webdriver.common.by import By
from selenium.webdriver.edge.options import Options as EdgeOptions
from selenium.webdriver.edge.service import Service as EdgeDriverService
from selenium.webdriver.edge.webdriver import WebDriver as EdgeDriver
from selenium.webdriver.firefox.options import Options as FirefoxOptions
from selenium.webdriver.firefox.service import Service as GeckoDriverService
from selenium.webdriver.firefox.webdriver import WebDriver as FirefoxDriver
from selenium.webdriver.remote.webdriver import WebDriver
from selenium.webdriver.safari.options import Options as SafariOptions
from selenium.webdriver.safari.webdriver import WebDriver as SafariDriver
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from webdriver_manager.chrome import ChromeDriverManager
from webdriver_manager.firefox import GeckoDriverManager
from webdriver_manager.microsoft import EdgeChromiumDriverManager as EdgeDriverManager
from forge.agent.components import ConfigurableComponent
from forge.agent.protocols import CommandProvider, DirectiveProvider
from forge.command import Command, command
from forge.content_processing.html import extract_hyperlinks, format_hyperlinks
from forge.content_processing.text import extract_information, summarize_text
from forge.llm.providers import MultiProvider
from forge.llm.providers.multi import ModelName
from forge.llm.providers.openai import OpenAIModelName
from forge.models.json_schema import JSONSchema
from forge.utils.exceptions import CommandExecutionError, TooMuchOutputError
from forge.utils.url_validator import validate_url
logger = logging.getLogger(__name__)
FILE_DIR = Path(__file__).parent.parent
MAX_RAW_CONTENT_LENGTH = 500
LINKS_TO_RETURN = 20
BrowserOptions = ChromeOptions | EdgeOptions | FirefoxOptions | SafariOptions
class BrowsingError(CommandExecutionError):
"""An error occurred while trying to browse the page"""
class WebSeleniumConfiguration(BaseModel):
llm_name: ModelName = OpenAIModelName.GPT3
"""Name of the llm model used to read websites"""
web_browser: Literal["chrome", "firefox", "safari", "edge"] = "chrome"
"""Web browser used by Selenium"""
headless: bool = True
"""Run browser in headless mode"""
user_agent: str = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_4) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.97 Safari/537.36"
)
"""User agent used by the browser"""
browse_spacy_language_model: str = "en_core_web_sm"
"""Spacy language model used for chunking text"""
selenium_proxy: Optional[str] = None
"""Http proxy to use with Selenium"""
class WebSeleniumComponent(
DirectiveProvider, CommandProvider, ConfigurableComponent[WebSeleniumConfiguration]
):
"""Provides commands to browse the web using Selenium."""
config_class = WebSeleniumConfiguration
def __init__(
self,
llm_provider: MultiProvider,
data_dir: Path,
config: Optional[WebSeleniumConfiguration] = None,
):
ConfigurableComponent.__init__(self, config)
self.llm_provider = llm_provider
self.data_dir = data_dir
def get_resources(self) -> Iterator[str]:
yield "Ability to read websites."
def get_commands(self) -> Iterator[Command]:
yield self.read_webpage
yield self.take_screenshot
yield self.click_element
yield self.fill_form
@command(
["read_webpage"],
(
"Read a webpage, and extract specific information from it."
" You must specify either topics_of_interest,"
" a question, or get_raw_content."
),
{
"url": JSONSchema(
type=JSONSchema.Type.STRING,
description="The URL to visit",
required=True,
),
"topics_of_interest": JSONSchema(
type=JSONSchema.Type.ARRAY,
items=JSONSchema(type=JSONSchema.Type.STRING),
description=(
"A list of topics about which you want to extract information "
"from the page."
),
required=False,
),
"question": JSONSchema(
type=JSONSchema.Type.STRING,
description=(
"A question you want to answer using the content of the webpage."
),
required=False,
),
"get_raw_content": JSONSchema(
type=JSONSchema.Type.BOOLEAN,
description=(
"If true, the unprocessed content of the webpage will be returned. "
"This consumes a lot of tokens, so use it with caution."
),
required=False,
),
},
)
@validate_url
async def read_webpage(
self,
url: str,
*,
topics_of_interest: list[str] = [],
get_raw_content: bool = False,
question: str = "",
) -> str:
"""Browse a website and return the answer and links to the user
Args:
url (str): The url of the website to browse
question (str): The question to answer using the content of the webpage
Returns:
str: The answer and links to the user and the webdriver
"""
driver = None
try:
driver = await self.open_page_in_browser(url)
text = self.scrape_text_with_selenium(driver)
links = self.scrape_links_with_selenium(driver, url)
return_literal_content = True
summarized = False
if not text:
return f"Website did not contain any text.\n\nLinks: {links}"
elif get_raw_content:
if (
output_tokens := self.llm_provider.count_tokens(
text, self.config.llm_name
)
) > MAX_RAW_CONTENT_LENGTH:
oversize_factor = round(output_tokens / MAX_RAW_CONTENT_LENGTH, 1)
raise TooMuchOutputError(
f"Page content is {oversize_factor}x the allowed length "
"for `get_raw_content=true`"
)
return text + (f"\n\nLinks: {links}" if links else "")
else:
text = await self.summarize_webpage(
text, question or None, topics_of_interest
)
return_literal_content = bool(question)
summarized = True
# Limit links to LINKS_TO_RETURN
if len(links) > LINKS_TO_RETURN:
links = links[:LINKS_TO_RETURN]
text_fmt = f"'''{text}'''" if "\n" in text else f"'{text}'"
links_fmt = "\n".join(f"- {link}" for link in links)
return (
f"Page content{' (summary)' if summarized else ''}:"
if return_literal_content
else "Answer gathered from webpage:"
) + f" {text_fmt}\n\nLinks:\n{links_fmt}"
except WebDriverException as e:
# These errors are often quite long and include lots of context.
# Just grab the first line.
msg = e.msg.split("\n")[0] if e.msg else str(e)
if "net::" in msg:
raise BrowsingError(
"A networking error occurred while trying to load the page: %s"
% re.sub(r"^unknown error: ", "", msg)
)
raise CommandExecutionError(msg)
finally:
if driver:
driver.close()
def scrape_text_with_selenium(self, driver: WebDriver) -> str:
"""Scrape text from a browser window using selenium
Args:
driver (WebDriver): A driver object representing
the browser window to scrape
Returns:
str: the text scraped from the website
"""
# Get the HTML content directly from the browser's DOM
page_source = driver.execute_script("return document.body.outerHTML;")
soup = BeautifulSoup(page_source, "html.parser")
for script in soup(["script", "style"]):
script.extract()
text = soup.get_text()
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = "\n".join(chunk for chunk in chunks if chunk)
return text
def scrape_links_with_selenium(self, driver: WebDriver, base_url: str) -> list[str]:
"""Scrape links from a website using selenium
Args:
driver (WebDriver): A driver object representing
the browser window to scrape
base_url (str): The base URL to use for resolving relative links
Returns:
List[str]: The links scraped from the website
"""
page_source = driver.page_source
soup = BeautifulSoup(page_source, "html.parser")
for script in soup(["script", "style"]):
script.extract()
hyperlinks = extract_hyperlinks(soup, base_url)
return format_hyperlinks(hyperlinks)
async def open_page_in_browser(self, url: str) -> WebDriver:
"""Open a browser window and load a web page using Selenium
Params:
url (str): The URL of the page to load
config (Config): The applicable application configuration
Returns:
driver (WebDriver): A driver object representing
the browser window to scrape
"""
logging.getLogger("selenium").setLevel(logging.CRITICAL)
options_available: dict[str, Type[BrowserOptions]] = {
"chrome": ChromeOptions,
"edge": EdgeOptions,
"firefox": FirefoxOptions,
"safari": SafariOptions,
}
options: BrowserOptions = options_available[self.config.web_browser]()
options.add_argument(f"user-agent={self.config.user_agent}")
if isinstance(options, FirefoxOptions):
if self.config.headless:
options.headless = True # type: ignore
options.add_argument("--disable-gpu")
driver = FirefoxDriver(
service=GeckoDriverService(GeckoDriverManager().install()),
options=options,
)
elif isinstance(options, EdgeOptions):
driver = EdgeDriver(
service=EdgeDriverService(EdgeDriverManager().install()),
options=options,
)
elif isinstance(options, SafariOptions):
# Requires a bit more setup on the users end.
# See https://developer.apple.com/documentation/webkit/testing_with_webdriver_in_safari # noqa: E501
driver = SafariDriver(options=options)
elif isinstance(options, ChromeOptions):
if platform == "linux" or platform == "linux2":
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--remote-debugging-port=9222")
options.add_argument("--no-sandbox")
if self.config.headless:
options.add_argument("--headless=new")
options.add_argument("--disable-gpu")
if self.config.selenium_proxy:
options.add_argument(f"--proxy-server={self.config.selenium_proxy}")
self._sideload_chrome_extensions(options, self.data_dir / "assets" / "crx")
if (chromium_driver_path := Path("/usr/bin/chromedriver")).exists():
chrome_service = ChromeDriverService(str(chromium_driver_path))
else:
try:
chrome_driver = ChromeDriverManager().install()
except AttributeError as e:
if "'NoneType' object has no attribute 'split'" in str(e):
# https://github.com/SergeyPirogov/webdriver_manager/issues/649
logger.critical(
"Connecting to browser failed:"
" is Chrome or Chromium installed?"
)
raise
chrome_service = ChromeDriverService(chrome_driver)
driver = ChromeDriver(service=chrome_service, options=options)
driver.get(url)
# Wait for page to be ready, sleep 2 seconds, wait again until page ready.
# This allows the cookiewall squasher time to get rid of cookie walls.
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
await asyncio.sleep(2)
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
return driver
def _sideload_chrome_extensions(
self, options: ChromeOptions, dl_folder: Path
) -> None:
crx_download_url_template = "https://clients2.google.com/service/update2/crx?response=redirect&prodversion=99.0&acceptformat=crx3&x=id%3D{crx_id}%26installsource%3Dondemand%26uc" # noqa
cookiewall_squasher_crx_id = "edibdbjcniadpccecjdfdjjppcpchdlm"
adblocker_crx_id = "cjpalhdlnbpafiamejdnhcphjbkeiagm"
# Make sure the target folder exists
dl_folder.mkdir(parents=True, exist_ok=True)
for crx_id in (cookiewall_squasher_crx_id, adblocker_crx_id):
crx_path = dl_folder / f"{crx_id}.crx"
if not crx_path.exists():
logger.debug(f"Downloading CRX {crx_id}...")
crx_download_url = crx_download_url_template.format(crx_id=crx_id)
urlretrieve(crx_download_url, crx_path)
logger.debug(f"Downloaded {crx_path.name}")
options.add_extension(str(crx_path))
async def summarize_webpage(
self,
text: str,
question: str | None,
topics_of_interest: list[str],
) -> str:
"""Summarize text using the OpenAI API
Args:
url (str): The url of the text
text (str): The text to summarize
question (str): The question to ask the model
driver (WebDriver): The webdriver to use to scroll the page
Returns:
str: The summary of the text
"""
if not text:
raise ValueError("No text to summarize")
text_length = len(text)
logger.debug(f"Web page content length: {text_length} characters")
result = None
information = None
if topics_of_interest:
information = await extract_information(
text,
topics_of_interest=topics_of_interest,
llm_provider=self.llm_provider,
model_name=self.config.llm_name,
spacy_model=self.config.browse_spacy_language_model,
)
return "\n".join(f"* {i}" for i in information)
else:
result, _ = await summarize_text(
text,
question=question,
llm_provider=self.llm_provider,
model_name=self.config.llm_name,
spacy_model=self.config.browse_spacy_language_model,
)
return result
@command(
["take_screenshot"],
"Take a screenshot of a webpage and save it to a file.",
{
"url": JSONSchema(
type=JSONSchema.Type.STRING,
description="The URL of the webpage to screenshot",
required=True,
),
"filename": JSONSchema(
type=JSONSchema.Type.STRING,
description="Filename for screenshot (e.g. 'screenshot.png')",
required=True,
),
"full_page": JSONSchema(
type=JSONSchema.Type.BOOLEAN,
description="Capture full page including scrollable content",
required=False,
),
},
)
@validate_url
async def take_screenshot(
self, url: str, filename: str, full_page: bool = False
) -> str:
"""Take a screenshot of a webpage.
Args:
url: The URL to screenshot
filename: The filename to save to
full_page: Whether to capture full scrollable page
Returns:
str: Success message with file path
"""
driver = None
try:
driver = await self.open_page_in_browser(url)
if full_page:
# Get full page dimensions
total_height = driver.execute_script(
"return document.body.scrollHeight"
)
driver.set_window_size(1920, total_height)
await asyncio.sleep(0.5) # Wait for resize
# Save screenshot
screenshot_path = self.data_dir / filename
screenshot_path.parent.mkdir(parents=True, exist_ok=True)
driver.save_screenshot(str(screenshot_path))
return f"Screenshot saved to {screenshot_path}"
except WebDriverException as e:
msg = e.msg.split("\n")[0] if e.msg else str(e)
raise CommandExecutionError(f"Screenshot failed: {msg}")
finally:
if driver:
driver.close()
@command(
["click_element"],
"Click an element on a webpage identified by a CSS selector or XPath.",
{
"url": JSONSchema(
type=JSONSchema.Type.STRING,
description="The URL of the webpage",
required=True,
),
"selector": JSONSchema(
type=JSONSchema.Type.STRING,
description="CSS selector or XPath expression to find the element",
required=True,
),
"selector_type": JSONSchema(
type=JSONSchema.Type.STRING,
description="Type of selector: 'css' or 'xpath' (default: 'css')",
required=False,
),
"timeout": JSONSchema(
type=JSONSchema.Type.INTEGER,
description="Timeout in seconds to wait for element (default: 10)",
required=False,
),
},
)
@validate_url
async def click_element(
self,
url: str,
selector: str,
selector_type: str = "css",
timeout: int = 10,
) -> str:
"""Click an element on a webpage.
Args:
url: The URL of the webpage
selector: The CSS selector or XPath
selector_type: Type of selector ('css' or 'xpath')
timeout: Timeout to wait for element
Returns:
str: Success message
"""
driver = None
try:
driver = await self.open_page_in_browser(url)
by_type = By.CSS_SELECTOR if selector_type == "css" else By.XPATH
# Wait for element to be clickable
element = WebDriverWait(driver, timeout).until(
EC.element_to_be_clickable((by_type, selector))
)
element.click()
# Wait for any page changes
await asyncio.sleep(1)
return f"Clicked element matching '{selector}'"
except WebDriverException as e:
msg = e.msg.split("\n")[0] if e.msg else str(e)
raise CommandExecutionError(f"Click failed: {msg}")
finally:
if driver:
driver.close()
@command(
["fill_form"],
"Fill form fields on a webpage with provided values.",
{
"url": JSONSchema(
type=JSONSchema.Type.STRING,
description="The URL of the webpage with the form",
required=True,
),
"fields": JSONSchema(
type=JSONSchema.Type.OBJECT,
description="Dictionary mapping CSS selectors to values to enter",
required=True,
),
"submit": JSONSchema(
type=JSONSchema.Type.BOOLEAN,
description="Whether to submit the form after filling (default: False)",
required=False,
),
},
)
@validate_url
async def fill_form(
self,
url: str,
fields: dict[str, str],
submit: bool = False,
) -> str:
"""Fill form fields on a webpage.
Args:
url: The URL of the webpage
fields: Dict mapping selectors to values
submit: Whether to submit the form
Returns:
str: Success message with filled fields
"""
driver = None
try:
driver = await self.open_page_in_browser(url)
filled = []
for selector, value in fields.items():
try:
element = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, selector))
)
# Clear and fill
element.clear()
element.send_keys(value)
filled.append(selector)
except Exception as e:
raise CommandExecutionError(
f"Could not fill field '{selector}': {e}"
)
if submit and filled:
# Find and click submit button
try:
submit_btn = driver.find_element(
By.CSS_SELECTOR, "button[type='submit'], input[type='submit']"
)
submit_btn.click()
await asyncio.sleep(2) # Wait for submission
except Exception:
# Try submitting the form directly
try:
form = driver.find_element(By.CSS_SELECTOR, "form")
form.submit()
await asyncio.sleep(2)
except Exception as e:
raise CommandExecutionError(f"Could not submit form: {e}")
msg = f"Filled {len(filled)} field(s): {', '.join(filled)}"
if submit:
msg += " and submitted form"
return msg
except WebDriverException as e:
msg = e.msg.split("\n")[0] if e.msg else str(e)
raise CommandExecutionError(f"Form fill failed: {msg}")
finally:
if driver:
driver.close()

View File

@@ -0,0 +1,335 @@
"""Tests for the WebPlaywrightComponent."""
from pathlib import Path
from unittest.mock import AsyncMock
import pytest
from forge.llm.providers.multi import MultiProvider
from . import BrowsingError, WebPlaywrightComponent
from .playwright_browser import WebPlaywrightConfiguration
# Skip all tests if playwright is not installed
pytest.importorskip("playwright")
@pytest.fixture
def web_playwright_component(app_data_dir: Path):
"""Create a WebPlaywrightComponent for testing."""
return WebPlaywrightComponent(MultiProvider(), app_data_dir)
@pytest.fixture
def web_playwright_component_with_config(app_data_dir: Path):
"""Create a WebPlaywrightComponent with custom config for testing."""
config = WebPlaywrightConfiguration(
headless=True,
max_retries=1,
page_load_timeout=5000,
)
return WebPlaywrightComponent(MultiProvider(), app_data_dir, config=config)
class TestWebPlaywrightConfiguration:
"""Tests for WebPlaywrightConfiguration."""
def test_default_configuration(self):
"""Test default configuration values."""
config = WebPlaywrightConfiguration()
assert config.browser_type == "chromium"
assert config.headless is True
assert config.max_retries == 3
assert config.page_load_timeout == 30000
assert config.max_content_length == 100_000
assert config.use_cloud_fallback is True
assert config.block_resources is True
def test_custom_configuration(self):
"""Test custom configuration values."""
config = WebPlaywrightConfiguration(
browser_type="firefox",
headless=False,
max_retries=5,
page_load_timeout=60000,
max_content_length=50_000,
use_cloud_fallback=False,
proxy="http://proxy:8080",
)
assert config.browser_type == "firefox"
assert config.headless is False
assert config.max_retries == 5
assert config.page_load_timeout == 60000
assert config.max_content_length == 50_000
assert config.use_cloud_fallback is False
assert config.proxy == "http://proxy:8080"
class TestWebPlaywrightComponent:
"""Tests for WebPlaywrightComponent."""
def test_component_initialization(
self, web_playwright_component: WebPlaywrightComponent
):
"""Test component initializes correctly."""
assert web_playwright_component._playwright is None
assert web_playwright_component._browser is None
assert web_playwright_component._context is None
def test_get_resources(self, web_playwright_component: WebPlaywrightComponent):
"""Test get_resources returns expected resources."""
resources = list(web_playwright_component.get_resources())
assert len(resources) == 1
assert "Playwright" in resources[0]
def test_get_commands(self, web_playwright_component: WebPlaywrightComponent):
"""Test get_commands returns expected commands."""
commands = list(web_playwright_component.get_commands())
command_names = [cmd.names[0] for cmd in commands]
assert "read_webpage" in command_names
assert "take_screenshot" in command_names
assert "click_element" in command_names
assert "fill_form" in command_names
def test_extract_text(self, web_playwright_component: WebPlaywrightComponent):
"""Test text extraction from HTML."""
html = """
<html>
<head><style>.hidden { display: none; }</style></head>
<body>
<h1>Hello World</h1>
<p>This is a test paragraph.</p>
<script>console.log('ignored');</script>
</body>
</html>
"""
text = web_playwright_component._extract_text(html)
assert "Hello World" in text
assert "This is a test paragraph" in text
assert "console.log" not in text
def test_extract_links(self, web_playwright_component: WebPlaywrightComponent):
"""Test link extraction from HTML."""
html = """
<html>
<body>
<a href="/page1">Page 1</a>
<a href="https://example.com/page2">Page 2</a>
</body>
</html>
"""
links = web_playwright_component._extract_links(html, "https://example.com")
assert len(links) == 2
assert any("Page 1" in link for link in links)
assert any("Page 2" in link for link in links)
def test_truncate_content_short(
self, web_playwright_component: WebPlaywrightComponent
):
"""Test that short content is not truncated."""
short_text = "This is short text."
result = web_playwright_component._truncate_content(short_text)
assert result == short_text
assert "[Content truncated" not in result
def test_truncate_content_long(
self, web_playwright_component: WebPlaywrightComponent
):
"""Test that long content is truncated."""
# Create text longer than max_content_length
long_text = "x" * (web_playwright_component.config.max_content_length + 1000)
result = web_playwright_component._truncate_content(long_text)
assert len(result) < len(long_text)
assert "[Content truncated" in result
class TestWebPlaywrightComponentAsync:
"""Async tests for WebPlaywrightComponent."""
@pytest.mark.asyncio
async def test_browse_website_nonexistent_url(
self, web_playwright_component_with_config: WebPlaywrightComponent
):
"""Test browsing a non-existent URL raises BrowsingError."""
url = "https://auto-gpt-thinks-this-website-does-not-exist.com"
question = "How to execute a barrel roll"
with pytest.raises((BrowsingError, Exception)) as raised:
await web_playwright_component_with_config.read_webpage(
url=url, question=question
)
# Verify error message is reasonable
error_msg = str(raised.value)
assert len(error_msg) < 500
@pytest.mark.asyncio
async def test_browse_website_invalid_url(
self, web_playwright_component: WebPlaywrightComponent
):
"""Test browsing an invalid URL raises ValueError."""
url = "not-a-valid-url"
question = "What is this page about?"
with pytest.raises(ValueError, match="Invalid URL format"):
await web_playwright_component.read_webpage(url=url, question=question)
@pytest.mark.asyncio
async def test_context_manager_cleanup(self, app_data_dir: Path):
"""Test that async context manager properly cleans up resources."""
component = WebPlaywrightComponent(MultiProvider(), app_data_dir)
# Mock the cleanup to verify it's called
component._cleanup = AsyncMock()
async with component:
pass
component._cleanup.assert_called_once()
class TestWebPlaywrightComponentMocked:
"""Tests with mocked browser for faster execution."""
@pytest.mark.asyncio
async def test_read_webpage_with_mocked_browser(self, app_data_dir: Path):
"""Test read_webpage with mocked browser."""
component = WebPlaywrightComponent(MultiProvider(), app_data_dir)
# Create mocks
mock_page = AsyncMock()
mock_page.content.return_value = """
<html>
<body>
<h1>Test Page</h1>
<p>This is test content.</p>
<a href="https://example.com">Example Link</a>
</body>
</html>
"""
mock_page.close = AsyncMock()
mock_context = AsyncMock()
mock_context.new_page.return_value = mock_page
mock_browser = AsyncMock()
mock_playwright = AsyncMock()
# Set component state as if browser was already initialized
component._playwright = mock_playwright
component._browser = mock_browser
component._context = mock_context
# Override _smart_wait to not actually wait
component._smart_wait = AsyncMock()
result = await component.read_webpage(
url="https://example.com", get_raw_content=True
)
assert "Test Page" in result
assert "test content" in result
mock_page.close.assert_called_once()
@pytest.mark.asyncio
async def test_take_screenshot_with_mocked_browser(self, app_data_dir: Path):
"""Test take_screenshot with mocked browser."""
component = WebPlaywrightComponent(MultiProvider(), app_data_dir)
# Create mocks
mock_page = AsyncMock()
mock_page.screenshot = AsyncMock()
mock_page.close = AsyncMock()
mock_context = AsyncMock()
mock_context.new_page.return_value = mock_page
mock_browser = AsyncMock()
mock_playwright = AsyncMock()
# Set component state
component._playwright = mock_playwright
component._browser = mock_browser
component._context = mock_context
component._smart_wait = AsyncMock()
result = await component.take_screenshot(
url="https://example.com", filename="test.png"
)
assert "Screenshot saved" in result
mock_page.screenshot.assert_called_once()
mock_page.close.assert_called_once()
@pytest.mark.asyncio
async def test_click_element_with_mocked_browser(self, app_data_dir: Path):
"""Test click_element with mocked browser."""
from unittest.mock import MagicMock
component = WebPlaywrightComponent(MultiProvider(), app_data_dir)
# Create mocks - locator() is synchronous, but click() is async
mock_locator = MagicMock()
mock_locator.click = AsyncMock()
mock_page = AsyncMock()
mock_page.locator = MagicMock(return_value=mock_locator)
mock_page.close = AsyncMock()
mock_context = AsyncMock()
mock_context.new_page.return_value = mock_page
mock_browser = AsyncMock()
mock_playwright = AsyncMock()
# Set component state
component._playwright = mock_playwright
component._browser = mock_browser
component._context = mock_context
component._smart_wait = AsyncMock()
result = await component.click_element(
url="https://example.com", selector="#button"
)
assert "Clicked element" in result
mock_locator.click.assert_called_once()
mock_page.close.assert_called_once()
@pytest.mark.asyncio
async def test_fill_form_with_mocked_browser(self, app_data_dir: Path):
"""Test fill_form with mocked browser."""
from unittest.mock import MagicMock
component = WebPlaywrightComponent(MultiProvider(), app_data_dir)
# Create mocks - locator() is synchronous, but fill() is async
mock_locator = MagicMock()
mock_locator.fill = AsyncMock()
mock_page = AsyncMock()
mock_page.locator = MagicMock(return_value=mock_locator)
mock_page.close = AsyncMock()
mock_context = AsyncMock()
mock_context.new_page.return_value = mock_page
mock_browser = AsyncMock()
mock_playwright = AsyncMock()
# Set component state
component._playwright = mock_playwright
component._browser = mock_browser
component._context = mock_context
component._smart_wait = AsyncMock()
result = await component.fill_form(
url="https://example.com",
fields={"#username": "testuser", "#password": "testpass"},
)
assert "Filled 2 field(s)" in result
assert mock_locator.fill.call_count == 2
mock_page.close.assert_called_once()

View File

@@ -1,26 +0,0 @@
from pathlib import Path
import pytest
from forge.llm.providers.multi import MultiProvider
from . import BrowsingError, WebSeleniumComponent
@pytest.fixture
def web_selenium_component(app_data_dir: Path):
return WebSeleniumComponent(MultiProvider(), app_data_dir)
@pytest.mark.asyncio
async def test_browse_website_nonexistent_url(
web_selenium_component: WebSeleniumComponent,
):
url = "https://auto-gpt-thinks-this-website-does-not-exist.com"
question = "How to execute a barrel roll"
with pytest.raises(BrowsingError, match="NAME_NOT_RESOLVED") as raised:
await web_selenium_component.read_webpage(url=url, question=question)
# Sanity check that the response is not too long
assert len(raised.exconly()) < 200

View File

@@ -1195,7 +1195,7 @@ description = "Foreign Function Interface for Python calling C code."
optional = false
python-versions = ">=3.9"
groups = ["main"]
markers = "platform_python_implementation != \"CPython\" or os_name == \"nt\" and implementation_name != \"pypy\""
markers = "platform_python_implementation != \"CPython\""
files = [
{file = "cffi-2.0.0-cp310-cp310-macosx_10_13_x86_64.whl", hash = "sha256:0cf2d91ecc3fcc0625c2c530fe004f82c110405f101548512cce44322fa8ac44"},
{file = "cffi-2.0.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:f73b96c41e3b2adedc34a7356e64c8eb96e03a3782b535e043a986276ce12a49"},
@@ -2537,7 +2537,6 @@ description = "Lightweight in-process concurrent programming"
optional = false
python-versions = ">=3.10"
groups = ["main"]
markers = "platform_machine == \"aarch64\" or platform_machine == \"ppc64le\" or platform_machine == \"x86_64\" or platform_machine == \"amd64\" or platform_machine == \"AMD64\" or platform_machine == \"win32\" or platform_machine == \"WIN32\""
files = [
{file = "greenlet-3.3.0-cp310-cp310-macosx_11_0_universal2.whl", hash = "sha256:6f8496d434d5cb2dce025773ba5597f71f5410ae499d5dd9533e0653258cdb3d"},
{file = "greenlet-3.3.0-cp310-cp310-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:b96dc7eef78fd404e022e165ec55327f935b9b52ff355b067eb4a0267fc1cffb"},
@@ -4560,21 +4559,6 @@ files = [
{file = "orjson-3.11.5.tar.gz", hash = "sha256:82393ab47b4fe44ffd0a7659fa9cfaacc717eb617c93cde83795f14af5c2e9d5"},
]
[[package]]
name = "outcome"
version = "1.3.0.post0"
description = "Capture the outcome of Python function calls."
optional = false
python-versions = ">=3.7"
groups = ["main"]
files = [
{file = "outcome-1.3.0.post0-py2.py3-none-any.whl", hash = "sha256:e771c5ce06d1415e356078d3bdd68523f284b4ce5419828922b6871e65eda82b"},
{file = "outcome-1.3.0.post0.tar.gz", hash = "sha256:9dcf02e65f2971b80047b377468e72a268e15c0af3cf1238e6ff14f7f91143b8"},
]
[package.dependencies]
attrs = ">=19.2.0"
[[package]]
name = "overrides"
version = "7.7.0"
@@ -4754,6 +4738,28 @@ files = [
{file = "playsound-1.2.2-py2.py3-none-any.whl", hash = "sha256:1e83750a5325cbccee03d6e751ba3e78c037ac95b95a3ba1f38d0c5aca9e1a34"},
]
[[package]]
name = "playwright"
version = "1.57.0"
description = "A high-level API to automate web browsers"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "playwright-1.57.0-py3-none-macosx_10_13_x86_64.whl", hash = "sha256:9351c1ac3dfd9b3820fe7fc4340d96c0d3736bb68097b9b7a69bd45d25e9370c"},
{file = "playwright-1.57.0-py3-none-macosx_11_0_arm64.whl", hash = "sha256:a4a9d65027bce48eeba842408bcc1421502dfd7e41e28d207e94260fa93ca67e"},
{file = "playwright-1.57.0-py3-none-macosx_11_0_universal2.whl", hash = "sha256:99104771abc4eafee48f47dac2369e0015516dc1ce8c409807d2dd440828b9a4"},
{file = "playwright-1.57.0-py3-none-manylinux1_x86_64.whl", hash = "sha256:284ed5a706b7c389a06caa431b2f0ba9ac4130113c3a779767dda758c2497bb1"},
{file = "playwright-1.57.0-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:38a1bae6c0a07839cdeaddbc0756b3b2b85e476c07945f64ece08f1f956a86f1"},
{file = "playwright-1.57.0-py3-none-win32.whl", hash = "sha256:1dd93b265688da46e91ecb0606d36f777f8eadcf7fbef12f6426b20bf0c9137c"},
{file = "playwright-1.57.0-py3-none-win_amd64.whl", hash = "sha256:6caefb08ed2c6f29d33b8088d05d09376946e49a73be19271c8cd5384b82b14c"},
{file = "playwright-1.57.0-py3-none-win_arm64.whl", hash = "sha256:5f065f5a133dbc15e6e7c71e7bc04f258195755b1c32a432b792e28338c8335e"},
]
[package.dependencies]
greenlet = ">=3.1.1,<4.0.0"
pyee = ">=13,<14"
[[package]]
name = "pluggy"
version = "1.6.0"
@@ -5323,7 +5329,7 @@ description = "C parser in Python"
optional = false
python-versions = ">=3.8"
groups = ["main"]
markers = "(platform_python_implementation != \"CPython\" or os_name == \"nt\") and (platform_python_implementation != \"CPython\" or implementation_name != \"pypy\") and implementation_name != \"PyPy\""
markers = "platform_python_implementation != \"CPython\" and implementation_name != \"PyPy\""
files = [
{file = "pycparser-2.23-py3-none-any.whl", hash = "sha256:e5c6e8d3fbad53479cab09ac03729e0a9faf2bee3db8208a550daf5af81a5934"},
{file = "pycparser-2.23.tar.gz", hash = "sha256:78816d4f24add8f10a06d6f05b4d424ad9e96cfebf68a4ddc99c65c0720d00c2"},
@@ -5485,6 +5491,24 @@ files = [
[package.dependencies]
typing-extensions = ">=4.14.1"
[[package]]
name = "pyee"
version = "13.0.0"
description = "A rough port of Node.js's EventEmitter to Python with a few tricks of its own"
optional = false
python-versions = ">=3.8"
groups = ["main"]
files = [
{file = "pyee-13.0.0-py3-none-any.whl", hash = "sha256:48195a3cddb3b1515ce0695ed76036b5ccc2ef3a9f963ff9f77aec0139845498"},
{file = "pyee-13.0.0.tar.gz", hash = "sha256:b391e3c5a434d1f5118a25615001dbc8f669cf410ab67d04c4d4e07c55481c37"},
]
[package.dependencies]
typing-extensions = "*"
[package.extras]
dev = ["black", "build", "flake8", "flake8-black", "isort", "jupyter-console", "mkdocs", "mkdocs-include-markdown-plugin", "mkdocstrings[python]", "mypy", "pytest", "pytest-asyncio ; python_version >= \"3.4\"", "pytest-trio ; python_version >= \"3.7\"", "sphinx", "toml", "tox", "trio", "trio ; python_version > \"3.6\"", "trio-typing ; python_version > \"3.6\"", "twine", "twisted", "validate-pyproject[all]"]
[[package]]
name = "pyflakes"
version = "3.4.0"
@@ -5618,19 +5642,6 @@ all = ["nodejs-wheel-binaries", "twine (>=3.4.1)"]
dev = ["twine (>=3.4.1)"]
nodejs = ["nodejs-wheel-binaries"]
[[package]]
name = "pysocks"
version = "1.7.1"
description = "A Python SOCKS client module. See https://github.com/Anorov/PySocks for more information."
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*"
groups = ["main"]
files = [
{file = "PySocks-1.7.1-py27-none-any.whl", hash = "sha256:08e69f092cc6dbe92a0fdd16eeb9b9ffbc13cadfe5ca4c7bd92ffb078b293299"},
{file = "PySocks-1.7.1-py3-none-any.whl", hash = "sha256:2725bd0a9925919b9b51739eea5f9e2bae91e83288108a9ad338b2e3a4435ee5"},
{file = "PySocks-1.7.1.tar.gz", hash = "sha256:3f8804571ebe159c380ac6de37643bb4685970655d3bba243530d6558b799aa0"},
]
[[package]]
name = "pytest"
version = "7.4.4"
@@ -6271,26 +6282,6 @@ botocore = ">=1.37.4,<2.0a0"
[package.extras]
crt = ["botocore[crt] (>=1.37.4,<2.0a0)"]
[[package]]
name = "selenium"
version = "4.32.0"
description = "Official Python bindings for Selenium WebDriver"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "selenium-4.32.0-py3-none-any.whl", hash = "sha256:c4d9613f8a45693d61530c9660560fadb52db7d730237bc788ddedf442391f97"},
{file = "selenium-4.32.0.tar.gz", hash = "sha256:b9509bef4056f4083772abb1ae19ff57247d617a29255384b26be6956615b206"},
]
[package.dependencies]
certifi = ">=2021.10.8"
trio = ">=0.17,<1.0"
trio-websocket = ">=0.9,<1.0"
typing_extensions = ">=4.9,<5.0"
urllib3 = {version = ">=1.26,<3", extras = ["socks"]}
websocket-client = ">=1.8,<2.0"
[[package]]
name = "sentry-sdk"
version = "1.45.1"
@@ -6446,18 +6437,6 @@ files = [
{file = "socksio-1.0.0.tar.gz", hash = "sha256:f88beb3da5b5c38b9890469de67d0cb0f9d494b78b106ca1845f96c10b91c4ac"},
]
[[package]]
name = "sortedcontainers"
version = "2.4.0"
description = "Sorted Containers -- Sorted List, Sorted Dict, Sorted Set"
optional = false
python-versions = "*"
groups = ["main"]
files = [
{file = "sortedcontainers-2.4.0-py2.py3-none-any.whl", hash = "sha256:a163dcaede0f1c021485e957a39245190e74249897e2ae4b2aa38595db237ee0"},
{file = "sortedcontainers-2.4.0.tar.gz", hash = "sha256:25caa5a06cc30b6b83d11423433f65d1f9d76c4c6a0c90e3379eaa43b9bfdb88"},
]
[[package]]
name = "soupsieve"
version = "2.8.2"
@@ -7212,43 +7191,6 @@ urllib3 = ">=1.26,<3"
all = ["brotli", "cchardet (>=2.1.7) ; python_version < \"3.11\"", "faust-cchardet (>=2.1.19) ; python_version >= \"3.11\"", "htmldate[speed] (>=1.9.2)", "py3langid (>=0.3.0)", "pycurl (>=7.45.3)", "urllib3[socks]", "zstandard (>=0.23.0)"]
dev = ["flake8", "mypy", "pytest", "pytest-cov", "types-lxml", "types-urllib3"]
[[package]]
name = "trio"
version = "0.32.0"
description = "A friendly Python library for async concurrency and I/O"
optional = false
python-versions = ">=3.10"
groups = ["main"]
files = [
{file = "trio-0.32.0-py3-none-any.whl", hash = "sha256:4ab65984ef8370b79a76659ec87aa3a30c5c7c83ff250b4de88c29a8ab6123c5"},
{file = "trio-0.32.0.tar.gz", hash = "sha256:150f29ec923bcd51231e1d4c71c7006e65247d68759dd1c19af4ea815a25806b"},
]
[package.dependencies]
attrs = ">=23.2.0"
cffi = {version = ">=1.14", markers = "os_name == \"nt\" and implementation_name != \"pypy\""}
idna = "*"
outcome = "*"
sniffio = ">=1.3.0"
sortedcontainers = "*"
[[package]]
name = "trio-websocket"
version = "0.12.2"
description = "WebSocket library for Trio"
optional = false
python-versions = ">=3.8"
groups = ["main"]
files = [
{file = "trio_websocket-0.12.2-py3-none-any.whl", hash = "sha256:df605665f1db533f4a386c94525870851096a223adcb97f72a07e8b4beba45b6"},
{file = "trio_websocket-0.12.2.tar.gz", hash = "sha256:22c72c436f3d1e264d0910a3951934798dcc5b00ae56fc4ee079d46c7cf20fae"},
]
[package.dependencies]
outcome = ">=1.2.0"
trio = ">=0.11"
wsproto = ">=0.14"
[[package]]
name = "typer"
version = "0.21.1"
@@ -7485,9 +7427,6 @@ files = [
{file = "urllib3-1.26.20.tar.gz", hash = "sha256:40c2dc0c681e47eb8f90e7e27bf6ff7df2e677421fd46756da1161c39ca70d32"},
]
[package.dependencies]
PySocks = {version = ">=1.5.6,<1.5.7 || >1.5.7,<2.0", optional = true, markers = "extra == \"socks\""}
[package.extras]
brotli = ["brotli (==1.0.9) ; os_name != \"nt\" and python_version < \"3\" and platform_python_implementation == \"CPython\"", "brotli (>=1.0.9) ; python_version >= \"3\" and platform_python_implementation == \"CPython\"", "brotlicffi (>=0.8.0) ; (os_name != \"nt\" or python_version >= \"3\") and platform_python_implementation != \"CPython\"", "brotlipy (>=0.6.0) ; os_name == \"nt\" and python_version < \"3\""]
secure = ["certifi", "cryptography (>=1.3.4)", "idna (>=2.0.0)", "ipaddress ; python_version == \"2.7\"", "pyOpenSSL (>=0.14)", "urllib3-secure-extra"]
@@ -7808,23 +7747,6 @@ srsly = ">=2.4.3,<3.0.0"
typer-slim = ">=0.3.0,<1.0.0"
wasabi = ">=0.9.1,<1.2.0"
[[package]]
name = "webdriver-manager"
version = "4.0.2"
description = "Library provides the way to automatically manage drivers for different browsers"
optional = false
python-versions = ">=3.7"
groups = ["main"]
files = [
{file = "webdriver_manager-4.0.2-py2.py3-none-any.whl", hash = "sha256:75908d92ecc45ff2b9953614459c633db8f9aa1ff30181cefe8696e312908129"},
{file = "webdriver_manager-4.0.2.tar.gz", hash = "sha256:efedf428f92fd6d5c924a0d054e6d1322dd77aab790e834ee767af392b35590f"},
]
[package.dependencies]
packaging = "*"
python-dotenv = "*"
requests = "*"
[[package]]
name = "websocket-client"
version = "1.9.0"
@@ -8033,21 +7955,6 @@ files = [
[package.extras]
dev = ["pytest", "setuptools"]
[[package]]
name = "wsproto"
version = "1.2.0"
description = "WebSockets state-machine based protocol implementation"
optional = false
python-versions = ">=3.7.0"
groups = ["main"]
files = [
{file = "wsproto-1.2.0-py3-none-any.whl", hash = "sha256:b9acddd652b585d75b20477888c56642fdade28bdfd3579aa24a4d2c037dd736"},
{file = "wsproto-1.2.0.tar.gz", hash = "sha256:ad565f26ecb92588a3e43bc3d96164de84cd9902482b130d0ddbaa9664a85065"},
]
[package.dependencies]
h11 = ">=0.9.0,<1"
[[package]]
name = "yarl"
version = "1.22.0"
@@ -8216,4 +8123,4 @@ type = ["pytest-mypy"]
[metadata]
lock-version = "2.1"
python-versions = "^3.12"
content-hash = "b6a5244884db6d11e2f3a100b9b89e358da6b17c9593e9afd559ade4ebb620f9"
content-hash = "609562fe6b77ff5067a3f939721e2fd4baecef616c030e607559db33c942a7f7"

View File

@@ -51,6 +51,7 @@ litellm = "^1.17.9"
numpy = ">=2.0.0"
openai = "^1.50.0"
Pillow = "*"
playwright = "^1.50.0"
playsound = "~1.2.2"
pydantic = "^2.7.2"
python-docx = "*"
@@ -60,7 +61,6 @@ pylatexenc = "*"
pypdf = "^3.1.0"
pyyaml = "^6.0"
requests = "*"
selenium = "^4.13.0"
sqlalchemy = "^2.0.19"
sentry-sdk = "^1.40.4"
spacy = "^3.8.0"
@@ -69,7 +69,6 @@ tiktoken = ">=0.7.0,<1.0.0"
toml = "^0.10.2"
uvicorn = { extras = ["standard"], version = ">=0.23.2,<1" }
watchdog = "^6.0.0"
webdriver-manager = "^4.0.2"
[tool.poetry.extras]
# benchmark extra removed - use direct_benchmark instead

View File

@@ -74,7 +74,7 @@ Agent(
- `self.git_ops` - GitOperationsComponent
- `self.image_gen` - ImageGeneratorComponent
- `self.web_search` - WebSearchComponent
- `self.web_selenium` - WebSeleniumComponent
- `self.web_browser` - WebPlaywrightComponent
- `self.context` - ContextComponent
- `self.watchdog` - WatchdogComponent
- `self.user_interaction` - UserInteractionComponent

View File

@@ -37,7 +37,7 @@ from forge.components.text_utils import TextUtilsComponent
from forge.components.todo import TodoComponent
from forge.components.user_interaction import UserInteractionComponent
from forge.components.watchdog import WatchdogComponent
from forge.components.web import WebSearchComponent, WebSeleniumComponent
from forge.components.web import WebPlaywrightComponent, WebSearchComponent
from forge.file_storage.base import FileStorage
from forge.llm.prompting.schema import ChatPrompt
from forge.llm.prompting.utils import dump_prompt
@@ -161,7 +161,7 @@ class Agent(BaseAgent[AnyActionProposal], Configurable[AgentSettings]):
self.git_ops = GitOperationsComponent()
self.image_gen = ImageGeneratorComponent(self.file_manager.workspace)
self.web_search = WebSearchComponent()
self.web_selenium = WebSeleniumComponent(
self.web_browser = WebPlaywrightComponent(
llm_provider,
app_config.app_data_dir,
)

View File

@@ -633,10 +633,23 @@ class ReflexionPromptStrategy(BaseMultiStepPromptStrategy):
# Phase and reflection_context are stored in strategy state, not in the proposal
# Ensure thoughts has all required fields
# Ensure thoughts has all required fields for ReflexionThoughts model
thoughts = assistant_reply_dict.get("thoughts", {})
if not isinstance(thoughts, dict):
thoughts = {}
# Set defaults for all required fields
if "observations" not in thoughts:
thoughts["observations"] = thoughts.get("text", "")
if "reasoning" not in thoughts:
thoughts["reasoning"] = ""
if "self_reflection" not in thoughts:
thoughts["self_reflection"] = thoughts.get("reasoning", "")
if "self_criticism" not in thoughts:
thoughts["self_criticism"] = thoughts.get("criticism", "")
if "plan" not in thoughts:
thoughts["plan"] = thoughts.get("plan", [])
if isinstance(thoughts["plan"], str):
thoughts["plan"] = [thoughts["plan"]] if thoughts["plan"] else []
if "lessons_applied" not in thoughts:
thoughts["lessons_applied"] = []
assistant_reply_dict["thoughts"] = thoughts

File diff suppressed because it is too large Load Diff

View File

@@ -49,12 +49,10 @@ sentry-sdk = "^1.40.4"
rich = "^13.0"
prompt-toolkit = "^3.0.0"
# Benchmarking
agbenchmark = { path = "../benchmark", optional = true, develop = true }
# agbenchmark = {git = "https://github.com/Significant-Gravitas/AutoGPT.git", subdirectory = "benchmark", optional = true}
# Benchmarking - use direct_benchmark instead (agbenchmark removed)
[tool.poetry.extras]
benchmark = ["agbenchmark"]
# benchmark extra removed - use direct_benchmark instead
[tool.poetry.group.dev.dependencies]
black = "^23.12.1"

View File

@@ -86,10 +86,14 @@ def test_strategy_comparison_quick():
Note: Requires API keys to be configured in environment.
"""
result = run_harness(
"--strategies", "one_shot",
"--categories", "general",
"-N", "1",
"--tests", "ReadFile", # Single fast test for smoke testing
"--strategies",
"one_shot",
"--categories",
"general",
"-N",
"1",
"--tests",
"ReadFile", # Single fast test for smoke testing
)
# Print output for debugging
@@ -112,9 +116,12 @@ def test_single_strategy():
to verify basic functionality without testing all strategies.
"""
result = run_harness(
"--strategies", "one_shot",
"--categories", "coding",
"--tests", "ReadFile,WriteFile",
"--strategies",
"one_shot",
"--categories",
"coding",
"--tests",
"ReadFile,WriteFile",
)
# Print output for debugging