// Copyright 2025 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package mindsdb import ( "context" "database/sql" "fmt" "time" _ "github.com/go-sql-driver/mysql" "github.com/goccy/go-yaml" "github.com/googleapis/genai-toolbox/internal/sources" "github.com/googleapis/genai-toolbox/internal/tools/mysql/mysqlcommon" "go.opentelemetry.io/otel/trace" ) const SourceKind string = "mindsdb" // validate interface var _ sources.SourceConfig = Config{} func init() { if !sources.Register(SourceKind, newConfig) { panic(fmt.Sprintf("source kind %q already registered", SourceKind)) } } func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (sources.SourceConfig, error) { actual := Config{Name: name} if err := decoder.DecodeContext(ctx, &actual); err != nil { return nil, err } return actual, nil } type Config struct { Name string `yaml:"name" validate:"required"` Kind string `yaml:"kind" validate:"required"` Host string `yaml:"host" validate:"required"` Port string `yaml:"port" validate:"required"` User string `yaml:"user" validate:"required"` Password string `yaml:"password"` Database string `yaml:"database" validate:"required"` QueryTimeout string `yaml:"queryTimeout"` } func (r Config) SourceConfigKind() string { return SourceKind } func (r Config) Initialize(ctx context.Context, tracer trace.Tracer) (sources.Source, error) { pool, err := initMindsDBConnectionPool(ctx, tracer, r.Name, r.Host, r.Port, r.User, r.Password, r.Database, r.QueryTimeout) if err != nil { return nil, fmt.Errorf("unable to create pool: %w", err) } err = pool.PingContext(ctx) if err != nil { return nil, fmt.Errorf("unable to connect successfully: %w", err) } s := &Source{ Config: r, Pool: pool, } return s, nil } var _ sources.Source = &Source{} type Source struct { Config Pool *sql.DB } func (s *Source) SourceKind() string { return SourceKind } func (s *Source) ToConfig() sources.SourceConfig { return s.Config } func (s *Source) MindsDBPool() *sql.DB { return s.Pool } func (s *Source) MySQLPool() *sql.DB { return s.Pool } func (s *Source) RunSQL(ctx context.Context, statement string, params []any) (any, error) { // MindsDB now supports MySQL prepared statements natively results, err := s.MindsDBPool().QueryContext(ctx, statement, params...) if err != nil { return nil, fmt.Errorf("unable to execute query: %w", err) } cols, err := results.Columns() if err != nil { return nil, fmt.Errorf("unable to retrieve rows column name: %w", err) } // create an array of values for each column, which can be re-used to scan each row rawValues := make([]any, len(cols)) values := make([]any, len(cols)) for i := range rawValues { values[i] = &rawValues[i] } defer results.Close() colTypes, err := results.ColumnTypes() if err != nil { return nil, fmt.Errorf("unable to get column types: %w", err) } var out []any for results.Next() { err := results.Scan(values...) if err != nil { return nil, fmt.Errorf("unable to parse row: %w", err) } vMap := make(map[string]any) for i, name := range cols { val := rawValues[i] if val == nil { vMap[name] = nil continue } // MindsDB uses mysql driver vMap[name], err = mysqlcommon.ConvertToType(colTypes[i], val) if err != nil { return nil, fmt.Errorf("errors encountered when converting values: %w", err) } } out = append(out, vMap) } if err := results.Err(); err != nil { return nil, fmt.Errorf("errors encountered during row iteration: %w", err) } return out, nil } func initMindsDBConnectionPool(ctx context.Context, tracer trace.Tracer, name, host, port, user, pass, dbname, queryTimeout string) (*sql.DB, error) { //nolint:all // Reassigned ctx ctx, span := sources.InitConnectionSpan(ctx, tracer, SourceKind, name) defer span.End() // Configure the driver to connect to the database var dsn string if pass == "" { // Connect without password dsn = fmt.Sprintf("%s@tcp(%s:%s)/%s?parseTime=true", user, host, port, dbname) } else { // Connect with password dsn = fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?parseTime=true", user, pass, host, port, dbname) } // Add query timeout to DSN if specified if queryTimeout != "" { timeout, err := time.ParseDuration(queryTimeout) if err != nil { return nil, fmt.Errorf("invalid queryTimeout %q: %w", queryTimeout, err) } dsn += "&readTimeout=" + timeout.String() } // Interact with the driver directly as you normally would pool, err := sql.Open("mysql", dsn) if err != nil { return nil, fmt.Errorf("sql.Open: %w", err) } return pool, nil }