// Copyright 2026 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package cloudloggingadmin import ( "context" "fmt" "slices" "strings" "time" "cloud.google.com/go/logging" "cloud.google.com/go/logging/logadmin" "github.com/goccy/go-yaml" "github.com/googleapis/genai-toolbox/internal/sources" "github.com/googleapis/genai-toolbox/internal/util" "go.opentelemetry.io/otel/trace" "golang.org/x/oauth2" "golang.org/x/oauth2/google" "google.golang.org/api/impersonate" "google.golang.org/api/iterator" "google.golang.org/api/option" ) const SourceType string = "cloud-logging-admin" var _ sources.SourceConfig = Config{} func init() { if !sources.Register(SourceType, newConfig) { panic(fmt.Sprintf("source type %q already registered", SourceType)) } } func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (sources.SourceConfig, error) { actual := Config{Name: name} if err := decoder.DecodeContext(ctx, &actual); err != nil { return nil, err } return actual, nil } type Config struct { Name string `yaml:"name" validate:"required"` Type string `yaml:"type" validate:"required"` Project string `yaml:"project" validate:"required"` UseClientOAuth bool `yaml:"useClientOAuth"` ImpersonateServiceAccount string `yaml:"impersonateServiceAccount"` } func (r Config) SourceConfigType() string { return SourceType } func (r Config) Initialize(ctx context.Context, tracer trace.Tracer) (sources.Source, error) { if r.UseClientOAuth && r.ImpersonateServiceAccount != "" { return nil, fmt.Errorf("useClientOAuth cannot be used with impersonateServiceAccount") } var client *logadmin.Client var tokenSource oauth2.TokenSource var clientCreator LogAdminClientCreator var err error s := &Source{ Config: r, Client: client, TokenSource: tokenSource, ClientCreator: clientCreator, } if r.UseClientOAuth { // use client OAuth baseClientCreator, err := newLogAdminClientCreator(ctx, tracer, r.Project, r.Name) if err != nil { return nil, fmt.Errorf("error constructing client creator: %w", err) } setupClientCaching(s, baseClientCreator) } else { client, tokenSource, err = initLogAdminConnection(ctx, tracer, r.Name, r.Project, r.ImpersonateServiceAccount) if err != nil { return nil, fmt.Errorf("error creating client from ADC %w", err) } s.Client = client s.TokenSource = tokenSource } return s, nil } var _ sources.Source = &Source{} type LogAdminClientCreator func(tokenString string) (*logadmin.Client, error) type Source struct { Config Client *logadmin.Client TokenSource oauth2.TokenSource ClientCreator LogAdminClientCreator // Caches for OAuth clients logadminClientCache *sources.Cache } func (s *Source) SourceType() string { // Returns logadmin source type return SourceType } func (s *Source) ToConfig() sources.SourceConfig { return s.Config } func (s *Source) UseClientAuthorization() bool { return s.UseClientOAuth } func (s *Source) LogAdminClient() *logadmin.Client { return s.Client } func (s *Source) LogAdminTokenSource() oauth2.TokenSource { return s.TokenSource } func (s *Source) LogAdminClientCreator() LogAdminClientCreator { return s.ClientCreator } func (s *Source) GetProject() string { return s.Project } // getClient returns the appropriate client based on authentication mode func (s *Source) getClient(accessToken string) (*logadmin.Client, error) { if s.UseClientOAuth { if s.ClientCreator == nil { return nil, fmt.Errorf("client creator is not initialized") } return s.ClientCreator(accessToken) } if s.Client == nil { return nil, fmt.Errorf("source client is not initialized") } return s.Client, nil } // ListLogNames lists all log names in the project func (s *Source) ListLogNames(ctx context.Context, limit int, accessToken string) ([]string, error) { client, err := s.getClient(accessToken) if err != nil { return nil, err } it := client.Logs(ctx) var logNames []string for len(logNames) < limit { logName, err := it.Next() if err == iterator.Done { break } if err != nil { return nil, err } logNames = append(logNames, logName) } return logNames, nil } // ListResourceTypes lists all resource types in the project func (s *Source) ListResourceTypes(ctx context.Context, accessToken string) ([]string, error) { client, err := s.getClient(accessToken) if err != nil { return nil, err } it := client.ResourceDescriptors(ctx) var types []string for { desc, err := it.Next() if err == iterator.Done { break } if err != nil { return nil, fmt.Errorf("failed to list resource descriptors: %w", err) } types = append(types, desc.Type) } slices.Sort(types) return types, nil } // QueryLogsParams contains the parameters for querying logs type QueryLogsParams struct { Filter string NewestFirst bool StartTime string EndTime string Verbose bool Limit int } // QueryLogs queries log entries based on the provided parameters func (s *Source) QueryLogs(ctx context.Context, params QueryLogsParams, accessToken string) ([]map[string]any, error) { client, err := s.getClient(accessToken) if err != nil { return nil, err } // Build filter var filterParts []string if params.Filter != "" { filterParts = append(filterParts, params.Filter) } // Add timestamp filter startTime := params.StartTime if startTime != "" { filterParts = append(filterParts, fmt.Sprintf(`timestamp>="%s"`, startTime)) } if params.EndTime != "" { filterParts = append(filterParts, fmt.Sprintf(`timestamp<="%s"`, params.EndTime)) } combinedFilter := strings.Join(filterParts, " AND ") // Add opts opts := []logadmin.EntriesOption{ logadmin.Filter(combinedFilter), } // Set order if params.NewestFirst { opts = append(opts, logadmin.NewestFirst()) } // Set up iterator it := client.Entries(ctx, opts...) var results []map[string]any for len(results) < params.Limit { entry, err := it.Next() if err == iterator.Done { break } if err != nil { return nil, fmt.Errorf("failed to iterate entries: %w", err) } result := map[string]any{ "logName": entry.LogName, "timestamp": entry.Timestamp.Format(time.RFC3339), "severity": entry.Severity.String(), "resource": map[string]any{ "type": entry.Resource.Type, "labels": entry.Resource.Labels, }, } if entry.Payload != nil { result["payload"] = entry.Payload } if params.Verbose { result["insertId"] = entry.InsertID if len(entry.Labels) > 0 { result["labels"] = entry.Labels } if entry.HTTPRequest != nil { httpRequestMap := map[string]any{ "status": entry.HTTPRequest.Status, "latency": entry.HTTPRequest.Latency.String(), "remoteIp": entry.HTTPRequest.RemoteIP, } if req := entry.HTTPRequest.Request; req != nil { httpRequestMap["requestMethod"] = req.Method httpRequestMap["requestUrl"] = req.URL.String() httpRequestMap["userAgent"] = req.UserAgent() } result["httpRequest"] = httpRequestMap } if entry.Trace != "" { result["trace"] = entry.Trace } if entry.SpanID != "" { result["spanId"] = entry.SpanID } if entry.Operation != nil { result["operation"] = map[string]any{ "id": entry.Operation.Id, "producer": entry.Operation.Producer, "first": entry.Operation.First, "last": entry.Operation.Last, } } if entry.SourceLocation != nil { result["sourceLocation"] = map[string]any{ "file": entry.SourceLocation.File, "line": entry.SourceLocation.Line, "function": entry.SourceLocation.Function, } } } results = append(results, result) } return results, nil } func setupClientCaching(s *Source, baseCreator LogAdminClientCreator) { onEvict := func(key string, value interface{}) { if client, ok := value.(*logadmin.Client); ok && client != nil { client.Close() } } s.logadminClientCache = sources.NewCache(onEvict) s.ClientCreator = func(tokenString string) (*logadmin.Client, error) { if val, found := s.logadminClientCache.Get(tokenString); found { return val.(*logadmin.Client), nil } client, err := baseCreator(tokenString) if err != nil { return nil, err } s.logadminClientCache.Set(tokenString, client) return client, nil } } func initLogAdminConnection( ctx context.Context, tracer trace.Tracer, name string, project string, impersonateServiceAccount string, ) (*logadmin.Client, oauth2.TokenSource, error) { ctx, span := sources.InitConnectionSpan(ctx, tracer, SourceType, name) defer span.End() userAgent, err := util.UserAgentFromContext(ctx) if err != nil { return nil, nil, err } var tokenSource oauth2.TokenSource var opts []option.ClientOption if impersonateServiceAccount != "" { // Create impersonated credentials token source with cloud-platform scope // This broader scope is needed for tools like conversational analytics cloudPlatformTokenSource, err := impersonate.CredentialsTokenSource(ctx, impersonate.CredentialsConfig{ TargetPrincipal: impersonateServiceAccount, Scopes: []string{"https://www.googleapis.com/auth/cloud-platform"}, }) if err != nil { return nil, nil, fmt.Errorf("failed to create impersonated credentials for %q: %w", impersonateServiceAccount, err) } tokenSource = cloudPlatformTokenSource opts = []option.ClientOption{ option.WithUserAgent(userAgent), option.WithTokenSource(cloudPlatformTokenSource), } } else { // Use default credentials cred, err := google.FindDefaultCredentials(ctx, logging.AdminScope) if err != nil { return nil, nil, fmt.Errorf("failed to find default Google Cloud credentials with scope %q: %w", logging.AdminScope, err) } tokenSource = cred.TokenSource opts = []option.ClientOption{ option.WithUserAgent(userAgent), option.WithCredentials(cred), } } client, err := logadmin.NewClient(ctx, project, opts...) if err != nil { return nil, nil, fmt.Errorf("failed to create Cloud Logging Admin client for project %q: %w", project, err) } return client, tokenSource, nil } func initLogAdminConnectionWithOAuthToken( ctx context.Context, tracer trace.Tracer, project, name, userAgent, tokenString string, ) (*logadmin.Client, error) { ctx, span := sources.InitConnectionSpan(ctx, tracer, SourceType, name) defer span.End() token := &oauth2.Token{ AccessToken: string(tokenString), } ts := oauth2.StaticTokenSource(token) // Initialize the logadmin client with tokenSource client, err := logadmin.NewClient(ctx, project, option.WithUserAgent(userAgent), option.WithTokenSource(ts)) if err != nil { return nil, fmt.Errorf("failed to create logadmin client for project %q: %w", project, err) } return client, nil } func newLogAdminClientCreator( ctx context.Context, tracer trace.Tracer, project, name string, ) (LogAdminClientCreator, error) { userAgent, err := util.UserAgentFromContext(ctx) if err != nil { return nil, err } return func(tokenString string) (*logadmin.Client, error) { return initLogAdminConnectionWithOAuthToken(ctx, tracer, project, name, userAgent, tokenString) }, nil }