feat(server): Integrate forge.logging (#7915)

* feat(server): Integrate `forge.logging`

- Add `configure_logging()` in `.util.logging` - a wrapper for `forge.logging.configure_logging()` with project-specific extras
- Call `configure_logging()` in `.app.main()`, and in child process initializers (e.g. `AppProcess.execute_run_command(..)`, `ExecutorManager.on_graph_executor_start()`)
- Change some `logger.warning` statements to `logger.info` where appropriate

* fix warnings to info

* fix(rnd): Fix broken test and Input/Output block field renaming

* Rename

* fix(rnd): Fix flaky CI

* feat(server): Add OAuth handlers for GitHub, Notion, Google & amend store data structure (#7868)

- Add `BaseOAuthHandler` + 3 initial implementations
  - Add `GitHubOAuthHandler`
  - Add `NotionOAuthHandler`
  - Add `GoogleOAuthHandler`
- Amend `OAuth2Credentials` type
  - Add `metadata` attribute
  - Make `access_token_expires_at`, `refresh_token`, `refresh_token_expires_at` optional

* extend GCP Logger

* update manager & add flag

* linting

* use default logger behaviour

* update messages

* update another message

---------

Co-authored-by: Reinier van der Leer <pwuts@agpt.co>
Co-authored-by: Zamil Majdy <zamil.majdy@agpt.co>
This commit is contained in:
Aarushi
2024-09-03 11:56:21 +01:00
committed by GitHub
parent 8c9fe5c167
commit 6204d82d84
8 changed files with 149 additions and 40 deletions

View File

@@ -5,3 +5,4 @@ DB_PORT=5432
DATABASE_URL="postgresql://${DB_USER}:${DB_PASS}@localhost:${DB_PORT}/${DB_NAME}"
PRISMA_SCHEMA="postgres/schema.prisma"
ENABLE_AUTH="false"
APP_ENV="local"

View File

@@ -1,19 +1,13 @@
from multiprocessing import freeze_support, set_start_method
from typing import TYPE_CHECKING
import Pyro5.api as pyro
from tenacity import retry, stop_after_attempt, wait_exponential
from autogpt_server.executor import ExecutionManager, ExecutionScheduler
from autogpt_server.server import AgentServer
from autogpt_server.util.process import AppProcess
from autogpt_server.util.service import PyroNameServer
from .util.logging import configure_logging
def get_config_and_secrets():
from autogpt_server.util.settings import Settings
settings = Settings()
return settings
if TYPE_CHECKING:
from autogpt_server.util.process import AppProcess
@retry(stop=stop_after_attempt(30), wait=wait_exponential(multiplier=1, min=1, max=30))
@@ -22,7 +16,7 @@ def wait_for_nameserver():
print("NameServer is ready")
def run_processes(processes: list[AppProcess], **kwargs):
def run_processes(processes: list["AppProcess"], **kwargs):
"""
Execute all processes in the app. The last process is run in the foreground.
"""
@@ -48,6 +42,11 @@ def run_processes(processes: list[AppProcess], **kwargs):
def main(**kwargs):
set_start_method("spawn", force=True)
freeze_support()
configure_logging()
from autogpt_server.executor import ExecutionManager, ExecutionScheduler
from autogpt_server.server import AgentServer
from autogpt_server.util.service import PyroNameServer
run_processes(
[

View File

@@ -197,7 +197,7 @@ class AIStructuredResponseGeneratorBlock(Block):
except Exception as e:
return {}, f"JSON decode error: {e}"
logger.warning(f"LLM request: {prompt}")
logger.info(f"LLM request: {prompt}")
retry_prompt = ""
model = input_data.model
api_key = (
@@ -213,7 +213,7 @@ class AIStructuredResponseGeneratorBlock(Block):
prompt=prompt,
json_format=bool(input_data.expected_format),
)
logger.warning(f"LLM attempt-{retry_count} response: {response_text}")
logger.info(f"LLM attempt-{retry_count} response: {response_text}")
if input_data.expected_format:
parsed_dict, parsed_error = parse_response(response_text)

View File

@@ -1,4 +1,5 @@
import asyncio
import logging
import uuid
from pathlib import Path
from typing import Any, Literal
@@ -13,6 +14,8 @@ from autogpt_server.data.db import BaseDbModel, transaction
from autogpt_server.data.user import DEFAULT_USER_ID
from autogpt_server.util import json
logger = logging.getLogger(__name__)
class Link(BaseDbModel):
source_id: str
@@ -481,17 +484,19 @@ TEMPLATES_DIR = Path(__file__).parent.parent.parent / "graph_templates"
async def import_packaged_templates() -> None:
templates_in_db = await get_graphs_meta(filter_by="template")
print("Loading templates...")
logging.info("Loading templates...")
for template_file in TEMPLATES_DIR.glob("*.json"):
template_data = json.loads(template_file.read_bytes())
template = Graph.model_validate(template_data)
if not template.is_template:
print(f"WARNING: pre-packaged graph file {template_file} is not a template")
logging.warning(
f"pre-packaged graph file {template_file} is not a template"
)
continue
if (
exists := next((t for t in templates_in_db if t.id == template.id), None)
) and exists.version >= template.version:
continue
await create_graph(template, DEFAULT_USER_ID)
print(f"Loaded template '{template.name}' ({template.id})")
logging.info(f"Loaded template '{template.name}' ({template.id})")

View File

@@ -25,6 +25,7 @@ from autogpt_server.data.execution import (
upsert_execution_output,
)
from autogpt_server.data.graph import Graph, Link, Node, get_graph, get_node
from autogpt_server.util.logging import configure_logging
from autogpt_server.util.service import AppService, expose, get_service_client
from autogpt_server.util.settings import Config
from autogpt_server.util.type import convert
@@ -32,8 +33,13 @@ from autogpt_server.util.type import convert
logger = logging.getLogger(__name__)
def get_log_prefix(graph_eid: str, node_eid: str, block_name: str = "-"):
return f"[ExecutionManager][graph-eid-{graph_eid}|node-eid-{node_eid}|{block_name}]"
def get_log_metadata(graph_eid: str, node_eid: str, block_name: str = "-") -> dict:
return {
"component": "ExecutionManager",
"graph_eid": graph_eid,
"node_eid": node_eid,
"block_name": block_name,
}
T = TypeVar("T")
@@ -76,20 +82,31 @@ def execute_node(
return
# Sanity check: validate the execution input.
prefix = get_log_prefix(graph_exec_id, node_exec_id, node_block.name)
log_metadata = get_log_metadata(graph_exec_id, node_exec_id, node_block.name)
exec_data, error = validate_exec(node, data.data, resolve_input=False)
if exec_data is None:
logger.error(f"{prefix} Skip execution, input validation error: {error}")
logger.error(
"Skip execution, input validation error",
extra={
"json_fields": {**log_metadata, "error": error},
},
)
return
# Execute the node
exec_data_str = str(exec_data).encode("utf-8").decode("unicode_escape")
logger.warning(f"{prefix} execute with input:\n`{exec_data_str}`")
logger.info(
"Executed node with input",
extra={"json_fields": {**log_metadata, "input": exec_data_str}},
)
update_execution(ExecutionStatus.RUNNING)
try:
for output_name, output_data in node_block.execute(exec_data):
logger.warning(f"{prefix} Executed, output [{output_name}]:`{output_data}`")
logger.info(
"Node produced output",
extra={"json_fields": {**log_metadata, output_name: output_data}},
)
wait(upsert_execution_output(node_exec_id, output_name, output_data))
for execution in _enqueue_next_nodes(
@@ -98,7 +115,7 @@ def execute_node(
node=node,
output=(output_name, output_data),
graph_exec_id=graph_exec_id,
prefix=prefix,
log_metadata=log_metadata,
):
yield execution
@@ -106,7 +123,10 @@ def execute_node(
except Exception as e:
error_msg = f"{e.__class__.__name__}: {e}"
logger.exception(f"{prefix} failed with error. `%s`", error_msg)
logger.exception(
"Node execution failed with error",
extra={"json_fields": {**log_metadata, error: error_msg}},
)
wait(upsert_execution_output(node_exec_id, "error", error_msg))
update_execution(ExecutionStatus.FAILED)
@@ -128,7 +148,7 @@ def _enqueue_next_nodes(
node: Node,
output: BlockData,
graph_exec_id: str,
prefix: str,
log_metadata: dict,
) -> list[NodeExecution]:
def wait(f: Coroutine[Any, Any, T]) -> T:
return loop.run_until_complete(f)
@@ -193,11 +213,25 @@ def _enqueue_next_nodes(
# Incomplete input data, skip queueing the execution.
if not next_node_input:
logger.warning(f"{prefix} Skipped queueing {suffix}")
logger.warning(
f"Skipped queueing {suffix}",
extra={
"json_fields": {
**log_metadata,
}
},
)
return enqueued_executions
# Input is complete, enqueue the execution.
logger.warning(f"{prefix} Enqueued {suffix}")
logger.info(
f"Enqueued {suffix}",
extra={
"json_fields": {
**log_metadata,
}
},
)
enqueued_executions.append(
add_enqueued_execution(next_node_exec_id, next_node_id, next_node_input)
)
@@ -223,9 +257,11 @@ def _enqueue_next_nodes(
idata, msg = validate_exec(next_node, idata)
suffix = f"{next_output_name}>{next_input_name}~{ineid}:{msg}"
if not idata:
logger.warning(f"{prefix} Enqueueing static-link skipped: {suffix}")
logger.info(
f"{log_metadata} Enqueueing static-link skipped: {suffix}"
)
continue
logger.warning(f"{prefix} Enqueueing static-link execution {suffix}")
logger.info(f"{log_metadata} Enqueueing static-link execution {suffix}")
enqueued_executions.append(
add_enqueued_execution(iexec.node_exec_id, next_node_id, idata)
)
@@ -326,34 +362,64 @@ class Executor:
@classmethod
def on_node_executor_start(cls):
configure_logging()
cls.logger = logging.getLogger("node_executor")
cls.loop = asyncio.new_event_loop()
cls.loop.run_until_complete(db.connect())
cls.agent_server_client = get_agent_server_client()
@classmethod
def on_node_execution(cls, q: ExecutionQueue[NodeExecution], data: NodeExecution):
prefix = get_log_prefix(data.graph_exec_id, data.node_exec_id)
log_metadata = get_log_metadata(data.graph_exec_id, data.node_exec_id)
try:
logger.warning(f"{prefix} Start node execution")
cls.logger.info(
"Start node execution",
extra={
"json_fields": {
**log_metadata,
}
},
)
for execution in execute_node(cls.loop, cls.agent_server_client, data):
q.add(execution)
logger.warning(f"{prefix} Finished node execution")
cls.logger.info(
"Finished node execution",
extra={
"json_fields": {
**log_metadata,
}
},
)
except Exception as e:
logger.exception(f"{prefix} Failed node execution: {e}")
cls.logger.exception(
f"Failed node execution: {e}",
extra={
**log_metadata,
},
)
@classmethod
def on_graph_executor_start(cls):
configure_logging()
cls.logger = logging.getLogger("graph_executor")
cls.pool_size = Config().num_node_workers
cls.executor = ProcessPoolExecutor(
max_workers=cls.pool_size,
initializer=cls.on_node_executor_start,
)
logger.warning(f"Graph executor started with max-{cls.pool_size} node workers.")
cls.logger.info(f"Graph executor started with max-{cls.pool_size} node workers")
@classmethod
def on_graph_execution(cls, graph_data: GraphExecution):
prefix = get_log_prefix(graph_data.graph_exec_id, "*")
logger.warning(f"{prefix} Start graph execution")
log_metadata = get_log_metadata(graph_data.graph_exec_id, "*")
cls.logger.info(
"Start graph execution",
extra={
"json_fields": {
**log_metadata,
}
},
)
try:
queue = ExecutionQueue[NodeExecution]()
@@ -385,9 +451,23 @@ class Executor:
elif queue.empty():
cls.wait_future(future)
logger.warning(f"{prefix} Finished graph execution")
cls.logger.info(
"Finished graph execution",
extra={
"json_fields": {
**log_metadata,
}
},
)
except Exception as e:
logger.exception(f"{prefix} Failed graph execution: {e}")
cls.logger.exception(
f"Failed graph execution: {e}",
extra={
"json_fields": {
**log_metadata,
}
},
)
@classmethod
def wait_future(cls, future: Future, timeout: int | None = 3):
@@ -405,12 +485,15 @@ class ExecutionManager(AppService):
self.queue = ExecutionQueue[GraphExecution]()
self.use_redis = False
# def __del__(self):
# self.sync_manager.shutdown()
def run_service(self):
with ProcessPoolExecutor(
max_workers=self.pool_size,
initializer=Executor.on_graph_executor_start,
) as executor:
logger.warning(
logger.info(
f"Execution manager started with max-{self.pool_size} graph workers."
)
while True:

View File

@@ -0,0 +1,17 @@
import os
from forge.logging.config import LogFormatName
def configure_logging():
import logging
from forge.logging import configure_logging
if os.getenv("APP_ENV") != "cloud":
configure_logging()
else:
configure_logging(log_format=LogFormatName.STRUCTURED)
# Silence httpx logger
logging.getLogger("httpx").setLevel(logging.WARNING)

View File

@@ -25,6 +25,10 @@ class AppProcess(ABC):
if silent:
sys.stdout = open(os.devnull, "w")
sys.stderr = open(os.devnull, "w")
else:
from .logging import configure_logging
configure_logging()
self.run()
except KeyboardInterrupt or SystemExit as e:
print(f"Process terminated: {e}")

View File

@@ -103,7 +103,7 @@ class AppService(AppProcess):
ns = pyro.locate_ns(host=pyro_host, port=9090)
uri = daemon.register(self)
ns.register(self.service_name, uri)
logger.warning(f"Service [{self.service_name}] Ready. Object URI = {uri}")
logger.info(f"Service [{self.service_name}] Ready. Object URI = {uri}")
daemon.requestLoop()
def __start_async_loop(self):