refactor: move source implementation in Invoke() function to Source (#2240)

Move source-related queries from `Invoke()` function into Source.

The following sources are updated in this PR:
* alloydb-pg
* cloudsql-pg
* postgres

This is an effort to generalizing tools to work with any Source that
implements a specific interface. This will provide a better segregation
of the roles for Tools vs Source.

Tool's role will be limited to the following:
* Resolve any pre-implementation steps or parameters (e.g. template
parameters)
* Retrieving Source
* Calling the source's implementation
This commit is contained in:
Yuan Teoh
2025-12-30 20:46:18 -08:00
committed by GitHub
parent 55eb958c2a
commit c1b87e209f
27 changed files with 119 additions and 611 deletions

View File

@@ -24,6 +24,7 @@ import (
"github.com/goccy/go-yaml"
"github.com/googleapis/genai-toolbox/internal/sources"
"github.com/googleapis/genai-toolbox/internal/util"
"github.com/googleapis/genai-toolbox/internal/util/orderedmap"
"github.com/jackc/pgx/v5/pgxpool"
"go.opentelemetry.io/otel/trace"
)
@@ -104,22 +105,22 @@ func (s *Source) PostgresPool() *pgxpool.Pool {
func (s *Source) RunSQL(ctx context.Context, statement string, params []any) (any, error) {
results, err := s.Pool.Query(ctx, statement, params...)
if err != nil {
return nil, fmt.Errorf("unable to execute query: %w. Query: %v , Values: %v. Toolbox v0.19.0+ is only compatible with AlloyDB AI NL v1.0.3+. Please ensure that you are using the latest AlloyDB AI NL extension", err, statement, params)
return nil, fmt.Errorf("unable to execute query: %w", err)
}
defer results.Close()
fields := results.FieldDescriptions()
var out []any
for results.Next() {
v, err := results.Values()
if err != nil {
return nil, fmt.Errorf("unable to parse row: %w", err)
}
vMap := make(map[string]any)
row := orderedmap.Row{}
for i, f := range fields {
vMap[f.Name] = v[i]
row.Add(f.Name, v[i])
}
out = append(out, vMap)
out = append(out, row)
}
// this will catch actual query execution errors
if err := results.Err(); err != nil {

View File

@@ -23,6 +23,7 @@ import (
"github.com/goccy/go-yaml"
"github.com/googleapis/genai-toolbox/internal/sources"
"github.com/googleapis/genai-toolbox/internal/util"
"github.com/googleapis/genai-toolbox/internal/util/orderedmap"
"github.com/jackc/pgx/v5/pgxpool"
"go.opentelemetry.io/otel/trace"
)
@@ -99,6 +100,33 @@ func (s *Source) PostgresPool() *pgxpool.Pool {
return s.Pool
}
func (s *Source) RunSQL(ctx context.Context, statement string, params []any) (any, error) {
results, err := s.PostgresPool().Query(ctx, statement, params...)
if err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
defer results.Close()
fields := results.FieldDescriptions()
var out []any
for results.Next() {
values, err := results.Values()
if err != nil {
return nil, fmt.Errorf("unable to parse row: %w", err)
}
row := orderedmap.Row{}
for i, f := range fields {
row.Add(f.Name, values[i])
}
out = append(out, row)
}
// this will catch actual query execution errors
if err := results.Err(); err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
return out, nil
}
func getConnectionConfig(ctx context.Context, user, pass, dbname string) (string, bool, error) {
userAgent, err := util.UserAgentFromContext(ctx)
if err != nil {

View File

@@ -23,6 +23,7 @@ import (
"github.com/goccy/go-yaml"
"github.com/googleapis/genai-toolbox/internal/sources"
"github.com/googleapis/genai-toolbox/internal/util"
"github.com/googleapis/genai-toolbox/internal/util/orderedmap"
"github.com/jackc/pgx/v5/pgxpool"
"go.opentelemetry.io/otel/trace"
)
@@ -98,6 +99,33 @@ func (s *Source) PostgresPool() *pgxpool.Pool {
return s.Pool
}
func (s *Source) RunSQL(ctx context.Context, statement string, params []any) (any, error) {
results, err := s.PostgresPool().Query(ctx, statement, params...)
if err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
defer results.Close()
fields := results.FieldDescriptions()
var out []any
for results.Next() {
values, err := results.Values()
if err != nil {
return nil, fmt.Errorf("unable to parse row: %w", err)
}
row := orderedmap.Row{}
for i, f := range fields {
row.Add(f.Name, values[i])
}
out = append(out, row)
}
// this will catch actual query execution errors
if err := results.Err(); err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
return out, nil
}
func initPostgresConnectionPool(ctx context.Context, tracer trace.Tracer, name, host, port, user, pass, dbname string, queryParams map[string]string) (*pgxpool.Pool, error) {
//nolint:all // Reassigned ctx
ctx, span := sources.InitConnectionSpan(ctx, tracer, SourceKind, name)

View File

@@ -145,7 +145,11 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
allParamValues[i+2] = fmt.Sprintf("%s", param)
}
return source.RunSQL(ctx, t.Statement, allParamValues)
resp, err := source.RunSQL(ctx, t.Statement, allParamValues)
if err != nil {
return nil, fmt.Errorf("%w. Query: %v , Values: %v. Toolbox v0.19.0+ is only compatible with AlloyDB AI NL v1.0.3+. Please ensure that you are using the latest AlloyDB AI NL extension", err, t.Statement, allParamValues)
}
return resp, nil
}
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {

View File

@@ -56,7 +56,8 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.T
}
type compatibleSource interface {
PostgresPool() *pgxpool.Pool
PostgresPool() *pgxpool.Pool // keep this so that sources are postgres compatible
RunSQL(context.Context, string, []any) (any, error)
}
type Config struct {
@@ -121,34 +122,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
return nil, fmt.Errorf("unable to extract standard params %w", err)
}
sliceParams := newParams.AsSlice()
results, err := source.PostgresPool().Query(ctx, databaseOverviewStatement, sliceParams...)
if err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
defer results.Close()
fields := results.FieldDescriptions()
var out []map[string]any
for results.Next() {
values, err := results.Values()
if err != nil {
return nil, fmt.Errorf("unable to parse row: %w", err)
}
rowMap := make(map[string]any)
for i, field := range fields {
rowMap[string(field.Name)] = values[i]
}
out = append(out, rowMap)
}
// this will catch actual query execution errors
if err := results.Err(); err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
return out, nil
return source.RunSQL(ctx, databaseOverviewStatement, sliceParams)
}
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {

View File

@@ -22,7 +22,6 @@ import (
"github.com/googleapis/genai-toolbox/internal/sources"
"github.com/googleapis/genai-toolbox/internal/tools"
"github.com/googleapis/genai-toolbox/internal/util"
"github.com/googleapis/genai-toolbox/internal/util/orderedmap"
"github.com/googleapis/genai-toolbox/internal/util/parameters"
"github.com/jackc/pgx/v5/pgxpool"
)
@@ -45,6 +44,7 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.T
type compatibleSource interface {
PostgresPool() *pgxpool.Pool
RunSQL(context.Context, string, []any) (any, error)
}
type Config struct {
@@ -106,32 +106,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
}
logger.DebugContext(ctx, fmt.Sprintf("executing `%s` tool query: %s", kind, sql))
results, err := source.PostgresPool().Query(ctx, sql)
if err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
defer results.Close()
fields := results.FieldDescriptions()
var out []any
for results.Next() {
v, err := results.Values()
if err != nil {
return nil, fmt.Errorf("unable to parse row: %w", err)
}
row := orderedmap.Row{}
for i, f := range fields {
row.Add(f.Name, v[i])
}
out = append(out, row)
}
if err := results.Err(); err != nil {
return err.Error(), fmt.Errorf("unable to execute query: %w", err)
}
return out, nil
return source.RunSQL(ctx, sql, nil)
}
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {

View File

@@ -62,6 +62,7 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.T
type compatibleSource interface {
PostgresPool() *pgxpool.Pool
RunSQL(context.Context, string, []any) (any, error)
}
type Config struct {
@@ -133,33 +134,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
return nil, fmt.Errorf("unable to extract standard params %w", err)
}
sliceParams := newParams.AsSlice()
results, err := source.PostgresPool().Query(ctx, getColumnCardinality, sliceParams...)
if err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
defer results.Close()
fields := results.FieldDescriptions()
var out []map[string]any
for results.Next() {
values, err := results.Values()
if err != nil {
return nil, fmt.Errorf("unable to parse row: %w", err)
}
rowMap := make(map[string]any)
for i, field := range fields {
rowMap[string(field.Name)] = values[i]
}
out = append(out, rowMap)
}
if err := results.Err(); err != nil {
return err.Error(), fmt.Errorf("unable to execute query: %w", err)
}
return out, nil
return source.RunSQL(ctx, getColumnCardinality, sliceParams)
}
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {

View File

@@ -66,6 +66,7 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.T
type compatibleSource interface {
PostgresPool() *pgxpool.Pool
RunSQL(context.Context, string, []any) (any, error)
}
type Config struct {
@@ -130,33 +131,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
}
sliceParams := newParams.AsSlice()
results, err := source.PostgresPool().Query(ctx, listActiveQueriesStatement, sliceParams...)
if err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
defer results.Close()
fields := results.FieldDescriptions()
var out []map[string]any
for results.Next() {
values, err := results.Values()
if err != nil {
return nil, fmt.Errorf("unable to parse row: %w", err)
}
rowMap := make(map[string]any)
for i, field := range fields {
rowMap[string(field.Name)] = values[i]
}
out = append(out, rowMap)
}
// this will catch actual query execution errors
if err := results.Err(); err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
return out, nil
return source.RunSQL(ctx, listActiveQueriesStatement, sliceParams)
}
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {

View File

@@ -53,6 +53,7 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.T
type compatibleSource interface {
PostgresPool() *pgxpool.Pool
RunSQL(context.Context, string, []any) (any, error)
}
type Config struct {
@@ -101,33 +102,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
if err != nil {
return nil, err
}
results, err := source.PostgresPool().Query(ctx, listAvailableExtensionsQuery)
if err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
fields := results.FieldDescriptions()
var out []any
for results.Next() {
v, err := results.Values()
if err != nil {
return nil, fmt.Errorf("unable to parse row: %w", err)
}
vMap := make(map[string]any)
for i, f := range fields {
vMap[f.Name] = v[i]
}
out = append(out, vMap)
}
// this will catch actual query execution errors
if err := results.Err(); err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
return out, nil
return source.RunSQL(ctx, listAvailableExtensionsQuery, nil)
}
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {

View File

@@ -110,6 +110,7 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.T
type compatibleSource interface {
PostgresPool() *pgxpool.Pool
RunSQL(context.Context, string, []any) (any, error)
}
type Config struct {
@@ -199,33 +200,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
}
sliceParams := newParams.AsSlice()
results, err := source.PostgresPool().Query(ctx, listDatabaseStats, sliceParams...)
if err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
defer results.Close()
fields := results.FieldDescriptions()
var out []map[string]any
for results.Next() {
values, err := results.Values()
if err != nil {
return nil, fmt.Errorf("unable to parse row: %w", err)
}
rowMap := make(map[string]any)
for i, field := range fields {
rowMap[string(field.Name)] = values[i]
}
out = append(out, rowMap)
}
// this will catch actual query execution errors
if err := results.Err(); err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
return out, nil
return source.RunSQL(ctx, listDatabaseStats, sliceParams)
}
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {

View File

@@ -89,6 +89,7 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.T
type compatibleSource interface {
PostgresPool() *pgxpool.Pool
RunSQL(context.Context, string, []any) (any, error)
}
type Config struct {
@@ -161,33 +162,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
}
sliceParams := newParams.AsSlice()
results, err := source.PostgresPool().Query(ctx, listIndexesStatement, sliceParams...)
if err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
defer results.Close()
fields := results.FieldDescriptions()
var out []map[string]any
for results.Next() {
values, err := results.Values()
if err != nil {
return nil, fmt.Errorf("unable to parse row: %w", err)
}
rowMap := make(map[string]any)
for i, field := range fields {
rowMap[string(field.Name)] = values[i]
}
out = append(out, rowMap)
}
// this will catch actual query execution errors
if err := results.Err(); err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
return out, nil
return source.RunSQL(ctx, listIndexesStatement, sliceParams)
}
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {

View File

@@ -64,6 +64,7 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.T
type compatibleSource interface {
PostgresPool() *pgxpool.Pool
RunSQL(context.Context, string, []any) (any, error)
}
type Config struct {
@@ -112,33 +113,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
if err != nil {
return nil, err
}
results, err := source.PostgresPool().Query(ctx, listAvailableExtensionsQuery)
if err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
fields := results.FieldDescriptions()
var out []any
for results.Next() {
v, err := results.Values()
if err != nil {
return nil, fmt.Errorf("unable to parse row: %w", err)
}
vMap := make(map[string]any)
for i, f := range fields {
vMap[f.Name] = v[i]
}
out = append(out, vMap)
}
// this will catch actual query execution errors
if err := results.Err(); err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
return out, nil
return source.RunSQL(ctx, listAvailableExtensionsQuery, nil)
}
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {

View File

@@ -64,6 +64,7 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.T
type compatibleSource interface {
PostgresPool() *pgxpool.Pool
RunSQL(context.Context, string, []any) (any, error)
}
type Config struct {
@@ -132,28 +133,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
}
sliceParams := newParams.AsSlice()
results, err := source.PostgresPool().Query(ctx, listLocks, sliceParams...)
if err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
defer results.Close()
fields := results.FieldDescriptions()
var out []map[string]any
for results.Next() {
values, err := results.Values()
if err != nil {
return nil, fmt.Errorf("unable to parse row: %w", err)
}
rowMap := make(map[string]any)
for i, field := range fields {
rowMap[string(field.Name)] = values[i]
}
out = append(out, rowMap)
}
return out, nil
return source.RunSQL(ctx, listLocks, sliceParams)
}
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {

View File

@@ -62,6 +62,7 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.T
type compatibleSource interface {
PostgresPool() *pgxpool.Pool
RunSQL(context.Context, string, []any) (any, error)
}
type Config struct {
@@ -125,34 +126,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
return nil, fmt.Errorf("unable to extract standard params %w", err)
}
sliceParams := newParams.AsSlice()
results, err := source.PostgresPool().Query(ctx, listPgSettingsStatement, sliceParams...)
if err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
defer results.Close()
fields := results.FieldDescriptions()
var out []map[string]any
for results.Next() {
values, err := results.Values()
if err != nil {
return nil, fmt.Errorf("unable to parse row: %w", err)
}
rowMap := make(map[string]any)
for i, field := range fields {
rowMap[string(field.Name)] = values[i]
}
out = append(out, rowMap)
}
// this will catch actual query execution errors
if err := results.Err(); err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
return out, nil
return source.RunSQL(ctx, listPgSettingsStatement, sliceParams)
}
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {

View File

@@ -73,6 +73,7 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.T
type compatibleSource interface {
PostgresPool() *pgxpool.Pool
RunSQL(context.Context, string, []any) (any, error)
}
type Config struct {
@@ -139,33 +140,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
return nil, fmt.Errorf("unable to extract standard params %w", err)
}
sliceParams := newParams.AsSlice()
results, err := source.PostgresPool().Query(ctx, listPublicationTablesStatement, sliceParams...)
if err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
defer results.Close()
fields := results.FieldDescriptions()
var out []map[string]any
for results.Next() {
values, err := results.Values()
if err != nil {
return nil, fmt.Errorf("unable to parse row: %w", err)
}
rowMap := make(map[string]any)
for i, field := range fields {
rowMap[string(field.Name)] = values[i]
}
out = append(out, rowMap)
}
if err := results.Err(); err != nil {
return err.Error(), fmt.Errorf("unable to execute query: %w", err)
}
return out, nil
return source.RunSQL(ctx, listPublicationTablesStatement, sliceParams)
}
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {

View File

@@ -63,6 +63,7 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.T
type compatibleSource interface {
PostgresPool() *pgxpool.Pool
RunSQL(context.Context, string, []any) (any, error)
}
type Config struct {
@@ -132,33 +133,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
return nil, fmt.Errorf("unable to extract standard params %w", err)
}
sliceParams := newParams.AsSlice()
results, err := source.PostgresPool().Query(ctx, listQueryStats, sliceParams...)
if err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
defer results.Close()
fields := results.FieldDescriptions()
var out []map[string]any
for results.Next() {
values, err := results.Values()
if err != nil {
return nil, fmt.Errorf("unable to parse row: %w", err)
}
rowMap := make(map[string]any)
for i, field := range fields {
rowMap[string(field.Name)] = values[i]
}
out = append(out, rowMap)
}
if err := results.Err(); err != nil {
return err.Error(), fmt.Errorf("unable to execute query: %w", err)
}
return out, nil
return source.RunSQL(ctx, listQueryStats, sliceParams)
}
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {

View File

@@ -85,6 +85,7 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.T
type compatibleSource interface {
PostgresPool() *pgxpool.Pool
RunSQL(context.Context, string, []any) (any, error)
}
type Config struct {
@@ -154,34 +155,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
return nil, fmt.Errorf("unable to extract standard params %w", err)
}
sliceParams := newParams.AsSlice()
results, err := source.PostgresPool().Query(ctx, listRolesStatement, sliceParams...)
if err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
defer results.Close()
fields := results.FieldDescriptions()
var out []map[string]any
for results.Next() {
values, err := results.Values()
if err != nil {
return nil, fmt.Errorf("unable to parse row: %w", err)
}
rowMap := make(map[string]any)
for i, field := range fields {
rowMap[string(field.Name)] = values[i]
}
out = append(out, rowMap)
}
// this will catch actual query execution errors
if err := results.Err(); err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
return out, nil
return source.RunSQL(ctx, listRolesStatement, sliceParams)
}
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {

View File

@@ -97,6 +97,7 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.T
type compatibleSource interface {
PostgresPool() *pgxpool.Pool
RunSQL(context.Context, string, []any) (any, error)
}
type Config struct {
@@ -162,34 +163,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
return nil, fmt.Errorf("unable to extract standard params %w", err)
}
sliceParams := newParams.AsSlice()
results, err := source.PostgresPool().Query(ctx, listSchemasStatement, sliceParams...)
if err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
defer results.Close()
fields := results.FieldDescriptions()
var out []map[string]any
for results.Next() {
values, err := results.Values()
if err != nil {
return nil, fmt.Errorf("unable to parse row: %w", err)
}
rowMap := make(map[string]any)
for i, field := range fields {
rowMap[string(field.Name)] = values[i]
}
out = append(out, rowMap)
}
// this will catch actual query execution errors
if err := results.Err(); err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
return out, nil
return source.RunSQL(ctx, listSchemasStatement, sliceParams)
}
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {

View File

@@ -63,6 +63,7 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.T
type compatibleSource interface {
PostgresPool() *pgxpool.Pool
RunSQL(context.Context, string, []any) (any, error)
}
type Config struct {
@@ -133,33 +134,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
}
sliceParams := newParams.AsSlice()
results, err := source.PostgresPool().Query(ctx, listSequencesStatement, sliceParams...)
if err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
defer results.Close()
fields := results.FieldDescriptions()
var out []map[string]any
for results.Next() {
values, err := results.Values()
if err != nil {
return nil, fmt.Errorf("unable to parse row: %w", err)
}
rowMap := make(map[string]any)
for i, field := range fields {
rowMap[string(field.Name)] = values[i]
}
out = append(out, rowMap)
}
// this will catch actual query execution errors
if err := results.Err(); err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
return out, nil
return source.RunSQL(ctx, listSequencesStatement, sliceParams)
}
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {

View File

@@ -121,6 +121,7 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.T
type compatibleSource interface {
PostgresPool() *pgxpool.Pool
RunSQL(context.Context, string, []any) (any, error)
}
type Config struct {
@@ -182,33 +183,15 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
if outputFormat != "simple" && outputFormat != "detailed" {
return nil, fmt.Errorf("invalid value for output_format: must be 'simple' or 'detailed', but got %q", outputFormat)
}
results, err := source.PostgresPool().Query(ctx, listTablesStatement, tableNames, outputFormat)
resp, err := source.RunSQL(ctx, listTablesStatement, []any{tableNames, outputFormat})
if err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
return nil, err
}
defer results.Close()
fields := results.FieldDescriptions()
out := []map[string]any{}
for results.Next() {
values, err := results.Values()
if err != nil {
return nil, fmt.Errorf("unable to parse row: %w", err)
}
rowMap := make(map[string]any)
for i, field := range fields {
rowMap[string(field.Name)] = values[i]
}
out = append(out, rowMap)
resSlice, ok := resp.([]any)
if !ok || len(resSlice) == 0 {
return []any{}, nil
}
if err := results.Err(); err != nil {
return nil, fmt.Errorf("error reading query results: %w", err)
}
return out, nil
return resp, err
}
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {

View File

@@ -69,6 +69,7 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.T
type compatibleSource interface {
PostgresPool() *pgxpool.Pool
RunSQL(context.Context, string, []any) (any, error)
}
type Config struct {
@@ -141,32 +142,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
return nil, fmt.Errorf("invalid 'limit' parameter; expected an integer")
}
results, err := source.PostgresPool().Query(ctx, listTableSpacesStatement, tablespaceName, limit)
if err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
defer results.Close()
fields := results.FieldDescriptions()
var out []map[string]any
for results.Next() {
values, err := results.Values()
if err != nil {
return nil, fmt.Errorf("unable to parse row: %w", err)
}
rowMap := make(map[string]any)
for i, field := range fields {
rowMap[string(field.Name)] = values[i]
}
out = append(out, rowMap)
}
// this will catch actual query execution errors
if err := results.Err(); err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
return out, nil
return source.RunSQL(ctx, listTableSpacesStatement, []any{tablespaceName, limit})
}
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {

View File

@@ -90,6 +90,7 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.T
type compatibleSource interface {
PostgresPool() *pgxpool.Pool
RunSQL(context.Context, string, []any) (any, error)
}
type Config struct {
@@ -172,28 +173,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
}
sliceParams := newParams.AsSlice()
results, err := source.PostgresPool().Query(ctx, listTableStats, sliceParams...)
if err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
defer results.Close()
fields := results.FieldDescriptions()
var out []map[string]any
for results.Next() {
values, err := results.Values()
if err != nil {
return nil, fmt.Errorf("unable to parse row: %w", err)
}
rowMap := make(map[string]any)
for i, field := range fields {
rowMap[string(field.Name)] = values[i]
}
out = append(out, rowMap)
}
return out, nil
return source.RunSQL(ctx, listTableStats, sliceParams)
}
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {

View File

@@ -89,6 +89,7 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.T
type compatibleSource interface {
PostgresPool() *pgxpool.Pool
RunSQL(context.Context, string, []any) (any, error)
}
type Config struct {
@@ -159,34 +160,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
return nil, fmt.Errorf("unable to extract standard params %w", err)
}
sliceParams := newParams.AsSlice()
results, err := source.PostgresPool().Query(ctx, listTriggersStatement, sliceParams...)
if err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
defer results.Close()
fields := results.FieldDescriptions()
var out []map[string]any
for results.Next() {
values, err := results.Values()
if err != nil {
return nil, fmt.Errorf("unable to parse row: %w", err)
}
rowMap := make(map[string]any)
for i, field := range fields {
rowMap[string(field.Name)] = values[i]
}
out = append(out, rowMap)
}
// this will catch actual query execution errors
if err := results.Err(); err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
return out, nil
return source.RunSQL(ctx, listTriggersStatement, sliceParams)
}
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {

View File

@@ -64,6 +64,7 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.T
type compatibleSource interface {
PostgresPool() *pgxpool.Pool
RunSQL(context.Context, string, []any) (any, error)
}
type Config struct {
@@ -129,34 +130,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
return nil, fmt.Errorf("unable to extract standard params %w", err)
}
sliceParams := newParams.AsSlice()
results, err := source.PostgresPool().Query(ctx, listViewsStatement, sliceParams...)
if err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
defer results.Close()
fields := results.FieldDescriptions()
var out []map[string]any
for results.Next() {
values, err := results.Values()
if err != nil {
return nil, fmt.Errorf("unable to parse row: %w", err)
}
rowMap := make(map[string]any)
for i, field := range fields {
rowMap[string(field.Name)] = values[i]
}
out = append(out, rowMap)
}
// this will catch actual query execution errors
if err := results.Err(); err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
return out, nil
return source.RunSQL(ctx, listViewsStatement, sliceParams)
}
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {

View File

@@ -71,6 +71,7 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.T
type compatibleSource interface {
PostgresPool() *pgxpool.Pool
RunSQL(context.Context, string, []any) (any, error)
}
type Config struct {
@@ -141,29 +142,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
return nil, fmt.Errorf("unable to extract standard params %w", err)
}
sliceParams := newParams.AsSlice()
results, err := source.PostgresPool().Query(ctx, longRunningTransactions, sliceParams...)
if err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
defer results.Close()
fields := results.FieldDescriptions()
var out []map[string]any
for results.Next() {
values, err := results.Values()
if err != nil {
return nil, fmt.Errorf("unable to parse row: %w", err)
}
rowMap := make(map[string]any)
for i, field := range fields {
rowMap[string(field.Name)] = values[i]
}
out = append(out, rowMap)
}
return out, nil
return source.RunSQL(ctx, longRunningTransactions, sliceParams)
}
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {

View File

@@ -61,6 +61,7 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.T
type compatibleSource interface {
PostgresPool() *pgxpool.Pool
RunSQL(context.Context, string, []any) (any, error)
}
type Config struct {
@@ -128,29 +129,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
return nil, fmt.Errorf("unable to extract standard params %w", err)
}
sliceParams := newParams.AsSlice()
results, err := source.PostgresPool().Query(ctx, replicationStats, sliceParams...)
if err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
defer results.Close()
fields := results.FieldDescriptions()
var out []map[string]any
for results.Next() {
values, err := results.Values()
if err != nil {
return nil, fmt.Errorf("unable to parse row: %w", err)
}
rowMap := make(map[string]any)
for i, field := range fields {
rowMap[string(field.Name)] = values[i]
}
out = append(out, rowMap)
}
return out, nil
return source.RunSQL(ctx, replicationStats, sliceParams)
}
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {

View File

@@ -43,6 +43,7 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.T
type compatibleSource interface {
PostgresPool() *pgxpool.Pool
RunSQL(context.Context, string, []any) (any, error)
}
type Config struct {
@@ -108,32 +109,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
return nil, fmt.Errorf("unable to extract standard params %w", err)
}
sliceParams := newParams.AsSlice()
results, err := source.PostgresPool().Query(ctx, newStatement, sliceParams...)
if err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
fields := results.FieldDescriptions()
var out []any
for results.Next() {
v, err := results.Values()
if err != nil {
return nil, fmt.Errorf("unable to parse row: %w", err)
}
vMap := make(map[string]any)
for i, f := range fields {
vMap[f.Name] = v[i]
}
out = append(out, vMap)
}
// this will catch actual query execution errors
if err := results.Err(); err != nil {
return nil, fmt.Errorf("unable to execute query: %w", err)
}
return out, nil
return source.RunSQL(ctx, newStatement, sliceParams)
}
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {