Websurfer Refactor (#4165)

* first commit

* use_ocr flag

* initial refactor

* test

* small fixes

* adding animations, in progress

* red boundary animations

* add hover tool

* fix timeout time - reduces a lot of errors

* put prompts in separate file

* prompt organize

* add cursor animation

* format and checks pass

* added check for reset
This commit is contained in:
Hussein Mozannar
2024-11-13 11:41:36 -08:00
committed by GitHub
parent 16463a8a98
commit 317d5d03ec
4 changed files with 662 additions and 275 deletions

View File

@@ -15,7 +15,6 @@ from typing import (
Optional,
Sequence,
Tuple,
Union,
cast,
)
@@ -38,15 +37,17 @@ from autogen_core.components.models import (
SystemMessage,
UserMessage,
)
from playwright._impl._errors import Error as PlaywrightError
from playwright._impl._errors import TimeoutError
from PIL import Image
from playwright.async_api import BrowserContext, Download, Page, Playwright, async_playwright
from ._events import WebSurferEvent
from ._playwright_controller import PlaywrightController
from ._prompts import WEB_SURFER_OCR_PROMPT, WEB_SURFER_QA_PROMPT, WEB_SURFER_QA_SYSTEM_MESSAGE, WEB_SURFER_TOOL_PROMPT
from ._set_of_mark import add_set_of_mark
from ._tool_definitions import (
TOOL_CLICK,
TOOL_HISTORY_BACK,
TOOL_HOVER,
TOOL_PAGE_DOWN,
TOOL_PAGE_UP,
TOOL_READ_PAGE_AND_ANSWER,
@@ -58,13 +59,7 @@ from ._tool_definitions import (
TOOL_VISIT_URL,
TOOL_WEB_SEARCH,
)
from ._types import (
InteractiveRegion,
UserContent,
VisualViewport,
interactiveregion_from_dict,
visualviewport_from_dict,
)
from ._types import InteractiveRegion, UserContent
from ._utils import message_content_to_str
# Viewport dimensions
@@ -98,6 +93,11 @@ class MultimodalWebSurfer(BaseChatAgent):
downloads_folder: str | None = None,
debug_dir: str | None = os.getcwd(),
to_save_screenshots: bool = False,
animate_actions: bool = False,
use_ocr: bool = True,
to_resize_viewport: bool = True,
playwright: Playwright | None = None,
context: BrowserContext | None = None,
):
"""
Initialize the MultimodalWebSurfer.
@@ -113,6 +113,11 @@ class MultimodalWebSurfer(BaseChatAgent):
downloads_folder (str | None): The folder to save downloads. Defaults to None.
debug_dir (str | None): The directory to save debug information. Defaults to the current working directory.
to_save_screenshots (bool): Whether to save screenshots. Defaults to False.
animate_actions (bool): Whether to animate actions. Defaults to False.
use_ocr (bool): Whether to use OCR to extract text from screenshots, otherwise extract text from page. Defaults to True.
to_resize_viewport (bool): Whether to resize the viewport. Defaults to True.
playwright (Playwright | None): The playwright instance to use. Defaults to None and creates a new one.
context (BrowserContext | None): The browser context to use. Defaults to None and creates a new one.
"""
super().__init__(name, description)
self._model_client = model_client
@@ -124,21 +129,18 @@ class MultimodalWebSurfer(BaseChatAgent):
self.downloads_folder = downloads_folder
self.debug_dir = debug_dir
self.to_save_screenshots = to_save_screenshots
self.use_ocr = use_ocr
self.to_resize_viewport = to_resize_viewport
self.animate_actions = animate_actions
self._chat_history: List[LLMMessage] = []
# Call init to set these
self._playwright: Playwright | None = None
self._context: BrowserContext | None = None
# Call init to set these in case not set
self._playwright: Playwright | None = playwright
self._context: BrowserContext | None = context
self._page: Page | None = None
self._last_download: Download | None = None
self._prior_metadata_hash: str | None = None
self.logger = logging.getLogger(EVENT_LOGGER_NAME + f".{self.name}.MultimodalWebSurfer")
# Read page_script
self._page_script: str = ""
with open(os.path.join(os.path.abspath(os.path.dirname(__file__)), "page_script.js"), "rt") as fh:
self._page_script = fh.read()
self._chat_history: List[LLMMessage] = []
# Define the download handler
def _download_handler(download: Download) -> None:
@@ -146,6 +148,27 @@ class MultimodalWebSurfer(BaseChatAgent):
self._download_handler = _download_handler
# Define the Playwright controller that handles the browser interactions
self._playwright_controller = PlaywrightController(
animate_actions=self.animate_actions,
downloads_folder=self.downloads_folder,
viewport_width=VIEWPORT_WIDTH,
viewport_height=VIEWPORT_HEIGHT,
_download_handler=self._download_handler,
to_resize_viewport=self.to_resize_viewport,
)
self.default_tools = [
TOOL_VISIT_URL,
TOOL_HISTORY_BACK,
TOOL_CLICK,
TOOL_TYPE,
TOOL_READ_PAGE_AND_ANSWER,
TOOL_SUMMARIZE_PAGE,
TOOL_SLEEP,
TOOL_HOVER,
]
self.did_lazy_init = False
@property
def produced_message_types(self) -> List[type[ChatMessage]]:
return [MultiModalMessage]
@@ -169,9 +192,18 @@ class MultimodalWebSurfer(BaseChatAgent):
)
async def on_reset(self, cancellation_token: CancellationToken) -> None:
if not self.did_lazy_init:
return
assert self._page is not None
self._chat_history.clear()
await self._visit_page(self.start_page)
reset_prior_metadata, reset_last_download = await self._playwright_controller.visit_page(
self._page, self.start_page
)
if reset_last_download and self._last_download is not None:
self._last_download = None
if reset_prior_metadata and self._prior_metadata_hash is not None:
self._prior_metadata_hash = None
if self.to_save_screenshots:
current_timestamp = "_" + int(time.time()).__str__()
screenshot_png_name = "screenshot" + current_timestamp + ".png"
@@ -202,18 +234,20 @@ class MultimodalWebSurfer(BaseChatAgent):
launch_args: Dict[str, Any] = {"headless": self.headless}
if self.browser_channel is not None:
launch_args["channel"] = self.browser_channel
self._playwright = await async_playwright().start()
if self._playwright is None:
self._playwright = await async_playwright().start()
# Create the context -- are we launching persistent?
if self.browser_data_dir is None:
browser = await self._playwright.chromium.launch(**launch_args)
self._context = await browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0"
)
else:
self._context = await self._playwright.chromium.launch_persistent_context(
self.browser_data_dir, **launch_args
)
if self._context is None:
if self.browser_data_dir is None:
browser = await self._playwright.chromium.launch(**launch_args)
self._context = await browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0"
)
else:
self._context = await self._playwright.chromium.launch_persistent_context(
self.browser_data_dir, **launch_args
)
# Create the page
self._context.set_default_timeout(60000) # One minute
@@ -221,7 +255,8 @@ class MultimodalWebSurfer(BaseChatAgent):
assert self._page is not None
# self._page.route(lambda x: True, self._route_handler)
self._page.on("download", self._download_handler)
await self._page.set_viewport_size({"width": VIEWPORT_WIDTH, "height": VIEWPORT_HEIGHT})
if self.to_resize_viewport:
await self._page.set_viewport_size({"width": VIEWPORT_WIDTH, "height": VIEWPORT_HEIGHT})
await self._page.add_init_script(
path=os.path.join(os.path.abspath(os.path.dirname(__file__)), "page_script.js")
)
@@ -231,10 +266,6 @@ class MultimodalWebSurfer(BaseChatAgent):
# Prepare the debug directory -- which stores the screenshots generated throughout the process
await self._set_debug_dir(self.debug_dir)
async def _sleep(self, duration: Union[int, float]) -> None:
assert self._page is not None
await self._page.wait_for_timeout(duration * 1000)
async def _set_debug_dir(self, debug_dir: str | None) -> None:
assert self._page is not None
self.debug_dir = debug_dir
@@ -275,7 +306,7 @@ class MultimodalWebSurfer(BaseChatAgent):
aria_name = re.sub(r"[\n\r]+", " ", rects[r].get("aria_name", "")).strip()
# What are the actions?
actions = ['"click"']
actions = ['"click", "hover"']
if rects[r]["role"] in ["textbox", "searchbox", "search"]:
actions = ['"input_text"']
actions_str = "[" + ",".join(actions) + "]"
@@ -289,7 +320,6 @@ class MultimodalWebSurfer(BaseChatAgent):
message: List[FunctionCall],
rects: Dict[str, InteractiveRegion],
tool_names: str,
use_ocr: bool = True,
cancellation_token: Optional[CancellationToken] = None,
) -> Tuple[bool, UserContent]:
name = message[0].name
@@ -311,30 +341,43 @@ class MultimodalWebSurfer(BaseChatAgent):
action_description = f"I typed '{url}' into the browser address bar."
# Check if the argument starts with a known protocol
if url.startswith(("https://", "http://", "file://", "about:")):
await self._visit_page(url)
reset_prior_metadata, reset_last_download = await self._playwright_controller.visit_page(
self._page, url
)
# If the argument contains a space, treat it as a search query
elif " " in url:
await self._visit_page(f"https://www.bing.com/search?q={quote_plus(url)}&FORM=QBLH")
reset_prior_metadata, reset_last_download = await self._playwright_controller.visit_page(
self._page, f"https://www.bing.com/search?q={quote_plus(url)}&FORM=QBLH"
)
# Otherwise, prefix with https://
else:
await self._visit_page("https://" + url)
reset_prior_metadata, reset_last_download = await self._playwright_controller.visit_page(
self._page, "https://" + url
)
if reset_last_download and self._last_download is not None:
self._last_download = None
if reset_prior_metadata and self._prior_metadata_hash is not None:
self._prior_metadata_hash = None
elif name == "history_back":
action_description = "I clicked the browser back button."
await self._back()
await self._playwright_controller.back(self._page)
elif name == "web_search":
query = args.get("query")
action_description = f"I typed '{query}' into the browser search bar."
await self._visit_page(f"https://www.bing.com/search?q={quote_plus(query)}&FORM=QBLH")
reset_prior_metadata, reset_last_download = await self._playwright_controller.visit_page(
self._page, f"https://www.bing.com/search?q={quote_plus(query)}&FORM=QBLH"
)
if reset_last_download and self._last_download is not None:
self._last_download = None
if reset_prior_metadata and self._prior_metadata_hash is not None:
self._prior_metadata_hash = None
elif name == "page_up":
action_description = "I scrolled up one page in the browser."
await self._page_up()
await self._playwright_controller.page_up(self._page)
elif name == "page_down":
action_description = "I scrolled down one page in the browser."
await self._page_down()
await self._playwright_controller.page_down(self._page)
elif name == "click":
target_id = str(args.get("target_id"))
@@ -343,8 +386,17 @@ class MultimodalWebSurfer(BaseChatAgent):
action_description = f"I clicked '{target_name}'."
else:
action_description = "I clicked the control."
await self._click_id(target_id)
new_page_tentative = await self._playwright_controller.click_id(self._page, target_id)
if new_page_tentative is not None:
self._page = new_page_tentative
self._prior_metadata_hash = None
self.logger.info(
WebSurferEvent(
source=self.name,
url=self._page.url,
message="New tab or window.",
)
)
elif name == "input_text":
input_field_id = str(args.get("input_field_id"))
text_value = str(args.get("text_value"))
@@ -353,7 +405,7 @@ class MultimodalWebSurfer(BaseChatAgent):
action_description = f"I typed '{text_value}' into '{input_field_name}'."
else:
action_description = f"I input '{text_value}'."
await self._fill_id(input_field_id, text_value)
await self._playwright_controller.fill_id(self._page, input_field_id, text_value)
elif name == "scroll_element_up":
target_id = str(args.get("target_id"))
@@ -364,7 +416,7 @@ class MultimodalWebSurfer(BaseChatAgent):
else:
action_description = "I scrolled the control up."
await self._scroll_id(target_id, "up")
await self._playwright_controller.scroll_id(self._page, target_id, "up")
elif name == "scroll_element_down":
target_id = str(args.get("target_id"))
@@ -375,17 +427,36 @@ class MultimodalWebSurfer(BaseChatAgent):
else:
action_description = "I scrolled the control down."
await self._scroll_id(target_id, "down")
await self._playwright_controller.scroll_id(self._page, target_id, "down")
elif name == "answer_question":
question = str(args.get("question"))
action_description = f"I answered the following question '{question}' based on the web page."
# Do Q&A on the DOM. No need to take further action. Browser state does not change.
return False, await self._summarize_page(question=question, cancellation_token=cancellation_token)
elif name == "summarize_page":
# Summarize the DOM. No need to take further action. Browser state does not change.
action_description = "I summarized the current web page"
return False, await self._summarize_page(cancellation_token=cancellation_token)
elif name == "hover":
target_id = str(args.get("target_id"))
target_name = self._target_name(target_id, rects)
if target_name:
action_description = f"I hovered over '{target_name}'."
else:
action_description = "I hovered over the control."
await self._playwright_controller.hover_id(self._page, target_id)
elif name == "sleep":
action_description = "I am waiting a short period of time before taking further action."
await self._sleep(3) # There's a 2s sleep below too
await self._playwright_controller.sleep(self._page, 3) # There's a 2s sleep below too
else:
raise ValueError(f"Unknown tool '{name}'. Please choose from:\n\n{tool_names}")
await self._page.wait_for_load_state()
await self._sleep(3)
await self._playwright_controller.sleep(self._page, 3) # There's a 2s sleep below too
# Handle downloads
if self._last_download is not None and self.downloads_folder is not None:
@@ -399,7 +470,7 @@ class MultimodalWebSurfer(BaseChatAgent):
await self._page.wait_for_load_state()
# Handle metadata
page_metadata = json.dumps(await self._get_page_metadata(), indent=4)
page_metadata = json.dumps(await self._playwright_controller.get_page_metadata(self._page), indent=4)
metadata_hash = hashlib.md5(page_metadata.encode("utf-8")).hexdigest()
if metadata_hash != self._prior_metadata_hash:
page_metadata = (
@@ -410,7 +481,7 @@ class MultimodalWebSurfer(BaseChatAgent):
self._prior_metadata_hash = metadata_hash
# Describe the viewport of the new page in words
viewport = await self._get_visual_viewport()
viewport = await self._playwright_controller.get_visual_viewport(self._page)
percent_visible = int(viewport["height"] * 100 / viewport["scrollHeight"])
percent_scrolled = int(viewport["pageTop"] * 100 / viewport["scrollHeight"])
if percent_scrolled < 1: # Allow some rounding error
@@ -435,7 +506,9 @@ class MultimodalWebSurfer(BaseChatAgent):
)
ocr_text = (
await self._get_ocr_text(new_screenshot, cancellation_token=cancellation_token) if use_ocr is True else ""
await self._get_ocr_text(new_screenshot, cancellation_token=cancellation_token)
if self.use_ocr is True
else await self._playwright_controller.get_webpage_text(self._page)
)
# Return the complete observation
@@ -451,7 +524,7 @@ class MultimodalWebSurfer(BaseChatAgent):
"""Generates the actual reply. First calls the LLM to figure out which tool to use, then executes the tool."""
# Lazy init
if self._playwright is None:
if not self.did_lazy_init:
await self._lazy_init()
assert self._page is not None
@@ -471,8 +544,8 @@ class MultimodalWebSurfer(BaseChatAgent):
history.append(SystemMessage(content=content))
# Ask the page for interactive elements, then prepare the state-of-mark screenshot
rects = await self._get_interactive_rects()
viewport = await self._get_visual_viewport()
rects = await self._playwright_controller.get_interactive_rects(self._page)
viewport = await self._playwright_controller.get_visual_viewport(self._page)
screenshot = await self._page.screenshot()
som_screenshot, visible_rects, rects_above, rects_below = add_set_of_mark(screenshot, rects)
@@ -488,18 +561,8 @@ class MultimodalWebSurfer(BaseChatAgent):
)
)
# What tools are available?
tools = [
TOOL_VISIT_URL,
TOOL_HISTORY_BACK,
TOOL_CLICK,
TOOL_TYPE,
TOOL_SUMMARIZE_PAGE,
TOOL_READ_PAGE_AND_ANSWER,
TOOL_SLEEP,
]
tools = self.default_tools.copy()
# Can we reach Bing to search?
# if self._navigation_allow_list("https://www.bing.com/"):
tools.append(TOOL_WEB_SEARCH)
# We can scroll up
@@ -511,7 +574,7 @@ class MultimodalWebSurfer(BaseChatAgent):
tools.append(TOOL_PAGE_DOWN)
# Focus hint
focused = await self._get_focused_rect_id()
focused = await self._playwright_controller.get_focused_rect_id(self._page)
focused_hint = ""
if focused:
name = self._target_name(focused, rects)
@@ -549,18 +612,13 @@ class MultimodalWebSurfer(BaseChatAgent):
tool_names = "\n".join([t["name"] for t in tools])
text_prompt = f"""
Consider the following screenshot of a web browser, which is open to the page '{self._page.url}'. In this screenshot, interactive elements are outlined in bounding boxes of different colors. Each bounding box has a numeric ID label in the same color. Additional information about each visible label is listed below:
{visible_targets}{other_targets_str}{focused_hint}You are to respond to the user's most recent request by selecting an appropriate tool the following set, or by answering the question directly if possible:
{tool_names}
When deciding between tools, consider if the request can be best addressed by:
- the contents of the current viewport (in which case actions like clicking links, clicking buttons, or inputting text might be most appropriate)
- contents found elsewhere on the full webpage (in which case actions like scrolling, summarization, or full-page Q&A might be most appropriate)
- on some other website entirely (in which case actions like performing a new web search might be the best option)
""".strip()
text_prompt = WEB_SURFER_TOOL_PROMPT.format(
url=self._page.url,
visible_targets=visible_targets,
other_targets_str=other_targets_str,
focused_hint=focused_hint,
tool_names=tool_names,
).strip()
# Scale the screenshot for the MLM, and close the original
scaled_screenshot = som_screenshot.resize((MLM_WIDTH, MLM_HEIGHT))
@@ -574,7 +632,6 @@ When deciding between tools, consider if the request can be best addressed by:
history, tools=tools, extra_create_args={"tool_choice": "auto"}, cancellation_token=cancellation_token
) # , "parallel_tool_calls": False})
message = response.content
self._last_download = None
if isinstance(message, str):
@@ -587,181 +644,6 @@ When deciding between tools, consider if the request can be best addressed by:
# Not sure what happened here
raise AssertionError(f"Unknown response format '{message}'")
async def _get_interactive_rects(self) -> Dict[str, InteractiveRegion]:
assert self._page is not None
# Read the regions from the DOM
try:
await self._page.evaluate(self._page_script)
except Exception:
pass
result = cast(
Dict[str, Dict[str, Any]], await self._page.evaluate("MultimodalWebSurfer.getInteractiveRects();")
)
# Convert the results into appropriate types
assert isinstance(result, dict)
typed_results: Dict[str, InteractiveRegion] = {}
for k in result:
assert isinstance(k, str)
typed_results[k] = interactiveregion_from_dict(result[k])
return typed_results
async def _get_visual_viewport(self) -> VisualViewport:
assert self._page is not None
try:
await self._page.evaluate(self._page_script)
except Exception:
pass
return visualviewport_from_dict(await self._page.evaluate("MultimodalWebSurfer.getVisualViewport();"))
async def _get_focused_rect_id(self) -> str:
assert self._page is not None
try:
await self._page.evaluate(self._page_script)
except Exception:
pass
result = await self._page.evaluate("MultimodalWebSurfer.getFocusedElementId();")
return str(result)
async def _get_page_metadata(self) -> Dict[str, Any]:
assert self._page is not None
try:
await self._page.evaluate(self._page_script)
except Exception:
pass
result = await self._page.evaluate("MultimodalWebSurfer.getPageMetadata();")
assert isinstance(result, dict)
return cast(Dict[str, Any], result)
async def _on_new_page(self, page: Page) -> None:
self._page = page
assert self._page is not None
# self._page.route(lambda x: True, self._route_handler)
self._page.on("download", self._download_handler)
await self._page.set_viewport_size({"width": VIEWPORT_WIDTH, "height": VIEWPORT_HEIGHT})
await self._sleep(0.2)
self._prior_metadata_hash = None
await self._page.add_init_script(
path=os.path.join(os.path.abspath(os.path.dirname(__file__)), "page_script.js")
)
await self._page.wait_for_load_state()
async def _back(self) -> None:
assert self._page is not None
await self._page.go_back()
async def _visit_page(self, url: str) -> None:
assert self._page is not None
try:
# Regular webpage
await self._page.goto(url)
await self._page.wait_for_load_state()
self._prior_metadata_hash = None
except Exception as e_outer:
# Downloaded file
if self.downloads_folder and "net::ERR_ABORTED" in str(e_outer):
async with self._page.expect_download() as download_info:
try:
await self._page.goto(url)
except Exception as e_inner:
if "net::ERR_ABORTED" in str(e_inner):
pass
else:
raise e_inner
download = await download_info.value
fname = os.path.join(self.downloads_folder, download.suggested_filename)
await download.save_as(fname)
message = f"<body style=\"margin: 20px;\"><h1>Successfully downloaded '{download.suggested_filename}' to local path:<br><br>{fname}</h1></body>"
await self._page.goto(
"data:text/html;base64," + base64.b64encode(message.encode("utf-8")).decode("utf-8")
)
self._last_download = None # Since we already handled it
else:
raise e_outer
async def _page_down(self) -> None:
assert self._page is not None
await self._page.evaluate(f"window.scrollBy(0, {VIEWPORT_HEIGHT-50});")
async def _page_up(self) -> None:
assert self._page is not None
await self._page.evaluate(f"window.scrollBy(0, -{VIEWPORT_HEIGHT-50});")
async def _click_id(self, identifier: str) -> None:
assert self._page is not None
target = self._page.locator(f"[__elementId='{identifier}']")
# See if it exists
try:
await target.wait_for(timeout=100)
except TimeoutError:
raise ValueError("No such element.") from None
# Click it
await target.scroll_into_view_if_needed()
box = cast(Dict[str, Union[int, float]], await target.bounding_box())
try:
# Give it a chance to open a new page
# TODO: Having trouble with these types
async with self._page.expect_event("popup", timeout=1000) as page_info: # type: ignore
await self._page.mouse.click(box["x"] + box["width"] / 2, box["y"] + box["height"] / 2, delay=10)
# If we got this far without error, than a popup or new tab opened. Handle it.
new_page = await page_info.value # type: ignore
assert isinstance(new_page, Page)
await self._on_new_page(new_page)
self.logger.info(
WebSurferEvent(
source=self.name,
url=self._page.url,
message="New tab or window.",
)
)
except TimeoutError:
pass
async def _fill_id(self, identifier: str, value: str) -> None:
assert self._page is not None
target = self._page.locator(f"[__elementId='{identifier}']")
# See if it exists
try:
await target.wait_for(timeout=100)
except TimeoutError:
raise ValueError("No such element.") from None
# Fill it
await target.scroll_into_view_if_needed()
await target.focus()
try:
await target.fill(value)
except PlaywrightError:
await target.press_sequentially(value)
await target.press("Enter")
async def _scroll_id(self, identifier: str, direction: str) -> None:
assert self._page is not None
await self._page.evaluate(
f"""
(function() {{
let elm = document.querySelector("[__elementId='{identifier}']");
if (elm) {{
if ("{direction}" == "up") {{
elm.scrollTop = Math.max(0, elm.scrollTop - elm.clientHeight);
}}
else {{
elm.scrollTop = Math.min(elm.scrollHeight - elm.clientHeight, elm.scrollTop + elm.clientHeight);
}}
}}
}})();
"""
)
async def _get_ocr_text(
self, image: bytes | io.BufferedIOBase | PIL.Image.Image, cancellation_token: Optional[CancellationToken] = None
) -> str:
@@ -783,7 +665,7 @@ When deciding between tools, consider if the request can be best addressed by:
messages.append(
UserMessage(
content=[
"Please transcribe all visible text on this page, including both main content and the labels of UI elements.",
WEB_SURFER_OCR_PROMPT,
AGImage.from_pil(scaled_screenshot),
],
source=self.name,
@@ -793,3 +675,68 @@ When deciding between tools, consider if the request can be best addressed by:
scaled_screenshot.close()
assert isinstance(response.content, str)
return response.content
async def _summarize_page(
self,
question: str | None = None,
cancellation_token: Optional[CancellationToken] = None,
) -> str:
assert self._page is not None
page_markdown: str = await self._playwright_controller.get_page_markdown(self._page)
title: str = self._page.url
try:
title = await self._page.title()
except Exception:
pass
# Take a screenshot and scale it
screenshot = Image.open(io.BytesIO(await self._page.screenshot()))
scaled_screenshot = screenshot.resize((MLM_WIDTH, MLM_HEIGHT))
screenshot.close()
ag_image = AGImage.from_pil(scaled_screenshot)
# Prepare the system prompt
messages: List[LLMMessage] = []
messages.append(SystemMessage(content=WEB_SURFER_QA_SYSTEM_MESSAGE))
prompt = WEB_SURFER_QA_PROMPT(title, question)
# Grow the buffer (which is added to the prompt) until we overflow the context window or run out of lines
buffer = ""
# for line in re.split(r"([\r\n]+)", page_markdown):
for line in page_markdown.splitlines():
message = UserMessage(
# content=[
prompt + buffer + line,
# ag_image,
# ],
source=self.name,
)
remaining = self._model_client.remaining_tokens(messages + [message])
if remaining > SCREENSHOT_TOKENS:
buffer += line
else:
break
# Nothing to do
buffer = buffer.strip()
if len(buffer) == 0:
return "Nothing to summarize."
# Append the message
messages.append(
UserMessage(
content=[
prompt + buffer,
ag_image,
],
source=self.name,
)
)
# Generate the response
response = await self._model_client.create(messages, cancellation_token=cancellation_token)
scaled_screenshot.close()
assert isinstance(response.content, str)
return response.content

View File

@@ -0,0 +1,380 @@
import asyncio
import base64
import os
import random
from typing import Any, Callable, Dict, Optional, Tuple, Union, cast
from playwright._impl._errors import Error as PlaywrightError
from playwright._impl._errors import TimeoutError
from playwright.async_api import Download, Page
from ._types import (
InteractiveRegion,
VisualViewport,
interactiveregion_from_dict,
visualviewport_from_dict,
)
class PlaywrightController:
def __init__(
self,
animate_actions: bool = False,
downloads_folder: Optional[str] = None,
viewport_width: int = 1440,
viewport_height: int = 900,
_download_handler: Optional[Callable[[Download], None]] = None,
to_resize_viewport: bool = True,
) -> None:
"""
A controller for Playwright to interact with web pages.
animate_actions: If True, actions will be animated.
downloads_folder: The folder to save downloads to.
viewport_width: The width of the viewport.
viewport_height: The height of the viewport.
_download_handler: A handler for downloads.
to_resize_viewport: If True, the viewport will be resized.
"""
self.animate_actions = animate_actions
self.downloads_folder = downloads_folder
self.viewport_width = viewport_width
self.viewport_height = viewport_height
self._download_handler = _download_handler
self.to_resize_viewport = to_resize_viewport
self._page_script: str = ""
self.last_cursor_position: Tuple[float, float] = (0.0, 0.0)
# Read page_script
with open(os.path.join(os.path.abspath(os.path.dirname(__file__)), "page_script.js"), "rt") as fh:
self._page_script = fh.read()
async def sleep(self, page: Page, duration: Union[int, float]) -> None:
assert page is not None
await page.wait_for_timeout(duration * 1000)
async def get_interactive_rects(self, page: Page) -> Dict[str, InteractiveRegion]:
assert page is not None
# Read the regions from the DOM
try:
await page.evaluate(self._page_script)
except Exception:
pass
result = cast(Dict[str, Dict[str, Any]], await page.evaluate("MultimodalWebSurfer.getInteractiveRects();"))
# Convert the results into appropriate types
assert isinstance(result, dict)
typed_results: Dict[str, InteractiveRegion] = {}
for k in result:
assert isinstance(k, str)
typed_results[k] = interactiveregion_from_dict(result[k])
return typed_results
async def get_visual_viewport(self, page: Page) -> VisualViewport:
assert page is not None
try:
await page.evaluate(self._page_script)
except Exception:
pass
return visualviewport_from_dict(await page.evaluate("MultimodalWebSurfer.getVisualViewport();"))
async def get_focused_rect_id(self, page: Page) -> str:
assert page is not None
try:
await page.evaluate(self._page_script)
except Exception:
pass
result = await page.evaluate("MultimodalWebSurfer.getFocusedElementId();")
return str(result)
async def get_page_metadata(self, page: Page) -> Dict[str, Any]:
assert page is not None
try:
await page.evaluate(self._page_script)
except Exception:
pass
result = await page.evaluate("MultimodalWebSurfer.getPageMetadata();")
assert isinstance(result, dict)
return cast(Dict[str, Any], result)
async def on_new_page(self, page: Page) -> None:
assert page is not None
page.on("download", self._download_handler) # type: ignore
if self.to_resize_viewport and self.viewport_width and self.viewport_height:
await page.set_viewport_size({"width": self.viewport_width, "height": self.viewport_height})
await self.sleep(page, 0.2)
await page.add_init_script(path=os.path.join(os.path.abspath(os.path.dirname(__file__)), "page_script.js"))
await page.wait_for_load_state()
async def back(self, page: Page) -> None:
assert page is not None
await page.go_back()
async def visit_page(self, page: Page, url: str) -> Tuple[bool, bool]:
assert page is not None
reset_prior_metadata_hash = False
reset_last_download = False
try:
# Regular webpage
await page.goto(url)
await page.wait_for_load_state()
reset_prior_metadata_hash = True
except Exception as e_outer:
# Downloaded file
if self.downloads_folder and "net::ERR_ABORTED" in str(e_outer):
async with page.expect_download() as download_info:
try:
await page.goto(url)
except Exception as e_inner:
if "net::ERR_ABORTED" in str(e_inner):
pass
else:
raise e_inner
download = await download_info.value
fname = os.path.join(self.downloads_folder, download.suggested_filename)
await download.save_as(fname)
message = f"<body style=\"margin: 20px;\"><h1>Successfully downloaded '{download.suggested_filename}' to local path:<br><br>{fname}</h1></body>"
await page.goto(
"data:text/html;base64," + base64.b64encode(message.encode("utf-8")).decode("utf-8")
)
reset_last_download = True
else:
raise e_outer
return reset_prior_metadata_hash, reset_last_download
async def page_down(self, page: Page) -> None:
assert page is not None
await page.evaluate(f"window.scrollBy(0, {self.viewport_height-50});")
async def page_up(self, page: Page) -> None:
assert page is not None
await page.evaluate(f"window.scrollBy(0, -{self.viewport_height-50});")
async def gradual_cursor_animation(
self, page: Page, start_x: float, start_y: float, end_x: float, end_y: float
) -> None:
# animation helper
steps = 20
for step in range(steps):
x = start_x + (end_x - start_x) * (step / steps)
y = start_y + (end_y - start_y) * (step / steps)
# await page.mouse.move(x, y, steps=1)
await page.evaluate(f"""
(function() {{
let cursor = document.getElementById('red-cursor');
cursor.style.left = '{x}px';
cursor.style.top = '{y}px';
}})();
""")
await asyncio.sleep(0.05)
self.last_cursor_position = (end_x, end_y)
async def add_cursor_box(self, page: Page, identifier: str) -> None:
# animation helper
await page.evaluate(f"""
(function() {{
let elm = document.querySelector("[__elementId='{identifier}']");
if (elm) {{
elm.style.transition = 'border 0.3s ease-in-out';
elm.style.border = '2px solid red';
}}
}})();
""")
await asyncio.sleep(0.3)
# Create a red cursor
await page.evaluate("""
(function() {
let cursor = document.createElement('div');
cursor.id = 'red-cursor';
cursor.style.width = '10px';
cursor.style.height = '10px';
cursor.style.backgroundColor = 'red';
cursor.style.position = 'absolute';
cursor.style.borderRadius = '50%';
cursor.style.zIndex = '10000';
document.body.appendChild(cursor);
})();
""")
async def remove_cursor_box(self, page: Page, identifier: str) -> None:
# Remove the highlight and cursor
await page.evaluate(f"""
(function() {{
let elm = document.querySelector("[__elementId='{identifier}']");
if (elm) {{
elm.style.border = '';
}}
let cursor = document.getElementById('red-cursor');
if (cursor) {{
cursor.remove();
}}
}})();
""")
async def click_id(self, page: Page, identifier: str) -> Page | None:
"""
Returns new page if a new page is opened, otherwise None.
"""
new_page: Page | None = None
assert page is not None
target = page.locator(f"[__elementId='{identifier}']")
# See if it exists
try:
await target.wait_for(timeout=5000)
except TimeoutError:
raise ValueError("No such element.") from None
# Click it
await target.scroll_into_view_if_needed()
await asyncio.sleep(0.3)
box = cast(Dict[str, Union[int, float]], await target.bounding_box())
if self.animate_actions:
await self.add_cursor_box(page, identifier)
# Move cursor to the box slowly
start_x, start_y = self.last_cursor_position
end_x, end_y = box["x"] + box["width"] / 2, box["y"] + box["height"] / 2
await self.gradual_cursor_animation(page, start_x, start_y, end_x, end_y)
await asyncio.sleep(0.1)
try:
# Give it a chance to open a new page
async with page.expect_event("popup", timeout=1000) as page_info: # type: ignore
await page.mouse.click(end_x, end_y, delay=10)
new_page = await page_info.value # type: ignore
assert isinstance(new_page, Page)
await self.on_new_page(new_page)
except TimeoutError:
pass
await self.remove_cursor_box(page, identifier)
else:
try:
# Give it a chance to open a new page
async with page.expect_event("popup", timeout=1000) as page_info: # type: ignore
await page.mouse.click(box["x"] + box["width"] / 2, box["y"] + box["height"] / 2, delay=10)
new_page = await page_info.value # type: ignore
assert isinstance(new_page, Page)
await self.on_new_page(new_page)
except TimeoutError:
pass
return new_page # type: ignore
async def hover_id(self, page: Page, identifier: str) -> None:
"""
Hovers the mouse over the target with the given id.
"""
assert page is not None
target = page.locator(f"[__elementId='{identifier}']")
# See if it exists
try:
await target.wait_for(timeout=5000)
except TimeoutError:
raise ValueError("No such element.") from None
# Hover over it
await target.scroll_into_view_if_needed()
await asyncio.sleep(0.3)
box = cast(Dict[str, Union[int, float]], await target.bounding_box())
if self.animate_actions:
await self.add_cursor_box(page, identifier)
# Move cursor to the box slowly
start_x, start_y = self.last_cursor_position
end_x, end_y = box["x"] + box["width"] / 2, box["y"] + box["height"] / 2
await self.gradual_cursor_animation(page, start_x, start_y, end_x, end_y)
await asyncio.sleep(0.1)
await page.mouse.move(box["x"] + box["width"] / 2, box["y"] + box["height"] / 2)
await self.remove_cursor_box(page, identifier)
else:
await page.mouse.move(box["x"] + box["width"] / 2, box["y"] + box["height"] / 2)
async def fill_id(self, page: Page, identifier: str, value: str) -> None:
assert page is not None
target = page.locator(f"[__elementId='{identifier}']")
# See if it exists
try:
await target.wait_for(timeout=5000)
except TimeoutError:
raise ValueError("No such element.") from None
# Fill it
await target.scroll_into_view_if_needed()
box = cast(Dict[str, Union[int, float]], await target.bounding_box())
if self.animate_actions:
await self.add_cursor_box(page, identifier)
# Move cursor to the box slowly
start_x, start_y = self.last_cursor_position
end_x, end_y = box["x"] + box["width"] / 2, box["y"] + box["height"] / 2
await self.gradual_cursor_animation(page, start_x, start_y, end_x, end_y)
await asyncio.sleep(0.1)
# Focus on the element
await target.focus()
if self.animate_actions:
# fill char by char to mimic human speed for short text and type fast for long text
if len(value) < 100:
delay_typing_speed = 50 + 100 * random.random()
else:
delay_typing_speed = 10
await target.press_sequentially(value, delay=delay_typing_speed)
else:
try:
await target.fill(value)
except PlaywrightError:
await target.press_sequentially(value)
await target.press("Enter")
if self.animate_actions:
await self.remove_cursor_box(page, identifier)
async def scroll_id(self, page: Page, identifier: str, direction: str) -> None:
assert page is not None
await page.evaluate(
f"""
(function() {{
let elm = document.querySelector("[__elementId='{identifier}']");
if (elm) {{
if ("{direction}" == "up") {{
elm.scrollTop = Math.max(0, elm.scrollTop - elm.clientHeight);
}}
else {{
elm.scrollTop = Math.min(elm.scrollHeight - elm.clientHeight, elm.scrollTop + elm.clientHeight);
}}
}}
}})();
"""
)
async def get_webpage_text(self, page: Page, n_lines: int = 100) -> str:
"""
page: playwright page object
n_lines: number of lines to return from the page innertext
return: text in the first n_lines of the page
"""
assert page is not None
try:
text_in_viewport = await page.evaluate("""() => {
return document.body.innerText;
}""")
text_in_viewport = "\n".join(text_in_viewport.split("\n")[:n_lines])
# remove empty lines
text_in_viewport = "\n".join([line for line in text_in_viewport.split("\n") if line.strip()])
assert isinstance(text_in_viewport, str)
return text_in_viewport
except Exception:
return ""
async def get_page_markdown(self, page: Page) -> str:
# TODO: replace with mdconvert
assert page is not None
return await self.get_webpage_text(page, n_lines=1000)

View File

@@ -0,0 +1,32 @@
WEB_SURFER_TOOL_PROMPT = """
Consider the following screenshot of a web browser, which is open to the page '{url}'. In this screenshot, interactive elements are outlined in bounding boxes of different colors. Each bounding box has a numeric ID label in the same color. Additional information about each visible label is listed below:
{visible_targets}{other_targets_str}{focused_hint}
You are to respond to the most recent request by selecting an appropriate tool from the following set, or by answering the question directly if possible without tools:
{tool_names}
When deciding between tools, consider if the request can be best addressed by:
- the contents of the current viewport (in which case actions like clicking links, clicking buttons, inputting text might be most appropriate, or hovering over element)
- contents found elsewhere on the full webpage (in which case actions like scrolling, summarization, or full-page Q&A might be most appropriate)
- on some other website entirely (in which case actions like performing a new web search might be the best option)
"""
WEB_SURFER_OCR_PROMPT = """
Please transcribe all visible text on this page, including both main content and the labels of UI elements.
"""
WEB_SURFER_QA_SYSTEM_MESSAGE = """
You are a helpful assistant that can summarize long documents to answer question.
"""
def WEB_SURFER_QA_PROMPT(title: str, question: str | None = None) -> str:
base_prompt = f"We are visiting the webpage '{title}'. Its full-text content are pasted below, along with a screenshot of the page's current viewport."
if question is not None:
return (
f"{base_prompt} Please summarize the webpage into one or two paragraphs with respect to '{question}':\n\n"
)
else:
return f"{base_prompt} Please summarize the webpage into one or two paragraphs:\n\n"

View File

@@ -1,6 +1,5 @@
from typing import Any, Dict
# TODO Why does pylance fail if I import from autogen_core.components.tools instead?
from autogen_core.components.tools._base import ParametersSchema, ToolSchema
@@ -16,6 +15,10 @@ def _load_tool(tooldef: Dict[str, Any]) -> ToolSchema:
)
REASONING_TOOL_PROMPT = (
"A short description of the action to be performed and reason for doing so, do not mention the user."
)
TOOL_VISIT_URL: ToolSchema = _load_tool(
{
"type": "function",
@@ -27,7 +30,7 @@ TOOL_VISIT_URL: ToolSchema = _load_tool(
"properties": {
"reasoning": {
"type": "string",
"description": "A short explanation of the reasoning for calling this tool and taking this action.",
"description": REASONING_TOOL_PROMPT,
},
"url": {
"type": "string",
@@ -51,7 +54,7 @@ TOOL_WEB_SEARCH: ToolSchema = _load_tool(
"properties": {
"reasoning": {
"type": "string",
"description": "A short explanation of the reasoning for calling this tool and taking this action.",
"description": REASONING_TOOL_PROMPT,
},
"query": {
"type": "string",
@@ -75,7 +78,7 @@ TOOL_HISTORY_BACK: ToolSchema = _load_tool(
"properties": {
"reasoning": {
"type": "string",
"description": "A short explanation of the reasoning for calling this tool and taking this action.",
"description": REASONING_TOOL_PROMPT,
},
},
"required": ["reasoning"],
@@ -95,7 +98,7 @@ TOOL_PAGE_UP: ToolSchema = _load_tool(
"properties": {
"reasoning": {
"type": "string",
"description": "A short explanation of the reasoning for calling this tool and taking this action.",
"description": REASONING_TOOL_PROMPT,
},
},
"required": ["reasoning"],
@@ -115,7 +118,7 @@ TOOL_PAGE_DOWN: ToolSchema = _load_tool(
"properties": {
"reasoning": {
"type": "string",
"description": "A short explanation of the reasoning for calling this tool and taking this action.",
"description": REASONING_TOOL_PROMPT,
},
},
"required": ["reasoning"],
@@ -135,7 +138,7 @@ TOOL_CLICK: ToolSchema = _load_tool(
"properties": {
"reasoning": {
"type": "string",
"description": "A short explanation of the reasoning for calling this tool and taking this action.",
"description": REASONING_TOOL_PROMPT,
},
"target_id": {
"type": "integer",
@@ -159,7 +162,7 @@ TOOL_TYPE: ToolSchema = _load_tool(
"properties": {
"reasoning": {
"type": "string",
"description": "A short explanation of the reasoning for calling this tool and taking this action.",
"description": REASONING_TOOL_PROMPT,
},
"input_field_id": {
"type": "integer",
@@ -187,7 +190,7 @@ TOOL_SCROLL_ELEMENT_DOWN: ToolSchema = _load_tool(
"properties": {
"reasoning": {
"type": "string",
"description": "A short explanation of the reasoning for calling this tool and taking this action.",
"description": REASONING_TOOL_PROMPT,
},
"target_id": {
"type": "integer",
@@ -211,7 +214,7 @@ TOOL_SCROLL_ELEMENT_UP: ToolSchema = _load_tool(
"properties": {
"reasoning": {
"type": "string",
"description": "A short explanation of the reasoning for calling this tool and taking this action.",
"description": REASONING_TOOL_PROMPT,
},
"target_id": {
"type": "integer",
@@ -224,6 +227,31 @@ TOOL_SCROLL_ELEMENT_UP: ToolSchema = _load_tool(
}
)
TOOL_HOVER: ToolSchema = _load_tool(
{
"type": "function",
"function": {
"name": "hover",
"description": "Hovers the mouse over the target with the given id.",
"parameters": {
"type": "object",
"properties": {
"reasoning": {
"type": "string",
"description": REASONING_TOOL_PROMPT,
},
"target_id": {
"type": "integer",
"description": "The numeric id of the target to hover over.",
},
},
"required": ["reasoning", "target_id"],
},
},
}
)
TOOL_READ_PAGE_AND_ANSWER: ToolSchema = _load_tool(
{
"type": "function",
@@ -235,7 +263,7 @@ TOOL_READ_PAGE_AND_ANSWER: ToolSchema = _load_tool(
"properties": {
"reasoning": {
"type": "string",
"description": "A short explanation of the reasoning for calling this tool and taking this action.",
"description": REASONING_TOOL_PROMPT,
},
"question": {
"type": "string",
@@ -259,7 +287,7 @@ TOOL_SUMMARIZE_PAGE: ToolSchema = _load_tool(
"properties": {
"reasoning": {
"type": "string",
"description": "A short explanation of the reasoning for calling this tool and taking this action.",
"description": REASONING_TOOL_PROMPT,
},
},
"required": ["reasoning"],
@@ -279,7 +307,7 @@ TOOL_SLEEP: ToolSchema = _load_tool(
"properties": {
"reasoning": {
"type": "string",
"description": "A short explanation of the reasoning for calling this tool and taking this action.",
"description": REASONING_TOOL_PROMPT,
},
},
"required": ["reasoning"],