tweak(rnd): Stop publishing custom Sentry metrics (#8000)

This commit is contained in:
Zamil Majdy
2024-09-05 16:51:00 -05:00
committed by GitHub
parent 2df325d033
commit 010a8ffaaf
6 changed files with 49 additions and 72 deletions

View File

@@ -1,8 +1,6 @@
from multiprocessing import set_start_method
from typing import TYPE_CHECKING
from .util.logging import configure_logging
if TYPE_CHECKING:
from autogpt_server.util.process import AppProcess
@@ -12,9 +10,6 @@ def run_processes(*processes: "AppProcess", **kwargs):
Execute all processes in the app. The last process is run in the foreground.
"""
try:
set_start_method("spawn", force=True)
configure_logging()
for process in processes[:-1]:
process.start(background=True, **kwargs)

View File

@@ -1,4 +1,5 @@
import asyncio
import logging
import os
from contextlib import asynccontextmanager
from uuid import uuid4
@@ -14,6 +15,8 @@ os.environ["PRISMA_SCHEMA_PATH"] = PRISMA_SCHEMA
prisma, conn_id = Prisma(auto_register=True), ""
logger = logging.getLogger(__name__)
async def connect(call_count=0):
global conn_id
@@ -21,13 +24,13 @@ async def connect(call_count=0):
conn_id = str(uuid4())
try:
print(f"[Prisma-{conn_id}] Acquiring connection..")
logger.info(f"[Prisma-{conn_id}] Acquiring connection..")
if not prisma.is_connected():
await prisma.connect()
print(f"[Prisma-{conn_id}] Connection acquired!")
logger.info(f"[Prisma-{conn_id}] Connection acquired!")
except Exception as e:
if call_count <= 5:
print(f"[Prisma-{conn_id}] Connection failed: {e}. Retrying now..")
logger.info(f"[Prisma-{conn_id}] Connection failed: {e}. Retrying now..")
await asyncio.sleep(call_count)
await connect(call_count + 1)
else:
@@ -36,9 +39,9 @@ async def connect(call_count=0):
async def disconnect():
if prisma.is_connected():
print(f"[Prisma-{conn_id}] Releasing connection.")
logger.info(f"[Prisma-{conn_id}] Releasing connection.")
await prisma.disconnect()
print(f"[Prisma-{conn_id}] Connection released.")
logger.info(f"[Prisma-{conn_id}] Connection released.")
@asynccontextmanager

View File

@@ -274,7 +274,10 @@ async def update_node_execution_stats(node_exec_id: str, stats: dict[str, Any]):
async def update_execution_status(
node_exec_id: str, status: ExecutionStatus, execution_data: BlockInput | None = None
node_exec_id: str,
status: ExecutionStatus,
execution_data: BlockInput | None = None,
stats: dict[str, Any] | None = None,
) -> ExecutionResult:
if status == ExecutionStatus.QUEUED and execution_data is None:
raise ValueError("Execution data must be provided when queuing an execution.")
@@ -287,6 +290,7 @@ async def update_execution_status(
**({"endedTime": now} if status == ExecutionStatus.FAILED else {}),
**({"endedTime": now} if status == ExecutionStatus.COMPLETED else {}),
**({"executionData": json.dumps(execution_data)} if execution_data else {}),
**({"stats": json.dumps(stats)} if stats else {}),
}
res = await AgentNodeExecution.prisma().update(

View File

@@ -34,12 +34,6 @@ from autogpt_server.data.graph import Graph, Link, Node, get_graph, get_node
from autogpt_server.util import json
from autogpt_server.util.decorator import error_logged, time_measured
from autogpt_server.util.logging import configure_logging
from autogpt_server.util.metrics import (
metric_graph_count,
metric_graph_timing,
metric_node_payload,
metric_node_timing,
)
from autogpt_server.util.service import AppService, expose, get_service_client
from autogpt_server.util.settings import Config
from autogpt_server.util.type import convert
@@ -69,7 +63,10 @@ ExecutionStream = Generator[NodeExecution, None, None]
def execute_node(
loop: asyncio.AbstractEventLoop, api_client: "AgentServer", data: NodeExecution
loop: asyncio.AbstractEventLoop,
api_client: "AgentServer",
data: NodeExecution,
execution_stats: dict[str, Any] | None = None,
) -> ExecutionStream:
"""
Execute a node in the graph. This will trigger a block execution on a node,
@@ -79,6 +76,7 @@ def execute_node(
loop: The event loop to run the async functions.
api_client: The client to send execution updates to the server.
data: The execution data for executing the current node.
execution_stats: The execution statistics to be updated.
Returns:
The subsequent node to be enqueued, or None if there is no subsequent node.
@@ -124,17 +122,18 @@ def execute_node(
# Execute the node
input_data_str = json.dumps(input_data)
metric_node_payload("input_size", len(input_data_str), tags=log_metadata)
input_size = len(input_data_str)
logger.info(
"Executed node with input",
extra={"json_fields": {**log_metadata, "input": input_data_str}},
)
update_execution(ExecutionStatus.RUNNING)
output_size = 0
try:
for output_name, output_data in node_block.execute(input_data):
output_data_str = json.dumps(output_data)
metric_node_payload("output_size", len(output_data_str), tags=log_metadata)
output_size += len(output_data_str)
logger.info(
"Node produced output",
extra={"json_fields": {**log_metadata, output_name: output_data_str}},
@@ -165,6 +164,11 @@ def execute_node(
raise e
finally:
if execution_stats is not None:
execution_stats["input_size"] = input_size
execution_stats["output_size"] = output_size
@contextmanager
def synchronized(api_client: "AgentServer", key: Any):
@@ -413,23 +417,24 @@ class Executor:
node_id=data.node_id,
block_name="-",
)
timing_info, _ = cls._on_node_execution(q, data, log_metadata)
metric_node_timing("walltime", timing_info.wall_time, tags=log_metadata)
metric_node_timing("cputime", timing_info.cpu_time, tags=log_metadata)
execution_stats = {}
timing_info, _ = cls._on_node_execution(q, data, log_metadata, execution_stats)
execution_stats["walltime"] = timing_info.wall_time
execution_stats["cputime"] = timing_info.cpu_time
cls.loop.run_until_complete(
update_node_execution_stats(
data.node_exec_id,
{
"walltime": timing_info.wall_time,
"cputime": timing_info.cpu_time,
},
)
update_node_execution_stats(data.node_exec_id, execution_stats)
)
@classmethod
@time_measured
def _on_node_execution(
cls, q: ExecutionQueue[NodeExecution], data: NodeExecution, log_metadata: dict
cls,
q: ExecutionQueue[NodeExecution],
d: NodeExecution,
log_metadata: dict,
stats: dict[str, Any] | None = None,
):
try:
cls.logger.info(
@@ -440,7 +445,7 @@ class Executor:
}
},
)
for execution in execute_node(cls.loop, cls.agent_server_client, data):
for execution in execute_node(cls.loop, cls.agent_server_client, d, stats):
q.add(execution)
cls.logger.info(
"Finished node execution",
@@ -486,9 +491,6 @@ class Executor:
block_name="-",
)
timing_info, node_count = cls._on_graph_execution(data, cancel, log_metadata)
metric_graph_timing("walltime", timing_info.wall_time, tags=log_metadata)
metric_graph_timing("cputime", timing_info.cpu_time, tags=log_metadata)
metric_graph_count("nodecount", node_count, tags=log_metadata)
cls.loop.run_until_complete(
update_graph_execution_stats(

View File

@@ -1,38 +1,8 @@
import sentry_sdk
from sentry_sdk import metrics
from autogpt_server.util.settings import Settings
sentry_dsn = Settings().secrets.sentry_dsn
sentry_sdk.init(dsn=sentry_dsn, traces_sample_rate=1.0, profiles_sample_rate=1.0)
def emit_distribution(
name: str,
key: str,
value: float,
unit: str = "none",
tags: dict[str, str] | None = None,
):
metrics.distribution(
key=f"{name}__{key}",
value=value,
unit=unit,
tags=tags or {},
)
def metric_node_payload(key: str, value: float, tags: dict[str, str]):
emit_distribution("NODE_EXECUTION", key, value, unit="byte", tags=tags)
def metric_node_timing(key: str, value: float, tags: dict[str, str]):
emit_distribution("NODE_EXECUTION", key, value, unit="second", tags=tags)
def metric_graph_count(key: str, value: int, tags: dict[str, str]):
emit_distribution("GRAPH_EXECUTION", key, value, tags=tags)
def metric_graph_timing(key: str, value: float, tags: dict[str, str]):
emit_distribution("GRAPH_EXECUTION", key, value, unit="second", tags=tags)
def sentry_init():
sentry_dsn = Settings().secrets.sentry_dsn
sentry_sdk.init(dsn=sentry_dsn, traces_sample_rate=1.0, profiles_sample_rate=1.0)

View File

@@ -4,6 +4,9 @@ from abc import ABC, abstractmethod
from multiprocessing import Process, set_start_method
from typing import Optional
from autogpt_server.util.logging import configure_logging
from autogpt_server.util.metrics import sentry_init
class AppProcess(ABC):
"""
@@ -13,6 +16,10 @@ class AppProcess(ABC):
process: Optional[Process] = None
set_start_method("spawn", force=True)
set_start_method("spawn", force=True)
configure_logging()
sentry_init()
@abstractmethod
def run(self):
"""
@@ -31,10 +38,6 @@ class AppProcess(ABC):
if silent:
sys.stdout = open(os.devnull, "w")
sys.stderr = open(os.devnull, "w")
else:
from .logging import configure_logging
configure_logging()
self.run()
except KeyboardInterrupt or SystemExit as e:
print(f"Process terminated: {e}")