mirror of
https://github.com/googleapis/genai-toolbox.git
synced 2026-02-11 07:35:05 -05:00
## Description
This PR adds cloud logging admin source, tools, integration test and
docs.
1. Source is implemented in a manner consistent with the BigQuery
source. Supports ADC, OAuth and impersonate Service Account.
2. Total of 3 tools have been implemented
- `cloud-logging-admin-list-log-names`
- `cloud-logging-admin-list-resource-types`
- `cloud-logging-admin-query-logs`
3. docs added for resource and tools.
4. Supporting integration test is added with updated ci
Note for reviewers:
1. Integration test runs on cloud, will require `LOGADMIN_PROJECT` env
variable, the test creates logs in the project using the `logging`
client and then verifies working of the tools using the `logadmin`
client.
2. Moved `cache.go` from the BigQuery source to `sources/cache.go` due
to shared utility.
Regarding Tools:
1. `cloud-logging-admin-list-log-names` uses `client.Logs()` instead of
`client.Entries()`, as the latter is resource heavy and the tradeoff was
not being able to apply any filters, tool has an optional parameter
`limit` which defaults to 200.
2. `cloud-logging-admin-list-resource-types` uses
`client.ResourceDescriptors(ctx)`, aim of the tool is to enable the
agent become aware of the the resources present and utilise this
information in writing filters.
3. `cloud-logging-admin-query-logs` tool enables search and read logs
from Google Cloud.
Parameters:
`filter` (optional): A text string to search for specific logs.
`newestFirst` (optional): A simple true/false switch for ordering.
`startTime ` (optional): The start date and time to search from (e.g.,
2025-12-09T00:00:00Z). Defaults to 30 days ago if not set.
`endTime` (optional): The end date and time to search up to. Defaults to
"now".
`verbose` (optional): If set to true, Shows all available details for
each log entry else shows only the main info (timestamp, message,
severity).
`limit` (optional): The maximum number of log entries to return (default
is 200).
Looking forward to the feedback here, as `verbose` is simply implemented
to save context tokens, any alternative suggestion here is also
welcomed.
Simple tools.yaml
```
sources:
my-logging-admin:
kind: cloud-logging-admin
project: <Add project>
useClientOAuth: false
tools:
list_resource_types:
kind: cloud-logging-admin-list-resource-types
source: my-logging-admin
description: List the types of resource that are indexed by Cloud Logging.
list_log_names:
kind: cloud-logging-admin-list-log-names
source: my-logging-admin
description: List log names matching a filter criteria.
query_logs:
kind: cloud-logging-admin-query-logs
source: my-logging-admin
description: query logs
```
## PR Checklist
- [x] Make sure you reviewed
[CONTRIBUTING.md](https://github.com/googleapis/genai-toolbox/blob/main/CONTRIBUTING.md)
- [x] Make sure to open an issue as a
[bug/issue](https://github.com/googleapis/genai-toolbox/issues/new/choose)
before writing your code! That way we can discuss the change, evaluate
designs, and agree on the general idea
- [x] Ensure the tests and linter pass
- [x] Code coverage does not decrease (if any source code was changed)
- [x] Appropriate docs were updated (if necessary)
- [ ] Make sure to add `!` if this involve a breaking change
🛠️ Fixes #1772
@anubhav756 @averikitsch Thanks for the guidance and feedback on the
implementation plan.
---------
Co-authored-by: Yuan Teoh <yuanteoh@google.com>
Co-authored-by: Yuan Teoh <45984206+Yuan325@users.noreply.github.com>
884 lines
28 KiB
Go
884 lines
28 KiB
Go
// Copyright 2025 Google LLC
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package bigquery
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math/big"
|
|
"net/http"
|
|
"reflect"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
bigqueryapi "cloud.google.com/go/bigquery"
|
|
dataplexapi "cloud.google.com/go/dataplex/apiv1"
|
|
"github.com/goccy/go-yaml"
|
|
"github.com/googleapis/genai-toolbox/internal/sources"
|
|
"github.com/googleapis/genai-toolbox/internal/tools"
|
|
"github.com/googleapis/genai-toolbox/internal/util"
|
|
"github.com/googleapis/genai-toolbox/internal/util/orderedmap"
|
|
"go.opentelemetry.io/otel/trace"
|
|
"golang.org/x/oauth2"
|
|
"golang.org/x/oauth2/google"
|
|
bigqueryrestapi "google.golang.org/api/bigquery/v2"
|
|
"google.golang.org/api/googleapi"
|
|
"google.golang.org/api/impersonate"
|
|
"google.golang.org/api/iterator"
|
|
"google.golang.org/api/option"
|
|
)
|
|
|
|
const SourceType string = "bigquery"
|
|
|
|
// CloudPlatformScope is a broad scope for Google Cloud Platform services.
|
|
const CloudPlatformScope = "https://www.googleapis.com/auth/cloud-platform"
|
|
|
|
const (
|
|
// No write operations are allowed.
|
|
WriteModeBlocked string = "blocked"
|
|
// Only protected write operations are allowed in a BigQuery session.
|
|
WriteModeProtected string = "protected"
|
|
// All write operations are allowed.
|
|
WriteModeAllowed string = "allowed"
|
|
)
|
|
|
|
// validate interface
|
|
var _ sources.SourceConfig = Config{}
|
|
|
|
type BigqueryClientCreator func(tokenString string, wantRestService bool) (*bigqueryapi.Client, *bigqueryrestapi.Service, error)
|
|
|
|
type BigQuerySessionProvider func(ctx context.Context) (*Session, error)
|
|
|
|
type DataplexClientCreator func(tokenString string) (*dataplexapi.CatalogClient, error)
|
|
|
|
func init() {
|
|
if !sources.Register(SourceType, newConfig) {
|
|
panic(fmt.Sprintf("source type %q already registered", SourceType))
|
|
}
|
|
}
|
|
|
|
func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (sources.SourceConfig, error) {
|
|
actual := Config{Name: name}
|
|
if err := decoder.DecodeContext(ctx, &actual); err != nil {
|
|
return nil, err
|
|
}
|
|
return actual, nil
|
|
}
|
|
|
|
type Config struct {
|
|
// BigQuery configs
|
|
Name string `yaml:"name" validate:"required"`
|
|
Type string `yaml:"type" validate:"required"`
|
|
Project string `yaml:"project" validate:"required"`
|
|
Location string `yaml:"location"`
|
|
WriteMode string `yaml:"writeMode"`
|
|
AllowedDatasets StringOrStringSlice `yaml:"allowedDatasets"`
|
|
UseClientOAuth bool `yaml:"useClientOAuth"`
|
|
ImpersonateServiceAccount string `yaml:"impersonateServiceAccount"`
|
|
Scopes StringOrStringSlice `yaml:"scopes"`
|
|
MaxQueryResultRows int `yaml:"maxQueryResultRows"`
|
|
}
|
|
|
|
// StringOrStringSlice is a custom type that can unmarshal both a single string
|
|
// (which it splits by comma) and a sequence of strings into a string slice.
|
|
type StringOrStringSlice []string
|
|
|
|
// UnmarshalYAML implements the yaml.Unmarshaler interface.
|
|
func (s *StringOrStringSlice) UnmarshalYAML(unmarshal func(any) error) error {
|
|
var v any
|
|
if err := unmarshal(&v); err != nil {
|
|
return err
|
|
}
|
|
switch val := v.(type) {
|
|
case string:
|
|
*s = strings.Split(val, ",")
|
|
return nil
|
|
case []any:
|
|
for _, item := range val {
|
|
if str, ok := item.(string); ok {
|
|
*s = append(*s, str)
|
|
} else {
|
|
return fmt.Errorf("element in sequence is not a string: %v", item)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
return fmt.Errorf("cannot unmarshal %T into StringOrStringSlice", v)
|
|
}
|
|
|
|
func (r Config) SourceConfigType() string {
|
|
// Returns BigQuery source type
|
|
return SourceType
|
|
}
|
|
func (r Config) Initialize(ctx context.Context, tracer trace.Tracer) (sources.Source, error) {
|
|
if r.WriteMode == "" {
|
|
r.WriteMode = WriteModeAllowed
|
|
}
|
|
|
|
if r.MaxQueryResultRows == 0 {
|
|
r.MaxQueryResultRows = 50
|
|
}
|
|
|
|
if r.WriteMode == WriteModeProtected && r.UseClientOAuth {
|
|
// The protected mode only allows write operations to the session's temporary datasets.
|
|
// when using client OAuth, a new session is created every
|
|
// time a BigQuery tool is invoked. Therefore, no session data can
|
|
// be preserved as needed by the protected mode.
|
|
return nil, fmt.Errorf("writeMode 'protected' cannot be used with useClientOAuth 'true'")
|
|
}
|
|
|
|
if r.UseClientOAuth && r.ImpersonateServiceAccount != "" {
|
|
return nil, fmt.Errorf("useClientOAuth cannot be used with impersonateServiceAccount")
|
|
}
|
|
|
|
var client *bigqueryapi.Client
|
|
var restService *bigqueryrestapi.Service
|
|
var tokenSource oauth2.TokenSource
|
|
var clientCreator BigqueryClientCreator
|
|
var err error
|
|
|
|
s := &Source{
|
|
Config: r,
|
|
Client: client,
|
|
RestService: restService,
|
|
TokenSource: tokenSource,
|
|
MaxQueryResultRows: r.MaxQueryResultRows,
|
|
ClientCreator: clientCreator,
|
|
}
|
|
|
|
if r.UseClientOAuth {
|
|
// use client OAuth
|
|
baseClientCreator, err := newBigQueryClientCreator(ctx, tracer, r.Project, r.Location, r.Name)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error constructing client creator: %w", err)
|
|
}
|
|
setupClientCaching(s, baseClientCreator)
|
|
|
|
} else {
|
|
// Initializes a BigQuery Google SQL source
|
|
client, restService, tokenSource, err = initBigQueryConnection(ctx, tracer, r.Name, r.Project, r.Location, r.ImpersonateServiceAccount, r.Scopes)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error creating client from ADC: %w", err)
|
|
}
|
|
s.Client = client
|
|
s.RestService = restService
|
|
s.TokenSource = tokenSource
|
|
}
|
|
|
|
allowedDatasets := make(map[string]struct{})
|
|
// Get full id of allowed datasets and verify they exist.
|
|
if len(r.AllowedDatasets) > 0 {
|
|
for _, allowed := range r.AllowedDatasets {
|
|
var projectID, datasetID, allowedFullID string
|
|
if strings.Contains(allowed, ".") {
|
|
parts := strings.Split(allowed, ".")
|
|
if len(parts) != 2 {
|
|
return nil, fmt.Errorf("invalid allowedDataset format: %q, expected 'project.dataset' or 'dataset'", allowed)
|
|
}
|
|
projectID = parts[0]
|
|
datasetID = parts[1]
|
|
allowedFullID = allowed
|
|
} else {
|
|
projectID = r.Project
|
|
datasetID = allowed
|
|
allowedFullID = fmt.Sprintf("%s.%s", projectID, datasetID)
|
|
}
|
|
|
|
if s.Client != nil {
|
|
dataset := s.Client.DatasetInProject(projectID, datasetID)
|
|
_, err := dataset.Metadata(ctx)
|
|
if err != nil {
|
|
if gerr, ok := err.(*googleapi.Error); ok && gerr.Code == http.StatusNotFound {
|
|
return nil, fmt.Errorf("allowedDataset '%s' not found in project '%s'", datasetID, projectID)
|
|
}
|
|
return nil, fmt.Errorf("failed to verify allowedDataset '%s' in project '%s': %w", datasetID, projectID, err)
|
|
}
|
|
}
|
|
allowedDatasets[allowedFullID] = struct{}{}
|
|
}
|
|
}
|
|
|
|
s.AllowedDatasets = allowedDatasets
|
|
s.SessionProvider = s.newBigQuerySessionProvider()
|
|
|
|
if r.WriteMode != WriteModeAllowed && r.WriteMode != WriteModeBlocked && r.WriteMode != WriteModeProtected {
|
|
return nil, fmt.Errorf("invalid writeMode %q: must be one of %q, %q, or %q", r.WriteMode, WriteModeAllowed, WriteModeProtected, WriteModeBlocked)
|
|
}
|
|
s.makeDataplexCatalogClient = s.lazyInitDataplexClient(ctx, tracer)
|
|
return s, nil
|
|
}
|
|
|
|
// setupClientCaching initializes caches and wraps the base client creator with caching logic.
|
|
func setupClientCaching(s *Source, baseCreator BigqueryClientCreator) {
|
|
// Define eviction handlers
|
|
onBqEvict := func(key string, value interface{}) {
|
|
if client, ok := value.(*bigqueryapi.Client); ok && client != nil {
|
|
client.Close()
|
|
}
|
|
}
|
|
onDataplexEvict := func(key string, value interface{}) {
|
|
if client, ok := value.(*dataplexapi.CatalogClient); ok && client != nil {
|
|
client.Close()
|
|
}
|
|
}
|
|
|
|
// Initialize caches
|
|
s.bqClientCache = sources.NewCache(onBqEvict)
|
|
s.bqRestCache = sources.NewCache(nil)
|
|
s.dataplexCache = sources.NewCache(onDataplexEvict)
|
|
|
|
// Create the caching wrapper for the client creator
|
|
s.ClientCreator = func(tokenString string, wantRestService bool) (*bigqueryapi.Client, *bigqueryrestapi.Service, error) {
|
|
// Check cache
|
|
bqClientVal, bqFound := s.bqClientCache.Get(tokenString)
|
|
|
|
if wantRestService {
|
|
restServiceVal, restFound := s.bqRestCache.Get(tokenString)
|
|
if bqFound && restFound {
|
|
// Cache hit for both
|
|
return bqClientVal.(*bigqueryapi.Client), restServiceVal.(*bigqueryrestapi.Service), nil
|
|
}
|
|
} else {
|
|
if bqFound {
|
|
return bqClientVal.(*bigqueryapi.Client), nil, nil
|
|
}
|
|
}
|
|
|
|
// Cache miss - call the client creator
|
|
client, restService, err := baseCreator(tokenString, wantRestService)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Set in cache
|
|
s.bqClientCache.Set(tokenString, client)
|
|
if wantRestService && restService != nil {
|
|
s.bqRestCache.Set(tokenString, restService)
|
|
}
|
|
|
|
return client, restService, nil
|
|
}
|
|
}
|
|
|
|
var _ sources.Source = &Source{}
|
|
|
|
type Source struct {
|
|
Config
|
|
Client *bigqueryapi.Client
|
|
RestService *bigqueryrestapi.Service
|
|
TokenSource oauth2.TokenSource
|
|
MaxQueryResultRows int
|
|
ClientCreator BigqueryClientCreator
|
|
AllowedDatasets map[string]struct{}
|
|
sessionMutex sync.Mutex
|
|
makeDataplexCatalogClient func() (*dataplexapi.CatalogClient, DataplexClientCreator, error)
|
|
SessionProvider BigQuerySessionProvider
|
|
Session *Session
|
|
|
|
// Caches for OAuth clients
|
|
bqClientCache *sources.Cache
|
|
bqRestCache *sources.Cache
|
|
dataplexCache *sources.Cache
|
|
}
|
|
|
|
type Session struct {
|
|
ID string
|
|
ProjectID string
|
|
DatasetID string
|
|
CreationTime time.Time
|
|
LastUsed time.Time
|
|
}
|
|
|
|
func (s *Source) SourceType() string {
|
|
// Returns BigQuery Google SQL source type
|
|
return SourceType
|
|
}
|
|
|
|
func (s *Source) ToConfig() sources.SourceConfig {
|
|
return s.Config
|
|
}
|
|
|
|
func (s *Source) BigQueryClient() *bigqueryapi.Client {
|
|
return s.Client
|
|
}
|
|
|
|
func (s *Source) BigQueryRestService() *bigqueryrestapi.Service {
|
|
return s.RestService
|
|
}
|
|
|
|
func (s *Source) BigQueryWriteMode() string {
|
|
return s.WriteMode
|
|
}
|
|
|
|
func (s *Source) BigQuerySession() BigQuerySessionProvider {
|
|
return s.SessionProvider
|
|
}
|
|
|
|
func (s *Source) newBigQuerySessionProvider() BigQuerySessionProvider {
|
|
return func(ctx context.Context) (*Session, error) {
|
|
if s.WriteMode != WriteModeProtected {
|
|
return nil, nil
|
|
}
|
|
|
|
s.sessionMutex.Lock()
|
|
defer s.sessionMutex.Unlock()
|
|
|
|
logger, err := util.LoggerFromContext(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get logger from context: %w", err)
|
|
}
|
|
|
|
if s.Session != nil {
|
|
// Absolute 7-day lifetime check.
|
|
const sessionMaxLifetime = 7 * 24 * time.Hour
|
|
// This assumes a single task will not exceed 30 minutes, preventing it from failing mid-execution.
|
|
const refreshThreshold = 30 * time.Minute
|
|
if time.Since(s.Session.CreationTime) > (sessionMaxLifetime - refreshThreshold) {
|
|
logger.DebugContext(ctx, "Session is approaching its 7-day maximum lifetime. Creating a new one.")
|
|
} else {
|
|
job := &bigqueryrestapi.Job{
|
|
Configuration: &bigqueryrestapi.JobConfiguration{
|
|
DryRun: true,
|
|
Query: &bigqueryrestapi.JobConfigurationQuery{
|
|
Query: "SELECT 1",
|
|
UseLegacySql: new(bool),
|
|
ConnectionProperties: []*bigqueryrestapi.ConnectionProperty{{Key: "session_id", Value: s.Session.ID}},
|
|
},
|
|
},
|
|
}
|
|
_, err := s.RestService.Jobs.Insert(s.Project, job).Do()
|
|
if err == nil {
|
|
s.Session.LastUsed = time.Now()
|
|
return s.Session, nil
|
|
}
|
|
logger.DebugContext(ctx, "Session validation failed (likely expired), creating a new one.", "error", err)
|
|
}
|
|
}
|
|
|
|
// Create a new session if one doesn't exist, it has passed its 7-day lifetime,
|
|
// or it failed the validation dry run.
|
|
|
|
creationTime := time.Now()
|
|
job := &bigqueryrestapi.Job{
|
|
JobReference: &bigqueryrestapi.JobReference{
|
|
ProjectId: s.Project,
|
|
Location: s.Location,
|
|
},
|
|
Configuration: &bigqueryrestapi.JobConfiguration{
|
|
DryRun: true,
|
|
Query: &bigqueryrestapi.JobConfigurationQuery{
|
|
Query: "SELECT 1",
|
|
CreateSession: true,
|
|
},
|
|
},
|
|
}
|
|
|
|
createdJob, err := s.RestService.Jobs.Insert(s.Project, job).Do()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create new session: %w", err)
|
|
}
|
|
|
|
var sessionID, sessionDatasetID, projectID string
|
|
if createdJob.Status != nil && createdJob.Statistics.SessionInfo != nil {
|
|
sessionID = createdJob.Statistics.SessionInfo.SessionId
|
|
} else {
|
|
return nil, fmt.Errorf("failed to get session ID from new session job")
|
|
}
|
|
|
|
if createdJob.Configuration != nil && createdJob.Configuration.Query != nil && createdJob.Configuration.Query.DestinationTable != nil {
|
|
sessionDatasetID = createdJob.Configuration.Query.DestinationTable.DatasetId
|
|
projectID = createdJob.Configuration.Query.DestinationTable.ProjectId
|
|
} else {
|
|
return nil, fmt.Errorf("failed to get session dataset ID from new session job")
|
|
}
|
|
|
|
s.Session = &Session{
|
|
ID: sessionID,
|
|
ProjectID: projectID,
|
|
DatasetID: sessionDatasetID,
|
|
CreationTime: creationTime,
|
|
LastUsed: creationTime,
|
|
}
|
|
return s.Session, nil
|
|
}
|
|
}
|
|
|
|
func (s *Source) UseClientAuthorization() bool {
|
|
return s.UseClientOAuth
|
|
}
|
|
|
|
func (s *Source) BigQueryProject() string {
|
|
return s.Project
|
|
}
|
|
|
|
func (s *Source) BigQueryLocation() string {
|
|
return s.Location
|
|
}
|
|
|
|
func (s *Source) BigQueryTokenSource() oauth2.TokenSource {
|
|
return s.TokenSource
|
|
}
|
|
|
|
func (s *Source) BigQueryTokenSourceWithScope(ctx context.Context, scopes []string) (oauth2.TokenSource, error) {
|
|
if len(scopes) == 0 {
|
|
scopes = s.Scopes
|
|
if len(scopes) == 0 {
|
|
scopes = []string{CloudPlatformScope}
|
|
}
|
|
}
|
|
|
|
if s.ImpersonateServiceAccount != "" {
|
|
// Create impersonated credentials token source with the requested scopes
|
|
ts, err := impersonate.CredentialsTokenSource(ctx, impersonate.CredentialsConfig{
|
|
TargetPrincipal: s.ImpersonateServiceAccount,
|
|
Scopes: scopes,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create impersonated credentials for %q with scopes %v: %w", s.ImpersonateServiceAccount, scopes, err)
|
|
}
|
|
return ts, nil
|
|
}
|
|
return google.DefaultTokenSource(ctx, scopes...)
|
|
}
|
|
|
|
func (s *Source) GetMaxQueryResultRows() int {
|
|
return s.MaxQueryResultRows
|
|
}
|
|
|
|
func (s *Source) BigQueryClientCreator() BigqueryClientCreator {
|
|
return s.ClientCreator
|
|
}
|
|
|
|
func (s *Source) BigQueryAllowedDatasets() []string {
|
|
if len(s.AllowedDatasets) == 0 {
|
|
return nil
|
|
}
|
|
datasets := make([]string, 0, len(s.AllowedDatasets))
|
|
for d := range s.AllowedDatasets {
|
|
datasets = append(datasets, d)
|
|
}
|
|
return datasets
|
|
}
|
|
|
|
// IsDatasetAllowed checks if a given dataset is accessible based on the source's configuration.
|
|
func (s *Source) IsDatasetAllowed(projectID, datasetID string) bool {
|
|
// If the normalized map is empty, it means no restrictions were configured.
|
|
if len(s.AllowedDatasets) == 0 {
|
|
return true
|
|
}
|
|
|
|
targetDataset := fmt.Sprintf("%s.%s", projectID, datasetID)
|
|
_, ok := s.AllowedDatasets[targetDataset]
|
|
return ok
|
|
}
|
|
|
|
func (s *Source) MakeDataplexCatalogClient() func() (*dataplexapi.CatalogClient, DataplexClientCreator, error) {
|
|
return s.makeDataplexCatalogClient
|
|
}
|
|
|
|
func (s *Source) lazyInitDataplexClient(ctx context.Context, tracer trace.Tracer) func() (*dataplexapi.CatalogClient, DataplexClientCreator, error) {
|
|
var once sync.Once
|
|
var client *dataplexapi.CatalogClient
|
|
var clientCreator DataplexClientCreator
|
|
var err error
|
|
|
|
return func() (*dataplexapi.CatalogClient, DataplexClientCreator, error) {
|
|
once.Do(func() {
|
|
c, cc, e := initDataplexConnection(ctx, tracer, s.Name, s.Project, s.UseClientOAuth, s.ImpersonateServiceAccount, s.Scopes)
|
|
if e != nil {
|
|
err = fmt.Errorf("failed to initialize dataplex client: %w", e)
|
|
return
|
|
}
|
|
client = c
|
|
|
|
// If using OAuth, wrap the provided client creator (cc) with caching logic
|
|
if s.UseClientOAuth && cc != nil {
|
|
clientCreator = func(tokenString string) (*dataplexapi.CatalogClient, error) {
|
|
// Check cache
|
|
if val, found := s.dataplexCache.Get(tokenString); found {
|
|
return val.(*dataplexapi.CatalogClient), nil
|
|
}
|
|
|
|
// Cache miss - call client creator
|
|
dpClient, err := cc(tokenString)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Set in cache
|
|
s.dataplexCache.Set(tokenString, dpClient)
|
|
return dpClient, nil
|
|
}
|
|
} else {
|
|
// Not using OAuth or no creator was returned
|
|
clientCreator = cc
|
|
}
|
|
})
|
|
return client, clientCreator, err
|
|
}
|
|
}
|
|
|
|
func (s *Source) RetrieveClientAndService(accessToken tools.AccessToken) (*bigqueryapi.Client, *bigqueryrestapi.Service, error) {
|
|
bqClient := s.BigQueryClient()
|
|
restService := s.BigQueryRestService()
|
|
|
|
// Initialize new client if using user OAuth token
|
|
if s.UseClientAuthorization() {
|
|
tokenStr, err := accessToken.ParseBearerToken()
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("error parsing access token: %w", err)
|
|
}
|
|
bqClient, restService, err = s.BigQueryClientCreator()(tokenStr, true)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("error creating client from OAuth access token: %w", err)
|
|
}
|
|
}
|
|
return bqClient, restService, nil
|
|
}
|
|
|
|
func (s *Source) RunSQL(ctx context.Context, bqClient *bigqueryapi.Client, statement, statementType string, params []bigqueryapi.QueryParameter, connProps []*bigqueryapi.ConnectionProperty) (any, error) {
|
|
query := bqClient.Query(statement)
|
|
query.Location = bqClient.Location
|
|
if params != nil {
|
|
query.Parameters = params
|
|
}
|
|
if connProps != nil {
|
|
query.ConnectionProperties = connProps
|
|
}
|
|
|
|
// This block handles SELECT statements, which return a row set.
|
|
// We iterate through the results, convert each row into a map of
|
|
// column names to values, and return the collection of rows.
|
|
job, err := query.Run(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to execute query: %w", err)
|
|
}
|
|
it, err := job.Read(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to read query results: %w", err)
|
|
}
|
|
|
|
var out []any
|
|
for s.MaxQueryResultRows <= 0 || len(out) < s.MaxQueryResultRows {
|
|
var val []bigqueryapi.Value
|
|
err = it.Next(&val)
|
|
if err == iterator.Done {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to iterate through query results: %w", err)
|
|
}
|
|
schema := it.Schema
|
|
row := orderedmap.Row{}
|
|
for i, field := range schema {
|
|
row.Add(field.Name, NormalizeValue(val[i]))
|
|
}
|
|
out = append(out, row)
|
|
}
|
|
// If the query returned any rows, return them directly.
|
|
if len(out) > 0 {
|
|
return out, nil
|
|
}
|
|
|
|
// This handles the standard case for a SELECT query that successfully
|
|
// executes but returns zero rows.
|
|
if statementType == "SELECT" {
|
|
return "The query returned 0 rows.", nil
|
|
}
|
|
// This is the fallback for a successful query that doesn't return content.
|
|
// In most cases, this will be for DML/DDL statements like INSERT, UPDATE, CREATE, etc.
|
|
// However, it is also possible that this was a query that was expected to return rows
|
|
// but returned none, a case that we cannot distinguish here.
|
|
return "Query executed successfully and returned no content.", nil
|
|
}
|
|
|
|
// NormalizeValue converts BigQuery specific types to standard JSON-compatible types.
|
|
// Specifically, it handles *big.Rat (used for NUMERIC/BIGNUMERIC) by converting
|
|
// them to decimal strings with up to 38 digits of precision, trimming trailing zeros.
|
|
// It recursively handles slices (arrays) and maps (structs) using reflection.
|
|
func NormalizeValue(v any) any {
|
|
if v == nil {
|
|
return nil
|
|
}
|
|
|
|
// Handle *big.Rat specifically.
|
|
if rat, ok := v.(*big.Rat); ok {
|
|
// Convert big.Rat to a decimal string.
|
|
// Use a precision of 38 digits (enough for BIGNUMERIC and NUMERIC)
|
|
// and trim trailing zeros to match BigQuery's behavior.
|
|
s := rat.FloatString(38)
|
|
if strings.Contains(s, ".") {
|
|
s = strings.TrimRight(s, "0")
|
|
s = strings.TrimRight(s, ".")
|
|
}
|
|
return s
|
|
}
|
|
|
|
// Use reflection for slices and maps to handle various underlying types.
|
|
rv := reflect.ValueOf(v)
|
|
switch rv.Kind() {
|
|
case reflect.Slice, reflect.Array:
|
|
// Preserve []byte as is, so json.Marshal encodes it as Base64 string (BigQuery BYTES behavior).
|
|
if rv.Type().Elem().Kind() == reflect.Uint8 {
|
|
return v
|
|
}
|
|
newSlice := make([]any, rv.Len())
|
|
for i := 0; i < rv.Len(); i++ {
|
|
newSlice[i] = NormalizeValue(rv.Index(i).Interface())
|
|
}
|
|
return newSlice
|
|
case reflect.Map:
|
|
// Ensure keys are strings to produce a JSON-compatible map.
|
|
if rv.Type().Key().Kind() != reflect.String {
|
|
return v
|
|
}
|
|
newMap := make(map[string]any, rv.Len())
|
|
iter := rv.MapRange()
|
|
for iter.Next() {
|
|
newMap[iter.Key().String()] = NormalizeValue(iter.Value().Interface())
|
|
}
|
|
return newMap
|
|
}
|
|
|
|
return v
|
|
}
|
|
|
|
func initBigQueryConnection(
|
|
ctx context.Context,
|
|
tracer trace.Tracer,
|
|
name string,
|
|
project string,
|
|
location string,
|
|
impersonateServiceAccount string,
|
|
scopes []string,
|
|
) (*bigqueryapi.Client, *bigqueryrestapi.Service, oauth2.TokenSource, error) {
|
|
ctx, span := sources.InitConnectionSpan(ctx, tracer, SourceType, name)
|
|
defer span.End()
|
|
|
|
userAgent, err := util.UserAgentFromContext(ctx)
|
|
if err != nil {
|
|
return nil, nil, nil, err
|
|
}
|
|
|
|
var tokenSource oauth2.TokenSource
|
|
var opts []option.ClientOption
|
|
|
|
var credScopes []string
|
|
if len(scopes) > 0 {
|
|
credScopes = scopes
|
|
} else if impersonateServiceAccount != "" {
|
|
credScopes = []string{CloudPlatformScope}
|
|
} else {
|
|
credScopes = []string{bigqueryapi.Scope}
|
|
}
|
|
|
|
if impersonateServiceAccount != "" {
|
|
// Create impersonated credentials token source
|
|
// This broader scope is needed for tools like conversational analytics
|
|
cloudPlatformTokenSource, err := impersonate.CredentialsTokenSource(ctx, impersonate.CredentialsConfig{
|
|
TargetPrincipal: impersonateServiceAccount,
|
|
Scopes: credScopes,
|
|
})
|
|
if err != nil {
|
|
return nil, nil, nil, fmt.Errorf("failed to create impersonated credentials for %q: %w", impersonateServiceAccount, err)
|
|
}
|
|
tokenSource = cloudPlatformTokenSource
|
|
opts = []option.ClientOption{
|
|
option.WithUserAgent(userAgent),
|
|
option.WithTokenSource(cloudPlatformTokenSource),
|
|
}
|
|
} else {
|
|
// Use default credentials
|
|
cred, err := google.FindDefaultCredentials(ctx, credScopes...)
|
|
if err != nil {
|
|
return nil, nil, nil, fmt.Errorf("failed to find default Google Cloud credentials with scopes %v: %w", credScopes, err)
|
|
}
|
|
tokenSource = cred.TokenSource
|
|
opts = []option.ClientOption{
|
|
option.WithUserAgent(userAgent),
|
|
option.WithCredentials(cred),
|
|
}
|
|
}
|
|
|
|
// Initialize the high-level BigQuery client
|
|
client, err := bigqueryapi.NewClient(ctx, project, opts...)
|
|
if err != nil {
|
|
return nil, nil, nil, fmt.Errorf("failed to create BigQuery client for project %q: %w", project, err)
|
|
}
|
|
client.Location = location
|
|
|
|
// Initialize the low-level BigQuery REST service using the same credentials
|
|
restService, err := bigqueryrestapi.NewService(ctx, opts...)
|
|
if err != nil {
|
|
return nil, nil, nil, fmt.Errorf("failed to create BigQuery v2 service: %w", err)
|
|
}
|
|
|
|
return client, restService, tokenSource, nil
|
|
}
|
|
|
|
// initBigQueryConnectionWithOAuthToken initialize a BigQuery client with an
|
|
// OAuth access token.
|
|
func initBigQueryConnectionWithOAuthToken(
|
|
ctx context.Context,
|
|
tracer trace.Tracer,
|
|
project string,
|
|
location string,
|
|
name string,
|
|
userAgent string,
|
|
tokenString string,
|
|
wantRestService bool,
|
|
) (*bigqueryapi.Client, *bigqueryrestapi.Service, error) {
|
|
ctx, span := sources.InitConnectionSpan(ctx, tracer, SourceType, name)
|
|
defer span.End()
|
|
// Construct token source
|
|
token := &oauth2.Token{
|
|
AccessToken: string(tokenString),
|
|
}
|
|
ts := oauth2.StaticTokenSource(token)
|
|
|
|
// Initialize the BigQuery client with tokenSource
|
|
client, err := bigqueryapi.NewClient(ctx, project, option.WithUserAgent(userAgent), option.WithTokenSource(ts))
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to create BigQuery client for project %q: %w", project, err)
|
|
}
|
|
client.Location = location
|
|
|
|
if wantRestService {
|
|
// Initialize the low-level BigQuery REST service using the same credentials
|
|
restService, err := bigqueryrestapi.NewService(ctx, option.WithUserAgent(userAgent), option.WithTokenSource(ts))
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to create BigQuery v2 service: %w", err)
|
|
}
|
|
return client, restService, nil
|
|
}
|
|
|
|
return client, nil, nil
|
|
}
|
|
|
|
// newBigQueryClientCreator sets the project parameters for the init helper
|
|
// function. The returned function takes in an OAuth access token and uses it to
|
|
// create a BQ client.
|
|
func newBigQueryClientCreator(
|
|
ctx context.Context,
|
|
tracer trace.Tracer,
|
|
project string,
|
|
location string,
|
|
name string,
|
|
) (func(string, bool) (*bigqueryapi.Client, *bigqueryrestapi.Service, error), error) {
|
|
userAgent, err := util.UserAgentFromContext(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return func(tokenString string, wantRestService bool) (*bigqueryapi.Client, *bigqueryrestapi.Service, error) {
|
|
return initBigQueryConnectionWithOAuthToken(ctx, tracer, project, location, name, userAgent, tokenString, wantRestService)
|
|
}, nil
|
|
}
|
|
|
|
func initDataplexConnection(
|
|
ctx context.Context,
|
|
tracer trace.Tracer,
|
|
name string,
|
|
project string,
|
|
useClientOAuth bool,
|
|
impersonateServiceAccount string,
|
|
scopes []string,
|
|
) (*dataplexapi.CatalogClient, DataplexClientCreator, error) {
|
|
var client *dataplexapi.CatalogClient
|
|
var clientCreator DataplexClientCreator
|
|
var err error
|
|
|
|
ctx, span := sources.InitConnectionSpan(ctx, tracer, SourceType, name)
|
|
defer span.End()
|
|
|
|
userAgent, err := util.UserAgentFromContext(ctx)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
if useClientOAuth {
|
|
clientCreator = newDataplexClientCreator(ctx, project, userAgent)
|
|
} else {
|
|
var opts []option.ClientOption
|
|
|
|
credScopes := scopes
|
|
if len(credScopes) == 0 {
|
|
credScopes = []string{CloudPlatformScope}
|
|
}
|
|
|
|
if impersonateServiceAccount != "" {
|
|
// Create impersonated credentials token source
|
|
ts, err := impersonate.CredentialsTokenSource(ctx, impersonate.CredentialsConfig{
|
|
TargetPrincipal: impersonateServiceAccount,
|
|
Scopes: credScopes,
|
|
})
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to create impersonated credentials for %q: %w", impersonateServiceAccount, err)
|
|
}
|
|
opts = []option.ClientOption{
|
|
option.WithUserAgent(userAgent),
|
|
option.WithTokenSource(ts),
|
|
}
|
|
} else {
|
|
// Use default credentials
|
|
cred, err := google.FindDefaultCredentials(ctx, credScopes...)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to find default Google Cloud credentials: %w", err)
|
|
}
|
|
opts = []option.ClientOption{
|
|
option.WithUserAgent(userAgent),
|
|
option.WithCredentials(cred),
|
|
}
|
|
}
|
|
|
|
client, err = dataplexapi.NewCatalogClient(ctx, opts...)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to create Dataplex client for project %q: %w", project, err)
|
|
}
|
|
}
|
|
|
|
return client, clientCreator, nil
|
|
}
|
|
|
|
func initDataplexConnectionWithOAuthToken(
|
|
ctx context.Context,
|
|
project string,
|
|
userAgent string,
|
|
tokenString string,
|
|
) (*dataplexapi.CatalogClient, error) {
|
|
// Construct token source
|
|
token := &oauth2.Token{
|
|
AccessToken: string(tokenString),
|
|
}
|
|
ts := oauth2.StaticTokenSource(token)
|
|
|
|
client, err := dataplexapi.NewCatalogClient(ctx, option.WithUserAgent(userAgent), option.WithTokenSource(ts))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create Dataplex client for project %q: %w", project, err)
|
|
}
|
|
return client, nil
|
|
}
|
|
|
|
func newDataplexClientCreator(
|
|
ctx context.Context,
|
|
project string,
|
|
userAgent string,
|
|
) func(string) (*dataplexapi.CatalogClient, error) {
|
|
return func(tokenString string) (*dataplexapi.CatalogClient, error) {
|
|
return initDataplexConnectionWithOAuthToken(ctx, project, userAgent, tokenString)
|
|
}
|
|
}
|