mirror of
https://github.com/googleapis/genai-toolbox.git
synced 2026-02-04 04:05:22 -05:00
## Description
This PR adds cloud logging admin source, tools, integration test and
docs.
1. Source is implemented in a manner consistent with the BigQuery
source. Supports ADC, OAuth and impersonate Service Account.
2. Total of 3 tools have been implemented
- `cloud-logging-admin-list-log-names`
- `cloud-logging-admin-list-resource-types`
- `cloud-logging-admin-query-logs`
3. docs added for resource and tools.
4. Supporting integration test is added with updated ci
Note for reviewers:
1. Integration test runs on cloud, will require `LOGADMIN_PROJECT` env
variable, the test creates logs in the project using the `logging`
client and then verifies working of the tools using the `logadmin`
client.
2. Moved `cache.go` from the BigQuery source to `sources/cache.go` due
to shared utility.
Regarding Tools:
1. `cloud-logging-admin-list-log-names` uses `client.Logs()` instead of
`client.Entries()`, as the latter is resource heavy and the tradeoff was
not being able to apply any filters, tool has an optional parameter
`limit` which defaults to 200.
2. `cloud-logging-admin-list-resource-types` uses
`client.ResourceDescriptors(ctx)`, aim of the tool is to enable the
agent become aware of the the resources present and utilise this
information in writing filters.
3. `cloud-logging-admin-query-logs` tool enables search and read logs
from Google Cloud.
Parameters:
`filter` (optional): A text string to search for specific logs.
`newestFirst` (optional): A simple true/false switch for ordering.
`startTime ` (optional): The start date and time to search from (e.g.,
2025-12-09T00:00:00Z). Defaults to 30 days ago if not set.
`endTime` (optional): The end date and time to search up to. Defaults to
"now".
`verbose` (optional): If set to true, Shows all available details for
each log entry else shows only the main info (timestamp, message,
severity).
`limit` (optional): The maximum number of log entries to return (default
is 200).
Looking forward to the feedback here, as `verbose` is simply implemented
to save context tokens, any alternative suggestion here is also
welcomed.
Simple tools.yaml
```
sources:
my-logging-admin:
kind: cloud-logging-admin
project: <Add project>
useClientOAuth: false
tools:
list_resource_types:
kind: cloud-logging-admin-list-resource-types
source: my-logging-admin
description: List the types of resource that are indexed by Cloud Logging.
list_log_names:
kind: cloud-logging-admin-list-log-names
source: my-logging-admin
description: List log names matching a filter criteria.
query_logs:
kind: cloud-logging-admin-query-logs
source: my-logging-admin
description: query logs
```
## PR Checklist
- [x] Make sure you reviewed
[CONTRIBUTING.md](https://github.com/googleapis/genai-toolbox/blob/main/CONTRIBUTING.md)
- [x] Make sure to open an issue as a
[bug/issue](https://github.com/googleapis/genai-toolbox/issues/new/choose)
before writing your code! That way we can discuss the change, evaluate
designs, and agree on the general idea
- [x] Ensure the tests and linter pass
- [x] Code coverage does not decrease (if any source code was changed)
- [x] Appropriate docs were updated (if necessary)
- [ ] Make sure to add `!` if this involve a breaking change
🛠️ Fixes #1772
@anubhav756 @averikitsch Thanks for the guidance and feedback on the
implementation plan.
---------
Co-authored-by: Yuan Teoh <yuanteoh@google.com>
Co-authored-by: Yuan Teoh <45984206+Yuan325@users.noreply.github.com>
440 lines
12 KiB
Go
440 lines
12 KiB
Go
// Copyright 2026 Google LLC
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
package cloudloggingadmin
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"slices"
|
|
"strings"
|
|
"time"
|
|
|
|
"cloud.google.com/go/logging"
|
|
"cloud.google.com/go/logging/logadmin"
|
|
"github.com/goccy/go-yaml"
|
|
"github.com/googleapis/genai-toolbox/internal/sources"
|
|
"github.com/googleapis/genai-toolbox/internal/util"
|
|
"go.opentelemetry.io/otel/trace"
|
|
"golang.org/x/oauth2"
|
|
"golang.org/x/oauth2/google"
|
|
"google.golang.org/api/impersonate"
|
|
"google.golang.org/api/iterator"
|
|
"google.golang.org/api/option"
|
|
)
|
|
|
|
const SourceType string = "cloud-logging-admin"
|
|
|
|
var _ sources.SourceConfig = Config{}
|
|
|
|
func init() {
|
|
if !sources.Register(SourceType, newConfig) {
|
|
panic(fmt.Sprintf("source type %q already registered", SourceType))
|
|
}
|
|
}
|
|
|
|
func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (sources.SourceConfig, error) {
|
|
actual := Config{Name: name}
|
|
if err := decoder.DecodeContext(ctx, &actual); err != nil {
|
|
return nil, err
|
|
}
|
|
return actual, nil
|
|
}
|
|
|
|
type Config struct {
|
|
Name string `yaml:"name" validate:"required"`
|
|
Type string `yaml:"type" validate:"required"`
|
|
Project string `yaml:"project" validate:"required"`
|
|
UseClientOAuth bool `yaml:"useClientOAuth"`
|
|
ImpersonateServiceAccount string `yaml:"impersonateServiceAccount"`
|
|
}
|
|
|
|
func (r Config) SourceConfigType() string {
|
|
return SourceType
|
|
}
|
|
|
|
func (r Config) Initialize(ctx context.Context, tracer trace.Tracer) (sources.Source, error) {
|
|
|
|
if r.UseClientOAuth && r.ImpersonateServiceAccount != "" {
|
|
return nil, fmt.Errorf("useClientOAuth cannot be used with impersonateServiceAccount")
|
|
}
|
|
|
|
var client *logadmin.Client
|
|
var tokenSource oauth2.TokenSource
|
|
var clientCreator LogAdminClientCreator
|
|
var err error
|
|
|
|
s := &Source{
|
|
Config: r,
|
|
Client: client,
|
|
TokenSource: tokenSource,
|
|
ClientCreator: clientCreator,
|
|
}
|
|
|
|
if r.UseClientOAuth {
|
|
// use client OAuth
|
|
baseClientCreator, err := newLogAdminClientCreator(ctx, tracer, r.Project, r.Name)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error constructing client creator: %w", err)
|
|
}
|
|
setupClientCaching(s, baseClientCreator)
|
|
} else {
|
|
client, tokenSource, err = initLogAdminConnection(ctx, tracer, r.Name, r.Project, r.ImpersonateServiceAccount)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error creating client from ADC %w", err)
|
|
}
|
|
s.Client = client
|
|
s.TokenSource = tokenSource
|
|
}
|
|
return s, nil
|
|
}
|
|
|
|
var _ sources.Source = &Source{}
|
|
|
|
type LogAdminClientCreator func(tokenString string) (*logadmin.Client, error)
|
|
|
|
type Source struct {
|
|
Config
|
|
Client *logadmin.Client
|
|
TokenSource oauth2.TokenSource
|
|
ClientCreator LogAdminClientCreator
|
|
|
|
// Caches for OAuth clients
|
|
logadminClientCache *sources.Cache
|
|
}
|
|
|
|
func (s *Source) SourceType() string {
|
|
// Returns logadmin source type
|
|
return SourceType
|
|
}
|
|
|
|
func (s *Source) ToConfig() sources.SourceConfig {
|
|
return s.Config
|
|
}
|
|
|
|
func (s *Source) UseClientAuthorization() bool {
|
|
return s.UseClientOAuth
|
|
}
|
|
|
|
func (s *Source) LogAdminClient() *logadmin.Client {
|
|
return s.Client
|
|
}
|
|
|
|
func (s *Source) LogAdminTokenSource() oauth2.TokenSource {
|
|
return s.TokenSource
|
|
}
|
|
|
|
func (s *Source) LogAdminClientCreator() LogAdminClientCreator {
|
|
return s.ClientCreator
|
|
}
|
|
|
|
func (s *Source) GetProject() string {
|
|
return s.Project
|
|
}
|
|
|
|
// getClient returns the appropriate client based on authentication mode
|
|
func (s *Source) getClient(accessToken string) (*logadmin.Client, error) {
|
|
if s.UseClientOAuth {
|
|
if s.ClientCreator == nil {
|
|
return nil, fmt.Errorf("client creator is not initialized")
|
|
}
|
|
return s.ClientCreator(accessToken)
|
|
}
|
|
if s.Client == nil {
|
|
return nil, fmt.Errorf("source client is not initialized")
|
|
}
|
|
return s.Client, nil
|
|
}
|
|
|
|
// ListLogNames lists all log names in the project
|
|
func (s *Source) ListLogNames(ctx context.Context, limit int, accessToken string) ([]string, error) {
|
|
client, err := s.getClient(accessToken)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
it := client.Logs(ctx)
|
|
var logNames []string
|
|
for len(logNames) < limit {
|
|
logName, err := it.Next()
|
|
if err == iterator.Done {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
logNames = append(logNames, logName)
|
|
}
|
|
return logNames, nil
|
|
}
|
|
|
|
// ListResourceTypes lists all resource types in the project
|
|
func (s *Source) ListResourceTypes(ctx context.Context, accessToken string) ([]string, error) {
|
|
client, err := s.getClient(accessToken)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
it := client.ResourceDescriptors(ctx)
|
|
var types []string
|
|
for {
|
|
desc, err := it.Next()
|
|
if err == iterator.Done {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to list resource descriptors: %w", err)
|
|
}
|
|
types = append(types, desc.Type)
|
|
}
|
|
slices.Sort(types)
|
|
return types, nil
|
|
}
|
|
|
|
// QueryLogsParams contains the parameters for querying logs
|
|
type QueryLogsParams struct {
|
|
Filter string
|
|
NewestFirst bool
|
|
StartTime string
|
|
EndTime string
|
|
Verbose bool
|
|
Limit int
|
|
}
|
|
|
|
// QueryLogs queries log entries based on the provided parameters
|
|
func (s *Source) QueryLogs(ctx context.Context, params QueryLogsParams, accessToken string) ([]map[string]any, error) {
|
|
client, err := s.getClient(accessToken)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Build filter
|
|
var filterParts []string
|
|
if params.Filter != "" {
|
|
filterParts = append(filterParts, params.Filter)
|
|
}
|
|
|
|
// Add timestamp filter
|
|
startTime := params.StartTime
|
|
if startTime != "" {
|
|
filterParts = append(filterParts, fmt.Sprintf(`timestamp>="%s"`, startTime))
|
|
}
|
|
|
|
if params.EndTime != "" {
|
|
filterParts = append(filterParts, fmt.Sprintf(`timestamp<="%s"`, params.EndTime))
|
|
}
|
|
|
|
combinedFilter := strings.Join(filterParts, " AND ")
|
|
|
|
// Add opts
|
|
opts := []logadmin.EntriesOption{
|
|
logadmin.Filter(combinedFilter),
|
|
}
|
|
|
|
// Set order
|
|
if params.NewestFirst {
|
|
opts = append(opts, logadmin.NewestFirst())
|
|
}
|
|
|
|
// Set up iterator
|
|
it := client.Entries(ctx, opts...)
|
|
|
|
var results []map[string]any
|
|
for len(results) < params.Limit {
|
|
entry, err := it.Next()
|
|
if err == iterator.Done {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to iterate entries: %w", err)
|
|
}
|
|
|
|
result := map[string]any{
|
|
"logName": entry.LogName,
|
|
"timestamp": entry.Timestamp.Format(time.RFC3339),
|
|
"severity": entry.Severity.String(),
|
|
"resource": map[string]any{
|
|
"type": entry.Resource.Type,
|
|
"labels": entry.Resource.Labels,
|
|
},
|
|
}
|
|
|
|
if entry.Payload != nil {
|
|
result["payload"] = entry.Payload
|
|
}
|
|
|
|
if params.Verbose {
|
|
result["insertId"] = entry.InsertID
|
|
|
|
if len(entry.Labels) > 0 {
|
|
result["labels"] = entry.Labels
|
|
}
|
|
|
|
if entry.HTTPRequest != nil {
|
|
httpRequestMap := map[string]any{
|
|
"status": entry.HTTPRequest.Status,
|
|
"latency": entry.HTTPRequest.Latency.String(),
|
|
"remoteIp": entry.HTTPRequest.RemoteIP,
|
|
}
|
|
if req := entry.HTTPRequest.Request; req != nil {
|
|
httpRequestMap["requestMethod"] = req.Method
|
|
httpRequestMap["requestUrl"] = req.URL.String()
|
|
httpRequestMap["userAgent"] = req.UserAgent()
|
|
}
|
|
result["httpRequest"] = httpRequestMap
|
|
}
|
|
|
|
if entry.Trace != "" {
|
|
result["trace"] = entry.Trace
|
|
}
|
|
|
|
if entry.SpanID != "" {
|
|
result["spanId"] = entry.SpanID
|
|
}
|
|
|
|
if entry.Operation != nil {
|
|
result["operation"] = map[string]any{
|
|
"id": entry.Operation.Id,
|
|
"producer": entry.Operation.Producer,
|
|
"first": entry.Operation.First,
|
|
"last": entry.Operation.Last,
|
|
}
|
|
}
|
|
|
|
if entry.SourceLocation != nil {
|
|
result["sourceLocation"] = map[string]any{
|
|
"file": entry.SourceLocation.File,
|
|
"line": entry.SourceLocation.Line,
|
|
"function": entry.SourceLocation.Function,
|
|
}
|
|
}
|
|
}
|
|
results = append(results, result)
|
|
}
|
|
return results, nil
|
|
}
|
|
|
|
func setupClientCaching(s *Source, baseCreator LogAdminClientCreator) {
|
|
onEvict := func(key string, value interface{}) {
|
|
if client, ok := value.(*logadmin.Client); ok && client != nil {
|
|
client.Close()
|
|
}
|
|
}
|
|
|
|
s.logadminClientCache = sources.NewCache(onEvict)
|
|
|
|
s.ClientCreator = func(tokenString string) (*logadmin.Client, error) {
|
|
if val, found := s.logadminClientCache.Get(tokenString); found {
|
|
return val.(*logadmin.Client), nil
|
|
}
|
|
|
|
client, err := baseCreator(tokenString)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
s.logadminClientCache.Set(tokenString, client)
|
|
return client, nil
|
|
}
|
|
}
|
|
|
|
func initLogAdminConnection(
|
|
ctx context.Context,
|
|
tracer trace.Tracer,
|
|
name string,
|
|
project string,
|
|
impersonateServiceAccount string,
|
|
) (*logadmin.Client, oauth2.TokenSource, error) {
|
|
ctx, span := sources.InitConnectionSpan(ctx, tracer, SourceType, name)
|
|
defer span.End()
|
|
|
|
userAgent, err := util.UserAgentFromContext(ctx)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
var tokenSource oauth2.TokenSource
|
|
var opts []option.ClientOption
|
|
|
|
if impersonateServiceAccount != "" {
|
|
// Create impersonated credentials token source with cloud-platform scope
|
|
// This broader scope is needed for tools like conversational analytics
|
|
cloudPlatformTokenSource, err := impersonate.CredentialsTokenSource(ctx, impersonate.CredentialsConfig{
|
|
TargetPrincipal: impersonateServiceAccount,
|
|
Scopes: []string{"https://www.googleapis.com/auth/cloud-platform"},
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to create impersonated credentials for %q: %w", impersonateServiceAccount, err)
|
|
}
|
|
|
|
tokenSource = cloudPlatformTokenSource
|
|
opts = []option.ClientOption{
|
|
option.WithUserAgent(userAgent),
|
|
option.WithTokenSource(cloudPlatformTokenSource),
|
|
}
|
|
} else {
|
|
// Use default credentials
|
|
cred, err := google.FindDefaultCredentials(ctx, logging.AdminScope)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to find default Google Cloud credentials with scope %q: %w", logging.AdminScope, err)
|
|
}
|
|
tokenSource = cred.TokenSource
|
|
opts = []option.ClientOption{
|
|
option.WithUserAgent(userAgent),
|
|
option.WithCredentials(cred),
|
|
}
|
|
}
|
|
|
|
client, err := logadmin.NewClient(ctx, project, opts...)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to create Cloud Logging Admin client for project %q: %w", project, err)
|
|
}
|
|
return client, tokenSource, nil
|
|
}
|
|
|
|
func initLogAdminConnectionWithOAuthToken(
|
|
ctx context.Context,
|
|
tracer trace.Tracer,
|
|
project, name, userAgent, tokenString string,
|
|
) (*logadmin.Client, error) {
|
|
ctx, span := sources.InitConnectionSpan(ctx, tracer, SourceType, name)
|
|
defer span.End()
|
|
|
|
token := &oauth2.Token{
|
|
AccessToken: string(tokenString),
|
|
}
|
|
ts := oauth2.StaticTokenSource(token)
|
|
|
|
// Initialize the logadmin client with tokenSource
|
|
client, err := logadmin.NewClient(ctx, project, option.WithUserAgent(userAgent), option.WithTokenSource(ts))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create logadmin client for project %q: %w", project, err)
|
|
}
|
|
return client, nil
|
|
}
|
|
|
|
func newLogAdminClientCreator(
|
|
ctx context.Context,
|
|
tracer trace.Tracer,
|
|
project, name string,
|
|
) (LogAdminClientCreator, error) {
|
|
userAgent, err := util.UserAgentFromContext(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return func(tokenString string) (*logadmin.Client, error) {
|
|
return initLogAdminConnectionWithOAuthToken(ctx, tracer, project, name, userAgent, tokenString)
|
|
}, nil
|
|
}
|