mirror of
https://github.com/googleapis/genai-toolbox.git
synced 2026-04-09 03:02:26 -04:00
feat(tools/postgres): add long_running_transactions, list_locks and replication_stats tools (#1751)
Adds the following tools for Postgre (1) long_running_transactions - reports transactions that exceed a configured duration threshold. (2) list_locks - lists active locks in the database, including the associated process, lock type, relation, mode, and the query holding or waiting on the lock. (3) replication_stats - reports replication-related metrics for WAL streaming replicas, including lag sizes presented in human-readable form. <img width="3020" height="1420" alt="image" src="https://github.com/user-attachments/assets/e7d70063-b90c-4464-90ec-1bd810c02cac" /> <img width="3036" height="1374" alt="image" src="https://github.com/user-attachments/assets/f7cf584b-ae01-455c-9c9c-acca3166a549" /> <img width="2682" height="868" alt="image" src="https://github.com/user-attachments/assets/dd10646c-4521-4d8f-a111-760d6eb01249" /> ## PR Checklist - [Y] Make sure you reviewed [CONTRIBUTING.md](https://github.com/googleapis/genai-toolbox/blob/main/CONTRIBUTING.md) - [Y] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/genai-toolbox/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [Y] Ensure the tests and linter pass - [Y] Code coverage does not decrease (if any source code was changed) - [Y] Appropriate docs were updated (if necessary) - [Y] Make sure to add `!` if this involve a breaking change 🛠️ Fixes #1691 #1715 --------- Co-authored-by: Wenxin Du <117315983+duwenxin99@users.noreply.github.com>
This commit is contained in:
@@ -183,11 +183,14 @@ import (
|
||||
_ "github.com/googleapis/genai-toolbox/internal/tools/postgres/postgreslistavailableextensions"
|
||||
_ "github.com/googleapis/genai-toolbox/internal/tools/postgres/postgreslistindexes"
|
||||
_ "github.com/googleapis/genai-toolbox/internal/tools/postgres/postgreslistinstalledextensions"
|
||||
_ "github.com/googleapis/genai-toolbox/internal/tools/postgres/postgreslistlocks"
|
||||
_ "github.com/googleapis/genai-toolbox/internal/tools/postgres/postgreslistschemas"
|
||||
_ "github.com/googleapis/genai-toolbox/internal/tools/postgres/postgreslistsequences"
|
||||
_ "github.com/googleapis/genai-toolbox/internal/tools/postgres/postgreslisttables"
|
||||
_ "github.com/googleapis/genai-toolbox/internal/tools/postgres/postgreslisttriggers"
|
||||
_ "github.com/googleapis/genai-toolbox/internal/tools/postgres/postgreslistviews"
|
||||
_ "github.com/googleapis/genai-toolbox/internal/tools/postgres/postgreslongrunningtransactions"
|
||||
_ "github.com/googleapis/genai-toolbox/internal/tools/postgres/postgresreplicationstats"
|
||||
_ "github.com/googleapis/genai-toolbox/internal/tools/postgres/postgressql"
|
||||
_ "github.com/googleapis/genai-toolbox/internal/tools/redis"
|
||||
_ "github.com/googleapis/genai-toolbox/internal/tools/serverlessspark/serverlesssparkcancelbatch"
|
||||
|
||||
@@ -1478,7 +1478,7 @@ func TestPrebuiltTools(t *testing.T) {
|
||||
wantToolset: server.ToolsetConfigs{
|
||||
"alloydb_postgres_database_tools": tools.ToolsetConfig{
|
||||
Name: "alloydb_postgres_database_tools",
|
||||
ToolNames: []string{"execute_sql", "list_tables", "list_active_queries", "list_available_extensions", "list_installed_extensions", "list_autovacuum_configurations", "list_memory_configurations", "list_top_bloated_tables", "list_replication_slots", "list_invalid_indexes", "get_query_plan", "list_views", "list_schemas", "database_overview", "list_triggers", "list_indexes", "list_sequences"},
|
||||
ToolNames: []string{"execute_sql", "list_tables", "list_active_queries", "list_available_extensions", "list_installed_extensions", "list_autovacuum_configurations", "list_memory_configurations", "list_top_bloated_tables", "list_replication_slots", "list_invalid_indexes", "get_query_plan", "list_views", "list_schemas", "database_overview", "list_triggers", "list_indexes", "list_sequences", "long_running_transactions", "list_locks", "replication_stats"},
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -1508,7 +1508,7 @@ func TestPrebuiltTools(t *testing.T) {
|
||||
wantToolset: server.ToolsetConfigs{
|
||||
"cloud_sql_postgres_database_tools": tools.ToolsetConfig{
|
||||
Name: "cloud_sql_postgres_database_tools",
|
||||
ToolNames: []string{"execute_sql", "list_tables", "list_active_queries", "list_available_extensions", "list_installed_extensions", "list_autovacuum_configurations", "list_memory_configurations", "list_top_bloated_tables", "list_replication_slots", "list_invalid_indexes", "get_query_plan", "list_views", "list_schemas", "database_overview", "list_triggers", "list_indexes", "list_sequences"},
|
||||
ToolNames: []string{"execute_sql", "list_tables", "list_active_queries", "list_available_extensions", "list_installed_extensions", "list_autovacuum_configurations", "list_memory_configurations", "list_top_bloated_tables", "list_replication_slots", "list_invalid_indexes", "get_query_plan", "list_views", "list_schemas", "database_overview", "list_triggers", "list_indexes", "list_sequences", "long_running_transactions", "list_locks", "replication_stats"},
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -1608,7 +1608,7 @@ func TestPrebuiltTools(t *testing.T) {
|
||||
wantToolset: server.ToolsetConfigs{
|
||||
"postgres_database_tools": tools.ToolsetConfig{
|
||||
Name: "postgres_database_tools",
|
||||
ToolNames: []string{"execute_sql", "list_tables", "list_active_queries", "list_available_extensions", "list_installed_extensions", "list_autovacuum_configurations", "list_memory_configurations", "list_top_bloated_tables", "list_replication_slots", "list_invalid_indexes", "get_query_plan", "list_views", "list_schemas", "database_overview", "list_triggers", "list_indexes", "list_sequences"},
|
||||
ToolNames: []string{"execute_sql", "list_tables", "list_active_queries", "list_available_extensions", "list_installed_extensions", "list_autovacuum_configurations", "list_memory_configurations", "list_top_bloated_tables", "list_replication_slots", "list_invalid_indexes", "get_query_plan", "list_views", "list_schemas", "database_overview", "list_triggers", "list_indexes", "list_sequences", "long_running_transactions", "list_locks", "replication_stats"},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
@@ -62,6 +62,14 @@ cluster][alloydb-free-trial].
|
||||
|
||||
- [`postgres-list-sequences`](../tools/postgres/postgres-list-sequences.md)
|
||||
List sequences in a PostgreSQL database.
|
||||
- [`postgres-long-running-transactions`](../tools/postgres/postgres-long-running-transactions.md)
|
||||
List long running transactions in a PostgreSQL database.
|
||||
|
||||
- [`postgres-list-locks`](../tools/postgres/postgres-list-locks.md)
|
||||
List lock stats in a PostgreSQL database.
|
||||
|
||||
- [`postgres-replication-stats`](../tools/postgres/postgres-replication-stats.md)
|
||||
List replication stats in a PostgreSQL database.
|
||||
|
||||
### Pre-built Configurations
|
||||
|
||||
|
||||
@@ -58,6 +58,14 @@ to a database by following these instructions][csql-pg-quickstart].
|
||||
|
||||
- [`postgres-list-sequences`](../tools/postgres/postgres-list-sequences.md)
|
||||
List sequences in a PostgreSQL database.
|
||||
- [`postgres-long-running-transactions`](../tools/postgres/postgres-long-running-transactions.md)
|
||||
List long running transactions in a PostgreSQL database.
|
||||
|
||||
- [`postgres-list-locks`](../tools/postgres/postgres-list-locks.md)
|
||||
List lock stats in a PostgreSQL database.
|
||||
|
||||
- [`postgres-replication-stats`](../tools/postgres/postgres-replication-stats.md)
|
||||
List replication stats in a PostgreSQL database.
|
||||
|
||||
### Pre-built Configurations
|
||||
|
||||
|
||||
@@ -53,6 +53,15 @@ reputation for reliability, feature robustness, and performance.
|
||||
- [`postgres-list-sequences`](../tools/postgres/postgres-list-sequences.md)
|
||||
List sequences in a PostgreSQL database.
|
||||
|
||||
- [`postgres-long-running-transactions`](../tools/postgres/postgres-long-running-transactions.md)
|
||||
List long running transactions in a PostgreSQL database.
|
||||
|
||||
- [`postgres-list-locks`](../tools/postgres/postgres-list-locks.md)
|
||||
List lock stats in a PostgreSQL database.
|
||||
|
||||
- [`postgres-replication-stats`](../tools/postgres/postgres-replication-stats.md)
|
||||
List replication stats in a PostgreSQL database.
|
||||
|
||||
### Pre-built Configurations
|
||||
|
||||
- [PostgreSQL using MCP](https://googleapis.github.io/genai-toolbox/how-to/connect-ide/postgres_mcp/)
|
||||
|
||||
79
docs/en/resources/tools/postgres/postgres-list-locks.md
Normal file
79
docs/en/resources/tools/postgres/postgres-list-locks.md
Normal file
@@ -0,0 +1,79 @@
|
||||
---
|
||||
title: "postgres-list-locks"
|
||||
type: docs
|
||||
weight: 1
|
||||
description: >
|
||||
The "postgres-list-locks" tool lists active locks in the database, including the associated process, lock type, relation, mode, and the query holding or waiting on the lock.
|
||||
aliases:
|
||||
- /resources/tools/postgres-list-locks
|
||||
---
|
||||
|
||||
## About
|
||||
|
||||
The `postgres-list-locks` tool displays information about active locks by joining pg_stat_activity with pg_locks. This is useful to find transactions holding or waiting for locks and to troubleshoot contention.
|
||||
|
||||
Compatible sources:
|
||||
|
||||
- [alloydb-postgres](../../sources/alloydb-pg.md)
|
||||
- [cloud-sql-postgres](../../sources/cloud-sql-pg.md)
|
||||
- [postgres](../../sources/postgres.md)
|
||||
|
||||
|
||||
This tool identifies all locks held by active processes showing the process ID, user, query text, and an aggregated list of all transactions and specific locks (relation, mode, grant status) associated with each process.
|
||||
|
||||
## Query
|
||||
|
||||
The tool aggregates locks per backend (process) and returns the concatenated transaction ids and lock entries. The SQL used by the tool looks like:
|
||||
|
||||
```sql
|
||||
SELECT
|
||||
locked.pid,
|
||||
locked.usename,
|
||||
locked.query,
|
||||
string_agg(locked.transactionid::text,':') as trxid,
|
||||
string_agg(locked.lockinfo,'||') as locks
|
||||
FROM
|
||||
(SELECT
|
||||
a.pid,
|
||||
a.usename,
|
||||
a.query,
|
||||
l.transactionid,
|
||||
(l.granted::text||','||coalesce(l.relation::regclass,0)::text||','||l.mode::text)::text as lockinfo
|
||||
FROM
|
||||
pg_stat_activity a
|
||||
JOIN pg_locks l ON l.pid = a.pid AND a.pid != pg_backend_pid()) as locked
|
||||
GROUP BY
|
||||
locked.pid, locked.usename, locked.query;
|
||||
```
|
||||
|
||||
## Example
|
||||
|
||||
```yaml
|
||||
tools:
|
||||
list_locks:
|
||||
kind: postgres-list-locks
|
||||
source: postgres-source
|
||||
description: "Lists active locks with associated process and query information."
|
||||
```
|
||||
|
||||
Example response element (aggregated per process):
|
||||
|
||||
```json
|
||||
{
|
||||
"pid": 23456,
|
||||
"usename": "dbuser",
|
||||
"query": "INSERT INTO orders (...) VALUES (...);",
|
||||
"trxid": "12345:0",
|
||||
"locks": "true,public.orders,RowExclusiveLock||false,0,ShareUpdateExclusiveLock"
|
||||
}
|
||||
```
|
||||
|
||||
## Reference
|
||||
|
||||
| field | type | required | description |
|
||||
|:--------|:--------|:--------:|:------------|
|
||||
| pid | integer | true | Process id (backend pid). |
|
||||
| usename | string | true | Database user. |
|
||||
| query | string | true | SQL text associated with the session. |
|
||||
| trxid | string | true | Aggregated transaction ids for the process, joined by ':' (string). Each element is the transactionid as text. |
|
||||
| locks | string | true | Aggregated lock info entries for the process, joined by '||'. Each entry is a comma-separated triple: `granted,relation,mode` where `relation` may be `0` when not resolvable via regclass. |
|
||||
@@ -0,0 +1,106 @@
|
||||
---
|
||||
title: "postgres-long-running-transactions"
|
||||
type: docs
|
||||
weight: 1
|
||||
description: >
|
||||
The postgres-long-running-transactions tool Identifies and lists database transactions that exceed a specified time limit. For each of the long running transactions, the output contains the process id, database name, user name, application name, client address, state, connection age, transaction age, query age, last activity age, wait event type, wait event, and query string.
|
||||
aliases:
|
||||
- /resources/tools/postgres-long-running-transactions
|
||||
---
|
||||
|
||||
## About
|
||||
|
||||
The `postgres-long-running-transactions` tool reports transactions that exceed a configured duration threshold by scanning `pg_stat_activity` for sessions where `xact_start` is set and older than the configured interval.
|
||||
|
||||
Compatible sources:
|
||||
|
||||
- [alloydb-postgres](../../sources/alloydb-pg.md)
|
||||
- [cloud-sql-postgres](../../sources/cloud-sql-pg.md)
|
||||
- [postgres](../../sources/postgres.md)
|
||||
|
||||
The tool returns a JSON array with one object per matching session (non-idle). Each object contains the process id, database and user, application name, client address, session state, several age intervals (connection, transaction, query, and last activity), wait event info, and the SQL text currently associated with the session.
|
||||
|
||||
Parameters:
|
||||
|
||||
- `min_duration` (optional): Only show transactions running at least this long (Postgres interval format, e.g., '5 minutes'). Default: `5 minutes`.
|
||||
- `limit` (optional): Maximum number of results to return. Default: `20`.
|
||||
|
||||
## Query
|
||||
|
||||
The SQL used by the tool looks like:
|
||||
|
||||
```sql
|
||||
SELECT
|
||||
pid,
|
||||
datname,
|
||||
usename,
|
||||
application_name as appname,
|
||||
client_addr,
|
||||
state,
|
||||
now() - backend_start as conn_age,
|
||||
now() - xact_start as xact_age,
|
||||
now() - query_start as query_age,
|
||||
now() - state_change as last_activity_age,
|
||||
wait_event_type,
|
||||
wait_event,
|
||||
query
|
||||
FROM
|
||||
pg_stat_activity
|
||||
WHERE
|
||||
state <> 'idle'
|
||||
AND (now() - xact_start) > COALESCE($1::INTERVAL, interval '5 minutes')
|
||||
AND xact_start IS NOT NULL
|
||||
AND pid <> pg_backend_pid()
|
||||
ORDER BY
|
||||
xact_age DESC
|
||||
LIMIT
|
||||
COALESCE($2::int, 20);
|
||||
```
|
||||
|
||||
## Example
|
||||
|
||||
```yaml
|
||||
tools:
|
||||
long_running_transactions:
|
||||
kind: postgres-long-running-transactions
|
||||
source: postgres-source
|
||||
description: "Identifies transactions open longer than a threshold and returns details including query text and durations."
|
||||
```
|
||||
|
||||
Example response element:
|
||||
|
||||
```json
|
||||
{
|
||||
"pid": 12345,
|
||||
"datname": "my_database",
|
||||
"usename": "dbuser",
|
||||
"appname": "my_app",
|
||||
"client_addr": "10.0.0.5",
|
||||
"state": "idle in transaction",
|
||||
"conn_age": "00:12:34",
|
||||
"xact_age": "00:06:00",
|
||||
"query_age": "00:02:00",
|
||||
"last_activity_age": "00:01:30",
|
||||
"wait_event_type": null,
|
||||
"wait_event": null,
|
||||
"query": "UPDATE users SET last_seen = now() WHERE id = 42;"
|
||||
}
|
||||
```
|
||||
|
||||
## Reference
|
||||
|
||||
| field | type | required | description |
|
||||
|:---------------------|:--------|:--------:|:------------|
|
||||
| pid | integer | true | Process id (backend pid). |
|
||||
| datname | string | true | Database name. |
|
||||
| usename | string | true | Database user name. |
|
||||
| appname | string | false | Application name (client application). |
|
||||
| client_addr | string | false | Client IPv4/IPv6 address (may be null for local connections). |
|
||||
| state | string | true | Session state (e.g., active, idle in transaction). |
|
||||
| conn_age | string | true | Age of the connection: `now() - backend_start` (Postgres interval serialized as string). |
|
||||
| xact_age | string | true | Age of the transaction: `now() - xact_start` (Postgres interval serialized as string). |
|
||||
| query_age | string | true | Age of the currently running query: `now() - query_start` (Postgres interval serialized as string). |
|
||||
| last_activity_age | string | true | Time since last state change: `now() - state_change` (Postgres interval serialized as string). |
|
||||
| wait_event_type | string | false | Type of event the backend is waiting on (may be null). |
|
||||
| wait_event | string | false | Specific wait event name (may be null). |
|
||||
| query | string | true | SQL text associated with the session. |
|
||||
@@ -0,0 +1,67 @@
|
||||
---
|
||||
title: "postgres-replication-stats"
|
||||
type: docs
|
||||
weight: 1
|
||||
description: >
|
||||
The "postgres-replication-stats" tool reports replication-related metrics for WAL streaming replicas, including lag sizes presented in human-readable form.
|
||||
aliases:
|
||||
- /resources/tools/postgres-replication-stats
|
||||
---
|
||||
|
||||
## About
|
||||
|
||||
The `postgres-replication-stats` tool queries pg_stat_replication to surface the status of connected replicas. It reports application_name, client address, connection and sync state, and human-readable lag sizes (sent, write, flush, replay, and total) computed using WAL LSN differences.
|
||||
|
||||
Compatible sources:
|
||||
|
||||
- [alloydb-postgres](../../sources/alloydb-pg.md)
|
||||
- [cloud-sql-postgres](../../sources/cloud-sql-pg.md)
|
||||
- [postgres](../../sources/postgres.md)
|
||||
|
||||
This tool takes no parameters. It returns a JSON array; each element represents a replication connection on the primary and includes lag metrics formatted by pg_size_pretty.
|
||||
|
||||
## Example
|
||||
|
||||
```yaml
|
||||
tools:
|
||||
replication_stats:
|
||||
kind: postgres-replication-stats
|
||||
source: postgres-source
|
||||
description: "Lists replication connections and readable WAL lag metrics."
|
||||
```
|
||||
|
||||
Example response element:
|
||||
|
||||
```json
|
||||
{
|
||||
"pid": 12345,
|
||||
"usename": "replication_user",
|
||||
"application_name": "replica-1",
|
||||
"backend_xmin": "0/0",
|
||||
"client_addr": "10.0.0.7",
|
||||
"state": "streaming",
|
||||
"sync_state": "sync",
|
||||
"sent_lag": "1234 kB",
|
||||
"write_lag": "12 kB",
|
||||
"flush_lag": "0 bytes",
|
||||
"replay_lag": "0 bytes",
|
||||
"total_lag": "1234 kB"
|
||||
}
|
||||
```
|
||||
|
||||
## Reference
|
||||
|
||||
| field | type | required | description |
|
||||
|------------------:|:-------:|:--------:|:------------|
|
||||
| pid | integer | true | Process ID of the replication backend on the primary. |
|
||||
| usename | string | true | Name of the user performing the replication connection. |
|
||||
| application_name | string | true | Name of the application (replica) connecting to the primary. |
|
||||
| backend_xmin | string | false | Standby's xmin horizon reported by hot_standby_feedback (may be null). |
|
||||
| client_addr | string | false | Client IP address of the replica (may be null). |
|
||||
| state | string | true | Connection state (e.g., streaming). |
|
||||
| sync_state | string | true | Sync state (e.g., async, sync, potential). |
|
||||
| sent_lag | string | true | Human-readable size difference between current WAL LSN and sent_lsn. |
|
||||
| write_lag | string | true | Human-readable write lag between sent_lsn and write_lsn. |
|
||||
| flush_lag | string | true | Human-readable flush lag between write_lsn and flush_lsn. |
|
||||
| replay_lag | string | true | Human-readable replay lag between flush_lsn and replay_lsn. |
|
||||
| total_lag | string | true | Human-readable total lag between current WAL LSN and replay_lsn. |
|
||||
@@ -50,6 +50,18 @@ tools:
|
||||
source: alloydb-pg-source
|
||||
description: "List all installed PostgreSQL extensions with their name, version, schema, owner, and description."
|
||||
|
||||
long_running_transactions:
|
||||
kind: postgres-long-running-transactions
|
||||
source: alloydb-pg-source
|
||||
|
||||
list_locks:
|
||||
kind: postgres-list-locks
|
||||
source: alloydb-pg-source
|
||||
|
||||
replication_stats:
|
||||
kind: postgres-replication-stats
|
||||
source: alloydb-pg-source
|
||||
|
||||
list_autovacuum_configurations:
|
||||
kind: postgres-sql
|
||||
source: alloydb-pg-source
|
||||
@@ -199,3 +211,7 @@ toolsets:
|
||||
- list_triggers
|
||||
- list_indexes
|
||||
- list_sequences
|
||||
- long_running_transactions
|
||||
- list_locks
|
||||
- replication_stats
|
||||
|
||||
|
||||
@@ -49,6 +49,21 @@ tools:
|
||||
source: cloudsql-pg-source
|
||||
description: "List all installed PostgreSQL extensions with their name, version, schema, owner, and description."
|
||||
|
||||
long_running_transactions:
|
||||
kind: postgres-long-running-transactions
|
||||
source: cloudsql-pg-source
|
||||
description: "Identifies and lists database transactions that exceed a specified time limit. For each of the long running transactions, the output contains the process id, database name, user name, application name, client address, state, connection age, transaction age, query age, last activity age, wait event type, wait event, and query string."
|
||||
|
||||
list_locks:
|
||||
kind: postgres-list-locks
|
||||
source: cloudsql-pg-source
|
||||
description: "Identifies all locks held by active processes showing the process ID, user, query text, and an aggregated list of all transactions and specific locks (relation, mode, grant status) associated with each process."
|
||||
|
||||
replication_stats:
|
||||
kind: postgres-replication-stats
|
||||
source: cloudsql-pg-source
|
||||
description: "Lists each replica's process ID, user name, application name, backend_xmin (standby's xmin horizon reported by hot_standby_feedback), client IP address, connection state, and sync_state, along with lag sizes in bytes for sent_lag (primary to sent), write_lag (sent to written), flush_lag (written to flushed), replay_lag (flushed to replayed), and the overall total_lag (primary to replayed)."
|
||||
|
||||
list_autovacuum_configurations:
|
||||
kind: postgres-sql
|
||||
source: cloudsql-pg-source
|
||||
@@ -198,3 +213,6 @@ toolsets:
|
||||
- list_triggers
|
||||
- list_indexes
|
||||
- list_sequences
|
||||
- long_running_transactions
|
||||
- list_locks
|
||||
- replication_stats
|
||||
|
||||
@@ -48,6 +48,21 @@ tools:
|
||||
source: postgresql-source
|
||||
description: "List all installed PostgreSQL extensions with their name, version, schema, owner, and description."
|
||||
|
||||
long_running_transactions:
|
||||
kind: postgres-long-running-transactions
|
||||
source: postgresql-source
|
||||
description: "Identifies and lists database transactions that exceed a specified time limit. For each of the long running transactions, the output contains the process id, database name, user name, application name, client address, state, connection age, transaction age, query age, last activity age, wait event type, wait event, and query string."
|
||||
|
||||
list_locks:
|
||||
kind: postgres-list-locks
|
||||
source: postgresql-source
|
||||
description: "Identifies all locks held by active processes showing the process ID, user, query text, and an aggregated list of all transactions and specific locks (relation, mode, grant status) associated with each process."
|
||||
|
||||
replication_stats:
|
||||
kind: postgres-replication-stats
|
||||
source: postgresql-source
|
||||
description: "Lists each replica's process ID, user name, application name, backend_xmin (standby's xmin horizon reported by hot_standby_feedback), client IP address, connection state, and sync_state, along with lag sizes in bytes for sent_lag (primary to sent), write_lag (sent to written), flush_lag (written to flushed), replay_lag (flushed to replayed), and the overall total_lag (primary to replayed)."
|
||||
|
||||
list_autovacuum_configurations:
|
||||
kind: postgres-sql
|
||||
source: postgresql-source
|
||||
@@ -197,3 +212,6 @@ toolsets:
|
||||
- list_triggers
|
||||
- list_indexes
|
||||
- list_sequences
|
||||
- long_running_transactions
|
||||
- list_locks
|
||||
- replication_stats
|
||||
|
||||
201
internal/tools/postgres/postgreslistlocks/postgreslistlocks.go
Normal file
201
internal/tools/postgres/postgreslistlocks/postgreslistlocks.go
Normal file
@@ -0,0 +1,201 @@
|
||||
// Copyright 2025 Google LLC
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package postgreslistlocks
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
yaml "github.com/goccy/go-yaml"
|
||||
"github.com/googleapis/genai-toolbox/internal/sources"
|
||||
"github.com/googleapis/genai-toolbox/internal/sources/alloydbpg"
|
||||
"github.com/googleapis/genai-toolbox/internal/sources/cloudsqlpg"
|
||||
"github.com/googleapis/genai-toolbox/internal/sources/postgres"
|
||||
"github.com/googleapis/genai-toolbox/internal/tools"
|
||||
"github.com/googleapis/genai-toolbox/internal/util/parameters"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
)
|
||||
|
||||
const kind string = "postgres-list-locks"
|
||||
|
||||
const listLocks = `
|
||||
SELECT
|
||||
locked.pid,
|
||||
locked.usename,
|
||||
locked.query,
|
||||
string_agg(locked.transactionid::text,':') as trxid,
|
||||
string_agg(locked.lockinfo,'||') as locks
|
||||
FROM
|
||||
(SELECT
|
||||
a.pid,
|
||||
a.usename,
|
||||
a.query,
|
||||
l.transactionid,
|
||||
(l.granted::text||','||coalesce(l.relation::regclass,0)::text||','||l.mode::text)::text as lockinfo
|
||||
FROM
|
||||
pg_stat_activity a
|
||||
JOIN pg_locks l ON l.pid = a.pid AND a.pid != pg_backend_pid()) as locked
|
||||
GROUP BY
|
||||
locked.pid, locked.usename, locked.query;
|
||||
`
|
||||
|
||||
func init() {
|
||||
if !tools.Register(kind, newConfig) {
|
||||
panic(fmt.Sprintf("tool kind %q already registered", kind))
|
||||
}
|
||||
}
|
||||
|
||||
func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.ToolConfig, error) {
|
||||
actual := Config{Name: name}
|
||||
if err := decoder.DecodeContext(ctx, &actual); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return actual, nil
|
||||
}
|
||||
|
||||
type compatibleSource interface {
|
||||
PostgresPool() *pgxpool.Pool
|
||||
}
|
||||
|
||||
// validate compatible sources are still compatible
|
||||
var _ compatibleSource = &alloydbpg.Source{}
|
||||
var _ compatibleSource = &cloudsqlpg.Source{}
|
||||
var _ compatibleSource = &postgres.Source{}
|
||||
|
||||
var compatibleSources = [...]string{alloydbpg.SourceKind, cloudsqlpg.SourceKind, postgres.SourceKind}
|
||||
|
||||
type Config struct {
|
||||
Name string `yaml:"name" validate:"required"`
|
||||
Kind string `yaml:"kind" validate:"required"`
|
||||
Source string `yaml:"source" validate:"required"`
|
||||
Description string `yaml:"description"`
|
||||
AuthRequired []string `yaml:"authRequired"`
|
||||
}
|
||||
|
||||
// validate interface
|
||||
var _ tools.ToolConfig = Config{}
|
||||
|
||||
func (cfg Config) ToolConfigKind() string {
|
||||
return kind
|
||||
}
|
||||
|
||||
func (cfg Config) Initialize(srcs map[string]sources.Source) (tools.Tool, error) {
|
||||
// verify source exists
|
||||
rawS, ok := srcs[cfg.Source]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("no source named %q configured", cfg.Source)
|
||||
}
|
||||
|
||||
// verify the source is compatible
|
||||
s, ok := rawS.(compatibleSource)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("invalid source for %q tool: source kind must be one of %q", kind, compatibleSources)
|
||||
}
|
||||
|
||||
allParameters := parameters.Parameters{}
|
||||
paramManifest := allParameters.Manifest()
|
||||
|
||||
if cfg.Description == "" {
|
||||
cfg.Description = "Identifies all locks held by active processes showing the process ID, user, query text, and an aggregated list of all transactions and specific locks (relation, mode, grant status) associated with each process."
|
||||
}
|
||||
|
||||
mcpManifest := tools.GetMcpManifest(cfg.Name, cfg.Description, cfg.AuthRequired, allParameters)
|
||||
|
||||
// finish tool setup
|
||||
return Tool{
|
||||
name: cfg.Name,
|
||||
kind: cfg.Kind,
|
||||
authRequired: cfg.AuthRequired,
|
||||
allParams: allParameters,
|
||||
pool: s.PostgresPool(),
|
||||
manifest: tools.Manifest{
|
||||
Description: cfg.Description,
|
||||
Parameters: paramManifest,
|
||||
AuthRequired: cfg.AuthRequired,
|
||||
},
|
||||
mcpManifest: mcpManifest,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// validate interface
|
||||
var _ tools.Tool = Tool{}
|
||||
|
||||
type Tool struct {
|
||||
Config
|
||||
name string `yaml:"name"`
|
||||
kind string `yaml:"kind"`
|
||||
authRequired []string `yaml:"authRequired"`
|
||||
allParams parameters.Parameters `yaml:"allParams"`
|
||||
pool *pgxpool.Pool
|
||||
manifest tools.Manifest
|
||||
mcpManifest tools.McpManifest
|
||||
}
|
||||
|
||||
func (t Tool) ToConfig() tools.ToolConfig {
|
||||
return t.Config
|
||||
}
|
||||
|
||||
func (t Tool) Invoke(ctx context.Context, params parameters.ParamValues, accessToken tools.AccessToken) (any, error) {
|
||||
paramsMap := params.AsMap()
|
||||
|
||||
newParams, err := parameters.GetParams(t.allParams, paramsMap)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to extract standard params %w", err)
|
||||
}
|
||||
sliceParams := newParams.AsSlice()
|
||||
|
||||
results, err := t.pool.Query(ctx, listLocks, sliceParams...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to execute query: %w", err)
|
||||
}
|
||||
defer results.Close()
|
||||
|
||||
fields := results.FieldDescriptions()
|
||||
var out []map[string]any
|
||||
|
||||
for results.Next() {
|
||||
values, err := results.Values()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to parse row: %w", err)
|
||||
}
|
||||
rowMap := make(map[string]any)
|
||||
for i, field := range fields {
|
||||
rowMap[string(field.Name)] = values[i]
|
||||
}
|
||||
out = append(out, rowMap)
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {
|
||||
return parameters.ParseParams(t.allParams, data, claims)
|
||||
}
|
||||
|
||||
func (t Tool) Manifest() tools.Manifest {
|
||||
return t.manifest
|
||||
}
|
||||
|
||||
func (t Tool) McpManifest() tools.McpManifest {
|
||||
return t.mcpManifest
|
||||
}
|
||||
|
||||
func (t Tool) Authorized(verifiedAuthServices []string) bool {
|
||||
return tools.IsAuthorized(t.authRequired, verifiedAuthServices)
|
||||
}
|
||||
|
||||
func (t Tool) RequiresClientAuthorization() bool {
|
||||
return false
|
||||
}
|
||||
@@ -0,0 +1,95 @@
|
||||
// Copyright 2025 Google LLC
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package postgreslistlocks_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
yaml "github.com/goccy/go-yaml"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/googleapis/genai-toolbox/internal/server"
|
||||
"github.com/googleapis/genai-toolbox/internal/testutils"
|
||||
"github.com/googleapis/genai-toolbox/internal/tools/postgres/postgreslistlocks"
|
||||
)
|
||||
|
||||
func TestParseFromYamlPostgresListLocks(t *testing.T) {
|
||||
ctx, err := testutils.ContextWithNewLogger()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
tcs := []struct {
|
||||
desc string
|
||||
in string
|
||||
want server.ToolConfigs
|
||||
}{
|
||||
{
|
||||
desc: "basic example",
|
||||
in: `
|
||||
tools:
|
||||
example_tool:
|
||||
kind: postgres-list-locks
|
||||
source: my-postgres-instance
|
||||
description: some description
|
||||
authRequired:
|
||||
- my-google-auth-service
|
||||
- other-auth-service
|
||||
`,
|
||||
want: server.ToolConfigs{
|
||||
"example_tool": postgreslistlocks.Config{
|
||||
Name: "example_tool",
|
||||
Kind: "postgres-list-locks",
|
||||
Source: "my-postgres-instance",
|
||||
Description: "some description",
|
||||
AuthRequired: []string{"my-google-auth-service", "other-auth-service"},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "basic example",
|
||||
in: `
|
||||
tools:
|
||||
example_tool:
|
||||
kind: postgres-list-locks
|
||||
source: my-postgres-instance
|
||||
description: some description
|
||||
`,
|
||||
want: server.ToolConfigs{
|
||||
"example_tool": postgreslistlocks.Config{
|
||||
Name: "example_tool",
|
||||
Kind: "postgres-list-locks",
|
||||
Source: "my-postgres-instance",
|
||||
Description: "some description",
|
||||
AuthRequired: []string{},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
got := struct {
|
||||
Tools server.ToolConfigs `yaml:"tools"`
|
||||
}{}
|
||||
// Parse contents
|
||||
err := yaml.UnmarshalContext(ctx, testutils.FormatYaml(tc.in), &got)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to unmarshal: %s", err)
|
||||
}
|
||||
if diff := cmp.Diff(tc.want, got.Tools); diff != "" {
|
||||
t.Fatalf("incorrect parse: diff %v", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,211 @@
|
||||
// Copyright 2025 Google LLC
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package postgreslongrunningtransactions
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
yaml "github.com/goccy/go-yaml"
|
||||
"github.com/googleapis/genai-toolbox/internal/sources"
|
||||
"github.com/googleapis/genai-toolbox/internal/sources/alloydbpg"
|
||||
"github.com/googleapis/genai-toolbox/internal/sources/cloudsqlpg"
|
||||
"github.com/googleapis/genai-toolbox/internal/sources/postgres"
|
||||
"github.com/googleapis/genai-toolbox/internal/tools"
|
||||
"github.com/googleapis/genai-toolbox/internal/util/parameters"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
)
|
||||
|
||||
const kind string = "postgres-long-running-transactions"
|
||||
|
||||
const longRunningTransactions = `
|
||||
SELECT
|
||||
pid,
|
||||
datname,
|
||||
usename,
|
||||
application_name as appname,
|
||||
client_addr,
|
||||
state,
|
||||
now() - backend_start as conn_age,
|
||||
now() - xact_start as xact_age,
|
||||
now() - query_start as query_age,
|
||||
now() - state_change as last_activity_age,
|
||||
wait_event_type,
|
||||
wait_event,
|
||||
query
|
||||
FROM
|
||||
pg_stat_activity
|
||||
WHERE
|
||||
state <> 'idle'
|
||||
AND (now() - xact_start) > COALESCE($1::INTERVAL, interval '5 minutes')
|
||||
AND xact_start IS NOT NULL
|
||||
AND pid <> pg_backend_pid()
|
||||
ORDER BY
|
||||
xact_age DESC
|
||||
LIMIT
|
||||
COALESCE($2::int, 20);
|
||||
`
|
||||
|
||||
func init() {
|
||||
if !tools.Register(kind, newConfig) {
|
||||
panic(fmt.Sprintf("tool kind %q already registered", kind))
|
||||
}
|
||||
}
|
||||
|
||||
func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.ToolConfig, error) {
|
||||
actual := Config{Name: name}
|
||||
if err := decoder.DecodeContext(ctx, &actual); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return actual, nil
|
||||
}
|
||||
|
||||
type compatibleSource interface {
|
||||
PostgresPool() *pgxpool.Pool
|
||||
}
|
||||
|
||||
// validate compatible sources are still compatible
|
||||
var _ compatibleSource = &alloydbpg.Source{}
|
||||
var _ compatibleSource = &cloudsqlpg.Source{}
|
||||
var _ compatibleSource = &postgres.Source{}
|
||||
|
||||
var compatibleSources = [...]string{alloydbpg.SourceKind, cloudsqlpg.SourceKind, postgres.SourceKind}
|
||||
|
||||
type Config struct {
|
||||
Name string `yaml:"name" validate:"required"`
|
||||
Kind string `yaml:"kind" validate:"required"`
|
||||
Source string `yaml:"source" validate:"required"`
|
||||
Description string `yaml:"description"`
|
||||
AuthRequired []string `yaml:"authRequired"`
|
||||
}
|
||||
|
||||
// validate interface
|
||||
var _ tools.ToolConfig = Config{}
|
||||
|
||||
func (cfg Config) ToolConfigKind() string {
|
||||
return kind
|
||||
}
|
||||
|
||||
func (cfg Config) Initialize(srcs map[string]sources.Source) (tools.Tool, error) {
|
||||
// verify source exists
|
||||
rawS, ok := srcs[cfg.Source]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("no source named %q configured", cfg.Source)
|
||||
}
|
||||
|
||||
// verify the source is compatible
|
||||
s, ok := rawS.(compatibleSource)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("invalid source for %q tool: source kind must be one of %q", kind, compatibleSources)
|
||||
}
|
||||
|
||||
allParameters := parameters.Parameters{
|
||||
parameters.NewStringParameterWithDefault("min_duration", "5 minutes", "Optional: Only show transactions running at least this long (e.g., '1 minute', '15 minutes', '30 seconds')."),
|
||||
parameters.NewIntParameterWithDefault("limit", 20, "Optional: The maximum number of long-running transactions to return. Defaults to 20."),
|
||||
}
|
||||
paramManifest := allParameters.Manifest()
|
||||
|
||||
if cfg.Description == "" {
|
||||
cfg.Description = "Identifies and lists database transactions that exceed a specified time limit. For each of the long running transactions, the output contains the process id, database name, user name, application name, client address, state, connection age, transaction age, query age, last activity age, wait event type, wait event, and query string."
|
||||
}
|
||||
|
||||
mcpManifest := tools.GetMcpManifest(cfg.Name, cfg.Description, cfg.AuthRequired, allParameters)
|
||||
|
||||
// finish tool setup
|
||||
return Tool{
|
||||
name: cfg.Name,
|
||||
kind: cfg.Kind,
|
||||
authRequired: cfg.AuthRequired,
|
||||
allParams: allParameters,
|
||||
pool: s.PostgresPool(),
|
||||
manifest: tools.Manifest{
|
||||
Description: cfg.Description,
|
||||
Parameters: paramManifest,
|
||||
AuthRequired: cfg.AuthRequired,
|
||||
},
|
||||
mcpManifest: mcpManifest,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// validate interface
|
||||
var _ tools.Tool = Tool{}
|
||||
|
||||
type Tool struct {
|
||||
Config
|
||||
name string `yaml:"name"`
|
||||
kind string `yaml:"kind"`
|
||||
authRequired []string `yaml:"authRequired"`
|
||||
allParams parameters.Parameters `yaml:"allParams"`
|
||||
pool *pgxpool.Pool
|
||||
manifest tools.Manifest
|
||||
mcpManifest tools.McpManifest
|
||||
}
|
||||
|
||||
func (t Tool) ToConfig() tools.ToolConfig {
|
||||
return t.Config
|
||||
}
|
||||
|
||||
func (t Tool) Invoke(ctx context.Context, params parameters.ParamValues, accessToken tools.AccessToken) (any, error) {
|
||||
paramsMap := params.AsMap()
|
||||
|
||||
newParams, err := parameters.GetParams(t.allParams, paramsMap)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to extract standard params %w", err)
|
||||
}
|
||||
sliceParams := newParams.AsSlice()
|
||||
|
||||
results, err := t.pool.Query(ctx, longRunningTransactions, sliceParams...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to execute query: %w", err)
|
||||
}
|
||||
defer results.Close()
|
||||
|
||||
fields := results.FieldDescriptions()
|
||||
var out []map[string]any
|
||||
|
||||
for results.Next() {
|
||||
values, err := results.Values()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to parse row: %w", err)
|
||||
}
|
||||
rowMap := make(map[string]any)
|
||||
for i, field := range fields {
|
||||
rowMap[string(field.Name)] = values[i]
|
||||
}
|
||||
out = append(out, rowMap)
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {
|
||||
return parameters.ParseParams(t.allParams, data, claims)
|
||||
}
|
||||
|
||||
func (t Tool) Manifest() tools.Manifest {
|
||||
return t.manifest
|
||||
}
|
||||
|
||||
func (t Tool) McpManifest() tools.McpManifest {
|
||||
return t.mcpManifest
|
||||
}
|
||||
|
||||
func (t Tool) Authorized(verifiedAuthServices []string) bool {
|
||||
return tools.IsAuthorized(t.authRequired, verifiedAuthServices)
|
||||
}
|
||||
|
||||
func (t Tool) RequiresClientAuthorization() bool {
|
||||
return false
|
||||
}
|
||||
@@ -0,0 +1,95 @@
|
||||
// Copyright 2025 Google LLC
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package postgreslongrunningtransactions_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
yaml "github.com/goccy/go-yaml"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/googleapis/genai-toolbox/internal/server"
|
||||
"github.com/googleapis/genai-toolbox/internal/testutils"
|
||||
"github.com/googleapis/genai-toolbox/internal/tools/postgres/postgreslongrunningtransactions"
|
||||
)
|
||||
|
||||
func TestParseFromYamlPostgresLongRunningTransactions(t *testing.T) {
|
||||
ctx, err := testutils.ContextWithNewLogger()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
tcs := []struct {
|
||||
desc string
|
||||
in string
|
||||
want server.ToolConfigs
|
||||
}{
|
||||
{
|
||||
desc: "basic example",
|
||||
in: `
|
||||
tools:
|
||||
example_tool:
|
||||
kind: postgres-long-running-transactions
|
||||
source: my-postgres-instance
|
||||
description: some description
|
||||
authRequired:
|
||||
- my-google-auth-service
|
||||
- other-auth-service
|
||||
`,
|
||||
want: server.ToolConfigs{
|
||||
"example_tool": postgreslongrunningtransactions.Config{
|
||||
Name: "example_tool",
|
||||
Kind: "postgres-long-running-transactions",
|
||||
Source: "my-postgres-instance",
|
||||
Description: "some description",
|
||||
AuthRequired: []string{"my-google-auth-service", "other-auth-service"},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "basic example",
|
||||
in: `
|
||||
tools:
|
||||
example_tool:
|
||||
kind: postgres-long-running-transactions
|
||||
source: my-postgres-instance
|
||||
description: some description
|
||||
`,
|
||||
want: server.ToolConfigs{
|
||||
"example_tool": postgreslongrunningtransactions.Config{
|
||||
Name: "example_tool",
|
||||
Kind: "postgres-long-running-transactions",
|
||||
Source: "my-postgres-instance",
|
||||
Description: "some description",
|
||||
AuthRequired: []string{},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
got := struct {
|
||||
Tools server.ToolConfigs `yaml:"tools"`
|
||||
}{}
|
||||
// Parse contents
|
||||
err := yaml.UnmarshalContext(ctx, testutils.FormatYaml(tc.in), &got)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to unmarshal: %s", err)
|
||||
}
|
||||
if diff := cmp.Diff(tc.want, got.Tools); diff != "" {
|
||||
t.Fatalf("incorrect parse: diff %v", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,198 @@
|
||||
// Copyright 2025 Google LLC
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package postgresreplicationstats
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
yaml "github.com/goccy/go-yaml"
|
||||
"github.com/googleapis/genai-toolbox/internal/sources"
|
||||
"github.com/googleapis/genai-toolbox/internal/sources/alloydbpg"
|
||||
"github.com/googleapis/genai-toolbox/internal/sources/cloudsqlpg"
|
||||
"github.com/googleapis/genai-toolbox/internal/sources/postgres"
|
||||
"github.com/googleapis/genai-toolbox/internal/tools"
|
||||
"github.com/googleapis/genai-toolbox/internal/util/parameters"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
)
|
||||
|
||||
const kind string = "postgres-replication-stats"
|
||||
|
||||
const replicationStats = `
|
||||
SELECT
|
||||
pid,
|
||||
usename,
|
||||
application_name,
|
||||
backend_xmin,
|
||||
client_addr,
|
||||
state,
|
||||
sync_state,
|
||||
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), sent_lsn)) AS sent_lag,
|
||||
pg_size_pretty(pg_wal_lsn_diff(sent_lsn, write_lsn)) AS write_lag,
|
||||
pg_size_pretty(pg_wal_lsn_diff(write_lsn, flush_lsn)) AS flush_lag,
|
||||
pg_size_pretty(pg_wal_lsn_diff(flush_lsn, replay_lsn)) AS replay_lag,
|
||||
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), replay_lsn)) AS total_lag
|
||||
FROM
|
||||
pg_stat_replication;
|
||||
`
|
||||
|
||||
func init() {
|
||||
if !tools.Register(kind, newConfig) {
|
||||
panic(fmt.Sprintf("tool kind %q already registered", kind))
|
||||
}
|
||||
}
|
||||
|
||||
func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.ToolConfig, error) {
|
||||
actual := Config{Name: name}
|
||||
if err := decoder.DecodeContext(ctx, &actual); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return actual, nil
|
||||
}
|
||||
|
||||
type compatibleSource interface {
|
||||
PostgresPool() *pgxpool.Pool
|
||||
}
|
||||
|
||||
// validate compatible sources are still compatible
|
||||
var _ compatibleSource = &alloydbpg.Source{}
|
||||
var _ compatibleSource = &cloudsqlpg.Source{}
|
||||
var _ compatibleSource = &postgres.Source{}
|
||||
|
||||
var compatibleSources = [...]string{alloydbpg.SourceKind, cloudsqlpg.SourceKind, postgres.SourceKind}
|
||||
|
||||
type Config struct {
|
||||
Name string `yaml:"name" validate:"required"`
|
||||
Kind string `yaml:"kind" validate:"required"`
|
||||
Source string `yaml:"source" validate:"required"`
|
||||
Description string `yaml:"description"`
|
||||
AuthRequired []string `yaml:"authRequired"`
|
||||
}
|
||||
|
||||
// validate interface
|
||||
var _ tools.ToolConfig = Config{}
|
||||
|
||||
func (cfg Config) ToolConfigKind() string {
|
||||
return kind
|
||||
}
|
||||
|
||||
func (cfg Config) Initialize(srcs map[string]sources.Source) (tools.Tool, error) {
|
||||
// verify source exists
|
||||
rawS, ok := srcs[cfg.Source]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("no source named %q configured", cfg.Source)
|
||||
}
|
||||
|
||||
// verify the source is compatible
|
||||
s, ok := rawS.(compatibleSource)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("invalid source for %q tool: source kind must be one of %q", kind, compatibleSources)
|
||||
}
|
||||
|
||||
allParameters := parameters.Parameters{}
|
||||
paramManifest := allParameters.Manifest()
|
||||
|
||||
if cfg.Description == "" {
|
||||
cfg.Description = "Lists each replica's process ID, user name, application name, backend_xmin (standby's xmin horizon reported by hot_standby_feedback), client IP address, connection state, and sync_state, along with lag sizes in bytes for sent_lag (primary to sent), write_lag (sent to written), flush_lag (written to flushed), replay_lag (flushed to replayed), and the overall total_lag (primary to replayed)."
|
||||
}
|
||||
|
||||
mcpManifest := tools.GetMcpManifest(cfg.Name, cfg.Description, cfg.AuthRequired, allParameters)
|
||||
|
||||
// finish tool setup
|
||||
return Tool{
|
||||
name: cfg.Name,
|
||||
kind: cfg.Kind,
|
||||
authRequired: cfg.AuthRequired,
|
||||
allParams: allParameters,
|
||||
pool: s.PostgresPool(),
|
||||
manifest: tools.Manifest{
|
||||
Description: cfg.Description,
|
||||
Parameters: paramManifest,
|
||||
AuthRequired: cfg.AuthRequired,
|
||||
},
|
||||
mcpManifest: mcpManifest,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// validate interface
|
||||
var _ tools.Tool = Tool{}
|
||||
|
||||
type Tool struct {
|
||||
Config
|
||||
name string `yaml:"name"`
|
||||
kind string `yaml:"kind"`
|
||||
authRequired []string `yaml:"authRequired"`
|
||||
allParams parameters.Parameters `yaml:"allParams"`
|
||||
pool *pgxpool.Pool
|
||||
manifest tools.Manifest
|
||||
mcpManifest tools.McpManifest
|
||||
}
|
||||
|
||||
func (t Tool) ToConfig() tools.ToolConfig {
|
||||
return t.Config
|
||||
}
|
||||
|
||||
func (t Tool) Invoke(ctx context.Context, params parameters.ParamValues, accessToken tools.AccessToken) (any, error) {
|
||||
paramsMap := params.AsMap()
|
||||
|
||||
newParams, err := parameters.GetParams(t.allParams, paramsMap)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to extract standard params %w", err)
|
||||
}
|
||||
sliceParams := newParams.AsSlice()
|
||||
|
||||
results, err := t.pool.Query(ctx, replicationStats, sliceParams...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to execute query: %w", err)
|
||||
}
|
||||
defer results.Close()
|
||||
|
||||
fields := results.FieldDescriptions()
|
||||
var out []map[string]any
|
||||
|
||||
for results.Next() {
|
||||
values, err := results.Values()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to parse row: %w", err)
|
||||
}
|
||||
rowMap := make(map[string]any)
|
||||
for i, field := range fields {
|
||||
rowMap[string(field.Name)] = values[i]
|
||||
}
|
||||
out = append(out, rowMap)
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {
|
||||
return parameters.ParseParams(t.allParams, data, claims)
|
||||
}
|
||||
|
||||
func (t Tool) Manifest() tools.Manifest {
|
||||
return t.manifest
|
||||
}
|
||||
|
||||
func (t Tool) McpManifest() tools.McpManifest {
|
||||
return t.mcpManifest
|
||||
}
|
||||
|
||||
func (t Tool) Authorized(verifiedAuthServices []string) bool {
|
||||
return tools.IsAuthorized(t.authRequired, verifiedAuthServices)
|
||||
}
|
||||
|
||||
func (t Tool) RequiresClientAuthorization() bool {
|
||||
return false
|
||||
}
|
||||
@@ -0,0 +1,95 @@
|
||||
// Copyright 2025 Google LLC
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package postgresreplicationstats_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
yaml "github.com/goccy/go-yaml"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/googleapis/genai-toolbox/internal/server"
|
||||
"github.com/googleapis/genai-toolbox/internal/testutils"
|
||||
"github.com/googleapis/genai-toolbox/internal/tools/postgres/postgresreplicationstats"
|
||||
)
|
||||
|
||||
func TestParseFromYamlPostgresReplicationStats(t *testing.T) {
|
||||
ctx, err := testutils.ContextWithNewLogger()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
tcs := []struct {
|
||||
desc string
|
||||
in string
|
||||
want server.ToolConfigs
|
||||
}{
|
||||
{
|
||||
desc: "basic example",
|
||||
in: `
|
||||
tools:
|
||||
example_tool:
|
||||
kind: postgres-replication-stats
|
||||
source: my-postgres-instance
|
||||
description: some description
|
||||
authRequired:
|
||||
- my-google-auth-service
|
||||
- other-auth-service
|
||||
`,
|
||||
want: server.ToolConfigs{
|
||||
"example_tool": postgresreplicationstats.Config{
|
||||
Name: "example_tool",
|
||||
Kind: "postgres-replication-stats",
|
||||
Source: "my-postgres-instance",
|
||||
Description: "some description",
|
||||
AuthRequired: []string{"my-google-auth-service", "other-auth-service"},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "basic example",
|
||||
in: `
|
||||
tools:
|
||||
example_tool:
|
||||
kind: postgres-replication-stats
|
||||
source: my-postgres-instance
|
||||
description: some description
|
||||
`,
|
||||
want: server.ToolConfigs{
|
||||
"example_tool": postgresreplicationstats.Config{
|
||||
Name: "example_tool",
|
||||
Kind: "postgres-replication-stats",
|
||||
Source: "my-postgres-instance",
|
||||
Description: "some description",
|
||||
AuthRequired: []string{},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
got := struct {
|
||||
Tools server.ToolConfigs `yaml:"tools"`
|
||||
}{}
|
||||
// Parse contents
|
||||
err := yaml.UnmarshalContext(ctx, testutils.FormatYaml(tc.in), &got)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to unmarshal: %s", err)
|
||||
}
|
||||
if diff := cmp.Diff(tc.want, got.Tools); diff != "" {
|
||||
t.Fatalf("incorrect parse: diff %v", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
@@ -190,6 +190,9 @@ func TestAlloyDBPgToolEndpoints(t *testing.T) {
|
||||
tests.RunPostgresListTriggersTest(t, ctx, pool)
|
||||
tests.RunPostgresListIndexesTest(t, ctx, pool)
|
||||
tests.RunPostgresListSequencesTest(t, ctx, pool)
|
||||
tests.RunPostgresListLocksTest(t, ctx, pool)
|
||||
tests.RunPostgresReplicationStatsTest(t, ctx, pool)
|
||||
tests.RunPostgresLongRunningTransactionsTest(t, ctx, pool)
|
||||
}
|
||||
|
||||
// Test connection with different IP type
|
||||
|
||||
@@ -174,6 +174,9 @@ func TestCloudSQLPgSimpleToolEndpoints(t *testing.T) {
|
||||
tests.RunPostgresListTriggersTest(t, ctx, pool)
|
||||
tests.RunPostgresListIndexesTest(t, ctx, pool)
|
||||
tests.RunPostgresListSequencesTest(t, ctx, pool)
|
||||
tests.RunPostgresListLocksTest(t, ctx, pool)
|
||||
tests.RunPostgresReplicationStatsTest(t, ctx, pool)
|
||||
tests.RunPostgresLongRunningTransactionsTest(t, ctx, pool)
|
||||
}
|
||||
|
||||
// Test connection with different IP type
|
||||
|
||||
@@ -202,6 +202,9 @@ func AddPostgresPrebuiltConfig(t *testing.T, config map[string]any) map[string]a
|
||||
PostgresListTriggersToolKind = "postgres-list-triggers"
|
||||
PostgresListIndexesToolKind = "postgres-list-indexes"
|
||||
PostgresListSequencesToolKind = "postgres-list-sequences"
|
||||
PostgresLongRunningTransactionsToolKind = "postgres-long-running-transactions"
|
||||
PostgresListLocksToolKind = "postgres-list-locks"
|
||||
PostgresReplicationStatsToolKind = "postgres-replication-stats"
|
||||
)
|
||||
|
||||
tools, ok := config["tools"].(map[string]any)
|
||||
@@ -260,6 +263,18 @@ func AddPostgresPrebuiltConfig(t *testing.T, config map[string]any) map[string]a
|
||||
"source": "my-instance",
|
||||
}
|
||||
|
||||
tools["long_running_transactions"] = map[string]any{
|
||||
"kind": PostgresLongRunningTransactionsToolKind,
|
||||
"source": "my-instance",
|
||||
}
|
||||
tools["list_locks"] = map[string]any{
|
||||
"kind": PostgresListLocksToolKind,
|
||||
"source": "my-instance",
|
||||
}
|
||||
tools["replication_stats"] = map[string]any{
|
||||
"kind": PostgresReplicationStatsToolKind,
|
||||
"source": "my-instance",
|
||||
}
|
||||
config["tools"] = tools
|
||||
return config
|
||||
}
|
||||
|
||||
@@ -153,4 +153,7 @@ func TestPostgres(t *testing.T) {
|
||||
tests.RunPostgresListTriggersTest(t, ctx, pool)
|
||||
tests.RunPostgresListIndexesTest(t, ctx, pool)
|
||||
tests.RunPostgresListSequencesTest(t, ctx, pool)
|
||||
tests.RunPostgresLongRunningTransactionsTest(t, ctx, pool)
|
||||
tests.RunPostgresListLocksTest(t, ctx, pool)
|
||||
tests.RunPostgresReplicationStatsTest(t, ctx, pool)
|
||||
}
|
||||
|
||||
216
tests/tool.go
216
tests/tool.go
@@ -2947,6 +2947,222 @@ func RunMSSQLListTablesTest(t *testing.T, tableNameParam, tableNameAuth string)
|
||||
}
|
||||
}
|
||||
|
||||
// RunPostgresListLocksTest runs tests for the postgres list-locks tool
|
||||
func RunPostgresListLocksTest(t *testing.T, ctx context.Context, pool *pgxpool.Pool) {
|
||||
type lockDetails struct {
|
||||
Pid any `json:"pid"`
|
||||
Usename string `json:"usename"`
|
||||
Database string `json:"database"`
|
||||
RelName string `json:"relname"`
|
||||
LockType string `json:"locktype"`
|
||||
Mode string `json:"mode"`
|
||||
Granted bool `json:"granted"`
|
||||
FastPath bool `json:"fastpath"`
|
||||
VirtualXid any `json:"virtualxid"`
|
||||
TransactionId any `json:"transactionid"`
|
||||
ClassId any `json:"classid"`
|
||||
ObjId any `json:"objid"`
|
||||
ObjSubId any `json:"objsubid"`
|
||||
PageNumber any `json:"page"`
|
||||
TupleNumber any `json:"tuple"`
|
||||
VirtualBlock any `json:"virtualblock"`
|
||||
BlockNumber any `json:"blockno"`
|
||||
}
|
||||
|
||||
invokeTcs := []struct {
|
||||
name string
|
||||
requestBody io.Reader
|
||||
wantStatusCode int
|
||||
expectResults bool
|
||||
}{
|
||||
{
|
||||
name: "invoke list_locks with no arguments",
|
||||
requestBody: bytes.NewBuffer([]byte(`{}`)),
|
||||
wantStatusCode: http.StatusOK,
|
||||
expectResults: false, // locks may or may not exist
|
||||
},
|
||||
}
|
||||
for _, tc := range invokeTcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
const api = "http://127.0.0.1:5000/api/tool/list_locks/invoke"
|
||||
resp, respBody := RunRequest(t, http.MethodPost, api, tc.requestBody, nil)
|
||||
if resp.StatusCode != tc.wantStatusCode {
|
||||
t.Fatalf("wrong status code: got %d, want %d, body: %s", resp.StatusCode, tc.wantStatusCode, string(respBody))
|
||||
}
|
||||
if tc.wantStatusCode != http.StatusOK {
|
||||
return
|
||||
}
|
||||
|
||||
var bodyWrapper struct {
|
||||
Result json.RawMessage `json:"result"`
|
||||
}
|
||||
if err := json.Unmarshal(respBody, &bodyWrapper); err != nil {
|
||||
t.Fatalf("error decoding response wrapper: %v", err)
|
||||
}
|
||||
|
||||
var resultString string
|
||||
if err := json.Unmarshal(bodyWrapper.Result, &resultString); err != nil {
|
||||
resultString = string(bodyWrapper.Result)
|
||||
}
|
||||
|
||||
var got []lockDetails
|
||||
if resultString != "null" {
|
||||
if err := json.Unmarshal([]byte(resultString), &got); err != nil {
|
||||
t.Fatalf("failed to unmarshal result: %v, result string: %s", err, resultString)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify that if results exist, they have the expected structure
|
||||
for _, lock := range got {
|
||||
if lock.LockType == "" {
|
||||
t.Errorf("lock type should not be empty")
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// RunPostgresLongRunningTransactionsTest runs tests for the postgres long-running-transactions tool
|
||||
func RunPostgresLongRunningTransactionsTest(t *testing.T, ctx context.Context, pool *pgxpool.Pool) {
|
||||
type transactionDetails struct {
|
||||
Pid any `json:"pid"`
|
||||
Usename string `json:"usename"`
|
||||
Database string `json:"database"`
|
||||
ApplicationName string `json:"application_name"`
|
||||
XactStart any `json:"xact_start"`
|
||||
XactDurationSecs any `json:"xact_duration_secs"`
|
||||
IdleInTransaction string `json:"idle_in_transaction"`
|
||||
Query string `json:"query"`
|
||||
}
|
||||
|
||||
invokeTcs := []struct {
|
||||
name string
|
||||
requestBody io.Reader
|
||||
wantStatusCode int
|
||||
}{
|
||||
{
|
||||
name: "invoke long_running_transactions with default threshold",
|
||||
requestBody: bytes.NewBuffer([]byte(`{}`)),
|
||||
wantStatusCode: http.StatusOK,
|
||||
},
|
||||
{
|
||||
name: "invoke long_running_transactions with custom threshold",
|
||||
requestBody: bytes.NewBuffer([]byte(`{"min_transaction_duration_secs": 3600}`)),
|
||||
wantStatusCode: http.StatusOK,
|
||||
},
|
||||
}
|
||||
for _, tc := range invokeTcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
const api = "http://127.0.0.1:5000/api/tool/long_running_transactions/invoke"
|
||||
resp, respBody := RunRequest(t, http.MethodPost, api, tc.requestBody, nil)
|
||||
if resp.StatusCode != tc.wantStatusCode {
|
||||
t.Fatalf("wrong status code: got %d, want %d, body: %s", resp.StatusCode, tc.wantStatusCode, string(respBody))
|
||||
}
|
||||
if tc.wantStatusCode != http.StatusOK {
|
||||
return
|
||||
}
|
||||
|
||||
var bodyWrapper struct {
|
||||
Result json.RawMessage `json:"result"`
|
||||
}
|
||||
if err := json.Unmarshal(respBody, &bodyWrapper); err != nil {
|
||||
t.Fatalf("error decoding response wrapper: %v", err)
|
||||
}
|
||||
|
||||
var resultString string
|
||||
if err := json.Unmarshal(bodyWrapper.Result, &resultString); err != nil {
|
||||
resultString = string(bodyWrapper.Result)
|
||||
}
|
||||
|
||||
var got []transactionDetails
|
||||
if resultString != "null" {
|
||||
if err := json.Unmarshal([]byte(resultString), &got); err != nil {
|
||||
t.Fatalf("failed to unmarshal result: %v, result string: %s", err, resultString)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify that if results exist, they have the expected structure
|
||||
for _, tx := range got {
|
||||
if tx.XactDurationSecs == nil {
|
||||
t.Errorf("transaction duration should not be null for long-running transactions")
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// RunPostgresReplicationStatsTest runs tests for the postgres replication-stats tool
|
||||
func RunPostgresReplicationStatsTest(t *testing.T, ctx context.Context, pool *pgxpool.Pool) {
|
||||
type replicationStats struct {
|
||||
ClientAddr string `json:"client_addr"`
|
||||
Username string `json:"usename"`
|
||||
ApplicationName string `json:"application_name"`
|
||||
ClientHostname string `json:"client_hostname"`
|
||||
BackendStart any `json:"backend_start"`
|
||||
State string `json:"state"`
|
||||
SyncState string `json:"sync_state"`
|
||||
ReplyTime any `json:"reply_time"`
|
||||
FlushLsn string `json:"flush_lsn"`
|
||||
ReplayLsn string `json:"replay_lsn"`
|
||||
WriteLag any `json:"write_lag"`
|
||||
FlushLag any `json:"flush_lag"`
|
||||
ReplayLag any `json:"replay_lag"`
|
||||
SyncPriority any `json:"sync_priority"`
|
||||
ReplicationSlotName any `json:"slot_name"`
|
||||
IsStreaming bool `json:"is_streaming"`
|
||||
}
|
||||
|
||||
invokeTcs := []struct {
|
||||
name string
|
||||
requestBody io.Reader
|
||||
wantStatusCode int
|
||||
}{
|
||||
{
|
||||
name: "invoke replication_stats with no arguments",
|
||||
requestBody: bytes.NewBuffer([]byte(`{}`)),
|
||||
wantStatusCode: http.StatusOK,
|
||||
},
|
||||
}
|
||||
for _, tc := range invokeTcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
const api = "http://127.0.0.1:5000/api/tool/replication_stats/invoke"
|
||||
resp, respBody := RunRequest(t, http.MethodPost, api, tc.requestBody, nil)
|
||||
if resp.StatusCode != tc.wantStatusCode {
|
||||
t.Fatalf("wrong status code: got %d, want %d, body: %s", resp.StatusCode, tc.wantStatusCode, string(respBody))
|
||||
}
|
||||
if tc.wantStatusCode != http.StatusOK {
|
||||
return
|
||||
}
|
||||
|
||||
var bodyWrapper struct {
|
||||
Result json.RawMessage `json:"result"`
|
||||
}
|
||||
if err := json.Unmarshal(respBody, &bodyWrapper); err != nil {
|
||||
t.Fatalf("error decoding response wrapper: %v", err)
|
||||
}
|
||||
|
||||
var resultString string
|
||||
if err := json.Unmarshal(bodyWrapper.Result, &resultString); err != nil {
|
||||
resultString = string(bodyWrapper.Result)
|
||||
}
|
||||
|
||||
var got []replicationStats
|
||||
if resultString != "null" {
|
||||
if err := json.Unmarshal([]byte(resultString), &got); err != nil {
|
||||
t.Fatalf("failed to unmarshal result: %v, result string: %s", err, resultString)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify that if results exist, they have the expected structure
|
||||
for _, stat := range got {
|
||||
if stat.State == "" {
|
||||
t.Errorf("replication state should not be empty")
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// RunRequest is a helper function to send HTTP requests and return the response
|
||||
func RunRequest(t *testing.T, method, url string, body io.Reader, headers map[string]string) (*http.Response, []byte) {
|
||||
// Send request
|
||||
|
||||
Reference in New Issue
Block a user