feat(tools/mysql-list-active-queries): Add a new tool to list ongoing queries in a MySQL instance (#1471)

## Description

---
This PR introduces a new MySQL tool `mysql-list-active-queries`. It adds
a new kind `mysql-list-active-queries` that returns the top N currently
running queries, pulled from `information_schema.innodb_trx` and
`information_schema.processlist`. The list is ordered by elapsed time in
descending order.

Parameters supported

- `min_duration_secs` (optional) — only include queries running at least
this many seconds
- `limit` (optional) — max rows to return (default 10).

## PR Checklist

---
> Thank you for opening a Pull Request! Before submitting your PR, there
are a
> few things you can do to make sure it goes smoothly:

- [x] Make sure you reviewed

[CONTRIBUTING.md](https://github.com/googleapis/genai-toolbox/blob/main/CONTRIBUTING.md)
- [x] Make sure to open an issue as a

[bug/issue](https://github.com/googleapis/genai-toolbox/issues/new/choose)
before writing your code! That way we can discuss the change, evaluate
  designs, and agree on the general idea
- [x] Ensure the tests and linter pass
- [x] Code coverage does not decrease (if any source code was changed)
- [x] Appropriate docs were updated (if necessary)
- [x] Make sure to add `!` if this involves a breaking change

---------

Co-authored-by: Averi Kitsch <akitsch@google.com>
This commit is contained in:
shuzhou-gc
2025-09-17 13:59:29 -07:00
committed by GitHub
parent e483cc77b7
commit ed54cd6cfd
10 changed files with 617 additions and 22 deletions

View File

@@ -121,6 +121,7 @@ import (
_ "github.com/googleapis/genai-toolbox/internal/tools/mssql/mssqllisttables"
_ "github.com/googleapis/genai-toolbox/internal/tools/mssql/mssqlsql"
_ "github.com/googleapis/genai-toolbox/internal/tools/mysql/mysqlexecutesql"
_ "github.com/googleapis/genai-toolbox/internal/tools/mysql/mysqllistactivequeries"
_ "github.com/googleapis/genai-toolbox/internal/tools/mysql/mysqllisttables"
_ "github.com/googleapis/genai-toolbox/internal/tools/mysql/mysqlsql"
_ "github.com/googleapis/genai-toolbox/internal/tools/neo4j/neo4jcypher"

View File

@@ -1429,7 +1429,7 @@ func TestPrebuiltTools(t *testing.T) {
wantToolset: server.ToolsetConfigs{
"cloud_sql_mysql_database_tools": tools.ToolsetConfig{
Name: "cloud_sql_mysql_database_tools",
ToolNames: []string{"execute_sql", "get_query_plan", "list_tables"},
ToolNames: []string{"execute_sql", "list_tables", "get_query_plan", "list_active_queries"},
},
},
},
@@ -1469,7 +1469,7 @@ func TestPrebuiltTools(t *testing.T) {
wantToolset: server.ToolsetConfigs{
"mysql_database_tools": tools.ToolsetConfig{
Name: "mysql_database_tools",
ToolNames: []string{"execute_sql", "get_query_plan", "list_tables"},
ToolNames: []string{"execute_sql", "list_tables", "get_query_plan", "list_active_queries"},
},
},
},

View File

@@ -22,18 +22,21 @@ to a database by following these instructions][csql-mysql-quickstart].
## Available Tools
- [`mysql-sql`](../tools/mysql/mysql-sql.md)
- [`mysql-sql`](../tools/mysql/mysql-sql.md)
Execute pre-defined prepared SQL queries in MySQL.
- [`mysql-execute-sql`](../tools/mysql/mysql-execute-sql.md)
- [`mysql-execute-sql`](../tools/mysql/mysql-execute-sql.md)
Run parameterized SQL queries in Cloud SQL for MySQL.
- [`mysql-list-tables`](../tools/mysql/mysql-list-tables.md)
- [`mysql-list-active-queries`](../tools/mysql/mysql-list-active-queries.md)
List active queries in MySQL.
- [`mysql-list-tables`](../tools/mysql/mysql-list-tables.md)
List tables in a Cloud SQL for MySQL database.
### Pre-built Configurations
- [Cloud SQL for MySQL using MCP](https://googleapis.github.io/genai-toolbox/how-to/connect-ide/cloud_sql_mysql_mcp/)
- [Cloud SQL for MySQL using MCP](https://googleapis.github.io/genai-toolbox/how-to/connect-ide/cloud_sql_mysql_mcp/)
Connect your IDE to Cloud SQL for MySQL using Toolbox.
## Requirements

View File

@@ -16,13 +16,16 @@ reliability, performance, and ease of use.
## Available Tools
- [`mysql-sql`](../tools/mysql/mysql-sql.md)
- [`mysql-sql`](../tools/mysql/mysql-sql.md)
Execute pre-defined prepared SQL queries in MySQL.
- [`mysql-execute-sql`](../tools/mysql/mysql-execute-sql.md)
- [`mysql-execute-sql`](../tools/mysql/mysql-execute-sql.md)
Run parameterized SQL queries in MySQL.
- [`mysql-list-tables`](../tools/mysql/mysql-list-tables.md)
- [`mysql-list-active-queries`](../tools/mysql/mysql-list-active-queries.md)
List active queries in MySQL.
- [`mysql-list-tables`](../tools/mysql/mysql-list-tables.md)
List tables in a MySQL database.
## Requirements

View File

@@ -0,0 +1,59 @@
---
title: "mysql-list-active-queries"
type: docs
weight: 1
description: >
A "mysql-list-active-queries" tool lists active queries in a MySQL database.
aliases:
- /resources/tools/mysql-list-active-queries
---
## About
A `mysql-list-active-queries` tool retrieves information about active queries in a MySQL database. It's compatible with
- [cloud-sql-mysql](../../sources/cloud-sql-mysql.md)
- [mysql](../../sources/mysql.md)
`mysql-list-active-queries` outputs detailed information as JSON for current active queries, ordered by execution time in descending order.
This tool takes 2 optional input parameters:
- `min_duration_secs` (optional): Only show queries running for at least this long in seconds, default `0`.
- `limit` (optional): max number of queries to return, default `10`.
## Example
```yaml
tools:
list_active_queries:
kind: mysql-list-active-queries
source: my-mysql-instance
description: Lists top N (default 10) ongoing queries from processlist and innodb_trx, ordered by execution time in descending order. Returns detailed information of those queries in json format, including process id, query, transaction duration, transaction wait duration, process time, transaction state, process state, username with host, transaction rows locked, transaction rows modified, and db schema.
```
The response is a json array with the following fields:
```json
{
"proccess_id": "id of the MySQL process/connection this query belongs to",
"query": "query text",
"trx_started": "the time when the transaction (this query belongs to) started",
"trx_duration_seconds": "the total elapsed time (in seconds) of the owning transaction so far",
"trx_wait_duration_seconds": "the total wait time (in seconds) of the owning transaction so far",
"query_time": "the time (in seconds) that the owning connection has been in its current state",
"trx_state": "the transaction execution state",
"proces_state": "the current state of the owning connection",
"user": "the user who issued this query",
"trx_rows_locked": "the approximate number of rows locked by the owning transaction",
"trx_rows_modified": "the approximate number of rows modified by the owning transaction",
"db": "the default database for the owning connection"
}
```
## Reference
| **field** | **type** | **required** | **description** |
|-------------|:------------------------------------------:|:------------:|--------------------------------------------------------------------------------------------------|
| kind | string | true | Must be "mysql-list-active-queries". |
| source | string | true | Name of the source the SQL should execute on. |
| description | string | true | Description of the tool that is passed to the LLM. |

View File

@@ -27,6 +27,10 @@ tools:
kind: mysql-execute-sql
source: cloud-sql-mysql-source
description: Use this tool to execute SQL.
list_active_queries:
kind: mysql-list-active-queries
source: cloud-sql-mysql-source
description: Lists top N (default 10) ongoing queries from processlist and innodb_trx, ordered by execution time in descending order. Returns detailed information of those queries in json format, including process id, query, transaction duration, transaction wait duration, process time, transaction state, process state, username with host, transaction rows locked, transaction rows modified, and db schema.
get_query_plan:
kind: mysql-sql
source: cloud-sql-mysql-source
@@ -46,5 +50,6 @@ tools:
toolsets:
cloud_sql_mysql_database_tools:
- execute_sql
- get_query_plan
- list_tables
- get_query_plan
- list_active_queries

View File

@@ -31,6 +31,10 @@ tools:
kind: mysql-execute-sql
source: mysql-source
description: Use this tool to execute SQL.
list_active_queries:
kind: mysql-list-active-queries
source: mysql-source
description: Lists top N (default 10) ongoing queries from processlist and innodb_trx, ordered by execution time in descending order. Returns detailed information of those queries in json format, including process id, query, transaction duration, transaction wait duration, process time, transaction state, process state, username with host, transaction rows locked, transaction rows modified, and db schema.
get_query_plan:
kind: mysql-sql
source: mysql-source
@@ -50,5 +54,6 @@ tools:
toolsets:
mysql_database_tools:
- execute_sql
- get_query_plan
- list_tables
- get_query_plan
- list_active_queries

View File

@@ -0,0 +1,285 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mysqllistactivequeries
import (
"context"
"database/sql"
"fmt"
yaml "github.com/goccy/go-yaml"
"github.com/googleapis/genai-toolbox/internal/sources"
"github.com/googleapis/genai-toolbox/internal/sources/cloudsqlmysql"
"github.com/googleapis/genai-toolbox/internal/sources/mysql"
"github.com/googleapis/genai-toolbox/internal/tools"
"github.com/googleapis/genai-toolbox/internal/tools/mysql/mysqlcommon"
"github.com/googleapis/genai-toolbox/internal/util"
)
const kind string = "mysql-list-active-queries"
const listActiveQueriesStatementMySQL = `
SELECT
p.id AS processlist_id,
substring(IFNULL(p.info, t.trx_query), 1, 100) AS query,
t.trx_started AS trx_started,
(UNIX_TIMESTAMP(UTC_TIMESTAMP()) - UNIX_TIMESTAMP(t.trx_started)) AS trx_duration_seconds,
(UNIX_TIMESTAMP(UTC_TIMESTAMP()) - UNIX_TIMESTAMP(t.trx_wait_started)) AS trx_wait_duration_seconds,
p.time AS query_time,
t.trx_state AS trx_state,
p.state AS process_state,
IF(p.host IS NULL OR p.host = '', p.user, concat(p.user, '@', SUBSTRING_INDEX(p.host, ':', 1))) AS user,
t.trx_rows_locked AS trx_rows_locked,
t.trx_rows_modified AS trx_rows_modified,
p.db AS db
FROM
information_schema.processlist p
LEFT OUTER JOIN
information_schema.innodb_trx t
ON p.id = t.trx_mysql_thread_id
WHERE
(? IS NULL OR p.time >= ?)
AND p.id != CONNECTION_ID()
AND Command NOT IN ('Binlog Dump', 'Binlog Dump GTID', 'Connect', 'Connect Out', 'Register Slave')
AND User NOT IN ('system user', 'event_scheduler')
AND (t.trx_id is NOT NULL OR command != 'Sleep')
ORDER BY
t.trx_started
LIMIT ?;
`
const listActiveQueriesStatementCloudSQLMySQL = `
SELECT
p.id AS processlist_id,
substring(IFNULL(p.info, t.trx_query), 1, 100) AS query,
t.trx_started AS trx_started,
(UNIX_TIMESTAMP(UTC_TIMESTAMP()) - UNIX_TIMESTAMP(t.trx_started)) AS trx_duration_seconds,
(UNIX_TIMESTAMP(UTC_TIMESTAMP()) - UNIX_TIMESTAMP(t.trx_wait_started)) AS trx_wait_duration_seconds,
p.time AS query_time,
t.trx_state AS trx_state,
p.state AS process_state,
IF(p.host IS NULL OR p.host = '', p.user, concat(p.user, '@', SUBSTRING_INDEX(p.host, ':', 1))) AS user,
t.trx_rows_locked AS trx_rows_locked,
t.trx_rows_modified AS trx_rows_modified,
p.db AS db
FROM
information_schema.processlist p
LEFT OUTER JOIN
information_schema.innodb_trx t
ON p.id = t.trx_mysql_thread_id
WHERE
(? IS NULL OR p.time >= ?)
AND p.id != CONNECTION_ID()
AND SUBSTRING_INDEX(IFNULL(p.host,''), ':', 1) NOT IN ('localhost', '127.0.0.1')
AND IFNULL(p.host,'') NOT LIKE '::1%'
AND Command NOT IN ('Binlog Dump', 'Binlog Dump GTID', 'Connect', 'Connect Out', 'Register Slave')
AND User NOT IN ('system user', 'event_scheduler')
AND (t.trx_id is NOT NULL OR command != 'sleep')
ORDER BY
t.trx_started
LIMIT ?;
`
func init() {
if !tools.Register(kind, newConfig) {
panic(fmt.Sprintf("tool kind %q already registered", kind))
}
}
func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.ToolConfig, error) {
actual := Config{Name: name}
if err := decoder.DecodeContext(ctx, &actual); err != nil {
return nil, err
}
return actual, nil
}
type compatibleSource interface {
MySQLPool() *sql.DB
}
// validate compatible sources are still compatible
var _ compatibleSource = &mysql.Source{}
var compatibleSources = [...]string{mysql.SourceKind, cloudsqlmysql.SourceKind}
type Config struct {
Name string `yaml:"name" validate:"required"`
Kind string `yaml:"kind" validate:"required"`
Source string `yaml:"source" validate:"required"`
Description string `yaml:"description" validate:"required"`
AuthRequired []string `yaml:"authRequired"`
}
// validate interface
var _ tools.ToolConfig = Config{}
func (cfg Config) ToolConfigKind() string {
return kind
}
func (cfg Config) Initialize(srcs map[string]sources.Source) (tools.Tool, error) {
// verify source exists
rawS, ok := srcs[cfg.Source]
if !ok {
return nil, fmt.Errorf("no source named %q configured", cfg.Source)
}
// verify the source is compatible
s, ok := rawS.(compatibleSource)
if !ok {
return nil, fmt.Errorf("invalid source for %q tool: source kind must be one of %q", kind, compatibleSources)
}
allParameters := tools.Parameters{
tools.NewIntParameterWithDefault("min_duration_secs", 0, "Optional: Only show queries running for at least this long in seconds"),
tools.NewIntParameterWithDefault("limit", 100, "Optional: The maximum number of rows to return."),
}
mcpManifest := tools.McpManifest{
Name: cfg.Name,
Description: cfg.Description,
InputSchema: allParameters.McpManifest(),
}
var statement string
sourceKind := rawS.SourceKind()
switch sourceKind {
case mysql.SourceKind:
statement = listActiveQueriesStatementMySQL
case cloudsqlmysql.SourceKind:
statement = listActiveQueriesStatementCloudSQLMySQL
default:
return nil, fmt.Errorf("unsupported source kind kind: %q", sourceKind)
}
// finish tool setup
t := Tool{
Name: cfg.Name,
Kind: kind,
AuthRequired: cfg.AuthRequired,
Pool: s.MySQLPool(),
allParams: allParameters,
manifest: tools.Manifest{Description: cfg.Description, Parameters: allParameters.Manifest(), AuthRequired: cfg.AuthRequired},
mcpManifest: mcpManifest,
statement: statement,
}
return t, nil
}
// validate interface
var _ tools.Tool = Tool{}
type Tool struct {
Name string `yaml:"name"`
Kind string `yaml:"kind"`
AuthRequired []string `yaml:"authRequired"`
allParams tools.Parameters `yaml:"parameters"`
Pool *sql.DB
manifest tools.Manifest
mcpManifest tools.McpManifest
statement string
}
func (t Tool) Invoke(ctx context.Context, params tools.ParamValues, accessToken tools.AccessToken) (any, error) {
paramsMap := params.AsMap()
duration, ok := paramsMap["min_duration_secs"].(int)
if !ok {
return nil, fmt.Errorf("invalid 'min_duration_secs' parameter; expected an integer")
}
limit, ok := paramsMap["limit"].(int)
if !ok {
return nil, fmt.Errorf("invalid 'limit' parameter; expected an integer")
}
// Log the query executed for debugging.
logger, err := util.LoggerFromContext(ctx)
if err != nil {
return nil, fmt.Errorf("error getting logger: %s", err)
}
logger.DebugContext(ctx, "executing `%s` tool query: %s", kind, t.statement)
results, err := t.Pool.QueryContext(ctx, t.statement, duration, duration, limit)
if err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
defer results.Close()
cols, err := results.Columns()
if err != nil {
return nil, fmt.Errorf("unable to retrieve rows column name: %w", err)
}
// create an array of values for each column, which can be re-used to scan each row
rawValues := make([]any, len(cols))
values := make([]any, len(cols))
for i := range rawValues {
values[i] = &rawValues[i]
}
colTypes, err := results.ColumnTypes()
if err != nil {
return nil, fmt.Errorf("unable to get column types: %w", err)
}
var out []any
for results.Next() {
err := results.Scan(values...)
if err != nil {
return nil, fmt.Errorf("unable to parse row: %w", err)
}
vMap := make(map[string]any)
for i, name := range cols {
val := rawValues[i]
if val == nil {
vMap[name] = nil
continue
}
vMap[name], err = mysqlcommon.ConvertToType(colTypes[i], val)
if err != nil {
return nil, fmt.Errorf("errors encountered when converting values: %w", err)
}
}
out = append(out, vMap)
}
if err := results.Err(); err != nil {
return nil, fmt.Errorf("errors encountered during row iteration: %w", err)
}
return out, nil
}
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (tools.ParamValues, error) {
return tools.ParseParams(t.allParams, data, claims)
}
func (t Tool) Manifest() tools.Manifest {
return t.manifest
}
func (t Tool) McpManifest() tools.McpManifest {
return t.mcpManifest
}
func (t Tool) Authorized(verifiedAuthServices []string) bool {
return tools.IsAuthorized(t.AuthRequired, verifiedAuthServices)
}
func (t Tool) RequiresClientAuthorization() bool {
return false
}

View File

@@ -0,0 +1,76 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mysqllistactivequeries_test
import (
"testing"
yaml "github.com/goccy/go-yaml"
"github.com/google/go-cmp/cmp"
"github.com/googleapis/genai-toolbox/internal/server"
"github.com/googleapis/genai-toolbox/internal/testutils"
"github.com/googleapis/genai-toolbox/internal/tools/mysql/mysqllistactivequeries"
)
func TestParseFromYamlExecuteSql(t *testing.T) {
ctx, err := testutils.ContextWithNewLogger()
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
tcs := []struct {
desc string
in string
want server.ToolConfigs
}{
{
desc: "basic example",
in: `
tools:
example_tool:
kind: mysql-list-active-queries
source: my-instance
description: some description
authRequired:
- my-google-auth-service
- other-auth-service
`,
want: server.ToolConfigs{
"example_tool": mysqllistactivequeries.Config{
Name: "example_tool",
Kind: "mysql-list-active-queries",
Source: "my-instance",
Description: "some description",
AuthRequired: []string{"my-google-auth-service", "other-auth-service"},
},
},
},
}
for _, tc := range tcs {
t.Run(tc.desc, func(t *testing.T) {
got := struct {
Tools server.ToolConfigs `yaml:"tools"`
}{}
// Parse contents
err := yaml.UnmarshalContext(ctx, testutils.FormatYaml(tc.in), &got)
if err != nil {
t.Fatalf("unable to unmarshal: %s", err)
}
if diff := cmp.Diff(tc.want, got.Tools); diff != "" {
t.Fatalf("incorrect parse: diff %v", diff)
}
})
}
}

View File

@@ -25,6 +25,7 @@ import (
"os"
"regexp"
"strings"
"sync"
"testing"
"time"
@@ -36,14 +37,15 @@ import (
)
var (
MySQLSourceKind = "mysql"
MySQLToolKind = "mysql-sql"
MySQLListTablesToolKind = "mysql-list-tables"
MySQLDatabase = os.Getenv("MYSQL_DATABASE")
MySQLHost = os.Getenv("MYSQL_HOST")
MySQLPort = os.Getenv("MYSQL_PORT")
MySQLUser = os.Getenv("MYSQL_USER")
MySQLPass = os.Getenv("MYSQL_PASS")
MySQLSourceKind = "mysql"
MySQLToolKind = "mysql-sql"
MySQLListActiveQueriesKind = "mysql-list-active-queries"
MySQLListTablesToolKind = "mysql-list-tables"
MySQLDatabase = os.Getenv("MYSQL_DATABASE")
MySQLHost = os.Getenv("MYSQL_HOST")
MySQLPort = os.Getenv("MYSQL_PORT")
MySQLUser = os.Getenv("MYSQL_USER")
MySQLPass = os.Getenv("MYSQL_PASS")
)
func getMySQLVars(t *testing.T) map[string]any {
@@ -80,6 +82,11 @@ func addPrebuiltToolConfig(t *testing.T, config map[string]any) map[string]any {
"source": "my-instance",
"description": "Lists tables in the database.",
}
tools["list_active_queries"] = map[string]any{
"kind": MySQLListActiveQueriesKind,
"source": "my-instance",
"description": "Lists active queries in the database.",
}
config["tools"] = tools
return config
}
@@ -157,6 +164,7 @@ func TestMySQLToolEndpoints(t *testing.T) {
// Run specific MySQL tool tests
runMySQLListTablesTest(t, tableNameParam, tableNameAuth)
runMySQLListActiveQueriesTest(t, ctx, pool)
}
func runMySQLListTablesTest(t *testing.T, tableNameParam, tableNameAuth string) {
@@ -219,7 +227,7 @@ func runMySQLListTablesTest(t *testing.T, tableNameParam, tableNameAuth string)
requestBody io.Reader
wantStatusCode int
want any
isSimple bool
isSimple bool
}{
{
name: "invoke list_tables detailed output",
@@ -232,7 +240,7 @@ func runMySQLListTablesTest(t *testing.T, tableNameParam, tableNameAuth string)
requestBody: bytes.NewBufferString(fmt.Sprintf(`{"table_names": "%s", "output_format": "simple"}`, tableNameAuth)),
wantStatusCode: http.StatusOK,
want: []map[string]any{{"name": tableNameAuth}},
isSimple: true,
isSimple: true,
},
{
name: "invoke list_tables with multiple table names",
@@ -276,7 +284,9 @@ func runMySQLListTablesTest(t *testing.T, tableNameParam, tableNameAuth string)
return
}
var bodyWrapper struct{ Result json.RawMessage `json:"result"` }
var bodyWrapper struct {
Result json.RawMessage `json:"result"`
}
if err := json.NewDecoder(resp.Body).Decode(&bodyWrapper); err != nil {
t.Fatalf("error decoding response wrapper: %v", err)
}
@@ -333,3 +343,151 @@ func runMySQLListTablesTest(t *testing.T, tableNameParam, tableNameAuth string)
})
}
}
func runMySQLListActiveQueriesTest(t *testing.T, ctx context.Context, pool *sql.DB) {
type queryListDetails struct {
ProcessId any `json:"process_id"`
Query string `json:"query"`
TrxStarted any `json:"trx_started"`
TrxDuration any `json:"trx_duration_seconds"`
TrxWaitDuration any `json:"trx_wait_duration_seconds"`
QueryTime any `json:"query_time"`
TrxState string `json:"trx_state"`
ProcessState string `json:"process_state"`
User string `json:"user"`
TrxRowsLocked any `json:"trx_rows_locked"`
TrxRowsModified any `json:"trx_rows_modified"`
Db string `json:"db"`
}
singleQueryWanted := queryListDetails{
ProcessId: any(nil),
Query: "SELECT sleep(10)",
TrxStarted: any(nil),
TrxDuration: any(nil),
TrxWaitDuration: any(nil),
QueryTime: any(nil),
TrxState: "",
ProcessState: "User sleep",
User: "",
TrxRowsLocked: any(nil),
TrxRowsModified: any(nil),
Db: "",
}
invokeTcs := []struct {
name string
requestBody io.Reader
clientSleepSecs int
waitSecsBeforeCheck int
wantStatusCode int
want any
}{
{
name: "invoke list_active_queries when the system is idle",
requestBody: bytes.NewBufferString(`{}`),
clientSleepSecs: 0,
waitSecsBeforeCheck: 0,
wantStatusCode: http.StatusOK,
want: []queryListDetails(nil),
},
{
name: "invoke list_active_queries when there is 1 ongoing but lower than the threshold",
requestBody: bytes.NewBufferString(`{"min_duration_secs": 100}`),
clientSleepSecs: 10,
waitSecsBeforeCheck: 1,
wantStatusCode: http.StatusOK,
want: []queryListDetails(nil),
},
{
name: "invoke list_active_queries when 1 ongoing query should show up",
requestBody: bytes.NewBufferString(`{"min_duration_secs": 5}`),
clientSleepSecs: 0,
waitSecsBeforeCheck: 5,
wantStatusCode: http.StatusOK,
want: []queryListDetails{singleQueryWanted},
},
{
name: "invoke list_active_queries when 2 ongoing query should show up",
requestBody: bytes.NewBufferString(`{"min_duration_secs": 2}`),
clientSleepSecs: 10,
waitSecsBeforeCheck: 3,
wantStatusCode: http.StatusOK,
want: []queryListDetails{singleQueryWanted, singleQueryWanted},
},
}
var wg sync.WaitGroup
for _, tc := range invokeTcs {
t.Run(tc.name, func(t *testing.T) {
if tc.clientSleepSecs > 0 {
wg.Add(1)
go func() {
defer wg.Done()
err := pool.PingContext(ctx)
if err != nil {
t.Errorf("unable to connect to test database: %s", err)
return
}
_, err = pool.ExecContext(ctx, fmt.Sprintf("SELECT sleep(%d);", tc.clientSleepSecs))
if err != nil {
t.Errorf("Executing 'SELECT sleep' failed: %s", err)
}
}()
}
if tc.waitSecsBeforeCheck > 0 {
time.Sleep(time.Duration(tc.waitSecsBeforeCheck) * time.Second)
}
const api = "http://127.0.0.1:5000/api/tool/list_active_queries/invoke"
req, err := http.NewRequest(http.MethodPost, api, tc.requestBody)
if err != nil {
t.Fatalf("unable to create request: %v", err)
}
req.Header.Add("Content-type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("unable to send request: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != tc.wantStatusCode {
body, _ := io.ReadAll(resp.Body)
t.Fatalf("wrong status code: got %d, want %d, body: %s", resp.StatusCode, tc.wantStatusCode, string(body))
}
if tc.wantStatusCode != http.StatusOK {
return
}
var bodyWrapper struct {
Result json.RawMessage `json:"result"`
}
if err := json.NewDecoder(resp.Body).Decode(&bodyWrapper); err != nil {
t.Fatalf("error decoding response wrapper: %v", err)
}
var resultString string
if err := json.Unmarshal(bodyWrapper.Result, &resultString); err != nil {
resultString = string(bodyWrapper.Result)
}
var got any
var details []queryListDetails
if err := json.Unmarshal([]byte(resultString), &details); err != nil {
t.Fatalf("failed to unmarshal nested ObjectDetails string: %v", err)
}
got = details
if diff := cmp.Diff(tc.want, got, cmp.Comparer(func(a, b queryListDetails) bool {
return a.Query == b.Query && a.ProcessState == b.ProcessState
})); diff != "" {
t.Errorf("Unexpected result: got %#v, want: %#v", got, tc.want)
}
})
}
wg.Wait()
}