mirror of
https://github.com/googleapis/genai-toolbox.git
synced 2026-01-08 15:14:00 -05:00
feat(source/bigquery): add service account impersonation support for bigquery (#1641)
## Description This change adds service account impersonation support to Bigquery. Users can now optionally supply an `impersonateServiceAccount` field in their `bigquery-source` config to enable impersonation. --- > Should include a concise description of the changes (bug or feature), it's > impact, along with a summary of the solution ## PR Checklist --- > Thank you for opening a Pull Request! Before submitting your PR, there are a > few things you can do to make sure it goes smoothly: - [x] Make sure you reviewed [CONTRIBUTING.md](https://github.com/googleapis/genai-toolbox/blob/main/CONTRIBUTING.md) - [x] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/genai-toolbox/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [x] Ensure the tests and linter pass - [x] Code coverage does not decrease (if any source code was changed) - [x] Appropriate docs were updated (if necessary) - [x] Make sure to add `!` if this involve a breaking change 🛠️ Fixes #906
This commit is contained in:
@@ -123,6 +123,7 @@ sources:
|
||||
# allowedDatasets: # Optional: Restricts tool access to a specific list of datasets.
|
||||
# - "my_dataset_1"
|
||||
# - "other_project.my_dataset_2"
|
||||
# impersonateServiceAccount: "service-account@project-id.iam.gserviceaccount.com" # Optional: Service account to impersonate
|
||||
```
|
||||
|
||||
Initialize a BigQuery source that uses the client's access token:
|
||||
@@ -138,15 +139,17 @@ sources:
|
||||
# allowedDatasets: # Optional: Restricts tool access to a specific list of datasets.
|
||||
# - "my_dataset_1"
|
||||
# - "other_project.my_dataset_2"
|
||||
# impersonateServiceAccount: "service-account@project-id.iam.gserviceaccount.com" # Optional: Service account to impersonate
|
||||
```
|
||||
|
||||
## Reference
|
||||
|
||||
| **field** | **type** | **required** | **description** |
|
||||
|-----------------|:--------:|:------------:|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| kind | string | true | Must be "bigquery". |
|
||||
| project | string | true | Id of the Google Cloud project to use for billing and as the default project for BigQuery resources. |
|
||||
| location | string | false | Specifies the location (e.g., 'us', 'asia-northeast1') in which to run the query job. This location must match the location of any tables referenced in the query. Defaults to the table's location or 'US' if the location cannot be determined. [Learn More](https://cloud.google.com/bigquery/docs/locations) |
|
||||
| writeMode | string | false | Controls the write behavior for tools. `allowed` (default): All queries are permitted. `blocked`: Only `SELECT` statements are allowed for the `bigquery-execute-sql` tool. `protected`: Enables session-based execution where all tools associated with this source instance share the same [BigQuery session](https://cloud.google.com/bigquery/docs/sessions-intro). This allows for stateful operations using temporary tables (e.g., `CREATE TEMP TABLE`). For `bigquery-execute-sql`, `SELECT` statements can be used on all tables, but write operations are restricted to the session's temporary dataset. For tools like `bigquery-sql`, `bigquery-forecast`, and `bigquery-analyze-contribution`, the `writeMode` restrictions do not apply, but they will operate within the shared session. **Note:** The `protected` mode cannot be used with `useClientOAuth: true`. It is also not recommended for multi-user server environments, as all users would share the same session. A session is terminated automatically after 24 hours of inactivity or after 7 days, whichever comes first. A new session is created on the next request, and any temporary data from the previous session will be lost. |
|
||||
| allowedDatasets | []string | false | An optional list of dataset IDs that tools using this source are allowed to access. If provided, any tool operation attempting to access a dataset not in this list will be rejected. To enforce this, two types of operations are also disallowed: 1) Dataset-level operations (e.g., `CREATE SCHEMA`), and 2) operations where table access cannot be statically analyzed (e.g., `EXECUTE IMMEDIATE`, `CREATE PROCEDURE`). If a single dataset is provided, it will be treated as the default for prebuilt tools. |
|
||||
| useClientOAuth | bool | false | If true, forwards the client's OAuth access token from the "Authorization" header to downstream queries. **Note:** This cannot be used with `writeMode: protected`. |
|
||||
| **field** | **type** | **required** | **description** |
|
||||
|---------------------------|:--------:|:------------:|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| kind | string | true | Must be "bigquery". |
|
||||
| project | string | true | Id of the Google Cloud project to use for billing and as the default project for BigQuery resources. |
|
||||
| location | string | false | Specifies the location (e.g., 'us', 'asia-northeast1') in which to run the query job. This location must match the location of any tables referenced in the query. Defaults to the table's location or 'US' if the location cannot be determined. [Learn More](https://cloud.google.com/bigquery/docs/locations) |
|
||||
| writeMode | string | false | Controls the write behavior for tools. `allowed` (default): All queries are permitted. `blocked`: Only `SELECT` statements are allowed for the `bigquery-execute-sql` tool. `protected`: Enables session-based execution where all tools associated with this source instance share the same [BigQuery session](https://cloud.google.com/bigquery/docs/sessions-intro). This allows for stateful operations using temporary tables (e.g., `CREATE TEMP TABLE`). For `bigquery-execute-sql`, `SELECT` statements can be used on all tables, but write operations are restricted to the session's temporary dataset. For tools like `bigquery-sql`, `bigquery-forecast`, and `bigquery-analyze-contribution`, the `writeMode` restrictions do not apply, but they will operate within the shared session. **Note:** The `protected` mode cannot be used with `useClientOAuth: true`. It is also not recommended for multi-user server environments, as all users would share the same session. A session is terminated automatically after 24 hours of inactivity or after 7 days, whichever comes first. A new session is created on the next request, and any temporary data from the previous session will be lost. |
|
||||
| allowedDatasets | []string | false | An optional list of dataset IDs that tools using this source are allowed to access. If provided, any tool operation attempting to access a dataset not in this list will be rejected. To enforce this, two types of operations are also disallowed: 1) Dataset-level operations (e.g., `CREATE SCHEMA`), and 2) operations where table access cannot be statically analyzed (e.g., `EXECUTE IMMEDIATE`, `CREATE PROCEDURE`). If a single dataset is provided, it will be treated as the default for prebuilt tools. |
|
||||
| useClientOAuth | bool | false | If true, forwards the client's OAuth access token from the "Authorization" header to downstream queries. **Note:** This cannot be used with `writeMode: protected`. |
|
||||
| impersonateServiceAccount | string | false | Service account email to impersonate when making BigQuery and Dataplex API calls. The authenticated principal must have the `roles/iam.serviceAccountTokenCreator` role on the target service account. [Learn More](https://cloud.google.com/iam/docs/service-account-impersonation) |
|
||||
|
||||
@@ -32,6 +32,7 @@ import (
|
||||
"golang.org/x/oauth2/google"
|
||||
bigqueryrestapi "google.golang.org/api/bigquery/v2"
|
||||
"google.golang.org/api/googleapi"
|
||||
"google.golang.org/api/impersonate"
|
||||
"google.golang.org/api/option"
|
||||
)
|
||||
|
||||
@@ -78,6 +79,7 @@ type Config struct {
|
||||
WriteMode string `yaml:"writeMode"`
|
||||
AllowedDatasets []string `yaml:"allowedDatasets"`
|
||||
UseClientOAuth bool `yaml:"useClientOAuth"`
|
||||
ImpersonateServiceAccount string `yaml:"impersonateServiceAccount"`
|
||||
}
|
||||
|
||||
func (r Config) SourceConfigKind() string {
|
||||
@@ -94,6 +96,10 @@ func (r Config) Initialize(ctx context.Context, tracer trace.Tracer) (sources.So
|
||||
return nil, fmt.Errorf("writeMode 'protected' cannot be used with useClientOAuth 'true'")
|
||||
}
|
||||
|
||||
if r.UseClientOAuth && r.ImpersonateServiceAccount != "" {
|
||||
return nil, fmt.Errorf("useClientOAuth cannot be used with impersonateServiceAccount")
|
||||
}
|
||||
|
||||
var client *bigqueryapi.Client
|
||||
var restService *bigqueryrestapi.Service
|
||||
var tokenSource oauth2.TokenSource
|
||||
@@ -107,7 +113,7 @@ func (r Config) Initialize(ctx context.Context, tracer trace.Tracer) (sources.So
|
||||
}
|
||||
} else {
|
||||
// Initializes a BigQuery Google SQL source
|
||||
client, restService, tokenSource, err = initBigQueryConnection(ctx, tracer, r.Name, r.Project, r.Location)
|
||||
client, restService, tokenSource, err = initBigQueryConnection(ctx, tracer, r.Name, r.Project, r.Location, r.ImpersonateServiceAccount)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating client from ADC: %w", err)
|
||||
}
|
||||
@@ -147,18 +153,19 @@ func (r Config) Initialize(ctx context.Context, tracer trace.Tracer) (sources.So
|
||||
}
|
||||
|
||||
s := &Source{
|
||||
Name: r.Name,
|
||||
Kind: SourceKind,
|
||||
Project: r.Project,
|
||||
Location: r.Location,
|
||||
Client: client,
|
||||
RestService: restService,
|
||||
TokenSource: tokenSource,
|
||||
MaxQueryResultRows: 50,
|
||||
ClientCreator: clientCreator,
|
||||
WriteMode: r.WriteMode,
|
||||
AllowedDatasets: allowedDatasets,
|
||||
UseClientOAuth: r.UseClientOAuth,
|
||||
Name: r.Name,
|
||||
Kind: SourceKind,
|
||||
Project: r.Project,
|
||||
Location: r.Location,
|
||||
Client: client,
|
||||
RestService: restService,
|
||||
TokenSource: tokenSource,
|
||||
MaxQueryResultRows: 50,
|
||||
WriteMode: r.WriteMode,
|
||||
AllowedDatasets: allowedDatasets,
|
||||
UseClientOAuth: r.UseClientOAuth,
|
||||
ClientCreator: clientCreator,
|
||||
ImpersonateServiceAccount: r.ImpersonateServiceAccount,
|
||||
}
|
||||
s.SessionProvider = s.newBigQuerySessionProvider()
|
||||
|
||||
@@ -167,7 +174,6 @@ func (r Config) Initialize(ctx context.Context, tracer trace.Tracer) (sources.So
|
||||
}
|
||||
s.makeDataplexCatalogClient = s.lazyInitDataplexClient(ctx, tracer)
|
||||
return s, nil
|
||||
|
||||
}
|
||||
|
||||
var _ sources.Source = &Source{}
|
||||
@@ -185,6 +191,7 @@ type Source struct {
|
||||
ClientCreator BigqueryClientCreator
|
||||
AllowedDatasets map[string]struct{}
|
||||
UseClientOAuth bool
|
||||
ImpersonateServiceAccount string
|
||||
WriteMode string
|
||||
sessionMutex sync.Mutex
|
||||
makeDataplexCatalogClient func() (*dataplexapi.CatalogClient, DataplexClientCreator, error)
|
||||
@@ -327,6 +334,17 @@ func (s *Source) BigQueryTokenSource() oauth2.TokenSource {
|
||||
}
|
||||
|
||||
func (s *Source) BigQueryTokenSourceWithScope(ctx context.Context, scope string) (oauth2.TokenSource, error) {
|
||||
if s.ImpersonateServiceAccount != "" {
|
||||
// Create impersonated credentials token source with the requested scope
|
||||
ts, err := impersonate.CredentialsTokenSource(ctx, impersonate.CredentialsConfig{
|
||||
TargetPrincipal: s.ImpersonateServiceAccount,
|
||||
Scopes: []string{scope},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create impersonated credentials for %q with scope %q: %w", s.ImpersonateServiceAccount, scope, err)
|
||||
}
|
||||
return ts, nil
|
||||
}
|
||||
return google.DefaultTokenSource(ctx, scope)
|
||||
}
|
||||
|
||||
@@ -373,7 +391,7 @@ func (s *Source) lazyInitDataplexClient(ctx context.Context, tracer trace.Tracer
|
||||
|
||||
return func() (*dataplexapi.CatalogClient, DataplexClientCreator, error) {
|
||||
once.Do(func() {
|
||||
c, cc, e := initDataplexConnection(ctx, tracer, s.Name, s.Project, s.UseClientOAuth)
|
||||
c, cc, e := initDataplexConnection(ctx, tracer, s.Name, s.Project, s.UseClientOAuth, s.ImpersonateServiceAccount)
|
||||
if e != nil {
|
||||
err = fmt.Errorf("failed to initialize dataplex client: %w", e)
|
||||
return
|
||||
@@ -391,34 +409,61 @@ func initBigQueryConnection(
|
||||
name string,
|
||||
project string,
|
||||
location string,
|
||||
impersonateServiceAccount string,
|
||||
) (*bigqueryapi.Client, *bigqueryrestapi.Service, oauth2.TokenSource, error) {
|
||||
ctx, span := sources.InitConnectionSpan(ctx, tracer, SourceKind, name)
|
||||
defer span.End()
|
||||
|
||||
cred, err := google.FindDefaultCredentials(ctx, "https://www.googleapis.com/auth/cloud-platform")
|
||||
if err != nil {
|
||||
return nil, nil, nil, fmt.Errorf("failed to find default Google Cloud credentials with scope %q: %w", bigqueryapi.Scope, err)
|
||||
}
|
||||
|
||||
userAgent, err := util.UserAgentFromContext(ctx)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
var tokenSource oauth2.TokenSource
|
||||
var opts []option.ClientOption
|
||||
|
||||
if impersonateServiceAccount != "" {
|
||||
// Create impersonated credentials token source with cloud-platform scope
|
||||
// This broader scope is needed for tools like conversational analytics
|
||||
cloudPlatformTokenSource, err := impersonate.CredentialsTokenSource(ctx, impersonate.CredentialsConfig{
|
||||
TargetPrincipal: impersonateServiceAccount,
|
||||
Scopes: []string{"https://www.googleapis.com/auth/cloud-platform"},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, nil, fmt.Errorf("failed to create impersonated credentials for %q: %w", impersonateServiceAccount, err)
|
||||
}
|
||||
tokenSource = cloudPlatformTokenSource
|
||||
opts = []option.ClientOption{
|
||||
option.WithUserAgent(userAgent),
|
||||
option.WithTokenSource(cloudPlatformTokenSource),
|
||||
}
|
||||
} else {
|
||||
// Use default credentials
|
||||
cred, err := google.FindDefaultCredentials(ctx, bigqueryapi.Scope)
|
||||
if err != nil {
|
||||
return nil, nil, nil, fmt.Errorf("failed to find default Google Cloud credentials with scope %q: %w", bigqueryapi.Scope, err)
|
||||
}
|
||||
tokenSource = cred.TokenSource
|
||||
opts = []option.ClientOption{
|
||||
option.WithUserAgent(userAgent),
|
||||
option.WithCredentials(cred),
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize the high-level BigQuery client
|
||||
client, err := bigqueryapi.NewClient(ctx, project, option.WithUserAgent(userAgent), option.WithCredentials(cred))
|
||||
client, err := bigqueryapi.NewClient(ctx, project, opts...)
|
||||
if err != nil {
|
||||
return nil, nil, nil, fmt.Errorf("failed to create BigQuery client for project %q: %w", project, err)
|
||||
}
|
||||
client.Location = location
|
||||
|
||||
// Initialize the low-level BigQuery REST service using the same credentials
|
||||
restService, err := bigqueryrestapi.NewService(ctx, option.WithUserAgent(userAgent), option.WithCredentials(cred))
|
||||
restService, err := bigqueryrestapi.NewService(ctx, opts...)
|
||||
if err != nil {
|
||||
return nil, nil, nil, fmt.Errorf("failed to create BigQuery v2 service: %w", err)
|
||||
}
|
||||
|
||||
return client, restService, cred.TokenSource, nil
|
||||
return client, restService, tokenSource, nil
|
||||
}
|
||||
|
||||
// initBigQueryConnectionWithOAuthToken initialize a BigQuery client with an
|
||||
@@ -486,6 +531,7 @@ func initDataplexConnection(
|
||||
name string,
|
||||
project string,
|
||||
useClientOAuth bool,
|
||||
impersonateServiceAccount string,
|
||||
) (*dataplexapi.CatalogClient, DataplexClientCreator, error) {
|
||||
var client *dataplexapi.CatalogClient
|
||||
var clientCreator DataplexClientCreator
|
||||
@@ -494,11 +540,6 @@ func initDataplexConnection(
|
||||
ctx, span := sources.InitConnectionSpan(ctx, tracer, SourceKind, name)
|
||||
defer span.End()
|
||||
|
||||
cred, err := google.FindDefaultCredentials(ctx)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to find default Google Cloud credentials: %w", err)
|
||||
}
|
||||
|
||||
userAgent, err := util.UserAgentFromContext(ctx)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
@@ -507,7 +548,34 @@ func initDataplexConnection(
|
||||
if useClientOAuth {
|
||||
clientCreator = newDataplexClientCreator(ctx, project, userAgent)
|
||||
} else {
|
||||
client, err = dataplexapi.NewCatalogClient(ctx, option.WithUserAgent(userAgent), option.WithCredentials(cred))
|
||||
var opts []option.ClientOption
|
||||
|
||||
if impersonateServiceAccount != "" {
|
||||
// Create impersonated credentials token source
|
||||
ts, err := impersonate.CredentialsTokenSource(ctx, impersonate.CredentialsConfig{
|
||||
TargetPrincipal: impersonateServiceAccount,
|
||||
Scopes: []string{"https://www.googleapis.com/auth/cloud-platform"},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to create impersonated credentials for %q: %w", impersonateServiceAccount, err)
|
||||
}
|
||||
opts = []option.ClientOption{
|
||||
option.WithUserAgent(userAgent),
|
||||
option.WithTokenSource(ts),
|
||||
}
|
||||
} else {
|
||||
// Use default credentials
|
||||
cred, err := google.FindDefaultCredentials(ctx)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to find default Google Cloud credentials: %w", err)
|
||||
}
|
||||
opts = []option.ClientOption{
|
||||
option.WithUserAgent(userAgent),
|
||||
option.WithCredentials(cred),
|
||||
}
|
||||
}
|
||||
|
||||
client, err = dataplexapi.NewCatalogClient(ctx, opts...)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to create Dataplex client for project %q: %w", project, err)
|
||||
}
|
||||
|
||||
@@ -110,6 +110,26 @@ func TestParseFromYamlBigQuery(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "with service account impersonation example",
|
||||
in: `
|
||||
sources:
|
||||
my-instance:
|
||||
kind: bigquery
|
||||
project: my-project
|
||||
location: us
|
||||
impersonateServiceAccount: service-account@my-project.iam.gserviceaccount.com
|
||||
`,
|
||||
want: server.SourceConfigs{
|
||||
"my-instance": bigquery.Config{
|
||||
Name: "my-instance",
|
||||
Kind: bigquery.SourceKind,
|
||||
Project: "my-project",
|
||||
Location: "us",
|
||||
ImpersonateServiceAccount: "service-account@my-project.iam.gserviceaccount.com",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user