mirror of
https://github.com/microsoft/autogen.git
synced 2026-04-20 03:02:16 -04:00
Agnext websurfer (#205)
* Initial work on multimodal websurfer * A little more progress. * Getting function calling to work. * Some basic progress with navigation. * Added ability to print multimodal messages to console. * Fixed hatch error * Nicely print multimodal messages to console. * Got OCR working. * Fixed the click action. * Solved some hatch errors. * Fixed some formatting errors. * Fixed more type errors. * Yet more fixes to types. * Fixed many type errors. * Fixed all type errors. Some needed to be ignored. See todos. * Fixed all? hatch errors? * Fixed multiline aria-names in prompts.
This commit is contained in:
50
python/teams/team-one/examples/example_websurfer.py
Normal file
50
python/teams/team-one/examples/example_websurfer.py
Normal file
@@ -0,0 +1,50 @@
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
from agnext.application import SingleThreadedAgentRuntime
|
||||
from agnext.application.logging import EVENT_LOGGER_NAME
|
||||
from team_one.agents.multimodal_web_surfer import MultimodalWebSurfer
|
||||
from team_one.agents.orchestrator import RoundRobinOrchestrator
|
||||
from team_one.agents.user_proxy import UserProxy
|
||||
from team_one.messages import RequestReplyMessage
|
||||
from team_one.utils import LogHandler, create_completion_client_from_env
|
||||
|
||||
# NOTE: Don't forget to 'playwright install --with-deps chromium'
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
# Create the runtime.
|
||||
runtime = SingleThreadedAgentRuntime()
|
||||
|
||||
# Create an appropriate client
|
||||
client = create_completion_client_from_env()
|
||||
|
||||
# Register agents.
|
||||
web_surfer = runtime.register_and_get_proxy(
|
||||
"WebSurfer",
|
||||
lambda: MultimodalWebSurfer(),
|
||||
)
|
||||
|
||||
user_proxy = runtime.register_and_get_proxy(
|
||||
"UserProxy",
|
||||
lambda: UserProxy(),
|
||||
)
|
||||
|
||||
runtime.register("orchestrator", lambda: RoundRobinOrchestrator([web_surfer, user_proxy]))
|
||||
|
||||
run_context = runtime.start()
|
||||
|
||||
actual_surfer = runtime._get_agent(web_surfer.id) # type: ignore
|
||||
assert isinstance(actual_surfer, MultimodalWebSurfer)
|
||||
await actual_surfer.init(model_client=client, browser_channel="chromium")
|
||||
|
||||
await runtime.send_message(RequestReplyMessage(), user_proxy.id)
|
||||
await run_context.stop_when_idle()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logger = logging.getLogger(EVENT_LOGGER_NAME)
|
||||
logger.setLevel(logging.INFO)
|
||||
log_handler = LogHandler()
|
||||
logger.handlers = [log_handler]
|
||||
asyncio.run(main())
|
||||
@@ -17,7 +17,8 @@ classifiers = [
|
||||
]
|
||||
dependencies = [
|
||||
"agnext@{root:parent:parent:uri}",
|
||||
"aiofiles"
|
||||
"aiofiles",
|
||||
"playwright"
|
||||
]
|
||||
|
||||
[tool.hatch.envs.default]
|
||||
@@ -53,7 +54,7 @@ allow-direct-references = true
|
||||
[tool.ruff]
|
||||
line-length = 120
|
||||
fix = true
|
||||
exclude = ["build", "dist"]
|
||||
exclude = ["build", "dist", "page_script.js"]
|
||||
target-version = "py310"
|
||||
include = ["src/**", "examples/*.py"]
|
||||
|
||||
|
||||
@@ -0,0 +1,3 @@
|
||||
from .multimodal_web_surfer import MultimodalWebSurfer
|
||||
|
||||
__all__ = ("MultimodalWebSurfer",)
|
||||
@@ -0,0 +1,748 @@
|
||||
import base64
|
||||
import hashlib
|
||||
import io
|
||||
import re
|
||||
import json
|
||||
import os
|
||||
import pathlib
|
||||
import traceback
|
||||
from typing import Any, BinaryIO, Dict, List, Tuple, Union, cast # Any, Callable, Dict, List, Literal, Tuple
|
||||
from urllib.parse import quote_plus # parse_qs, quote, unquote, urlparse, urlunparse
|
||||
|
||||
import aiofiles
|
||||
from agnext.components import Image as AGImage
|
||||
from agnext.components.models import (
|
||||
AssistantMessage,
|
||||
ChatCompletionClient,
|
||||
LLMMessage,
|
||||
SystemMessage,
|
||||
UserMessage,
|
||||
)
|
||||
from agnext.core import CancellationToken
|
||||
from PIL import Image
|
||||
from playwright._impl._errors import Error as PlaywrightError
|
||||
from playwright._impl._errors import TimeoutError
|
||||
|
||||
# from playwright._impl._async_base.AsyncEventInfo
|
||||
from playwright.async_api import BrowserContext, Page, Playwright, async_playwright
|
||||
|
||||
from team_one.agents.base_agent import BaseAgent
|
||||
from team_one.messages import UserContent
|
||||
from team_one.utils import SentinelMeta, message_content_to_str
|
||||
|
||||
from .set_of_mark import add_set_of_mark
|
||||
from .tool_definitions import (
|
||||
TOOL_CLICK,
|
||||
TOOL_HISTORY_BACK,
|
||||
TOOL_PAGE_DOWN,
|
||||
TOOL_PAGE_UP,
|
||||
# TOOL_READ_PAGE_AND_ANSWER,
|
||||
# TOOL_SCROLL_ELEMENT_DOWN,
|
||||
# TOOL_SCROLL_ELEMENT_UP,
|
||||
TOOL_SLEEP,
|
||||
# TOOL_SUMMARIZE_PAGE,
|
||||
TOOL_TYPE,
|
||||
TOOL_VISIT_URL,
|
||||
TOOL_WEB_SEARCH,
|
||||
)
|
||||
from .types import (
|
||||
InteractiveRegion,
|
||||
VisualViewport,
|
||||
interactiveregion_from_dict,
|
||||
visualviewport_from_dict,
|
||||
)
|
||||
|
||||
# Viewport dimensions
|
||||
VIEWPORT_HEIGHT = 900
|
||||
VIEWPORT_WIDTH = 1440
|
||||
|
||||
# Size of the image we send to the MLM
|
||||
# Current values represent a 0.85 scaling to fit within the GPT-4v short-edge constraints (768px)
|
||||
MLM_HEIGHT = 765
|
||||
MLM_WIDTH = 1224
|
||||
|
||||
|
||||
# Sentinels
|
||||
class DEFAULT_CHANNEL(metaclass=SentinelMeta):
|
||||
pass
|
||||
|
||||
|
||||
class MultimodalWebSurfer(BaseAgent):
|
||||
"""(In preview) A multimodal agent that acts as a web surfer that can search the web and visit web pages."""
|
||||
|
||||
DEFAULT_DESCRIPTION = "A helpful assistant with access to a web browser. Ask them to perform web searches, open pages, and interact with content (e.g., clicking links, scrolling the viewport, etc., filling in form fields, etc.) It can also summarize the entire page, or answer questions based on the content of the page. It can also be asked to sleep and wait for pages to load, in cases where the pages seem to be taking a while to load."
|
||||
|
||||
DEFAULT_START_PAGE = "https://www.bing.com/"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
description: str = DEFAULT_DESCRIPTION,
|
||||
):
|
||||
"""Do not instantiate directly. Call MultimodalWebSurfer.create instead."""
|
||||
super().__init__(description)
|
||||
|
||||
# Call init to set these
|
||||
self._playwright: Playwright | None = None
|
||||
self._context: BrowserContext | None = None
|
||||
self._page: Page | None = None
|
||||
self._prior_metadata_hash: str | None = None
|
||||
|
||||
# Read page_script
|
||||
self._page_script: str = ""
|
||||
with open(os.path.join(os.path.abspath(os.path.dirname(__file__)), "page_script.js"), "rt") as fh:
|
||||
self._page_script = fh.read()
|
||||
|
||||
async def init(
|
||||
self,
|
||||
model_client: ChatCompletionClient,
|
||||
headless: bool = True,
|
||||
browser_channel: str | type[DEFAULT_CHANNEL] = DEFAULT_CHANNEL,
|
||||
browser_data_dir: str | None = None,
|
||||
start_page: str | None = None,
|
||||
downloads_folder: str | None = None,
|
||||
debug_dir: str | None = os.getcwd(),
|
||||
# navigation_allow_list=lambda url: True,
|
||||
# markdown_converter: Optional[Union[MarkdownConverter, None]] = None,
|
||||
) -> None:
|
||||
self._model_client = model_client
|
||||
self.start_page = start_page or self.DEFAULT_START_PAGE
|
||||
self.downloads_folder = downloads_folder
|
||||
self._chat_history: List[LLMMessage] = []
|
||||
|
||||
# def _download_handler(download):
|
||||
# self._last_download = download
|
||||
#
|
||||
# self._download_handler = _download_handler
|
||||
# self._last_download = None
|
||||
self._prior_metadata_hash = None
|
||||
|
||||
## Create or use the provided MarkdownConverter
|
||||
# if markdown_converter is None:
|
||||
# self._markdown_converter = MarkdownConverter()
|
||||
# else:
|
||||
# self._markdown_converter = markdown_converter
|
||||
|
||||
# Create the playwright self
|
||||
launch_args: Dict[str, Any] = {"headless": headless}
|
||||
if browser_channel is not DEFAULT_CHANNEL:
|
||||
launch_args["channel"] = browser_channel
|
||||
self._playwright = await async_playwright().start()
|
||||
|
||||
# Create the context -- are we launching persistent?
|
||||
if browser_data_dir is None:
|
||||
browser = await self._playwright.chromium.launch(**launch_args)
|
||||
self._context = await browser.new_context(
|
||||
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0"
|
||||
)
|
||||
else:
|
||||
self._context = await self._playwright.chromium.launch_persistent_context(browser_data_dir, **launch_args)
|
||||
|
||||
# Create the page
|
||||
self._context.set_default_timeout(60000) # One minute
|
||||
self._page = await self._context.new_page()
|
||||
# self._page.route(lambda x: True, self._route_handler)
|
||||
# self._page.on("download", self._download_handler)
|
||||
await self._page.set_viewport_size({"width": VIEWPORT_WIDTH, "height": VIEWPORT_HEIGHT})
|
||||
await self._page.add_init_script(
|
||||
path=os.path.join(os.path.abspath(os.path.dirname(__file__)), "page_script.js")
|
||||
)
|
||||
await self._page.goto(self.start_page)
|
||||
await self._page.wait_for_load_state()
|
||||
# self._sleep(1)
|
||||
|
||||
# Prepare the debug directory -- which stores the screenshots generated throughout the process
|
||||
await self._set_debug_dir(debug_dir)
|
||||
|
||||
async def _sleep(self, duration: Union[int, float]) -> None:
|
||||
assert self._page is not None
|
||||
await self._page.wait_for_timeout(duration * 1000)
|
||||
|
||||
async def _set_debug_dir(self, debug_dir: str | None) -> None:
|
||||
assert self._page is not None
|
||||
self.debug_dir = debug_dir
|
||||
if self.debug_dir is None:
|
||||
return
|
||||
|
||||
if not os.path.isdir(self.debug_dir):
|
||||
os.mkdir(self.debug_dir)
|
||||
|
||||
debug_html = os.path.join(self.debug_dir, "screenshot.html")
|
||||
async with aiofiles.open(debug_html, "wt") as file:
|
||||
await file.write(
|
||||
f"""
|
||||
<html style="width:100%; margin: 0px; padding: 0px;">
|
||||
<body style="width: 100%; margin: 0px; padding: 0px;">
|
||||
<img src="screenshot.png" id="main_image" style="width: 100%; max-width: {VIEWPORT_WIDTH}px; margin: 0px; padding: 0px;">
|
||||
<script language="JavaScript">
|
||||
var counter = 0;
|
||||
setInterval(function() {{
|
||||
counter += 1;
|
||||
document.getElementById("main_image").src = "screenshot.png?bc=" + counter;
|
||||
}}, 300);
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
||||
""".strip(),
|
||||
)
|
||||
await self._page.screenshot(path=os.path.join(self.debug_dir, "screenshot.png"))
|
||||
print(f"Multimodal Web Surfer debug screens: {pathlib.Path(os.path.abspath(debug_html)).as_uri()}\n")
|
||||
|
||||
# def reset(self):
|
||||
# super().reset()
|
||||
# self._log_to_console(fname="reset", args={"home": self.start_page})
|
||||
# self._visit_page(self.start_page)
|
||||
# self._page.wait_for_load_state()
|
||||
# if self.debug_dir:
|
||||
# screenshot = self._page.screenshot()
|
||||
# with open(os.path.join(self.debug_dir, "screenshot.png"), "wb") as png:
|
||||
# png.write(screenshot)
|
||||
|
||||
def _target_name(self, target: str, rects: Dict[str, InteractiveRegion]) -> str | None:
|
||||
try:
|
||||
return rects[target]["aria_name"].strip()
|
||||
except KeyError:
|
||||
return None
|
||||
|
||||
def _format_target_list(self, ids: List[str], rects: Dict[str, InteractiveRegion]) -> List[str]:
|
||||
targets: List[str] = []
|
||||
for r in list(set(ids)):
|
||||
if r in rects:
|
||||
# Get the role
|
||||
aria_role = rects[r].get("role", "").strip()
|
||||
if len(aria_role) == 0:
|
||||
aria_role = rects[r].get("tag_name", "").strip()
|
||||
|
||||
# Get the name
|
||||
aria_name = re.sub(r"[\n\r]+", " ", rects[r].get("aria_name", "")).strip()
|
||||
|
||||
# What are the actions?
|
||||
actions = ['"click"']
|
||||
if rects[r]["role"] in ["textbox", "searchbox", "search"]:
|
||||
actions = ['"input_text"']
|
||||
actions_str = "[" + ",".join(actions) + "]"
|
||||
|
||||
targets.append(f'{{"id": {r}, "name": "{aria_name}", "role": "{aria_role}", "tools": {actions_str} }}')
|
||||
|
||||
return targets
|
||||
|
||||
async def _generate_reply(self, cancellation_token: CancellationToken) -> Tuple[bool, UserContent]:
|
||||
assert self._page is not None
|
||||
try:
|
||||
request_halt, content = await self.__generate_reply(cancellation_token)
|
||||
return request_halt, content
|
||||
except Exception:
|
||||
return False, f"Web surfing error:\n\n{traceback.format_exc()}"
|
||||
|
||||
async def __generate_reply(self, cancellation_token: CancellationToken) -> Tuple[bool, UserContent]:
|
||||
assert self._page is not None
|
||||
"""Generates the actual reply."""
|
||||
|
||||
# Clone the messages to give context, removing old screenshots
|
||||
history: List[LLMMessage] = []
|
||||
for m in self._chat_history:
|
||||
if isinstance(m.content, str):
|
||||
history.append(m)
|
||||
elif isinstance(m.content, list):
|
||||
content = message_content_to_str(m.content)
|
||||
if isinstance(m, UserMessage):
|
||||
history.append(UserMessage(content=content, source=m.source))
|
||||
elif isinstance(m, AssistantMessage):
|
||||
history.append(AssistantMessage(content=content, source=m.source))
|
||||
elif isinstance(m, SystemMessage):
|
||||
history.append(SystemMessage(content=content))
|
||||
|
||||
# Ask the page for interactive elements, then prepare the state-of-mark screenshot
|
||||
rects = await self._get_interactive_rects()
|
||||
viewport = await self._get_visual_viewport()
|
||||
screenshot = await self._page.screenshot()
|
||||
som_screenshot, visible_rects, rects_above, rects_below = add_set_of_mark(screenshot, rects)
|
||||
|
||||
if self.debug_dir:
|
||||
som_screenshot.save(os.path.join(self.debug_dir, "screenshot.png"))
|
||||
|
||||
# What tools are available?
|
||||
tools = [
|
||||
TOOL_VISIT_URL,
|
||||
TOOL_HISTORY_BACK,
|
||||
TOOL_CLICK,
|
||||
TOOL_TYPE,
|
||||
# TOOL_SUMMARIZE_PAGE,
|
||||
# TOOL_READ_PAGE_AND_ANSWER,
|
||||
TOOL_SLEEP,
|
||||
]
|
||||
|
||||
# # Can we reach Bing to search?
|
||||
# if self._navigation_allow_list("https://www.bing.com/"):
|
||||
tools.append(TOOL_WEB_SEARCH)
|
||||
|
||||
# We can scroll up
|
||||
if viewport["pageTop"] > 5:
|
||||
tools.append(TOOL_PAGE_UP)
|
||||
|
||||
# Can scroll down
|
||||
if (viewport["pageTop"] + viewport["height"] + 5) < viewport["scrollHeight"]:
|
||||
tools.append(TOOL_PAGE_DOWN)
|
||||
|
||||
# Focus hint
|
||||
focused = await self._get_focused_rect_id()
|
||||
focused_hint = ""
|
||||
if focused:
|
||||
name = self._target_name(focused, rects)
|
||||
if name:
|
||||
name = f"(and name '{name}') "
|
||||
|
||||
role = "control"
|
||||
try:
|
||||
role = rects[focused]["role"]
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
focused_hint = f"\nThe {role} with ID {focused} {name}currently has the input focus.\n\n"
|
||||
|
||||
# Everything visible
|
||||
visible_targets = "\n".join(self._format_target_list(visible_rects, rects)) + "\n\n"
|
||||
|
||||
# Everything else
|
||||
other_targets: List[str] = []
|
||||
other_targets.extend(self._format_target_list(rects_above, rects))
|
||||
other_targets.extend(self._format_target_list(rects_below, rects))
|
||||
|
||||
if len(other_targets) > 0:
|
||||
other_targets_str = (
|
||||
"Additional valid interaction targets (not shown) include:\n" + "\n".join(other_targets) + "\n\n"
|
||||
)
|
||||
else:
|
||||
other_targets_str = ""
|
||||
|
||||
# If there are scrollable elements, then add the corresponding tools
|
||||
# has_scrollable_elements = False
|
||||
# if has_scrollable_elements:
|
||||
# tools.append(TOOL_SCROLL_ELEMENT_UP)
|
||||
# tools.append(TOOL_SCROLL_ELEMENT_DOWN)
|
||||
|
||||
tool_names = "\n".join([t["name"] for t in tools])
|
||||
|
||||
text_prompt = f"""
|
||||
Consider the following screenshot of a web browser, which is open to the page '{self._page.url}'. In this screenshot, interactive elements are outlined in bounding boxes of different colors. Each bounding box has a numeric ID label in the same color. Additional information about each visible label is listed below:
|
||||
|
||||
{visible_targets}{other_targets_str}{focused_hint}You are to respond to the user's most recent request by selecting an appropriate tool the following set, or by answering the question directly if possible:
|
||||
|
||||
{tool_names}
|
||||
|
||||
When deciding between tools, consider if the request can be best addressed by:
|
||||
- the contents of the current viewport (in which case actions like clicking links, clicking buttons, or inputting text might be most appropriate)
|
||||
- contents found elsewhere on the full webpage (in which case actions like scrolling, summarization, or full-page Q&A might be most appropriate)
|
||||
- on some other website entirely (in which case actions like performing a new web search might be the best option)
|
||||
""".strip()
|
||||
|
||||
# Scale the screenshot for the MLM, and close the original
|
||||
scaled_screenshot = som_screenshot.resize((MLM_WIDTH, MLM_HEIGHT))
|
||||
som_screenshot.close()
|
||||
if self.debug_dir:
|
||||
scaled_screenshot.save(os.path.join(self.debug_dir, "screenshot_scaled.png"))
|
||||
|
||||
# Add the multimodal message and make the request
|
||||
history.append(
|
||||
UserMessage(content=[text_prompt, AGImage.from_pil(scaled_screenshot)], source=self.metadata["name"])
|
||||
)
|
||||
response = await self._model_client.create(
|
||||
history, tools=tools, extra_create_args={"tool_choice": "auto"}
|
||||
) # , "parallel_tool_calls": False})
|
||||
message = response.content
|
||||
|
||||
action_description = ""
|
||||
# self._last_download = None
|
||||
# try:
|
||||
if True:
|
||||
if isinstance(message, list):
|
||||
name = message[0].name
|
||||
args = json.loads(message[0].arguments)
|
||||
|
||||
if name == "visit_url":
|
||||
url = args.get("url")
|
||||
action_description = f"I typed '{url}' into the browser address bar."
|
||||
# Check if the argument starts with a known protocol
|
||||
if url.startswith(("https://", "http://", "file://", "about:")):
|
||||
await self._visit_page(url)
|
||||
# If the argument contains a space, treat it as a search query
|
||||
elif " " in url:
|
||||
await self._visit_page(f"https://www.bing.com/search?q={quote_plus(url)}&FORM=QBLH")
|
||||
# Otherwise, prefix with https://
|
||||
else:
|
||||
await self._visit_page("https://" + url)
|
||||
|
||||
elif name == "history_back":
|
||||
action_description = "I clicked the browser back button."
|
||||
await self._back()
|
||||
|
||||
elif name == "web_search":
|
||||
query = args.get("query")
|
||||
action_description = f"I typed '{query}' into the browser search bar."
|
||||
await self._visit_page(f"https://www.bing.com/search?q={quote_plus(query)}&FORM=QBLH")
|
||||
|
||||
elif name == "page_up":
|
||||
action_description = "I scrolled up one page in the browser."
|
||||
await self._page_up()
|
||||
|
||||
elif name == "page_down":
|
||||
action_description = "I scrolled down one page in the browser."
|
||||
await self._page_down()
|
||||
|
||||
elif name == "click":
|
||||
target_id = str(args.get("target_id"))
|
||||
target_name = self._target_name(target_id, rects)
|
||||
if target_name:
|
||||
action_description = f"I clicked '{target_name}'."
|
||||
else:
|
||||
action_description = "I clicked the control."
|
||||
await self._click_id(target_id)
|
||||
|
||||
elif name == "input_text":
|
||||
input_field_id = str(args.get("input_field_id"))
|
||||
text_value = str(args.get("text_value"))
|
||||
input_field_name = self._target_name(input_field_id, rects)
|
||||
if input_field_name:
|
||||
action_description = f"I typed '{text_value}' into '{input_field_name}'."
|
||||
else:
|
||||
action_description = f"I input '{text_value}'."
|
||||
await self._fill_id(input_field_id, text_value)
|
||||
|
||||
elif name == "scroll_element_up":
|
||||
target_id = str(args.get("target_id"))
|
||||
target_name = self._target_name(target_id, rects)
|
||||
|
||||
if target_name:
|
||||
action_description = f"I scrolled '{target_name}' up."
|
||||
else:
|
||||
action_description = "I scrolled the control up."
|
||||
|
||||
await self._scroll_id(target_id, "up")
|
||||
|
||||
elif name == "scroll_element_down":
|
||||
target_id = str(args.get("target_id"))
|
||||
target_name = self._target_name(target_id, rects)
|
||||
|
||||
if target_name:
|
||||
action_description = f"I scrolled '{target_name}' down."
|
||||
else:
|
||||
action_description = "I scrolled the control down."
|
||||
|
||||
await self._scroll_id(target_id, "down")
|
||||
|
||||
# elif name == "answer_question":
|
||||
# question = str(args.get("question"))
|
||||
# action_description = self._summarize_page(question=question)
|
||||
#
|
||||
# elif name == "summarize_page":
|
||||
# action_description = self._summarize_page()
|
||||
|
||||
elif name == "sleep":
|
||||
action_description = "I am waiting a short period of time before taking further action."
|
||||
await self._sleep(3) # There's a 2s sleep below too
|
||||
|
||||
else:
|
||||
raise ValueError(f"Unknown tool '{name}'. Please choose from:\n\n{tool_names}")
|
||||
|
||||
# except ValueError as e:
|
||||
# return True, str(e)
|
||||
|
||||
await self._page.wait_for_load_state()
|
||||
await self._sleep(3)
|
||||
|
||||
# # Handle downloads
|
||||
# if self._last_download is not None and self.downloads_folder is not None:
|
||||
# fname = os.path.join(self.downloads_folder, self._last_download.suggested_filename)
|
||||
# self._last_download.save_as(fname)
|
||||
# page_body = f"<html><head><title>Download Successful</title></head><body style=\"margin: 20px;\"><h1>Successfully downloaded '{self._last_download.suggested_filename}' to local path:<br><br>{fname}</h1></body></html>"
|
||||
# self._page.goto("data:text/html;base64," + base64.b64encode(page_body.encode("utf-8")).decode("utf-8"))
|
||||
# self._page.wait_for_load_state()
|
||||
|
||||
# Handle metadata
|
||||
page_metadata = json.dumps(await self._get_page_metadata(), indent=4)
|
||||
metadata_hash = hashlib.md5(page_metadata.encode("utf-8")).hexdigest()
|
||||
if metadata_hash != self._prior_metadata_hash:
|
||||
page_metadata = (
|
||||
"\nThe following metadata was extracted from the webpage:\n\n" + page_metadata.strip() + "\n"
|
||||
)
|
||||
else:
|
||||
page_metadata = ""
|
||||
self._prior_metadata_hash = metadata_hash
|
||||
|
||||
# Describe the viewport of the new page in words
|
||||
viewport = await self._get_visual_viewport()
|
||||
percent_visible = int(viewport["height"] * 100 / viewport["scrollHeight"])
|
||||
percent_scrolled = int(viewport["pageTop"] * 100 / viewport["scrollHeight"])
|
||||
if percent_scrolled < 1: # Allow some rounding error
|
||||
position_text = "at the top of the page"
|
||||
elif percent_scrolled + percent_visible >= 99: # Allow some rounding error
|
||||
position_text = "at the bottom of the page"
|
||||
else:
|
||||
position_text = str(percent_scrolled) + "% down from the top of the page"
|
||||
|
||||
new_screenshot = await self._page.screenshot()
|
||||
if self.debug_dir:
|
||||
async with aiofiles.open(os.path.join(self.debug_dir, "screenshot.png"), "wb") as file:
|
||||
await file.write(new_screenshot)
|
||||
|
||||
ocr_text = await self._get_ocr_text(new_screenshot)
|
||||
|
||||
# Return the complete observation
|
||||
message_content = "" # message.content or ""
|
||||
page_title = await self._page.title()
|
||||
|
||||
return False, [
|
||||
f"{message_content}\n\n{action_description}\n\nHere is a screenshot of [{page_title}]({self._page.url}). The viewport shows {percent_visible}% of the webpage, and is positioned {position_text}.{page_metadata}\nAutomatic OCR of the page screenshot has detected the following text:\n\n{ocr_text}".strip(),
|
||||
AGImage.from_pil(Image.open(io.BytesIO(new_screenshot))),
|
||||
]
|
||||
|
||||
async def _get_interactive_rects(self) -> Dict[str, InteractiveRegion]:
|
||||
assert self._page is not None
|
||||
|
||||
# Read the regions from the DOM
|
||||
try:
|
||||
await self._page.evaluate(self._page_script)
|
||||
except Exception:
|
||||
pass
|
||||
result = cast(
|
||||
Dict[str, Dict[str, Any]], await self._page.evaluate("MultimodalWebSurfer.getInteractiveRects();")
|
||||
)
|
||||
|
||||
# Convert the results into appropriate types
|
||||
assert isinstance(result, dict)
|
||||
typed_results: Dict[str, InteractiveRegion] = {}
|
||||
for k in result:
|
||||
assert isinstance(k, str)
|
||||
typed_results[k] = interactiveregion_from_dict(result[k])
|
||||
|
||||
return typed_results
|
||||
|
||||
async def _get_visual_viewport(self) -> VisualViewport:
|
||||
assert self._page is not None
|
||||
try:
|
||||
await self._page.evaluate(self._page_script)
|
||||
except Exception:
|
||||
pass
|
||||
return visualviewport_from_dict(await self._page.evaluate("MultimodalWebSurfer.getVisualViewport();"))
|
||||
|
||||
async def _get_focused_rect_id(self) -> str:
|
||||
assert self._page is not None
|
||||
try:
|
||||
await self._page.evaluate(self._page_script)
|
||||
except Exception:
|
||||
pass
|
||||
result = await self._page.evaluate("MultimodalWebSurfer.getFocusedElementId();")
|
||||
return str(result)
|
||||
|
||||
async def _get_page_metadata(self) -> Dict[str, Any]:
|
||||
assert self._page is not None
|
||||
try:
|
||||
await self._page.evaluate(self._page_script)
|
||||
except Exception:
|
||||
pass
|
||||
result = await self._page.evaluate("MultimodalWebSurfer.getPageMetadata();")
|
||||
assert isinstance(result, dict)
|
||||
return cast(Dict[str, Any], result)
|
||||
|
||||
# async def _get_page_markdown(self):
|
||||
# assert self._page is not None
|
||||
# html = self._page.evaluate("document.documentElement.outerHTML;")
|
||||
# res = self._markdown_converter.convert_stream(io.StringIO(html), file_extension=".html", url=self._page.url)
|
||||
# return res.text_content
|
||||
|
||||
async def _on_new_page(self, page: Page) -> None:
|
||||
self._page = page
|
||||
# self._page.route(lambda x: True, self._route_handler)
|
||||
# self._page.on("download", self._download_handler)
|
||||
await self._page.set_viewport_size({"width": VIEWPORT_WIDTH, "height": VIEWPORT_HEIGHT})
|
||||
await self._sleep(0.2)
|
||||
self._prior_metadata_hash = None
|
||||
await self._page.add_init_script(
|
||||
path=os.path.join(os.path.abspath(os.path.dirname(__file__)), "page_script.js")
|
||||
)
|
||||
await self._page.wait_for_load_state()
|
||||
|
||||
async def _back(self) -> None:
|
||||
assert self._page is not None
|
||||
await self._page.go_back()
|
||||
|
||||
async def _visit_page(self, url: str) -> None:
|
||||
assert self._page is not None
|
||||
try:
|
||||
# Regular webpage
|
||||
await self._page.goto(url)
|
||||
self._prior_metadata_hash = None
|
||||
except Exception as e_outer:
|
||||
# Downloaded file
|
||||
if self.downloads_folder and "net::ERR_ABORTED" in str(e_outer):
|
||||
async with self._page.expect_download() as download_info:
|
||||
try:
|
||||
await self._page.goto(url)
|
||||
except Exception as e_inner:
|
||||
if "net::ERR_ABORTED" in str(e_inner):
|
||||
pass
|
||||
else:
|
||||
raise e_inner
|
||||
download = await download_info.value
|
||||
fname = os.path.join(self.downloads_folder, download.suggested_filename)
|
||||
await download.save_as(fname)
|
||||
message = f"<body style=\"margin: 20px;\"><h1>Successfully downloaded '{download.suggested_filename}' to local path:<br><br>{fname}</h1></body>"
|
||||
await self._page.goto(
|
||||
"data:text/html;base64," + base64.b64encode(message.encode("utf-8")).decode("utf-8")
|
||||
)
|
||||
self._last_download = None # Since we already handled it
|
||||
else:
|
||||
raise e_outer
|
||||
|
||||
async def _page_down(self) -> None:
|
||||
assert self._page is not None
|
||||
await self._page.evaluate(f"window.scrollBy(0, {VIEWPORT_HEIGHT-50});")
|
||||
|
||||
async def _page_up(self) -> None:
|
||||
assert self._page is not None
|
||||
await self._page.evaluate(f"window.scrollBy(0, -{VIEWPORT_HEIGHT-50});")
|
||||
|
||||
async def _click_id(self, identifier: str) -> None:
|
||||
assert self._page is not None
|
||||
target = self._page.locator(f"[__elementId='{identifier}']")
|
||||
|
||||
# See if it exists
|
||||
try:
|
||||
await target.wait_for(timeout=100)
|
||||
except TimeoutError:
|
||||
raise ValueError("No such element.") from None
|
||||
|
||||
# Click it
|
||||
await target.scroll_into_view_if_needed()
|
||||
box = cast(Dict[str, Union[int, float]], await target.bounding_box())
|
||||
try:
|
||||
# Give it a chance to open a new page
|
||||
# TODO: Having trouble with these types
|
||||
async with self._page.expect_event("popup", timeout=1000) as page_info: # type: ignore
|
||||
await self._page.mouse.click(box["x"] + box["width"] / 2, box["y"] + box["height"] / 2, delay=10)
|
||||
# If we got this far without error, than a popup or new tab opened. Handle it.
|
||||
|
||||
new_page = await page_info.value # type: ignore
|
||||
|
||||
assert isinstance(new_page, Page)
|
||||
await self._on_new_page(new_page)
|
||||
except TimeoutError:
|
||||
pass
|
||||
|
||||
async def _fill_id(self, identifier: str, value: str) -> None:
|
||||
assert self._page is not None
|
||||
target = self._page.locator(f"[__elementId='{identifier}']")
|
||||
|
||||
# See if it exists
|
||||
try:
|
||||
await target.wait_for(timeout=100)
|
||||
except TimeoutError:
|
||||
raise ValueError("No such element.") from None
|
||||
|
||||
# Fill it
|
||||
await target.scroll_into_view_if_needed()
|
||||
await target.focus()
|
||||
try:
|
||||
await target.fill(value)
|
||||
except PlaywrightError:
|
||||
await target.press_sequentially(value)
|
||||
await target.press("Enter")
|
||||
|
||||
async def _scroll_id(self, identifier: str, direction: str) -> None:
|
||||
assert self._page is not None
|
||||
await self._page.evaluate(
|
||||
f"""
|
||||
(function() {{
|
||||
let elm = document.querySelector("[__elementId='{identifier}']");
|
||||
if (elm) {{
|
||||
if ("{direction}" == "up") {{
|
||||
elm.scrollTop = Math.max(0, elm.scrollTop - elm.clientHeight);
|
||||
}}
|
||||
else {{
|
||||
elm.scrollTop = Math.min(elm.scrollHeight - elm.clientHeight, elm.scrollTop + elm.clientHeight);
|
||||
}}
|
||||
}}
|
||||
}})();
|
||||
"""
|
||||
)
|
||||
|
||||
# def _summarize_page(self, question=None, token_limit=100000):
|
||||
# page_markdown = self._get_page_markdown()
|
||||
#
|
||||
# buffer = ""
|
||||
# for line in re.split(r"([\r\n]+)", page_markdown):
|
||||
# tokens = count_token(buffer + line)
|
||||
# if tokens + 1024 > token_limit: # Leave room for our summary
|
||||
# break
|
||||
# buffer += line
|
||||
#
|
||||
# buffer = buffer.strip()
|
||||
# if len(buffer) == 0:
|
||||
# return "Nothing to summarize."
|
||||
#
|
||||
# title = self._page.url
|
||||
# try:
|
||||
# title = self._page.title()
|
||||
# except:
|
||||
# pass
|
||||
#
|
||||
# # Take a screenshot and scale it
|
||||
# screenshot = self._page.screenshot()
|
||||
# if not isinstance(screenshot, io.BufferedIOBase):
|
||||
# screenshot = io.BytesIO(screenshot)
|
||||
# screenshot = Image.open(screenshot)
|
||||
# scaled_screenshot = screenshot.resize((MLM_WIDTH, MLM_HEIGHT))
|
||||
# screenshot.close()
|
||||
#
|
||||
# messages = [
|
||||
# {
|
||||
# "role": "system",
|
||||
# "content": "You are a helpful assistant that can summarize long documents to answer question.",
|
||||
# }
|
||||
# ]
|
||||
#
|
||||
# prompt = f"We are visiting the webpage '{title}'. Its full-text contents are pasted below, along with a screenshot of the page's current viewport."
|
||||
# if question is not None:
|
||||
# prompt += (
|
||||
# f" Please summarize the webpage into one or two paragraphs with respect to '{question}':\n\n{buffer}"
|
||||
# )
|
||||
# else:
|
||||
# prompt += f" Please summarize the webpage into one or two paragraphs:\n\n{buffer}"
|
||||
#
|
||||
# messages.append(
|
||||
# self._make_mm_message(prompt, scaled_screenshot),
|
||||
# )
|
||||
# scaled_screenshot.close()
|
||||
#
|
||||
# response = self.client.create(context=None, messages=messages)
|
||||
# extracted_response = self.client.extract_text_or_completion_object(response)[0]
|
||||
# return str(extracted_response)
|
||||
|
||||
async def _get_ocr_text(self, image: bytes | io.BufferedIOBase | Image.Image) -> str:
|
||||
scaled_screenshot = None
|
||||
if isinstance(image, Image.Image):
|
||||
scaled_screenshot = image.resize((MLM_WIDTH, MLM_HEIGHT))
|
||||
else:
|
||||
pil_image = None
|
||||
if not isinstance(image, io.BufferedIOBase):
|
||||
pil_image = Image.open(io.BytesIO(image))
|
||||
else:
|
||||
# TODO: Not sure why this cast was needed, but by this point screenshot is a binary file-like object
|
||||
pil_image = Image.open(cast(BinaryIO, image))
|
||||
scaled_screenshot = pil_image.resize((MLM_WIDTH, MLM_HEIGHT))
|
||||
pil_image.close()
|
||||
|
||||
# Add the multimodal message and make the request
|
||||
messages: List[LLMMessage] = []
|
||||
messages.append(
|
||||
UserMessage(
|
||||
content=[
|
||||
"Please transcribe all visible text on this page, including both main content and the labels of UI elements.",
|
||||
AGImage.from_pil(scaled_screenshot),
|
||||
],
|
||||
source=self.metadata["name"],
|
||||
)
|
||||
)
|
||||
response = await self._model_client.create(messages)
|
||||
scaled_screenshot.close()
|
||||
assert isinstance(response.content, str)
|
||||
return response.content
|
||||
@@ -0,0 +1,376 @@
|
||||
var MultimodalWebSurfer = MultimodalWebSurfer || (function() {
|
||||
let nextLabel = 10;
|
||||
|
||||
let roleMapping = {
|
||||
"a": "link",
|
||||
"area": "link",
|
||||
"button": "button",
|
||||
"input, type=button": "button",
|
||||
"input, type=checkbox": "checkbox",
|
||||
"input, type=email": "textbox",
|
||||
"input, type=number": "spinbutton",
|
||||
"input, type=radio": "radio",
|
||||
"input, type=range": "slider",
|
||||
"input, type=reset": "button",
|
||||
"input, type=search": "searchbox",
|
||||
"input, type=submit": "button",
|
||||
"input, type=tel": "textbox",
|
||||
"input, type=text": "textbox",
|
||||
"input, type=url": "textbox",
|
||||
"search": "search",
|
||||
"select": "combobox",
|
||||
"option": "option",
|
||||
"textarea": "textbox"
|
||||
};
|
||||
|
||||
let getCursor = function(elm) {
|
||||
return window.getComputedStyle(elm)["cursor"];
|
||||
};
|
||||
|
||||
let getInteractiveElements = function() {
|
||||
|
||||
let results = []
|
||||
let roles = ["scrollbar", "searchbox", "slider", "spinbutton", "switch", "tab", "treeitem", "button", "checkbox", "gridcell", "link", "menuitem", "menuitemcheckbox", "menuitemradio", "option", "progressbar", "radio", "textbox", "combobox", "menu", "tree", "treegrid", "grid", "listbox", "radiogroup", "widget"];
|
||||
let inertCursors = ["auto", "default", "none", "text", "vertical-text", "not-allowed", "no-drop"];
|
||||
|
||||
// Get the main interactive elements
|
||||
let nodeList = document.querySelectorAll("input, select, textarea, button, [href], [onclick], [contenteditable], [tabindex]:not([tabindex='-1'])");
|
||||
for (let i=0; i<nodeList.length; i++) { // Copy to something mutable
|
||||
results.push(nodeList[i]);
|
||||
}
|
||||
|
||||
// Anything not already included that has a suitable role
|
||||
nodeList = document.querySelectorAll("[role]");
|
||||
for (let i=0; i<nodeList.length; i++) { // Copy to something mutable
|
||||
if (results.indexOf(nodeList[i]) == -1) {
|
||||
let role = nodeList[i].getAttribute("role");
|
||||
if (roles.indexOf(role) > -1) {
|
||||
results.push(nodeList[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Any element that changes the cursor to something implying interactivity
|
||||
nodeList = document.querySelectorAll("*");
|
||||
for (let i=0; i<nodeList.length; i++) {
|
||||
let node = nodeList[i];
|
||||
|
||||
// Cursor is default, or does not suggest interactivity
|
||||
let cursor = getCursor(node);
|
||||
if (inertCursors.indexOf(cursor) >= 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Move up to the first instance of this cursor change
|
||||
parent = node.parentNode;
|
||||
while (parent && getCursor(parent) == cursor) {
|
||||
node = parent;
|
||||
parent = node.parentNode;
|
||||
}
|
||||
|
||||
// Add the node if it is new
|
||||
if (results.indexOf(node) == -1) {
|
||||
results.push(node);
|
||||
}
|
||||
}
|
||||
|
||||
return results;
|
||||
};
|
||||
|
||||
let labelElements = function(elements) {
|
||||
for (let i=0; i<elements.length; i++) {
|
||||
if (!elements[i].hasAttribute("__elementId")) {
|
||||
elements[i].setAttribute("__elementId", "" + (nextLabel++));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let isTopmost = function(element, x, y) {
|
||||
let hit = document.elementFromPoint(x, y);
|
||||
|
||||
// Hack to handle elements outside the viewport
|
||||
if (hit === null) {
|
||||
return true;
|
||||
}
|
||||
|
||||
while (hit) {
|
||||
if (hit == element) return true;
|
||||
hit = hit.parentNode;
|
||||
}
|
||||
return false;
|
||||
};
|
||||
|
||||
let getFocusedElementId = function() {
|
||||
let elm = document.activeElement;
|
||||
while (elm) {
|
||||
if (elm.hasAttribute && elm.hasAttribute("__elementId")) {
|
||||
return elm.getAttribute("__elementId");
|
||||
}
|
||||
elm = elm.parentNode;
|
||||
}
|
||||
return null;
|
||||
};
|
||||
|
||||
let trimmedInnerText = function(element) {
|
||||
if (!element) {
|
||||
return "";
|
||||
}
|
||||
let text = element.innerText;
|
||||
if (!text) {
|
||||
return "";
|
||||
}
|
||||
return text.trim();
|
||||
};
|
||||
|
||||
let getApproximateAriaName = function(element) {
|
||||
// Check for aria labels
|
||||
if (element.hasAttribute("aria-labelledby")) {
|
||||
let buffer = "";
|
||||
let ids = element.getAttribute("aria-labelledby").split(" ");
|
||||
for (let i=0; i<ids.length; i++) {
|
||||
let label = document.getElementById(ids[i]);
|
||||
if (label) {
|
||||
buffer = buffer + " " + trimmedInnerText(label);
|
||||
}
|
||||
}
|
||||
return buffer.trim();
|
||||
}
|
||||
|
||||
if (element.hasAttribute("aria-label")) {
|
||||
return element.getAttribute("aria-label");
|
||||
}
|
||||
|
||||
// Check for labels
|
||||
if (element.hasAttribute("id")) {
|
||||
let label_id = element.getAttribute("id");
|
||||
let label = "";
|
||||
let labels = document.querySelectorAll("label[for='" + label_id + "']");
|
||||
for (let j=0; j<labels.length; j++) {
|
||||
label += labels[j].innerText + " ";
|
||||
}
|
||||
label = label.trim();
|
||||
if (label != "") {
|
||||
return label;
|
||||
}
|
||||
}
|
||||
|
||||
if (element.parentElement && element.parentElement.tagName == "LABEL") {
|
||||
return element.parentElement.innerText;
|
||||
}
|
||||
|
||||
// Check for alt text or titles
|
||||
if (element.hasAttribute("alt")) {
|
||||
return element.getAttribute("alt")
|
||||
}
|
||||
|
||||
if (element.hasAttribute("title")) {
|
||||
return element.getAttribute("title")
|
||||
}
|
||||
|
||||
return trimmedInnerText(element);
|
||||
};
|
||||
|
||||
let getApproximateAriaRole = function(element) {
|
||||
let tag = element.tagName.toLowerCase();
|
||||
if (tag == "input" && element.hasAttribute("type")) {
|
||||
tag = tag + ", type=" + element.getAttribute("type");
|
||||
}
|
||||
|
||||
if (element.hasAttribute("role")) {
|
||||
return [element.getAttribute("role"), tag];
|
||||
}
|
||||
else if (tag in roleMapping) {
|
||||
return [roleMapping[tag], tag];
|
||||
}
|
||||
else {
|
||||
return ["", tag];
|
||||
}
|
||||
};
|
||||
|
||||
let getInteractiveRects = function() {
|
||||
labelElements(getInteractiveElements());
|
||||
let elements = document.querySelectorAll("[__elementId]");
|
||||
let results = {};
|
||||
for (let i=0; i<elements.length; i++) {
|
||||
let key = elements[i].getAttribute("__elementId");
|
||||
let rects = elements[i].getClientRects();
|
||||
let ariaRole = getApproximateAriaRole(elements[i]);
|
||||
let ariaName = getApproximateAriaName(elements[i]);
|
||||
let vScrollable = elements[i].scrollHeight - elements[i].clientHeight >= 1;
|
||||
|
||||
let record = {
|
||||
"tag_name": ariaRole[1],
|
||||
"role": ariaRole[0],
|
||||
"aria-name": ariaName,
|
||||
"v-scrollable": vScrollable,
|
||||
"rects": []
|
||||
};
|
||||
|
||||
for (const rect of rects) {
|
||||
let x = rect.left + rect.width/2;
|
||||
let y = rect.top + rect.height/2;
|
||||
if (isTopmost(elements[i], x, y)) {
|
||||
record["rects"].push(JSON.parse(JSON.stringify(rect)));
|
||||
}
|
||||
}
|
||||
|
||||
if (record["rects"].length > 0) {
|
||||
results[key] = record;
|
||||
}
|
||||
}
|
||||
return results;
|
||||
};
|
||||
|
||||
let getVisualViewport = function() {
|
||||
let vv = window.visualViewport;
|
||||
let de = document.documentElement;
|
||||
return {
|
||||
"height": vv ? vv.height : 0,
|
||||
"width": vv ? vv.width : 0,
|
||||
"offsetLeft": vv ? vv.offsetLeft : 0,
|
||||
"offsetTop": vv ? vv.offsetTop : 0,
|
||||
"pageLeft": vv ? vv.pageLeft : 0,
|
||||
"pageTop": vv ? vv.pageTop : 0,
|
||||
"scale": vv ? vv.scale : 0,
|
||||
"clientWidth": de ? de.clientWidth : 0,
|
||||
"clientHeight": de ? de.clientHeight : 0,
|
||||
"scrollWidth": de ? de.scrollWidth : 0,
|
||||
"scrollHeight": de ? de.scrollHeight : 0
|
||||
};
|
||||
};
|
||||
|
||||
let _getMetaTags = function() {
|
||||
let meta = document.querySelectorAll("meta");
|
||||
let results = {};
|
||||
for (let i = 0; i<meta.length; i++) {
|
||||
let key = null;
|
||||
if (meta[i].hasAttribute("name")) {
|
||||
key = meta[i].getAttribute("name");
|
||||
}
|
||||
else if (meta[i].hasAttribute("property")) {
|
||||
key = meta[i].getAttribute("property");
|
||||
}
|
||||
else {
|
||||
continue;
|
||||
}
|
||||
if (meta[i].hasAttribute("content")) {
|
||||
results[key] = meta[i].getAttribute("content");
|
||||
}
|
||||
}
|
||||
return results;
|
||||
};
|
||||
|
||||
let _getJsonLd = function() {
|
||||
let jsonld = [];
|
||||
let scripts = document.querySelectorAll('script[type="application/ld+json"]');
|
||||
for (let i=0; i<scripts.length; i++) {
|
||||
jsonld.push(scripts[i].innerHTML.trim());
|
||||
}
|
||||
return jsonld;
|
||||
};
|
||||
|
||||
// From: https://www.stevefenton.co.uk/blog/2022/12/parse-microdata-with-javascript/
|
||||
let _getMicrodata = function() {
|
||||
function sanitize(input) {
|
||||
return input.replace(/\s/gi, ' ').trim();
|
||||
}
|
||||
|
||||
function addValue(information, name, value) {
|
||||
if (information[name]) {
|
||||
if (typeof information[name] === 'array') {
|
||||
information[name].push(value);
|
||||
} else {
|
||||
const arr = [];
|
||||
arr.push(information[name]);
|
||||
arr.push(value);
|
||||
information[name] = arr;
|
||||
}
|
||||
} else {
|
||||
information[name] = value;
|
||||
}
|
||||
}
|
||||
|
||||
function traverseItem(item, information) {
|
||||
const children = item.children;
|
||||
|
||||
for (let i = 0; i < children.length; i++) {
|
||||
const child = children[i];
|
||||
|
||||
if (child.hasAttribute('itemscope')) {
|
||||
if (child.hasAttribute('itemprop')) {
|
||||
const itemProp = child.getAttribute('itemprop');
|
||||
const itemType = child.getAttribute('itemtype');
|
||||
|
||||
const childInfo = {
|
||||
itemType: itemType
|
||||
};
|
||||
|
||||
traverseItem(child, childInfo);
|
||||
|
||||
itemProp.split(' ').forEach(propName => {
|
||||
addValue(information, propName, childInfo);
|
||||
});
|
||||
}
|
||||
|
||||
} else if (child.hasAttribute('itemprop')) {
|
||||
const itemProp = child.getAttribute('itemprop');
|
||||
itemProp.split(' ').forEach(propName => {
|
||||
if (propName === 'url') {
|
||||
addValue(information, propName, child.href);
|
||||
} else {
|
||||
addValue(information, propName, sanitize(child.getAttribute("content") || child.content || child.textContent || child.src || ""));
|
||||
}
|
||||
});
|
||||
traverseItem(child, information);
|
||||
} else {
|
||||
traverseItem(child, information);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const microdata = [];
|
||||
|
||||
document.querySelectorAll("[itemscope]").forEach(function(elem, i) {
|
||||
const itemType = elem.getAttribute('itemtype');
|
||||
const information = {
|
||||
itemType: itemType
|
||||
};
|
||||
traverseItem(elem, information);
|
||||
microdata.push(information);
|
||||
});
|
||||
|
||||
return microdata;
|
||||
};
|
||||
|
||||
let getPageMetadata = function() {
|
||||
let jsonld = _getJsonLd();
|
||||
let metaTags = _getMetaTags();
|
||||
let microdata = _getMicrodata();
|
||||
let results = {}
|
||||
if (jsonld.length > 0) {
|
||||
try {
|
||||
results["jsonld"] = JSON.parse(jsonld);
|
||||
}
|
||||
catch (e) {
|
||||
results["jsonld"] = jsonld;
|
||||
}
|
||||
}
|
||||
if (microdata.length > 0) {
|
||||
results["microdata"] = microdata;
|
||||
}
|
||||
for (let key in metaTags) {
|
||||
if (metaTags.hasOwnProperty(key)) {
|
||||
results["meta_tags"] = metaTags;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return results;
|
||||
};
|
||||
|
||||
return {
|
||||
getInteractiveRects: getInteractiveRects,
|
||||
getVisualViewport: getVisualViewport,
|
||||
getFocusedElementId: getFocusedElementId,
|
||||
getPageMetadata: getPageMetadata,
|
||||
};
|
||||
})();
|
||||
@@ -0,0 +1,96 @@
|
||||
import io
|
||||
import random
|
||||
from typing import BinaryIO, Dict, List, Tuple, cast
|
||||
|
||||
from PIL import Image, ImageDraw, ImageFont
|
||||
|
||||
from .types import DOMRectangle, InteractiveRegion
|
||||
|
||||
TOP_NO_LABEL_ZONE = 20 # Don't print any labels close the top of the page
|
||||
|
||||
|
||||
def add_set_of_mark(
|
||||
screenshot: bytes | Image.Image | io.BufferedIOBase, ROIs: Dict[str, InteractiveRegion]
|
||||
) -> Tuple[Image.Image, List[str], List[str], List[str]]:
|
||||
if isinstance(screenshot, Image.Image):
|
||||
return _add_set_of_mark(screenshot, ROIs)
|
||||
|
||||
if isinstance(screenshot, bytes):
|
||||
screenshot = io.BytesIO(screenshot)
|
||||
|
||||
# TODO: Not sure why this cast was needed, but by this point screenshot is a binary file-like object
|
||||
image = Image.open(cast(BinaryIO, screenshot))
|
||||
comp, visible_rects, rects_above, rects_below = _add_set_of_mark(image, ROIs)
|
||||
image.close()
|
||||
return comp, visible_rects, rects_above, rects_below
|
||||
|
||||
|
||||
def _add_set_of_mark(
|
||||
screenshot: Image.Image, ROIs: Dict[str, InteractiveRegion]
|
||||
) -> Tuple[Image.Image, List[str], List[str], List[str]]:
|
||||
visible_rects: List[str] = list()
|
||||
rects_above: List[str] = list() # Scroll up to see
|
||||
rects_below: List[str] = list() # Scroll down to see
|
||||
|
||||
fnt = ImageFont.load_default(14)
|
||||
base = screenshot.convert("L").convert("RGBA")
|
||||
overlay = Image.new("RGBA", base.size)
|
||||
|
||||
draw = ImageDraw.Draw(overlay)
|
||||
for r in ROIs:
|
||||
for rect in ROIs[r]["rects"]:
|
||||
# Empty rectangles
|
||||
if not rect:
|
||||
continue
|
||||
if rect["width"] * rect["height"] == 0:
|
||||
continue
|
||||
|
||||
mid = ((rect["right"] + rect["left"]) / 2.0, (rect["top"] + rect["bottom"]) / 2.0)
|
||||
|
||||
if 0 <= mid[0] and mid[0] < base.size[0]:
|
||||
if mid[1] < 0:
|
||||
rects_above.append(r)
|
||||
elif mid[1] >= base.size[1]:
|
||||
rects_below.append(r)
|
||||
else:
|
||||
visible_rects.append(r)
|
||||
_draw_roi(draw, int(r), fnt, rect)
|
||||
|
||||
comp = Image.alpha_composite(base, overlay)
|
||||
overlay.close()
|
||||
return comp, visible_rects, rects_above, rects_below
|
||||
|
||||
|
||||
def _draw_roi(
|
||||
draw: ImageDraw.ImageDraw, idx: int, font: ImageFont.FreeTypeFont | ImageFont.ImageFont, rect: DOMRectangle
|
||||
) -> None:
|
||||
color = _color(idx)
|
||||
luminance = color[0] * 0.3 + color[1] * 0.59 + color[2] * 0.11
|
||||
text_color = (0, 0, 0, 255) if luminance > 90 else (255, 255, 255, 255)
|
||||
|
||||
roi = [(rect["left"], rect["top"]), (rect["right"], rect["bottom"])]
|
||||
|
||||
label_location = (rect["right"], rect["top"])
|
||||
label_anchor = "rb"
|
||||
|
||||
if label_location[1] <= TOP_NO_LABEL_ZONE:
|
||||
label_location = (rect["right"], rect["bottom"])
|
||||
label_anchor = "rt"
|
||||
|
||||
draw.rectangle(roi, outline=color, fill=(color[0], color[1], color[2], 48), width=2)
|
||||
|
||||
# TODO: Having trouble with these types being partially Unknown.
|
||||
bbox = draw.textbbox(label_location, str(idx), font=font, anchor=label_anchor, align="center") # type: ignore
|
||||
bbox = (bbox[0] - 3, bbox[1] - 3, bbox[2] + 3, bbox[3] + 3)
|
||||
draw.rectangle(bbox, fill=color)
|
||||
|
||||
# TODO: Having trouble with these types being partially Unknown.
|
||||
draw.text(label_location, str(idx), fill=text_color, font=font, anchor=label_anchor, align="center") # type: ignore
|
||||
|
||||
|
||||
def _color(identifier: int) -> Tuple[int, int, int, int]:
|
||||
rnd = random.Random(int(identifier))
|
||||
color = [rnd.randint(0, 255), rnd.randint(125, 255), rnd.randint(0, 50)]
|
||||
rnd.shuffle(color)
|
||||
color.append(255)
|
||||
return cast(Tuple[int, int, int, int], tuple(color))
|
||||
@@ -0,0 +1,289 @@
|
||||
from typing import Any, Dict
|
||||
|
||||
# TODO Why does pylance fail if I import from agnext.components.tools instead?
|
||||
from agnext.components.tools._base import ParametersSchema, ToolSchema
|
||||
|
||||
|
||||
def _load_tool(tooldef: Dict[str, Any]) -> ToolSchema:
|
||||
return ToolSchema(
|
||||
name=tooldef["function"]["name"],
|
||||
description=tooldef["function"]["description"],
|
||||
parameters=ParametersSchema(
|
||||
type="object",
|
||||
properties=tooldef["function"]["parameters"]["properties"],
|
||||
required=tooldef["function"]["parameters"]["required"],
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
TOOL_VISIT_URL: ToolSchema = _load_tool(
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "visit_url",
|
||||
"description": "Inputs the given url into the browser's address bar, navigating directly to the requested page.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"reasoning": {
|
||||
"type": "string",
|
||||
"description": "A short explanation of the reasoning for calling this tool and taking this action.",
|
||||
},
|
||||
"url": {
|
||||
"type": "string",
|
||||
"description": "The URL to visit in the browser.",
|
||||
},
|
||||
},
|
||||
"required": ["reasoning", "url"],
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
TOOL_WEB_SEARCH: ToolSchema = _load_tool(
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "web_search",
|
||||
"description": "Performs a web search on Bing.com with the given query.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"reasoning": {
|
||||
"type": "string",
|
||||
"description": "A short explanation of the reasoning for calling this tool and taking this action.",
|
||||
},
|
||||
"query": {
|
||||
"type": "string",
|
||||
"description": "The web search query to use.",
|
||||
},
|
||||
},
|
||||
"required": ["reasoning", "query"],
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
TOOL_HISTORY_BACK: ToolSchema = _load_tool(
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "history_back",
|
||||
"description": "Navigates back one page in the browser's history. This is equivalent to clicking the browser back button.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"reasoning": {
|
||||
"type": "string",
|
||||
"description": "A short explanation of the reasoning for calling this tool and taking this action.",
|
||||
},
|
||||
},
|
||||
"required": ["reasoning"],
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
TOOL_PAGE_UP: ToolSchema = _load_tool(
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "page_up",
|
||||
"description": "Scrolls the entire browser viewport one page UP towards the beginning.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"reasoning": {
|
||||
"type": "string",
|
||||
"description": "A short explanation of the reasoning for calling this tool and taking this action.",
|
||||
},
|
||||
},
|
||||
"required": ["reasoning"],
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
TOOL_PAGE_DOWN: ToolSchema = _load_tool(
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "page_down",
|
||||
"description": "Scrolls the entire browser viewport one page DOWN towards the end.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"reasoning": {
|
||||
"type": "string",
|
||||
"description": "A short explanation of the reasoning for calling this tool and taking this action.",
|
||||
},
|
||||
},
|
||||
"required": ["reasoning"],
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
TOOL_CLICK: ToolSchema = _load_tool(
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "click",
|
||||
"description": "Clicks the mouse on the target with the given id.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"reasoning": {
|
||||
"type": "string",
|
||||
"description": "A short explanation of the reasoning for calling this tool and taking this action.",
|
||||
},
|
||||
"target_id": {
|
||||
"type": "integer",
|
||||
"description": "The numeric id of the target to click.",
|
||||
},
|
||||
},
|
||||
"required": ["reasoning", "target_id"],
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
TOOL_TYPE: ToolSchema = _load_tool(
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "input_text",
|
||||
"description": "Types the given text value into the specified field.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"reasoning": {
|
||||
"type": "string",
|
||||
"description": "A short explanation of the reasoning for calling this tool and taking this action.",
|
||||
},
|
||||
"input_field_id": {
|
||||
"type": "integer",
|
||||
"description": "The numeric id of the input field to receive the text.",
|
||||
},
|
||||
"text_value": {
|
||||
"type": "string",
|
||||
"description": "The text to type into the input field.",
|
||||
},
|
||||
},
|
||||
"required": ["reasoning", "input_field_id", "text_value"],
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
TOOL_SCROLL_ELEMENT_DOWN: ToolSchema = _load_tool(
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "scroll_element_down",
|
||||
"description": "Scrolls a given html element (e.g., a div or a menu) DOWN.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"reasoning": {
|
||||
"type": "string",
|
||||
"description": "A short explanation of the reasoning for calling this tool and taking this action.",
|
||||
},
|
||||
"target_id": {
|
||||
"type": "integer",
|
||||
"description": "The numeric id of the target to scroll down.",
|
||||
},
|
||||
},
|
||||
"required": ["reasoning", "target_id"],
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
TOOL_SCROLL_ELEMENT_UP: ToolSchema = _load_tool(
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "scroll_element_up",
|
||||
"description": "Scrolls a given html element (e.g., a div or a menu) UP.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"reasoning": {
|
||||
"type": "string",
|
||||
"description": "A short explanation of the reasoning for calling this tool and taking this action.",
|
||||
},
|
||||
"target_id": {
|
||||
"type": "integer",
|
||||
"description": "The numeric id of the target to scroll UP.",
|
||||
},
|
||||
},
|
||||
"required": ["reasoning", "target_id"],
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
TOOL_READ_PAGE_AND_ANSWER: ToolSchema = _load_tool(
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "answer_question",
|
||||
"description": "Uses AI to answer a question about the current webpage's content.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"reasoning": {
|
||||
"type": "string",
|
||||
"description": "A short explanation of the reasoning for calling this tool and taking this action.",
|
||||
},
|
||||
"question": {
|
||||
"type": "string",
|
||||
"description": "The question to answer.",
|
||||
},
|
||||
},
|
||||
"required": ["reasoning", "question"],
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
TOOL_SUMMARIZE_PAGE: ToolSchema = _load_tool(
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "summarize_page",
|
||||
"description": "Uses AI to summarize the entire page.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"reasoning": {
|
||||
"type": "string",
|
||||
"description": "A short explanation of the reasoning for calling this tool and taking this action.",
|
||||
},
|
||||
},
|
||||
"required": ["reasoning"],
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
TOOL_SLEEP: ToolSchema = _load_tool(
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "sleep",
|
||||
"description": "Wait a short period of time. Call this function if the page has not yet fully loaded, or if it is determined that a small delay would increase the task's chances of success.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"reasoning": {
|
||||
"type": "string",
|
||||
"description": "A short explanation of the reasoning for calling this tool and taking this action.",
|
||||
},
|
||||
},
|
||||
"required": ["reasoning"],
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
@@ -0,0 +1,98 @@
|
||||
from typing import Any, Dict, List, TypedDict, Union
|
||||
|
||||
|
||||
class DOMRectangle(TypedDict):
|
||||
x: Union[int, float]
|
||||
y: Union[int, float]
|
||||
width: Union[int, float]
|
||||
height: Union[int, float]
|
||||
top: Union[int, float]
|
||||
right: Union[int, float]
|
||||
bottom: Union[int, float]
|
||||
left: Union[int, float]
|
||||
|
||||
|
||||
class VisualViewport(TypedDict):
|
||||
height: Union[int, float]
|
||||
width: Union[int, float]
|
||||
offsetLeft: Union[int, float]
|
||||
offsetTop: Union[int, float]
|
||||
pageLeft: Union[int, float]
|
||||
pageTop: Union[int, float]
|
||||
scale: Union[int, float]
|
||||
clientWidth: Union[int, float]
|
||||
clientHeight: Union[int, float]
|
||||
scrollWidth: Union[int, float]
|
||||
scrollHeight: Union[int, float]
|
||||
|
||||
|
||||
class InteractiveRegion(TypedDict):
|
||||
tag_name: str
|
||||
role: str
|
||||
aria_name: str
|
||||
v_scrollable: bool
|
||||
rects: List[DOMRectangle]
|
||||
|
||||
|
||||
# Helper functions for dealing with JSON. Not sure there's a better way?
|
||||
|
||||
|
||||
def _get_str(d: Any, k: str) -> str:
|
||||
val = d[k]
|
||||
assert isinstance(val, str)
|
||||
return val
|
||||
|
||||
|
||||
def _get_number(d: Any, k: str) -> Union[int, float]:
|
||||
val = d[k]
|
||||
assert isinstance(val, int) or isinstance(val, float)
|
||||
return val
|
||||
|
||||
|
||||
def _get_bool(d: Any, k: str) -> bool:
|
||||
val = d[k]
|
||||
assert isinstance(val, bool)
|
||||
return val
|
||||
|
||||
|
||||
def domrectangle_from_dict(rect: Dict[str, Any]) -> DOMRectangle:
|
||||
return DOMRectangle(
|
||||
x=_get_number(rect, "x"),
|
||||
y=_get_number(rect, "y"),
|
||||
width=_get_number(rect, "width"),
|
||||
height=_get_number(rect, "height"),
|
||||
top=_get_number(rect, "top"),
|
||||
right=_get_number(rect, "right"),
|
||||
bottom=_get_number(rect, "bottom"),
|
||||
left=_get_number(rect, "left"),
|
||||
)
|
||||
|
||||
|
||||
def interactiveregion_from_dict(region: Dict[str, Any]) -> InteractiveRegion:
|
||||
typed_rects: List[DOMRectangle] = []
|
||||
for rect in region["rects"]:
|
||||
typed_rects.append(domrectangle_from_dict(rect))
|
||||
|
||||
return InteractiveRegion(
|
||||
tag_name=_get_str(region, "tag_name"),
|
||||
role=_get_str(region, "role"),
|
||||
aria_name=_get_str(region, "aria-name"),
|
||||
v_scrollable=_get_bool(region, "v-scrollable"),
|
||||
rects=typed_rects,
|
||||
)
|
||||
|
||||
|
||||
def visualviewport_from_dict(viewport: Dict[str, Any]) -> VisualViewport:
|
||||
return VisualViewport(
|
||||
height=_get_number(viewport, "height"),
|
||||
width=_get_number(viewport, "width"),
|
||||
offsetLeft=_get_number(viewport, "offsetLeft"),
|
||||
offsetTop=_get_number(viewport, "offsetTop"),
|
||||
pageLeft=_get_number(viewport, "pageLeft"),
|
||||
pageTop=_get_number(viewport, "pageTop"),
|
||||
scale=_get_number(viewport, "scale"),
|
||||
clientWidth=_get_number(viewport, "clientWidth"),
|
||||
clientHeight=_get_number(viewport, "clientHeight"),
|
||||
scrollWidth=_get_number(viewport, "scrollWidth"),
|
||||
scrollHeight=_get_number(viewport, "scrollHeight"),
|
||||
)
|
||||
25
python/teams/team-one/src/team_one/utils.py
Executable file → Normal file
25
python/teams/team-one/src/team_one/utils.py
Executable file → Normal file
@@ -2,7 +2,7 @@ import json
|
||||
import logging
|
||||
import os
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, List
|
||||
from typing import Any, Dict, List, Literal
|
||||
|
||||
from agnext.components.models import (
|
||||
AzureOpenAIChatCompletionClient,
|
||||
@@ -107,3 +107,26 @@ class LogHandler(logging.Handler):
|
||||
)
|
||||
except Exception:
|
||||
self.handleError(record)
|
||||
|
||||
|
||||
class SentinelMeta(type):
|
||||
"""
|
||||
A baseclass for sentinels that plays well with type hints.
|
||||
Define new sentinels like this:
|
||||
|
||||
```
|
||||
class MY_DEFAULT(metaclass=SentinelMeta):
|
||||
pass
|
||||
|
||||
|
||||
foo: list[str] | None | type[MY_DEFAULT] = MY_DEFAULT
|
||||
```
|
||||
|
||||
Reference: https://stackoverflow.com/questions/69239403/type-hinting-parameters-with-a-sentinel-value-as-the-default
|
||||
"""
|
||||
|
||||
def __repr__(cls) -> str:
|
||||
return f"<{cls.__name__}>"
|
||||
|
||||
def __bool__(cls) -> Literal[False]:
|
||||
return False
|
||||
|
||||
Reference in New Issue
Block a user