feat(sources/elasticsearch): add Elasticsearch source and tools (#1109)

Add support for Elasticsearch with the following tools:
* search
* esql
* get_mappings
* list_indices

This PR fixes #859

---------

Co-authored-by: duwenxin <duwenxin@google.com>
Co-authored-by: Wenxin Du <117315983+duwenxin99@users.noreply.github.com>
This commit is contained in:
Laurent Saint-Félix
2025-11-06 01:33:37 +01:00
committed by GitHub
parent d7f68ebb1a
commit 5367285e91
16 changed files with 1227 additions and 3 deletions

View File

@@ -705,7 +705,26 @@ steps:
- |
./yugabytedb.test -test.v
- id: "elasticsearch"
name: golang:1
waitFor: ["compile-test-binary"]
entrypoint: /bin/bash
env:
- "GOPATH=/gopath"
- "SERVICE_ACCOUNT_EMAIL=$SERVICE_ACCOUNT_EMAIL"
secretEnv: ["CLIENT_ID", "ELASTICSEARCH_USER", "ELASTICSEARCH_PASS", "ELASTICSEARCH_HOST"]
volumes:
- name: "go"
path: "/gopath"
args:
- -c
- |
.ci/test_with_coverage.sh \
"Elasticsearch" \
elasticsearch \
elasticsearch
- id: "cassandra"
name: golang:1
waitFor: ["compile-test-binary"]
@@ -764,7 +783,7 @@ steps:
.ci/test_with_coverage.sh \
"Serverless Spark" \
serverlessspark
availableSecrets:
secretManager:
- versionName: projects/$PROJECT_ID/secrets/cloud_sql_pg_user/versions/latest
@@ -855,6 +874,12 @@ availableSecrets:
env: YUGABYTEDB_USER
- versionName: projects/$PROJECT_ID/secrets/yugabytedb_pass/versions/latest
env: YUGABYTEDB_PASS
- versionName: projects/$PROJECT_ID/secrets/elastic_search_host/versions/latest
env: ELASTICSEARCH_HOST
- versionName: projects/$PROJECT_ID/secrets/elastic_search_user/versions/latest
env: ELASTICSEARCH_USER
- versionName: projects/$PROJECT_ID/secrets/elastic_search_pass/versions/latest
env: ELASTICSEARCH_PASS
- versionName: projects/$PROJECT_ID/secrets/cassandra_user/versions/latest
env: CASSANDRA_USER
- versionName: projects/$PROJECT_ID/secrets/cassandra_pass/versions/latest

View File

@@ -85,6 +85,7 @@ import (
_ "github.com/googleapis/genai-toolbox/internal/tools/dataplex/dataplexsearchaspecttypes"
_ "github.com/googleapis/genai-toolbox/internal/tools/dataplex/dataplexsearchentries"
_ "github.com/googleapis/genai-toolbox/internal/tools/dgraph"
_ "github.com/googleapis/genai-toolbox/internal/tools/elasticsearch/elasticsearchesql"
_ "github.com/googleapis/genai-toolbox/internal/tools/firebird/firebirdexecutesql"
_ "github.com/googleapis/genai-toolbox/internal/tools/firebird/firebirdsql"
_ "github.com/googleapis/genai-toolbox/internal/tools/firestore/firestoreadddocuments"
@@ -195,6 +196,7 @@ import (
_ "github.com/googleapis/genai-toolbox/internal/sources/couchbase"
_ "github.com/googleapis/genai-toolbox/internal/sources/dataplex"
_ "github.com/googleapis/genai-toolbox/internal/sources/dgraph"
_ "github.com/googleapis/genai-toolbox/internal/sources/elasticsearch"
_ "github.com/googleapis/genai-toolbox/internal/sources/firebird"
_ "github.com/googleapis/genai-toolbox/internal/sources/firestore"
_ "github.com/googleapis/genai-toolbox/internal/sources/http"

View File

@@ -0,0 +1,68 @@
---
title: "Elasticsearch"
type: docs
weight: 1
description: >
Elasticsearch is a distributed, free and open search and analytics engine
for all types of data, including textual, numerical, geospatial, structured,
and unstructured.
---
# Elasticsearch Source
[Elasticsearch][elasticsearch-docs] is a distributed, free and open search and analytics engine
for all types of data, including textual, numerical, geospatial, structured,
and unstructured.
If you are new to Elasticsearch, you can learn how to
[set up a cluster and start indexing data][elasticsearch-quickstart].
Elasticsearch uses [ES|QL][elasticsearch-esql] for querying data. ES|QL
is a powerful query language that allows you to search and aggregate data in
Elasticsearch.
See the [official documentation](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html) for more information.
[elasticsearch-docs]: https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html
[elasticsearch-quickstart]: https://www.elastic.co/guide/en/elasticsearch/reference/current/getting-started.html
[elasticsearch-esql]: https://www.elastic.co/guide/en/elasticsearch/reference/current/esql.html
## Available Tools
- [`elasticsearch-esql`](../tools/elasticsearch/elasticsearch-esql.md)
Execute ES|QL queries.
## Requirements
### API Key
Toolbox uses an [API key][api-key] to authorize and authenticate when
interacting with [Elasticsearch][elasticsearch-docs].
In addition to [setting the API key for your server][set-api-key], you need to
ensure the API key has the correct permissions for the queries you intend to
run. See [API key management][api-key-management] for more information on
applying permissions to an API key.
[api-key]: https://www.elastic.co/guide/en/elasticsearch/reference/current/security-api-create-api-key.html
[set-api-key]: https://www.elastic.co/guide/en/elasticsearch/reference/current/security-api-create-api-key.html
[api-key-management]: https://www.elastic.co/guide/en/elasticsearch/reference/current/security-api-get-api-key.html
## Example
```yaml
sources:
my-elasticsearch-source:
kind: "elasticsearch"
addresses:
- "http://localhost:9200"
apikey: "my-api-key"
```
## Reference
| **field** | **type** | **required** | **description** |
|-----------|:--------:|:------------:|-------------------------------------------------------------------------------|
| kind | string | true | Must be "elasticsearch". |
| addresses | []string | true | List of Elasticsearch hosts to connect to. |
| apikey | string | true | The API key to use for authentication. |

View File

@@ -0,0 +1,7 @@
---
title: "Elasticsearch"
type: docs
weight: 1
description: >
Tools that work with Elasticsearch Sources.
---

View File

@@ -0,0 +1,45 @@
---
title: "elasticsearch-esql"
type: docs
weight: 2
description: >
Execute ES|QL queries.
---
# elasticsearch-esql
Execute ES|QL queries.
This tool allows you to execute ES|QL queries against your Elasticsearch
cluster. You can use this to perform complex searches and aggregations.
See the [official documentation](https://www.elastic.co/docs/reference/query-languages/esql/esql-getting-started) for more information.
## Example
```yaml
tools:
query_my_index:
kind: elasticsearch-esql
source: elasticsearch-source
description: Use this tool to execute ES|QL queries.
query: |
FROM my-index
| KEEP *
| LIMIT ?limit
parameters:
- name: limit
type: integer
description: Limit the number of results.
required: true
```
## Parameters
| **name** | **type** | **required** | **description** |
|------------|:--------:|:------------:|-----------------------------------------------------------------------------------------------------------------------------------------------------|
| query | string | false | The ES\|QL query to run. Can also be passed by parameters. |
| format | string | false | The format of the query. Default is json. Valid values are csv, json, tsv, txt, yaml, cbor, smile, or arrow. |
| timeout | integer | false | The timeout for the query in seconds. Default is 60 (1 minute). |
| parameters | [parameters](../#specifying-parameters) | false | List of [parameters](../#specifying-parameters) that will be used with the ES\|QL query.<br/>Only supports “string”, “integer”, “float”, “boolean”. |

2
go.mod
View File

@@ -21,6 +21,8 @@ require (
github.com/cenkalti/backoff/v5 v5.0.3
github.com/couchbase/gocb/v2 v2.11.1
github.com/couchbase/tools-common/http v1.0.9
github.com/elastic/elastic-transport-go/v8 v8.7.0
github.com/elastic/go-elasticsearch/v8 v8.19.0
github.com/fsnotify/fsnotify v1.9.0
github.com/go-chi/chi/v5 v5.2.3
github.com/go-chi/httplog/v2 v2.1.1

4
go.sum
View File

@@ -820,6 +820,10 @@ github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/elastic/elastic-transport-go/v8 v8.7.0 h1:OgTneVuXP2uip4BA658Xi6Hfw+PeIOod2rY3GVMGoVE=
github.com/elastic/elastic-transport-go/v8 v8.7.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk=
github.com/elastic/go-elasticsearch/v8 v8.19.0 h1:VmfBLNRORY7RZL+9hTxBD97ehl9H8Nxf2QigDh6HuMU=
github.com/elastic/go-elasticsearch/v8 v8.19.0/go.mod h1:F3j9e+BubmKvzvLjNui/1++nJuJxbkhHefbaT0kFKGY=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=

View File

@@ -36,6 +36,7 @@ var expectedToolSources = []string{
"cloud-sql-postgres-observability",
"cloud-sql-postgres",
"dataplex",
"elasticsearch",
"firestore",
"looker-conversational-analytics",
"looker",

View File

@@ -0,0 +1,33 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
sources:
elasticsearch-source:
kind: elasticsearch
addresses:
- ${ELASTICSEARCH_HOST}
apikey: ${ELASTICSEARCH_APIKEY}
tools:
execute_esql_query:
kind: elasticsearch-esql
source: elasticsearch-source
description: Use this tool to execute ES|QL queries.
parameters:
- name: query
type: string
description: The ES|QL query to execute.
toolsets:
elasticsearch-tools:
- execute_esql_query

View File

@@ -0,0 +1,149 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package elasticsearch
import (
"context"
"fmt"
"net/http"
"github.com/elastic/elastic-transport-go/v8/elastictransport"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/goccy/go-yaml"
"github.com/googleapis/genai-toolbox/internal/sources"
"github.com/googleapis/genai-toolbox/internal/util"
"go.opentelemetry.io/otel/trace"
)
const SourceKind string = "elasticsearch"
// validate interface
var _ sources.SourceConfig = Config{}
func init() {
if !sources.Register(SourceKind, newConfig) {
panic(fmt.Sprintf("source kind %q already registered", SourceKind))
}
}
func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (sources.SourceConfig, error) {
actual := Config{Name: name}
if err := decoder.DecodeContext(ctx, &actual); err != nil {
return nil, err
}
return actual, nil
}
type Config struct {
Name string `yaml:"name" validate:"required"`
Kind string `yaml:"kind" validate:"required"`
Addresses []string `yaml:"addresses" validate:"required"`
Username string `yaml:"username"`
Password string `yaml:"password"`
APIKey string `yaml:"apikey"`
}
func (c Config) SourceConfigKind() string {
return SourceKind
}
type EsClient interface {
esapi.Transport
elastictransport.Instrumented
}
type Source struct {
Name string
Kind string
Client EsClient
}
var _ sources.Source = &Source{}
// tracerProviderAdapter adapts a Tracer to implement the TracerProvider interface
type tracerProviderAdapter struct {
trace.TracerProvider
tracer trace.Tracer
}
// Tracer implements the TracerProvider interface
func (t *tracerProviderAdapter) Tracer(name string, options ...trace.TracerOption) trace.Tracer {
return t.tracer
}
// Initialize creates a new Elasticsearch Source instance.
func (c Config) Initialize(ctx context.Context, tracer trace.Tracer) (sources.Source, error) {
tracerProvider := &tracerProviderAdapter{tracer: tracer}
ua, err := util.UserAgentFromContext(ctx)
if err != nil {
return nil, fmt.Errorf("error getting user agent from context: %w", err)
}
// Create a new Elasticsearch client with the provided configuration
cfg := elasticsearch.Config{
Addresses: c.Addresses,
Instrumentation: elasticsearch.NewOpenTelemetryInstrumentation(tracerProvider, false),
Header: http.Header{"User-Agent": []string{ua + " go-elasticsearch/" + elasticsearch.Version}},
}
// Client need either username and password or an API key
if c.Username != "" && c.Password != "" {
cfg.Username = c.Username
cfg.Password = c.Password
} else if c.APIKey != "" {
// API key will be set below
cfg.APIKey = c.APIKey
} else {
// If neither username/password nor API key is provided, we throw an error
return nil, fmt.Errorf("elasticsearch source %q requires either username/password or an API key", c.Name)
}
client, err := elasticsearch.NewBaseClient(cfg)
if err != nil {
return nil, err
}
// Test connection
res, err := esapi.InfoRequest{
Instrument: client.InstrumentationEnabled(),
}.Do(ctx, client)
if err != nil {
return nil, err
}
defer res.Body.Close()
if res.IsError() {
return nil, fmt.Errorf("elasticsearch connection failed: status %d", res.StatusCode)
}
s := &Source{
Name: c.Name,
Kind: SourceKind,
Client: client,
}
return s, nil
}
// SourceKind returns the kind string for this source.
func (s *Source) SourceKind() string {
return SourceKind
}
func (s *Source) ElasticsearchClient() EsClient {
return s.Client
}

View File

@@ -0,0 +1,66 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package elasticsearch_test
import (
"testing"
yaml "github.com/goccy/go-yaml"
"github.com/google/go-cmp/cmp"
"github.com/googleapis/genai-toolbox/internal/server"
"github.com/googleapis/genai-toolbox/internal/sources/elasticsearch"
)
func TestParseFromYamlElasticsearch(t *testing.T) {
tcs := []struct {
desc string
in string
want server.SourceConfigs
}{
{
desc: "basic example",
in: `
sources:
my-es-instance:
kind: elasticsearch
addresses:
- http://localhost:9200
apikey: somekey
`,
want: server.SourceConfigs{
"my-es-instance": elasticsearch.Config{
Name: "my-es-instance",
Kind: elasticsearch.SourceKind,
Addresses: []string{"http://localhost:9200"},
APIKey: "somekey",
},
},
},
}
for _, tc := range tcs {
t.Run(tc.desc, func(t *testing.T) {
got := struct {
Sources server.SourceConfigs `yaml:"sources"`
}{}
err := yaml.Unmarshal([]byte(tc.in), &got)
if err != nil {
t.Fatalf("failed to parse yaml: %v", err)
}
if diff := cmp.Diff(tc.want, got.Sources); diff != "" {
t.Errorf("unexpected config diff (-want +got):\n%s", diff)
}
})
}
}

View File

@@ -0,0 +1,243 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package elasticsearchesql
import (
"bytes"
"context"
"encoding/json"
"fmt"
"time"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/googleapis/genai-toolbox/internal/util"
"github.com/goccy/go-yaml"
"github.com/googleapis/genai-toolbox/internal/sources"
es "github.com/googleapis/genai-toolbox/internal/sources/elasticsearch"
"github.com/googleapis/genai-toolbox/internal/tools"
)
const kind string = "elasticsearch-esql"
func init() {
if !tools.Register(kind, newConfig) {
panic(fmt.Sprintf("tool kind %q already registered", kind))
}
}
type compatibleSource interface {
ElasticsearchClient() es.EsClient
}
var _ compatibleSource = &es.Source{}
var compatibleSources = [...]string{es.SourceKind}
type Config struct {
Name string `yaml:"name" validate:"required"`
Kind string `yaml:"kind" validate:"required"`
Source string `yaml:"source" validate:"required"`
Description string `yaml:"description" validate:"required"`
AuthRequired []string `yaml:"authRequired" validate:"required"`
Query string `yaml:"query"`
Format string `yaml:"format"`
Timeout int `yaml:"timeout"`
Parameters tools.Parameters `yaml:"parameters"`
}
var _ tools.ToolConfig = Config{}
func (c Config) ToolConfigKind() string {
return kind
}
func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.ToolConfig, error) {
actual := Config{Name: name}
if err := decoder.DecodeContext(ctx, &actual); err != nil {
return nil, err
}
return actual, nil
}
type Tool struct {
Name string `yaml:"name"`
Kind string `yaml:"kind"`
AuthRequired []string `yaml:"authRequired"`
Parameters tools.Parameters `yaml:"parameters"`
Query string `yaml:"query"`
Format string `yaml:"format" default:"json"`
Timeout int `yaml:"timeout"`
manifest tools.Manifest
mcpManifest tools.McpManifest
EsClient es.EsClient
}
var _ tools.Tool = Tool{}
func (c Config) Initialize(srcs map[string]sources.Source) (tools.Tool, error) {
// verify source exists
src, ok := srcs[c.Source]
if !ok {
return nil, fmt.Errorf("source %q not found", c.Source)
}
// verify the source is compatible
s, ok := src.(compatibleSource)
if !ok {
return nil, fmt.Errorf("invalid source for %q tool: source kind must be one of %q", kind, compatibleSources)
}
mcpManifest := tools.GetMcpManifest(c.Name, c.Description, c.AuthRequired, c.Parameters)
return Tool{
Name: c.Name,
Kind: kind,
Parameters: c.Parameters,
Query: c.Query,
Format: c.Format,
Timeout: c.Timeout,
AuthRequired: c.AuthRequired,
EsClient: s.ElasticsearchClient(),
manifest: tools.Manifest{Description: c.Description, Parameters: c.Parameters.Manifest(), AuthRequired: c.AuthRequired},
mcpManifest: mcpManifest,
}, nil
}
type esqlColumn struct {
Name string `json:"name"`
Type string `json:"type"`
}
type esqlResult struct {
Columns []esqlColumn `json:"columns"`
Values [][]any `json:"values"`
}
func (t Tool) Invoke(ctx context.Context, params tools.ParamValues, accessToken tools.AccessToken) (any, error) {
var cancel context.CancelFunc
if t.Timeout > 0 {
ctx, cancel = context.WithTimeout(ctx, time.Duration(t.Timeout)*time.Second)
defer cancel()
} else {
ctx, cancel = context.WithTimeout(ctx, time.Minute)
defer cancel()
}
bodyStruct := struct {
Query string `json:"query"`
Params []map[string]any `json:"params,omitempty"`
}{
Query: t.Query,
Params: make([]map[string]any, 0, len(params)),
}
paramMap := params.AsMap()
// If a query is provided in the params and not already set in the tool, use it.
if query, ok := paramMap["query"]; ok {
if str, ok := query.(string); ok && bodyStruct.Query == "" {
bodyStruct.Query = str
}
// Drop the query param if not a string or if the tool already has a query.
delete(paramMap, "query")
}
for _, param := range t.Parameters {
if param.GetType() == "array" {
return nil, fmt.Errorf("array parameters are not supported yet")
}
bodyStruct.Params = append(bodyStruct.Params, map[string]any{param.GetName(): paramMap[param.GetName()]})
}
body, err := json.Marshal(bodyStruct)
if err != nil {
return nil, fmt.Errorf("failed to marshal query body: %w", err)
}
res, err := esapi.EsqlQueryRequest{
Body: bytes.NewReader(body),
Format: t.Format,
FilterPath: []string{"columns", "values"},
Instrument: t.EsClient.InstrumentationEnabled(),
}.Do(ctx, t.EsClient)
if err != nil {
return nil, err
}
defer res.Body.Close()
if res.IsError() {
// Try to extract error message from response
var esErr json.RawMessage
err = util.DecodeJSON(res.Body, &esErr)
if err != nil {
return nil, fmt.Errorf("elasticsearch error: status %s", res.Status())
}
return esErr, nil
}
var result esqlResult
err = util.DecodeJSON(res.Body, &result)
if err != nil {
return nil, fmt.Errorf("failed to decode response body: %w", err)
}
output := t.esqlToMap(result)
return output, nil
}
// esqlToMap converts the esqlResult to a slice of maps.
func (t Tool) esqlToMap(result esqlResult) []map[string]any {
output := make([]map[string]any, 0, len(result.Values))
for _, value := range result.Values {
row := make(map[string]any)
if value == nil {
output = append(output, row)
continue
}
for i, col := range result.Columns {
if i < len(value) {
row[col.Name] = value[i]
} else {
row[col.Name] = nil
}
}
output = append(output, row)
}
return output
}
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (tools.ParamValues, error) {
return tools.ParseParams(t.Parameters, data, claims)
}
func (t Tool) Manifest() tools.Manifest {
return t.manifest
}
func (t Tool) McpManifest() tools.McpManifest {
return t.mcpManifest
}
func (t Tool) Authorized(verifiedAuthServices []string) bool {
return tools.IsAuthorized(t.AuthRequired, verifiedAuthServices)
}
func (t Tool) RequiresClientAuthorization() bool {
return false
}

View File

@@ -0,0 +1,261 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package elasticsearchesql
import (
"reflect"
"testing"
"github.com/goccy/go-yaml"
"github.com/google/go-cmp/cmp"
"github.com/googleapis/genai-toolbox/internal/server"
"github.com/googleapis/genai-toolbox/internal/testutils"
"github.com/googleapis/genai-toolbox/internal/tools"
)
func TestParseFromYamlElasticsearchEsql(t *testing.T) {
ctx, err := testutils.ContextWithNewLogger()
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
tcs := []struct {
desc string
in string
want server.ToolConfigs
}{
{
desc: "basic search example",
in: `
tools:
example_tool:
kind: elasticsearch-esql
source: my-elasticsearch-instance
description: Elasticsearch ES|QL tool
query: |
FROM my-index
| KEEP first_name, last_name
`,
want: server.ToolConfigs{
"example_tool": Config{
Name: "example_tool",
Kind: "elasticsearch-esql",
Source: "my-elasticsearch-instance",
Description: "Elasticsearch ES|QL tool",
AuthRequired: []string{},
Query: "FROM my-index\n| KEEP first_name, last_name\n",
},
},
},
{
desc: "search with customizable limit parameter",
in: `
tools:
example_tool:
kind: elasticsearch-esql
source: my-elasticsearch-instance
description: Elasticsearch ES|QL tool with customizable limit
parameters:
- name: limit
type: integer
description: Limit the number of results
query: |
FROM my-index
| LIMIT ?limit
`,
want: server.ToolConfigs{
"example_tool": Config{
Name: "example_tool",
Kind: "elasticsearch-esql",
Source: "my-elasticsearch-instance",
Description: "Elasticsearch ES|QL tool with customizable limit",
AuthRequired: []string{},
Parameters: tools.Parameters{
tools.NewIntParameter("limit", "Limit the number of results"),
},
Query: "FROM my-index\n| LIMIT ?limit\n",
},
},
},
}
for _, tc := range tcs {
t.Run(tc.desc, func(t *testing.T) {
got := struct {
Tools server.ToolConfigs `yaml:"tools"`
}{}
// Parse contents
err := yaml.UnmarshalContext(ctx, testutils.FormatYaml(tc.in), &got)
if err != nil {
t.Fatalf("unable to unmarshal: %s", err)
}
if diff := cmp.Diff(tc.want, got.Tools); diff != "" {
t.Fatalf("incorrect parse: diff %v", diff)
}
})
}
}
func TestTool_esqlToMap(t1 *testing.T) {
tests := []struct {
name string
result esqlResult
want []map[string]any
}{
{
name: "simple case with two rows",
result: esqlResult{
Columns: []esqlColumn{
{Name: "first_name", Type: "text"},
{Name: "last_name", Type: "text"},
},
Values: [][]any{
{"John", "Doe"},
{"Jane", "Smith"},
},
},
want: []map[string]any{
{"first_name": "John", "last_name": "Doe"},
{"first_name": "Jane", "last_name": "Smith"},
},
},
{
name: "different data types",
result: esqlResult{
Columns: []esqlColumn{
{Name: "id", Type: "integer"},
{Name: "active", Type: "boolean"},
{Name: "score", Type: "float"},
},
Values: [][]any{
{1, true, 95.5},
{2, false, 88.0},
},
},
want: []map[string]any{
{"id": 1, "active": true, "score": 95.5},
{"id": 2, "active": false, "score": 88.0},
},
},
{
name: "no rows",
result: esqlResult{
Columns: []esqlColumn{
{Name: "id", Type: "integer"},
{Name: "name", Type: "text"},
},
Values: [][]any{},
},
want: []map[string]any{},
},
{
name: "null values",
result: esqlResult{
Columns: []esqlColumn{
{Name: "id", Type: "integer"},
{Name: "name", Type: "text"},
},
Values: [][]any{
{1, nil},
{2, "Alice"},
},
},
want: []map[string]any{
{"id": 1, "name": nil},
{"id": 2, "name": "Alice"},
},
},
{
name: "missing values in a row",
result: esqlResult{
Columns: []esqlColumn{
{Name: "id", Type: "integer"},
{Name: "name", Type: "text"},
{Name: "age", Type: "integer"},
},
Values: [][]any{
{1, "Bob"},
{2, "Charlie", 30},
},
},
want: []map[string]any{
{"id": 1, "name": "Bob", "age": nil},
{"id": 2, "name": "Charlie", "age": 30},
},
},
{
name: "all null row",
result: esqlResult{
Columns: []esqlColumn{
{Name: "id", Type: "integer"},
{Name: "name", Type: "text"},
},
Values: [][]any{
nil,
},
},
want: []map[string]any{
{},
},
},
{
name: "empty columns",
result: esqlResult{
Columns: []esqlColumn{},
Values: [][]any{
{},
{},
},
},
want: []map[string]any{
{},
{},
},
},
{
name: "more values than columns",
result: esqlResult{
Columns: []esqlColumn{
{Name: "id", Type: "integer"},
},
Values: [][]any{
{1, "extra"},
},
},
want: []map[string]any{
{"id": 1},
},
},
{
name: "no columns but with values",
result: esqlResult{
Columns: []esqlColumn{},
Values: [][]any{
{1, "data"},
},
},
want: []map[string]any{
{},
},
},
}
for _, tt := range tests {
t1.Run(tt.name, func(t1 *testing.T) {
t := Tool{}
if got := t.esqlToMap(tt.result); !reflect.DeepEqual(got, tt.want) {
t1.Errorf("esqlToMap() = %v, want %v", got, tt.want)
}
})
}
}

View File

@@ -0,0 +1,309 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package elasticsearch
import (
"context"
"fmt"
"os"
"regexp"
"strings"
"testing"
"time"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/googleapis/genai-toolbox/internal/testutils"
"github.com/googleapis/genai-toolbox/tests"
)
var (
ElasticsearchSourceKind = "elasticsearch"
ElasticsearchToolKind = "elasticsearch-esql"
EsAddress = os.Getenv("ELASTICSEARCH_HOST")
EsUser = os.Getenv("ELASTICSEARCH_USER")
EsPass = os.Getenv("ELASTICSEARCH_PASS")
)
func getElasticsearchVars(t *testing.T) map[string]any {
if EsAddress == "" {
t.Fatal("'ELASTICSEARCH_HOST' not set")
}
return map[string]any{
"kind": ElasticsearchSourceKind,
"addresses": []string{EsAddress},
"username": EsUser,
"password": EsPass,
}
}
type ElasticsearchWants struct {
Select1 string
MyToolId3NameAlice string
MyToolById4 string
Null string
McpMyFailTool string
McpMyToolId3NameAlice string
McpSelect1 string
}
func TestElasticsearchToolEndpoints(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
var args []string
sourceConfig := getElasticsearchVars(t)
index := "test-index"
paramToolStatement, idParamToolStatement, nameParamToolStatement, arrayParamToolStatement, authToolStatement := getElasticsearchQueries(index)
toolsConfig := getElasticsearchToolsConfig(sourceConfig, ElasticsearchToolKind, paramToolStatement, idParamToolStatement, nameParamToolStatement, arrayParamToolStatement, authToolStatement)
cmd, cleanup, err := tests.StartCmd(ctx, toolsConfig, args...)
if err != nil {
t.Fatalf("failed to start cmd: %v", err)
}
defer cleanup()
waitCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
out, err := testutils.WaitForString(waitCtx, regexp.MustCompile(`Server ready to serve`), cmd.Out)
if err != nil {
t.Logf("toolbox command logs: \n%s", out)
t.Fatalf("toolbox didn't start successfully: %s", err)
}
esClient, err := elasticsearch.NewBaseClient(elasticsearch.Config{
Addresses: []string{EsAddress},
Username: EsUser,
Password: EsPass,
})
if err != nil {
t.Fatalf("error creating the Elasticsearch client: %s", err)
}
// Delete index if already exists
defer func() {
_, err = esapi.IndicesDeleteRequest{
Index: []string{index},
}.Do(ctx, esClient)
if err != nil {
t.Fatalf("error deleting index: %s", err)
}
}()
alice := fmt.Sprintf(`{
"id": 1,
"name": "Alice",
"email": "%s"
}`, tests.ServiceAccountEmail)
// Index sample documents
sampleDocs := []string{
alice,
`{"id": 2, "name": "Jane", "email": "janedoe@gmail.com"}`,
`{"id": 3, "name": "Sid"}`,
`{"id": 4, "name": "null"}`,
}
for _, doc := range sampleDocs {
res, err := esapi.IndexRequest{
Index: "test-index",
Body: strings.NewReader(doc),
Refresh: "true",
}.Do(ctx, esClient)
if res.IsError() {
t.Fatalf("error indexing document: %s", res.String())
}
if err != nil {
t.Fatalf("error indexing document: %s", err)
}
}
// Get configs for tests
wants := getElasticsearchWants()
tests.RunToolGetTest(t)
tests.RunToolInvokeTest(t, wants.Select1,
tests.DisableArrayTest(),
tests.WithMyToolId3NameAliceWant(wants.MyToolId3NameAlice),
tests.WithMyToolById4Want(wants.MyToolById4),
tests.WithNullWant(wants.Null),
)
tests.RunMCPToolCallMethod(t, wants.McpMyFailTool, wants.McpSelect1, tests.WithMcpMyToolId3NameAliceWant(wants.McpMyToolId3NameAlice))
}
func getElasticsearchQueries(index string) (string, string, string, string, string) {
paramToolStatement := fmt.Sprintf(`FROM %s | WHERE id == ?id OR name == ?name | SORT id ASC`, index)
idParamToolStatement := fmt.Sprintf(`FROM %s | WHERE id == ?id`, index)
nameParamToolStatement := fmt.Sprintf(`FROM %s | WHERE name == ?name`, index)
arrayParamToolStatement := fmt.Sprintf(`FROM %s | WHERE first_name == ?first_name_array`, index) // Not supported yet.
authToolStatement := fmt.Sprintf(`FROM %s | WHERE email == ?email | KEEP name`, index)
return paramToolStatement, idParamToolStatement, nameParamToolStatement, arrayParamToolStatement, authToolStatement
}
func getElasticsearchWants() ElasticsearchWants {
select1Want := fmt.Sprintf(`[{"email":"%[1]s","email.keyword":"%[1]s","id":1,"name":"Alice","name.keyword":"Alice"},{"email":"janedoe@gmail.com","email.keyword":"janedoe@gmail.com","id":2,"name":"Jane","name.keyword":"Jane"},{"email":null,"email.keyword":null,"id":3,"name":"Sid","name.keyword":"Sid"},{"email":null,"email.keyword":null,"id":4,"name":"null","name.keyword":"null"}]`, tests.ServiceAccountEmail)
myToolId3NameAliceWant := fmt.Sprintf(`[{"email":"%[1]s","email.keyword":"%[1]s","id":1,"name":"Alice","name.keyword":"Alice"},{"email":null,"email.keyword":null,"id":3,"name":"Sid","name.keyword":"Sid"}]`, tests.ServiceAccountEmail)
myToolById4Want := `[{"email":null,"email.keyword":null,"id":4,"name":"null","name.keyword":"null"}]`
nullWant := `{"error":{"root_cause":[{"type":"verification_exception","reason":"Found 1 problem\nline 1:25: first argument of [name == ?name] is [text] so second argument must also be [text] but was [null]"}],"type":"verification_exception","reason":"Found 1 problem\nline 1:25: first argument of [name == ?name] is [text] so second argument must also be [text] but was [null]"},"status":400}`
mcpMyFailToolWant := `{"content":[{"type":"text","text":"{\"error\":{\"root_cause\":[{\"type\":\"parsing_exception\",\"reason\":\"line 1:1: mismatched input 'SELEC' expecting {, 'row', 'from', 'show'}\"}],\"type\":\"parsing_exception\",\"reason\":\"line 1:1: mismatched input 'SELEC' expecting {, 'row', 'from', 'show'}\",\"caused_by\":{\"type\":\"input_mismatch_exception\",\"reason\":null}},\"status\":400}"}]}`
mcpMyToolId3NameAliceWant := fmt.Sprintf(`{"jsonrpc":"2.0","id":"my-tool","result":{"content":[{"type":"text","text":"[{\"email\":\"%[1]s\",\"email.keyword\":\"%[1]s\",\"id\":1,\"name\":\"Alice\",\"name.keyword\":\"Alice\"},{\"email\":null,\"email.keyword\":null,\"id\":3,\"name\":\"Sid\",\"name.keyword\":\"Sid\"}]"}]}}`, tests.ServiceAccountEmail)
mcpSelect1Want := fmt.Sprintf(`{"jsonrpc":"2.0","id":"invoke my-auth-required-tool","result":{"content":[{"type":"text","text":"[{\"email\":\"%[1]s\",\"email.keyword\":\"%[1]s\",\"id\":1,\"name\":\"Alice\",\"name.keyword\":\"Alice\"},{\"email\":\"janedoe@gmail.com\",\"email.keyword\":\"janedoe@gmail.com\",\"id\":2,\"name\":\"Jane\",\"name.keyword\":\"Jane\"},{\"email\":null,\"email.keyword\":null,\"id\":3,\"name\":\"Sid\",\"name.keyword\":\"Sid\"},{\"email\":null,\"email.keyword\":null,\"id\":4,\"name\":\"null\",\"name.keyword\":\"null\"}]"}]}}`, tests.ServiceAccountEmail)
return ElasticsearchWants{
Select1: select1Want,
MyToolId3NameAlice: myToolId3NameAliceWant,
MyToolById4: myToolById4Want,
Null: nullWant,
McpMyFailTool: mcpMyFailToolWant,
McpMyToolId3NameAlice: mcpMyToolId3NameAliceWant,
McpSelect1: mcpSelect1Want,
}
}
func getElasticsearchToolsConfig(sourceConfig map[string]any, toolKind, paramToolStatement, idParamToolStmt, nameParamToolStmt, arrayToolStatement, authToolStatement string) map[string]any {
toolsFile := map[string]any{
"sources": map[string]any{
"my-instance": sourceConfig,
},
"authServices": map[string]any{
"my-google-auth": map[string]any{
"kind": "google",
"clientId": tests.ClientId,
},
},
"tools": map[string]any{
"my-simple-tool": map[string]any{
"kind": toolKind,
"source": "my-instance",
"description": "Simple tool to test end to end functionality.",
"query": "FROM test-index | SORT id ASC",
},
"my-tool": map[string]any{
"kind": toolKind,
"source": "my-instance",
"description": "Tool to test invocation with params.",
"query": paramToolStatement,
"parameters": []any{
map[string]any{
"name": "id",
"type": "integer",
"description": "user ID",
},
map[string]any{
"name": "name",
"type": "string",
"description": "user name",
},
},
},
"my-tool-by-id": map[string]any{
"kind": toolKind,
"source": "my-instance",
"description": "Tool to test invocation with params.",
"query": idParamToolStmt,
"parameters": []any{
map[string]any{
"name": "id",
"type": "integer",
"description": "user ID",
},
},
},
"my-tool-by-name": map[string]any{
"kind": toolKind,
"source": "my-instance",
"description": "Tool to test invocation with params.",
"query": nameParamToolStmt,
"parameters": []any{
map[string]any{
"name": "name",
"type": "string",
"description": "user name",
"required": false,
},
},
},
"my-array-tool": map[string]any{
"kind": toolKind,
"source": "my-instance",
"description": "Tool to test invocation with array params.",
"query": arrayToolStatement,
"parameters": []any{
map[string]any{
"name": "idArray",
"type": "array",
"description": "ID array",
"items": map[string]any{
"name": "id",
"type": "integer",
"description": "ID",
},
},
map[string]any{
"name": "nameArray",
"type": "array",
"description": "user name array",
"items": map[string]any{
"name": "name",
"type": "string",
"description": "user name",
},
},
},
},
"my-auth-tool": map[string]any{
"kind": toolKind,
"source": "my-instance",
"description": "Tool to test authenticated parameters.",
// statement to auto-fill authenticated parameter
"query": authToolStatement,
"parameters": []map[string]any{
{
"name": "email",
"type": "string",
"description": "user email",
"authServices": []map[string]string{
{
"name": "my-google-auth",
"field": "email",
},
},
},
},
},
"my-auth-required-tool": map[string]any{
"kind": toolKind,
"source": "my-instance",
"description": "Tool to test auth required invocation.",
"query": "FROM test-index | SORT id ASC",
"authRequired": []string{
"my-google-auth",
},
},
"my-fail-tool": map[string]any{
"kind": toolKind,
"source": "my-instance",
"description": "Tool to test statement with incorrect syntax.",
"query": "SELEC 1;",
},
},
}
return toolsFile
}

View File

@@ -18,6 +18,7 @@ package tests
// InvokeTestConfig represents the various configuration options for RunToolInvokeTest()
type InvokeTestConfig struct {
myAuthToolWant string
myToolId3NameAliceWant string
myToolById4Want string
nullWant string
@@ -31,6 +32,14 @@ type InvokeTestConfig struct {
type InvokeTestOption func(*InvokeTestConfig)
// WithMyAuthToolWant represents the response value for my-auth-tool.
// e.g. tests.RunToolInvokeTest(t, select1Want, tests.WithMyAuthToolWant("custom"))
func WithMyAuthToolWant(s string) InvokeTestOption {
return func(c *InvokeTestConfig) {
c.myAuthToolWant = s
}
}
// WithMyToolId3NameAliceWant represents the response value for my-tool with id=3 and name=Alice.
// e.g. tests.RunToolInvokeTest(t, select1Want, tests.WithMyToolId3NameAliceWant("custom"))
func WithMyToolId3NameAliceWant(s string) InvokeTestOption {

View File

@@ -364,7 +364,7 @@ func RunToolInvokeTest(t *testing.T, select1Want string, options ...InvokeTestOp
enabled: configs.supportSelect1Auth,
requestHeader: map[string]string{"my-google-auth_token": idToken},
requestBody: bytes.NewBuffer([]byte(`{}`)),
wantBody: "[{\"name\":\"Alice\"}]",
wantBody: configs.myAuthToolWant,
wantStatusCode: http.StatusOK,
},
{