feat: add queryTimeout support to MySQL source (#830)

## Summary
- Added configurable query timeout to MySQL source configuration
- Updated connection DSN to include readTimeout parameter  
- Added documentation and example usage
- Added test coverage for queryTimeout configuration

## Test plan
- [x] Added unit test for queryTimeout configuration parsing
- [x] Updated documentation with queryTimeout field description
- [x] Verified timeout parameter is correctly added to DSN when
specified

## Caveat

When queryTimeout is exceeded, we get an obscure error message
([screenshot](https://github.com/user-attachments/assets/fd292f91-328d-4ebc-9a87-2d92e9887300)):
```
unable to execute query: invalid connection
```

This seems to be a problem with the mysql-go library:
https://stackoverflow.com/q/65253798/10720618

I tried to use
[MAX_EXECUTION_TIME](https://dev.mysql.com/doc/refman/8.0/en/optimizer-hints.html#optimizer-hints-execution-time)
but it didn't work as expected (my `sleep(MAX_EXECUTION_TIME+3)` query
finished successfully after MAX_EXECUTION_TIME milliseconds)

Any ideas on what can be done here? The error message is very
misleading. My goal with adding timeouts is to communicate to the LLM
when it has issued a slow query and force it to adjust (e.g. query
indexes and write a more optimized query) but this defeats the purpose.

---------

Co-authored-by: Wenxin Du <117315983+duwenxin99@users.noreply.github.com>
This commit is contained in:
Kamal Muradov
2025-07-09 18:11:15 -04:00
committed by GitHub
parent 2bdcc0841a
commit 391cb5bfe8
3 changed files with 56 additions and 18 deletions

View File

@@ -4,7 +4,6 @@ type: docs
weight: 1
description: >
MySQL is a relational database management system that stores and manages data.
---
## About
@@ -35,6 +34,7 @@ sources:
database: my_db
user: ${USER_NAME}
password: ${PASSWORD}
queryTimeout: 30s # Optional: query timeout duration
```
{{< notice tip >}}
@@ -44,11 +44,12 @@ instead of hardcoding your secrets into the configuration file.
## Reference
| **field** | **type** | **required** | **description** |
|-----------|:--------:|:------------:|---------------------------------------------------------------------------------------------|
| kind | string | true | Must be "mysql". |
| host | string | true | IP address to connect to (e.g. "127.0.0.1"). |
| port | string | true | Port to connect to (e.g. "3306"). |
| database | string | true | Name of the MySQL database to connect to (e.g. "my_db"). |
| user | string | true | Name of the MySQL user to connect as (e.g. "my-mysql-user"). |
| password | string | true | Password of the MySQL user (e.g. "my-password"). |
| **field** | **type** | **required** | **description** |
| ------------ | :------: | :----------: | ----------------------------------------------------------------------------------------------- |
| kind | string | true | Must be "mysql". |
| host | string | true | IP address to connect to (e.g. "127.0.0.1"). |
| port | string | true | Port to connect to (e.g. "3306"). |
| database | string | true | Name of the MySQL database to connect to (e.g. "my_db"). |
| user | string | true | Name of the MySQL user to connect as (e.g. "my-mysql-user"). |
| password | string | true | Password of the MySQL user (e.g. "my-password"). |
| queryTimeout | string | false | Maximum time to wait for query execution (e.g. "30s", "2m"). By default, no timeout is applied. |

View File

@@ -18,6 +18,7 @@ import (
"context"
"database/sql"
"fmt"
"time"
_ "github.com/go-sql-driver/mysql"
"github.com/goccy/go-yaml"
@@ -45,13 +46,14 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (sources
}
type Config struct {
Name string `yaml:"name" validate:"required"`
Kind string `yaml:"kind" validate:"required"`
Host string `yaml:"host" validate:"required"`
Port string `yaml:"port" validate:"required"`
User string `yaml:"user" validate:"required"`
Password string `yaml:"password" validate:"required"`
Database string `yaml:"database" validate:"required"`
Name string `yaml:"name" validate:"required"`
Kind string `yaml:"kind" validate:"required"`
Host string `yaml:"host" validate:"required"`
Port string `yaml:"port" validate:"required"`
User string `yaml:"user" validate:"required"`
Password string `yaml:"password" validate:"required"`
Database string `yaml:"database" validate:"required"`
QueryTimeout string `yaml:"queryTimeout"`
}
func (r Config) SourceConfigKind() string {
@@ -59,7 +61,7 @@ func (r Config) SourceConfigKind() string {
}
func (r Config) Initialize(ctx context.Context, tracer trace.Tracer) (sources.Source, error) {
pool, err := initMySQLConnectionPool(ctx, tracer, r.Name, r.Host, r.Port, r.User, r.Password, r.Database)
pool, err := initMySQLConnectionPool(ctx, tracer, r.Name, r.Host, r.Port, r.User, r.Password, r.Database, r.QueryTimeout)
if err != nil {
return nil, fmt.Errorf("unable to create pool: %w", err)
}
@@ -93,7 +95,7 @@ func (s *Source) MySQLPool() *sql.DB {
return s.Pool
}
func initMySQLConnectionPool(ctx context.Context, tracer trace.Tracer, name, host, port, user, pass, dbname string) (*sql.DB, error) {
func initMySQLConnectionPool(ctx context.Context, tracer trace.Tracer, name, host, port, user, pass, dbname, queryTimeout string) (*sql.DB, error) {
//nolint:all // Reassigned ctx
ctx, span := sources.InitConnectionSpan(ctx, tracer, SourceKind, name)
defer span.End()
@@ -101,6 +103,15 @@ func initMySQLConnectionPool(ctx context.Context, tracer trace.Tracer, name, hos
// Configure the driver to connect to the database
dsn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?parseTime=true", user, pass, host, port, dbname)
// Add query timeout to DSN if specified
if queryTimeout != "" {
timeout, err := time.ParseDuration(queryTimeout)
if err != nil {
return nil, fmt.Errorf("invalid queryTimeout %q: %w", queryTimeout, err)
}
dsn += "&readTimeout=" + timeout.String()
}
// Interact with the driver directly as you normally would
pool, err := sql.Open("mysql", dsn)
if err != nil {

View File

@@ -54,6 +54,32 @@ func TestParseFromYamlCloudSQLMySQL(t *testing.T) {
},
},
},
{
desc: "with query timeout",
in: `
sources:
my-mysql-instance:
kind: mysql
host: 0.0.0.0
port: my-port
database: my_db
user: my_user
password: my_pass
queryTimeout: 45s
`,
want: server.SourceConfigs{
"my-mysql-instance": mysql.Config{
Name: "my-mysql-instance",
Kind: mysql.SourceKind,
Host: "0.0.0.0",
Port: "my-port",
Database: "my_db",
User: "my_user",
Password: "my_pass",
QueryTimeout: "45s",
},
},
},
}
for _, tc := range tcs {
t.Run(tc.desc, func(t *testing.T) {