Files
genai-toolbox/internal/sources/cloudsqlpg/cloud_sql_pg.go
Kurtis Van Gent 1c9ad5ea24 refactor: implement dynamic source registration (#614)
This commit refactors the source configuration and loading mechanism to
use a dynamic registration pattern. Each source package now registers
itself with a central registry via its init() function.

The server configuration code uses this registry to decode and
initialize sources, decoupling it from specific source implementations
and simplifying the addition of new sources.

Key changes:
- Introduced `sources.Register()` and `newConfig()` constructor in each
source package.
- Moved source package imports to `cmd/root.go` as blank imports to
trigger `init()` functions for self-registration.
- Removed direct imports of specific source packages from
`internal/server/config.go`.
- Renamed `SourceKind` constants to `Kind` within each source package.
- Updated tests to use the new `Kind` constants and reflect registration
changes.

---------

Co-authored-by: Yuan Teoh <yuanteoh@google.com>
2025-06-04 14:23:57 -07:00

172 lines
5.1 KiB
Go

// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cloudsqlpg
import (
"context"
"fmt"
"net"
"cloud.google.com/go/cloudsqlconn"
"github.com/goccy/go-yaml"
"github.com/googleapis/genai-toolbox/internal/sources"
"github.com/googleapis/genai-toolbox/internal/util"
"github.com/jackc/pgx/v5/pgxpool"
"go.opentelemetry.io/otel/trace"
)
const SourceKind string = "cloud-sql-postgres"
// validate interface
var _ sources.SourceConfig = Config{}
func init() {
if !sources.Register(SourceKind, newConfig) {
panic(fmt.Sprintf("source kind %q already registered", SourceKind))
}
}
func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (sources.SourceConfig, error) {
actual := Config{Name: name, IPType: "public"} // Default IPType
if err := decoder.DecodeContext(ctx, &actual); err != nil {
return nil, err
}
return actual, nil
}
type Config struct {
Name string `yaml:"name" validate:"required"`
Kind string `yaml:"kind" validate:"required"`
Project string `yaml:"project" validate:"required"`
Region string `yaml:"region" validate:"required"`
Instance string `yaml:"instance" validate:"required"`
IPType sources.IPType `yaml:"ipType" validate:"required"`
Database string `yaml:"database" validate:"required"`
User string `yaml:"user"`
Password string `yaml:"password"`
}
func (r Config) SourceConfigKind() string {
return SourceKind
}
func (r Config) Initialize(ctx context.Context, tracer trace.Tracer) (sources.Source, error) {
pool, err := initCloudSQLPgConnectionPool(ctx, tracer, r.Name, r.Project, r.Region, r.Instance, r.IPType.String(), r.User, r.Password, r.Database)
if err != nil {
return nil, fmt.Errorf("unable to create pool: %w", err)
}
err = pool.Ping(ctx)
if err != nil {
return nil, fmt.Errorf("unable to connect successfully: %w", err)
}
s := &Source{
Name: r.Name,
Kind: SourceKind,
Pool: pool,
}
return s, nil
}
var _ sources.Source = &Source{}
type Source struct {
Name string `yaml:"name"`
Kind string `yaml:"kind"`
Pool *pgxpool.Pool
}
func (s *Source) SourceKind() string {
return SourceKind
}
func (s *Source) PostgresPool() *pgxpool.Pool {
return s.Pool
}
func getConnectionConfig(ctx context.Context, user, pass, dbname string) (string, bool, error) {
useIAM := true
// If username and password both provided, use password authentication
if user != "" && pass != "" {
dsn := fmt.Sprintf("user=%s password=%s dbname=%s sslmode=disable", user, pass, dbname)
useIAM = false
return dsn, useIAM, nil
}
// If username is empty, fetch email from ADC
// otherwise, use username as IAM email
if user == "" {
if pass != "" {
// If password is provided without an username, raise an error
return "", useIAM, fmt.Errorf("password is provided without a username. Please provide both a username and password, or leave both fields empty")
}
email, err := sources.GetIAMPrincipalEmailFromADC(ctx)
if err != nil {
return "", useIAM, fmt.Errorf("error getting email from ADC: %v", err)
}
user = email
}
// Construct IAM connection string with username
dsn := fmt.Sprintf("user=%s dbname=%s sslmode=disable", user, dbname)
return dsn, useIAM, nil
}
func initCloudSQLPgConnectionPool(ctx context.Context, tracer trace.Tracer, name, project, region, instance, ipType, user, pass, dbname string) (*pgxpool.Pool, error) {
//nolint:all // Reassigned ctx
ctx, span := sources.InitConnectionSpan(ctx, tracer, SourceKind, name)
defer span.End()
// Configure the driver to connect to the database
dsn, useIAM, err := getConnectionConfig(ctx, user, pass, dbname)
if err != nil {
return nil, fmt.Errorf("unable to get Cloud SQL connection config: %w", err)
}
config, err := pgxpool.ParseConfig(dsn)
if err != nil {
return nil, fmt.Errorf("unable to parse connection uri: %w", err)
}
// Create a new dialer with options
userAgent, err := util.UserAgentFromContext(ctx)
if err != nil {
return nil, err
}
opts, err := sources.GetCloudSQLOpts(ipType, userAgent, useIAM)
if err != nil {
return nil, err
}
d, err := cloudsqlconn.NewDialer(ctx, opts...)
if err != nil {
return nil, fmt.Errorf("unable to parse connection uri: %w", err)
}
// Tell the driver to use the Cloud SQL Go Connector to create connections
i := fmt.Sprintf("%s:%s:%s", project, region, instance)
config.ConnConfig.DialFunc = func(ctx context.Context, _ string, instance string) (net.Conn, error) {
return d.Dial(ctx, i)
}
// Interact with the driver directly as you normally would
pool, err := pgxpool.NewWithConfig(ctx, config)
if err != nil {
return nil, err
}
return pool, nil
}