feat(source/bigquery): add service account impersonation support for bigquery (#1641)

## Description

This change adds service account impersonation support to Bigquery.
Users can now optionally supply an `impersonateServiceAccount` field in
their `bigquery-source` config to enable impersonation.

---
> Should include a concise description of the changes (bug or feature),
it's
> impact, along with a summary of the solution

## PR Checklist

---
> Thank you for opening a Pull Request! Before submitting your PR, there
are a
> few things you can do to make sure it goes smoothly:

- [x] Make sure you reviewed

[CONTRIBUTING.md](https://github.com/googleapis/genai-toolbox/blob/main/CONTRIBUTING.md)
- [x] Make sure to open an issue as a

[bug/issue](https://github.com/googleapis/genai-toolbox/issues/new/choose)
before writing your code! That way we can discuss the change, evaluate
  designs, and agree on the general idea
- [x] Ensure the tests and linter pass
- [x] Code coverage does not decrease (if any source code was changed)
- [x] Appropriate docs were updated (if necessary)
- [x] Make sure to add `!` if this involve a breaking change

🛠️ Fixes #906
This commit is contained in:
Colin Pistell
2025-10-31 10:43:52 -06:00
committed by GitHub
parent 38d535de34
commit e09d182f88
3 changed files with 128 additions and 37 deletions

View File

@@ -123,6 +123,7 @@ sources:
# allowedDatasets: # Optional: Restricts tool access to a specific list of datasets.
# - "my_dataset_1"
# - "other_project.my_dataset_2"
# impersonateServiceAccount: "service-account@project-id.iam.gserviceaccount.com" # Optional: Service account to impersonate
```
Initialize a BigQuery source that uses the client's access token:
@@ -138,15 +139,17 @@ sources:
# allowedDatasets: # Optional: Restricts tool access to a specific list of datasets.
# - "my_dataset_1"
# - "other_project.my_dataset_2"
# impersonateServiceAccount: "service-account@project-id.iam.gserviceaccount.com" # Optional: Service account to impersonate
```
## Reference
| **field** | **type** | **required** | **description** |
|-----------------|:--------:|:------------:|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| kind | string | true | Must be "bigquery". |
| project | string | true | Id of the Google Cloud project to use for billing and as the default project for BigQuery resources. |
| location | string | false | Specifies the location (e.g., 'us', 'asia-northeast1') in which to run the query job. This location must match the location of any tables referenced in the query. Defaults to the table's location or 'US' if the location cannot be determined. [Learn More](https://cloud.google.com/bigquery/docs/locations) |
| writeMode | string | false | Controls the write behavior for tools. `allowed` (default): All queries are permitted. `blocked`: Only `SELECT` statements are allowed for the `bigquery-execute-sql` tool. `protected`: Enables session-based execution where all tools associated with this source instance share the same [BigQuery session](https://cloud.google.com/bigquery/docs/sessions-intro). This allows for stateful operations using temporary tables (e.g., `CREATE TEMP TABLE`). For `bigquery-execute-sql`, `SELECT` statements can be used on all tables, but write operations are restricted to the session's temporary dataset. For tools like `bigquery-sql`, `bigquery-forecast`, and `bigquery-analyze-contribution`, the `writeMode` restrictions do not apply, but they will operate within the shared session. **Note:** The `protected` mode cannot be used with `useClientOAuth: true`. It is also not recommended for multi-user server environments, as all users would share the same session. A session is terminated automatically after 24 hours of inactivity or after 7 days, whichever comes first. A new session is created on the next request, and any temporary data from the previous session will be lost. |
| allowedDatasets | []string | false | An optional list of dataset IDs that tools using this source are allowed to access. If provided, any tool operation attempting to access a dataset not in this list will be rejected. To enforce this, two types of operations are also disallowed: 1) Dataset-level operations (e.g., `CREATE SCHEMA`), and 2) operations where table access cannot be statically analyzed (e.g., `EXECUTE IMMEDIATE`, `CREATE PROCEDURE`). If a single dataset is provided, it will be treated as the default for prebuilt tools. |
| useClientOAuth | bool | false | If true, forwards the client's OAuth access token from the "Authorization" header to downstream queries. **Note:** This cannot be used with `writeMode: protected`. |
| **field** | **type** | **required** | **description** |
|---------------------------|:--------:|:------------:|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| kind | string | true | Must be "bigquery". |
| project | string | true | Id of the Google Cloud project to use for billing and as the default project for BigQuery resources. |
| location | string | false | Specifies the location (e.g., 'us', 'asia-northeast1') in which to run the query job. This location must match the location of any tables referenced in the query. Defaults to the table's location or 'US' if the location cannot be determined. [Learn More](https://cloud.google.com/bigquery/docs/locations) |
| writeMode | string | false | Controls the write behavior for tools. `allowed` (default): All queries are permitted. `blocked`: Only `SELECT` statements are allowed for the `bigquery-execute-sql` tool. `protected`: Enables session-based execution where all tools associated with this source instance share the same [BigQuery session](https://cloud.google.com/bigquery/docs/sessions-intro). This allows for stateful operations using temporary tables (e.g., `CREATE TEMP TABLE`). For `bigquery-execute-sql`, `SELECT` statements can be used on all tables, but write operations are restricted to the session's temporary dataset. For tools like `bigquery-sql`, `bigquery-forecast`, and `bigquery-analyze-contribution`, the `writeMode` restrictions do not apply, but they will operate within the shared session. **Note:** The `protected` mode cannot be used with `useClientOAuth: true`. It is also not recommended for multi-user server environments, as all users would share the same session. A session is terminated automatically after 24 hours of inactivity or after 7 days, whichever comes first. A new session is created on the next request, and any temporary data from the previous session will be lost. |
| allowedDatasets | []string | false | An optional list of dataset IDs that tools using this source are allowed to access. If provided, any tool operation attempting to access a dataset not in this list will be rejected. To enforce this, two types of operations are also disallowed: 1) Dataset-level operations (e.g., `CREATE SCHEMA`), and 2) operations where table access cannot be statically analyzed (e.g., `EXECUTE IMMEDIATE`, `CREATE PROCEDURE`). If a single dataset is provided, it will be treated as the default for prebuilt tools. |
| useClientOAuth | bool | false | If true, forwards the client's OAuth access token from the "Authorization" header to downstream queries. **Note:** This cannot be used with `writeMode: protected`. |
| impersonateServiceAccount | string | false | Service account email to impersonate when making BigQuery and Dataplex API calls. The authenticated principal must have the `roles/iam.serviceAccountTokenCreator` role on the target service account. [Learn More](https://cloud.google.com/iam/docs/service-account-impersonation) |

View File

@@ -32,6 +32,7 @@ import (
"golang.org/x/oauth2/google"
bigqueryrestapi "google.golang.org/api/bigquery/v2"
"google.golang.org/api/googleapi"
"google.golang.org/api/impersonate"
"google.golang.org/api/option"
)
@@ -78,6 +79,7 @@ type Config struct {
WriteMode string `yaml:"writeMode"`
AllowedDatasets []string `yaml:"allowedDatasets"`
UseClientOAuth bool `yaml:"useClientOAuth"`
ImpersonateServiceAccount string `yaml:"impersonateServiceAccount"`
}
func (r Config) SourceConfigKind() string {
@@ -94,6 +96,10 @@ func (r Config) Initialize(ctx context.Context, tracer trace.Tracer) (sources.So
return nil, fmt.Errorf("writeMode 'protected' cannot be used with useClientOAuth 'true'")
}
if r.UseClientOAuth && r.ImpersonateServiceAccount != "" {
return nil, fmt.Errorf("useClientOAuth cannot be used with impersonateServiceAccount")
}
var client *bigqueryapi.Client
var restService *bigqueryrestapi.Service
var tokenSource oauth2.TokenSource
@@ -107,7 +113,7 @@ func (r Config) Initialize(ctx context.Context, tracer trace.Tracer) (sources.So
}
} else {
// Initializes a BigQuery Google SQL source
client, restService, tokenSource, err = initBigQueryConnection(ctx, tracer, r.Name, r.Project, r.Location)
client, restService, tokenSource, err = initBigQueryConnection(ctx, tracer, r.Name, r.Project, r.Location, r.ImpersonateServiceAccount)
if err != nil {
return nil, fmt.Errorf("error creating client from ADC: %w", err)
}
@@ -147,18 +153,19 @@ func (r Config) Initialize(ctx context.Context, tracer trace.Tracer) (sources.So
}
s := &Source{
Name: r.Name,
Kind: SourceKind,
Project: r.Project,
Location: r.Location,
Client: client,
RestService: restService,
TokenSource: tokenSource,
MaxQueryResultRows: 50,
ClientCreator: clientCreator,
WriteMode: r.WriteMode,
AllowedDatasets: allowedDatasets,
UseClientOAuth: r.UseClientOAuth,
Name: r.Name,
Kind: SourceKind,
Project: r.Project,
Location: r.Location,
Client: client,
RestService: restService,
TokenSource: tokenSource,
MaxQueryResultRows: 50,
WriteMode: r.WriteMode,
AllowedDatasets: allowedDatasets,
UseClientOAuth: r.UseClientOAuth,
ClientCreator: clientCreator,
ImpersonateServiceAccount: r.ImpersonateServiceAccount,
}
s.SessionProvider = s.newBigQuerySessionProvider()
@@ -167,7 +174,6 @@ func (r Config) Initialize(ctx context.Context, tracer trace.Tracer) (sources.So
}
s.makeDataplexCatalogClient = s.lazyInitDataplexClient(ctx, tracer)
return s, nil
}
var _ sources.Source = &Source{}
@@ -185,6 +191,7 @@ type Source struct {
ClientCreator BigqueryClientCreator
AllowedDatasets map[string]struct{}
UseClientOAuth bool
ImpersonateServiceAccount string
WriteMode string
sessionMutex sync.Mutex
makeDataplexCatalogClient func() (*dataplexapi.CatalogClient, DataplexClientCreator, error)
@@ -327,6 +334,17 @@ func (s *Source) BigQueryTokenSource() oauth2.TokenSource {
}
func (s *Source) BigQueryTokenSourceWithScope(ctx context.Context, scope string) (oauth2.TokenSource, error) {
if s.ImpersonateServiceAccount != "" {
// Create impersonated credentials token source with the requested scope
ts, err := impersonate.CredentialsTokenSource(ctx, impersonate.CredentialsConfig{
TargetPrincipal: s.ImpersonateServiceAccount,
Scopes: []string{scope},
})
if err != nil {
return nil, fmt.Errorf("failed to create impersonated credentials for %q with scope %q: %w", s.ImpersonateServiceAccount, scope, err)
}
return ts, nil
}
return google.DefaultTokenSource(ctx, scope)
}
@@ -373,7 +391,7 @@ func (s *Source) lazyInitDataplexClient(ctx context.Context, tracer trace.Tracer
return func() (*dataplexapi.CatalogClient, DataplexClientCreator, error) {
once.Do(func() {
c, cc, e := initDataplexConnection(ctx, tracer, s.Name, s.Project, s.UseClientOAuth)
c, cc, e := initDataplexConnection(ctx, tracer, s.Name, s.Project, s.UseClientOAuth, s.ImpersonateServiceAccount)
if e != nil {
err = fmt.Errorf("failed to initialize dataplex client: %w", e)
return
@@ -391,34 +409,61 @@ func initBigQueryConnection(
name string,
project string,
location string,
impersonateServiceAccount string,
) (*bigqueryapi.Client, *bigqueryrestapi.Service, oauth2.TokenSource, error) {
ctx, span := sources.InitConnectionSpan(ctx, tracer, SourceKind, name)
defer span.End()
cred, err := google.FindDefaultCredentials(ctx, "https://www.googleapis.com/auth/cloud-platform")
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to find default Google Cloud credentials with scope %q: %w", bigqueryapi.Scope, err)
}
userAgent, err := util.UserAgentFromContext(ctx)
if err != nil {
return nil, nil, nil, err
}
var tokenSource oauth2.TokenSource
var opts []option.ClientOption
if impersonateServiceAccount != "" {
// Create impersonated credentials token source with cloud-platform scope
// This broader scope is needed for tools like conversational analytics
cloudPlatformTokenSource, err := impersonate.CredentialsTokenSource(ctx, impersonate.CredentialsConfig{
TargetPrincipal: impersonateServiceAccount,
Scopes: []string{"https://www.googleapis.com/auth/cloud-platform"},
})
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create impersonated credentials for %q: %w", impersonateServiceAccount, err)
}
tokenSource = cloudPlatformTokenSource
opts = []option.ClientOption{
option.WithUserAgent(userAgent),
option.WithTokenSource(cloudPlatformTokenSource),
}
} else {
// Use default credentials
cred, err := google.FindDefaultCredentials(ctx, bigqueryapi.Scope)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to find default Google Cloud credentials with scope %q: %w", bigqueryapi.Scope, err)
}
tokenSource = cred.TokenSource
opts = []option.ClientOption{
option.WithUserAgent(userAgent),
option.WithCredentials(cred),
}
}
// Initialize the high-level BigQuery client
client, err := bigqueryapi.NewClient(ctx, project, option.WithUserAgent(userAgent), option.WithCredentials(cred))
client, err := bigqueryapi.NewClient(ctx, project, opts...)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create BigQuery client for project %q: %w", project, err)
}
client.Location = location
// Initialize the low-level BigQuery REST service using the same credentials
restService, err := bigqueryrestapi.NewService(ctx, option.WithUserAgent(userAgent), option.WithCredentials(cred))
restService, err := bigqueryrestapi.NewService(ctx, opts...)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create BigQuery v2 service: %w", err)
}
return client, restService, cred.TokenSource, nil
return client, restService, tokenSource, nil
}
// initBigQueryConnectionWithOAuthToken initialize a BigQuery client with an
@@ -486,6 +531,7 @@ func initDataplexConnection(
name string,
project string,
useClientOAuth bool,
impersonateServiceAccount string,
) (*dataplexapi.CatalogClient, DataplexClientCreator, error) {
var client *dataplexapi.CatalogClient
var clientCreator DataplexClientCreator
@@ -494,11 +540,6 @@ func initDataplexConnection(
ctx, span := sources.InitConnectionSpan(ctx, tracer, SourceKind, name)
defer span.End()
cred, err := google.FindDefaultCredentials(ctx)
if err != nil {
return nil, nil, fmt.Errorf("failed to find default Google Cloud credentials: %w", err)
}
userAgent, err := util.UserAgentFromContext(ctx)
if err != nil {
return nil, nil, err
@@ -507,7 +548,34 @@ func initDataplexConnection(
if useClientOAuth {
clientCreator = newDataplexClientCreator(ctx, project, userAgent)
} else {
client, err = dataplexapi.NewCatalogClient(ctx, option.WithUserAgent(userAgent), option.WithCredentials(cred))
var opts []option.ClientOption
if impersonateServiceAccount != "" {
// Create impersonated credentials token source
ts, err := impersonate.CredentialsTokenSource(ctx, impersonate.CredentialsConfig{
TargetPrincipal: impersonateServiceAccount,
Scopes: []string{"https://www.googleapis.com/auth/cloud-platform"},
})
if err != nil {
return nil, nil, fmt.Errorf("failed to create impersonated credentials for %q: %w", impersonateServiceAccount, err)
}
opts = []option.ClientOption{
option.WithUserAgent(userAgent),
option.WithTokenSource(ts),
}
} else {
// Use default credentials
cred, err := google.FindDefaultCredentials(ctx)
if err != nil {
return nil, nil, fmt.Errorf("failed to find default Google Cloud credentials: %w", err)
}
opts = []option.ClientOption{
option.WithUserAgent(userAgent),
option.WithCredentials(cred),
}
}
client, err = dataplexapi.NewCatalogClient(ctx, opts...)
if err != nil {
return nil, nil, fmt.Errorf("failed to create Dataplex client for project %q: %w", project, err)
}

View File

@@ -110,6 +110,26 @@ func TestParseFromYamlBigQuery(t *testing.T) {
},
},
},
{
desc: "with service account impersonation example",
in: `
sources:
my-instance:
kind: bigquery
project: my-project
location: us
impersonateServiceAccount: service-account@my-project.iam.gserviceaccount.com
`,
want: server.SourceConfigs{
"my-instance": bigquery.Config{
Name: "my-instance",
Kind: bigquery.SourceKind,
Project: "my-project",
Location: "us",
ImpersonateServiceAccount: "service-account@my-project.iam.gserviceaccount.com",
},
},
},
}
for _, tc := range tcs {
t.Run(tc.desc, func(t *testing.T) {