mirror of
https://github.com/googleapis/genai-toolbox.git
synced 2026-01-09 15:38:08 -05:00
feat: Add Valkey Source and Tool (#532)
This commit is contained in:
@@ -348,6 +348,24 @@ steps:
|
||||
- -c
|
||||
- |
|
||||
./redis.test -test.v
|
||||
|
||||
- id: "valkey"
|
||||
name : golang:1
|
||||
waitFor: ["compile-test-binary"]
|
||||
entrypoint: /bin/bash
|
||||
env:
|
||||
- "GOPATH=/gopath"
|
||||
- "VALKEY_DATABASE=$_VALKEY_DATABASE"
|
||||
- "SERVICE_ACCOUNT_EMAIL=$SERVICE_ACCOUNT_EMAIL"
|
||||
secretEnv: ["VALKEY_ADDRESS", "CLIENT_ID"]
|
||||
volumes:
|
||||
- name: "go"
|
||||
path: "/gopath"
|
||||
args:
|
||||
- -c
|
||||
- |
|
||||
./valkey.test -test.v
|
||||
|
||||
|
||||
availableSecrets:
|
||||
secretManager:
|
||||
@@ -399,6 +417,8 @@ availableSecrets:
|
||||
env: REDIS_ADDRESS
|
||||
- versionName: projects/$PROJECT_ID/secrets/memorystore_redis_pass/versions/latest
|
||||
env: REDIS_PASS
|
||||
- versionName: projects/$PROJECT_ID/secrets/memorystore_valkey_address/versions/latest
|
||||
env: VALKEY_ADDRESS
|
||||
|
||||
|
||||
options:
|
||||
@@ -430,4 +450,4 @@ substitutions:
|
||||
_MSSQL_PORT: "1433"
|
||||
_DGRAPHURL: "https://play.dgraph.io"
|
||||
_COUCHBASE_BUCKET: "couchbase-bucket"
|
||||
_COUCHBASE_SCOPE: "couchbase-scope"
|
||||
_COUCHBASE_SCOPE: "couchbase-scope"
|
||||
@@ -57,6 +57,7 @@ import (
|
||||
_ "github.com/googleapis/genai-toolbox/internal/tools/spanner"
|
||||
_ "github.com/googleapis/genai-toolbox/internal/tools/spannerexecutesql"
|
||||
_ "github.com/googleapis/genai-toolbox/internal/tools/sqlitesql"
|
||||
_ "github.com/googleapis/genai-toolbox/internal/tools/valkey"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
@@ -76,6 +77,7 @@ import (
|
||||
_ "github.com/googleapis/genai-toolbox/internal/sources/redis"
|
||||
_ "github.com/googleapis/genai-toolbox/internal/sources/spanner"
|
||||
_ "github.com/googleapis/genai-toolbox/internal/sources/sqlite"
|
||||
_ "github.com/googleapis/genai-toolbox/internal/sources/valkey"
|
||||
)
|
||||
|
||||
var (
|
||||
|
||||
@@ -26,7 +26,8 @@ Specify your AUTH string in the password field:
|
||||
sources:
|
||||
my-redis-instance:
|
||||
kind: redis
|
||||
address: 127.0.0.1
|
||||
address:
|
||||
- 127.0.0.1
|
||||
username: ${MY_USER_NAME}
|
||||
password: ${MY_AUTH_STRING} # Omit this field if you don't have a password.
|
||||
# database: 0
|
||||
@@ -50,7 +51,8 @@ Here is an example tools.yaml config with [AUTH][auth] enabled:
|
||||
sources:
|
||||
my-redis-cluster-instance:
|
||||
kind: memorystore-redis
|
||||
address: 127.0.0.1
|
||||
address:
|
||||
- 127.0.0.1
|
||||
password: ${MY_AUTH_STRING}
|
||||
# useGCPIAM: false
|
||||
# clusterEnabled: false
|
||||
|
||||
64
docs/en/resources/sources/valkey.md
Normal file
64
docs/en/resources/sources/valkey.md
Normal file
@@ -0,0 +1,64 @@
|
||||
---
|
||||
title: "Valkey"
|
||||
linkTitle: "Valkey"
|
||||
type: docs
|
||||
weight: 1
|
||||
description: >
|
||||
Valkey is an open-source, in-memory data structure store, forked from Redis.
|
||||
|
||||
---
|
||||
|
||||
## About
|
||||
|
||||
Valkey is an open-source, in-memory data structure store that originated as a fork of Redis. It's designed to be used as a database, cache, and message broker, supporting a wide range of data structures like strings, hashes, lists, sets, sorted sets with range queries, bitmaps, hyperloglogs, and geospatial indexes with radius queries.
|
||||
|
||||
If you're new to Valkey, you can find installation and getting started guides on the [official Valkey website](https://valkey.io/docs/getting-started/).
|
||||
|
||||
## Example
|
||||
|
||||
```yaml
|
||||
sources:
|
||||
my-valkey-instance:
|
||||
kind: valkey
|
||||
address:
|
||||
- 127.0.0.1
|
||||
username: ${YOUR_USERNAME}
|
||||
password: ${YOUR_PASSWORD}
|
||||
# database: 0
|
||||
# useGCPIAM: false
|
||||
# disableCache: false
|
||||
```
|
||||
|
||||
{{< notice tip >}}
|
||||
Use environment variable replacement with the format ${ENV_NAME}
|
||||
instead of hardcoding your secrets into the configuration file.
|
||||
{{< /notice >}}
|
||||
|
||||
### IAM Authentication
|
||||
|
||||
If you are using GCP's Memorystore for Valkey, you can connect using IAM
|
||||
authentication. Grant your account the required [IAM role][iam] and set
|
||||
`useGCPIAM` to `true`:
|
||||
|
||||
```yaml
|
||||
sources:
|
||||
my-valkey-instance:
|
||||
kind: valkey
|
||||
address:
|
||||
- 127.0.0.1
|
||||
useGCPIAM: true
|
||||
```
|
||||
|
||||
[iam]: https://cloud.google.com/memorystore/docs/valkey/about-iam-auth
|
||||
|
||||
## Reference
|
||||
|
||||
| **field** | **type** | **required** | **description** |
|
||||
|--------------|:--------:|:------------:|----------------------------------------------------------------------------------------------------------------------------------|
|
||||
| kind | string | true | Must be "valkey". |
|
||||
| address | []string | true | Endpoints for the Valkey instance to connect to. |
|
||||
| username | string | false | If you are using a non-default user, specify the user name here. If you are using Memorystore for Valkey, leave this field blank |
|
||||
| password | string | false | Password for the Valkey instance |
|
||||
| database | int | false | The Valkey database to connect to. Not applicable for cluster enabled instances. The default database is `0`. |
|
||||
| useGCPIAM | bool | false | Set it to `true` if you are using GCP's IAM authentication. Defaults to `false`. |
|
||||
| disableCache | bool | false | Set it to `true` if you want to enable client-side caching. Defaults to `false`. |
|
||||
56
docs/en/resources/tools/valkey.md
Normal file
56
docs/en/resources/tools/valkey.md
Normal file
@@ -0,0 +1,56 @@
|
||||
---
|
||||
title: "valkey"
|
||||
type: docs
|
||||
weight: 1
|
||||
description: >
|
||||
A "valkey" tool executes a set of pre-defined Valkey commands against a Memorystore for Valkey instance.
|
||||
---
|
||||
|
||||
## About
|
||||
|
||||
A valkey tool executes a series of pre-defined Valkey commands against a
|
||||
Memorystore for Valkey instance.
|
||||
|
||||
The specified Valkey commands are executed sequentially. Each command is
|
||||
represented as a string array, where the first element is the command name (e.g., SET,
|
||||
GET, HGETALL) and subsequent elements are its arguments.
|
||||
|
||||
### Dynamic Command Parameters
|
||||
|
||||
Command arguments can be templated using the `$variableName` annotation. The
|
||||
array type parameters will be expanded once into multiple arguments. Take the
|
||||
following config for example:
|
||||
|
||||
```yaml
|
||||
commands:
|
||||
- [SADD, userNames, $userNames] # Array will be flattened into multiple arguments.
|
||||
parameters:
|
||||
- name: userNames
|
||||
type: array
|
||||
description: The user names to be set.
|
||||
```
|
||||
|
||||
If the input is an array of strings `["Alice", "Sid", "Bob"]`, The final command
|
||||
to be executed after argument expansion will be `[SADD, userNames, Alice, Sid, Bob]`.
|
||||
|
||||
## Example
|
||||
|
||||
```yaml
|
||||
tools:
|
||||
user_data_tool:
|
||||
kind: valkey
|
||||
source: my-valkey-instance
|
||||
description: |
|
||||
Use this tool to interact with user data stored in Valkey.
|
||||
It can set, retrieve, and delete user-specific information.
|
||||
commands:
|
||||
- [SADD, userNames, $userNames] # Array will be flattened into multiple arguments.
|
||||
- [GET, $userId]
|
||||
parameters:
|
||||
- name: userId
|
||||
type: string
|
||||
description: The unique identifier for the user.
|
||||
- name: userNames
|
||||
type: array
|
||||
description: The user names to be set.
|
||||
```
|
||||
1
go.mod
1
go.mod
@@ -28,6 +28,7 @@ require (
|
||||
github.com/neo4j/neo4j-go-driver/v5 v5.28.1
|
||||
github.com/redis/go-redis/v9 v9.9.0
|
||||
github.com/spf13/cobra v1.9.1
|
||||
github.com/valkey-io/valkey-go v1.0.61
|
||||
go.opentelemetry.io/contrib/propagators/autoprop v0.61.0
|
||||
go.opentelemetry.io/otel v1.36.0
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.36.0
|
||||
|
||||
4
go.sum
4
go.sum
@@ -1027,6 +1027,8 @@ github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdh
|
||||
github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
|
||||
github.com/neo4j/neo4j-go-driver/v5 v5.28.1 h1:RKWQW7wTgYAY2fU9S+9LaJ9OwRPbRc0I17tlT7nDmAY=
|
||||
github.com/neo4j/neo4j-go-driver/v5 v5.28.1/go.mod h1:Vff8OwT7QpLm7L2yYr85XNWe9Rbqlbeb9asNXJTHO4k=
|
||||
github.com/onsi/gomega v1.36.2 h1:koNYke6TVk6ZmnyHrCXba/T/MoLBXFjeC1PtvYgw0A8=
|
||||
github.com/onsi/gomega v1.36.2/go.mod h1:DdwyADRjrc825LhMEkD76cHR5+pUnjhUN8GlHlRPHzY=
|
||||
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
|
||||
github.com/phpdave11/gofpdf v1.4.2/go.mod h1:zpO6xFn9yxo3YLyMvW8HcKWVdbNqgIfOOp2dXMnm1mY=
|
||||
github.com/phpdave11/gofpdi v1.0.12/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI=
|
||||
@@ -1090,6 +1092,8 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o
|
||||
github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
|
||||
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
|
||||
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
github.com/valkey-io/valkey-go v1.0.61 h1:uz7gxSs4dKqLfaa8xKFo8wHaCWYSCD3lMhVL0OJifZA=
|
||||
github.com/valkey-io/valkey-go v1.0.61/go.mod h1:bHmwjIEOrGq/ubOJfh5uMRs7Xj6mV3mQ/ZXUbmqpjqY=
|
||||
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM=
|
||||
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI=
|
||||
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
|
||||
125
internal/sources/valkey/valkey.go
Normal file
125
internal/sources/valkey/valkey.go
Normal file
@@ -0,0 +1,125 @@
|
||||
// Copyright 2025 Google LLC
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
package valkey
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"github.com/goccy/go-yaml"
|
||||
"github.com/googleapis/genai-toolbox/internal/sources"
|
||||
"github.com/valkey-io/valkey-go"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
const SourceKind string = "valkey"
|
||||
|
||||
// validate interface
|
||||
var _ sources.SourceConfig = Config{}
|
||||
|
||||
func init() {
|
||||
if !sources.Register(SourceKind, newConfig) {
|
||||
panic(fmt.Sprintf("source kind %q already registered", SourceKind))
|
||||
}
|
||||
}
|
||||
|
||||
func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (sources.SourceConfig, error) {
|
||||
actual := Config{Name: name}
|
||||
if err := decoder.DecodeContext(ctx, &actual); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return actual, nil
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
Name string `yaml:"name" validate:"required"`
|
||||
Kind string `yaml:"kind" validate:"required"`
|
||||
Address []string `yaml:"address" validate:"required"`
|
||||
Username string `yaml:"username"`
|
||||
Password string `yaml:"password"`
|
||||
Database int `yaml:"database"`
|
||||
UseGCPIAM bool `yaml:"useGCPIAM"`
|
||||
DisableCache bool `yaml:"disableCache"`
|
||||
}
|
||||
|
||||
func (r Config) SourceConfigKind() string {
|
||||
return SourceKind
|
||||
}
|
||||
|
||||
func (r Config) Initialize(ctx context.Context, tracer trace.Tracer) (sources.Source, error) {
|
||||
|
||||
client, err := initValkeyClient(ctx, r)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error initializing Valkey client: %s", err)
|
||||
}
|
||||
s := &Source{
|
||||
Name: r.Name,
|
||||
Kind: SourceKind,
|
||||
Client: client,
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func initValkeyClient(ctx context.Context, r Config) (valkey.Client, error) {
|
||||
var authFn func(valkey.AuthCredentialsContext) (valkey.AuthCredentials, error)
|
||||
if r.UseGCPIAM {
|
||||
// Pass in an access token getter fn for IAM auth
|
||||
authFn = func(valkey.AuthCredentialsContext) (valkey.AuthCredentials, error) {
|
||||
token, err := sources.GetIAMAccessToken(ctx)
|
||||
creds := valkey.AuthCredentials{Username: "default", Password: token}
|
||||
if err != nil {
|
||||
return creds, err
|
||||
}
|
||||
return creds, nil
|
||||
}
|
||||
}
|
||||
|
||||
client, err := valkey.NewClient(valkey.ClientOption{
|
||||
InitAddress: r.Address,
|
||||
SelectDB: r.Database,
|
||||
Username: r.Username,
|
||||
Password: r.Password,
|
||||
AuthCredentialsFn: authFn,
|
||||
DisableCache: r.DisableCache,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("error creating Valkey client: %v", err)
|
||||
}
|
||||
|
||||
// Ping the server to check connectivity
|
||||
pingCmd := client.B().Ping().Build()
|
||||
_, err = client.Do(ctx, pingCmd).ToString()
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to execute PING command: %v", err)
|
||||
}
|
||||
return client, nil
|
||||
}
|
||||
|
||||
var _ sources.Source = &Source{}
|
||||
|
||||
type Source struct {
|
||||
Name string `yaml:"name"`
|
||||
Kind string `yaml:"kind"`
|
||||
Client valkey.Client
|
||||
}
|
||||
|
||||
func (s *Source) SourceKind() string {
|
||||
return SourceKind
|
||||
}
|
||||
|
||||
func (s *Source) ValkeyClient() valkey.Client {
|
||||
return s.Client
|
||||
}
|
||||
162
internal/sources/valkey/valkey_test.go
Normal file
162
internal/sources/valkey/valkey_test.go
Normal file
@@ -0,0 +1,162 @@
|
||||
// Copyright 2025 Google LLC
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package valkey_test
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
yaml "github.com/goccy/go-yaml"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/googleapis/genai-toolbox/internal/server"
|
||||
"github.com/googleapis/genai-toolbox/internal/sources"
|
||||
"github.com/googleapis/genai-toolbox/internal/sources/valkey"
|
||||
"github.com/googleapis/genai-toolbox/internal/testutils"
|
||||
)
|
||||
|
||||
func TestParseFromYamlValkey(t *testing.T) {
|
||||
tcs := []struct {
|
||||
desc string
|
||||
in string
|
||||
want server.SourceConfigs
|
||||
}{
|
||||
{
|
||||
desc: "default setting",
|
||||
in: `
|
||||
sources:
|
||||
my-valkey-instance:
|
||||
kind: valkey
|
||||
address:
|
||||
- 127.0.0.1
|
||||
`,
|
||||
want: map[string]sources.SourceConfig{
|
||||
"my-valkey-instance": valkey.Config{
|
||||
Name: "my-valkey-instance",
|
||||
Kind: valkey.SourceKind,
|
||||
Address: []string{"127.0.0.1"},
|
||||
Username: "",
|
||||
Password: "",
|
||||
Database: 0,
|
||||
UseGCPIAM: false,
|
||||
DisableCache: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "advanced example",
|
||||
in: `
|
||||
sources:
|
||||
my-valkey-instance:
|
||||
kind: valkey
|
||||
address:
|
||||
- 127.0.0.1
|
||||
database: 1
|
||||
username: user
|
||||
password: pass
|
||||
useGCPIAM: true
|
||||
disableCache: true
|
||||
`,
|
||||
want: map[string]sources.SourceConfig{
|
||||
"my-valkey-instance": valkey.Config{
|
||||
Name: "my-valkey-instance",
|
||||
Kind: valkey.SourceKind,
|
||||
Address: []string{"127.0.0.1"},
|
||||
Username: "user",
|
||||
Password: "pass",
|
||||
Database: 1,
|
||||
UseGCPIAM: true,
|
||||
DisableCache: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
got := struct {
|
||||
Sources server.SourceConfigs `yaml:"sources"`
|
||||
}{}
|
||||
// Parse contents
|
||||
err := yaml.Unmarshal(testutils.FormatYaml(tc.in), &got)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to unmarshal: %s", err)
|
||||
}
|
||||
if !cmp.Equal(tc.want, got.Sources) {
|
||||
t.Fatalf("incorrect parse: want %v, got %v", tc.want, got.Sources)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestFailParseFromYaml(t *testing.T) {
|
||||
tcs := []struct {
|
||||
desc string
|
||||
in string
|
||||
err string
|
||||
}{
|
||||
{
|
||||
desc: "invalid database",
|
||||
in: `
|
||||
sources:
|
||||
my-valkey-instance:
|
||||
kind: valkey
|
||||
project: my-project
|
||||
address:
|
||||
- 127.0.0.1
|
||||
database: my-db
|
||||
useGCPIAM: false
|
||||
`,
|
||||
err: "cannot unmarshal string into Go struct field .Sources of type int",
|
||||
},
|
||||
{
|
||||
desc: "extra field",
|
||||
in: `
|
||||
sources:
|
||||
my-valkey-instance:
|
||||
kind: valkey
|
||||
address:
|
||||
- 127.0.0.1
|
||||
project: proj
|
||||
database: 1
|
||||
`,
|
||||
err: "unable to parse source \"my-valkey-instance\" as \"valkey\": [5:1] unknown field \"project\"",
|
||||
},
|
||||
{
|
||||
desc: "missing required field",
|
||||
in: `
|
||||
sources:
|
||||
my-valkey-instance:
|
||||
kind: valkey
|
||||
`,
|
||||
err: "unable to parse source \"my-valkey-instance\" as \"valkey\": Key: 'Config.Address' Error:Field validation for 'Address' failed on the 'required' tag",
|
||||
},
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
got := struct {
|
||||
Sources server.SourceConfigs `yaml:"sources"`
|
||||
}{}
|
||||
// Parse contents
|
||||
err := yaml.Unmarshal(testutils.FormatYaml(tc.in), &got)
|
||||
if err == nil {
|
||||
t.Fatalf("expect parsing to fail")
|
||||
}
|
||||
errStr := err.Error()
|
||||
if !strings.Contains(errStr, tc.err) {
|
||||
t.Fatalf("unexpected error: got %q, want %q", errStr, tc.err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
204
internal/tools/valkey/valkey.go
Normal file
204
internal/tools/valkey/valkey.go
Normal file
@@ -0,0 +1,204 @@
|
||||
// Copyright 2025 Google LLC
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
package valkey
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
yaml "github.com/goccy/go-yaml"
|
||||
"github.com/googleapis/genai-toolbox/internal/sources"
|
||||
valkeysrc "github.com/googleapis/genai-toolbox/internal/sources/valkey"
|
||||
"github.com/googleapis/genai-toolbox/internal/tools"
|
||||
"github.com/valkey-io/valkey-go"
|
||||
)
|
||||
|
||||
const kind string = "valkey"
|
||||
|
||||
func init() {
|
||||
if !tools.Register(kind, newConfig) {
|
||||
panic(fmt.Sprintf("tool kind %q already registered", kind))
|
||||
}
|
||||
}
|
||||
|
||||
func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.ToolConfig, error) {
|
||||
actual := Config{Name: name}
|
||||
if err := decoder.DecodeContext(ctx, &actual); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return actual, nil
|
||||
}
|
||||
|
||||
type compatibleSource interface {
|
||||
ValkeyClient() valkey.Client
|
||||
}
|
||||
|
||||
// validate compatible sources are still compatible
|
||||
var _ compatibleSource = &valkeysrc.Source{}
|
||||
|
||||
var compatibleSources = [...]string{valkeysrc.SourceKind, valkeysrc.SourceKind}
|
||||
|
||||
type Config struct {
|
||||
Name string `yaml:"name" validate:"required"`
|
||||
Kind string `yaml:"kind" validate:"required"`
|
||||
Source string `yaml:"source" validate:"required"`
|
||||
Description string `yaml:"description" validate:"required"`
|
||||
Commands [][]string `yaml:"commands" validate:"required"`
|
||||
AuthRequired []string `yaml:"authRequired"`
|
||||
Parameters tools.Parameters `yaml:"parameters"`
|
||||
}
|
||||
|
||||
// validate interface
|
||||
var _ tools.ToolConfig = Config{}
|
||||
|
||||
func (cfg Config) ToolConfigKind() string {
|
||||
return kind
|
||||
}
|
||||
|
||||
func (cfg Config) Initialize(srcs map[string]sources.Source) (tools.Tool, error) {
|
||||
// verify source exists
|
||||
rawS, ok := srcs[cfg.Source]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("no source named %q configured", cfg.Source)
|
||||
}
|
||||
|
||||
// verify the source is compatible
|
||||
s, ok := rawS.(compatibleSource)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("invalid source for %q tool: source kind must be one of %q", kind, compatibleSources)
|
||||
}
|
||||
|
||||
mcpManifest := tools.McpManifest{
|
||||
Name: cfg.Name,
|
||||
Description: cfg.Description,
|
||||
InputSchema: cfg.Parameters.McpManifest(),
|
||||
}
|
||||
|
||||
// finish tool setup
|
||||
t := Tool{
|
||||
Name: cfg.Name,
|
||||
Kind: kind,
|
||||
Parameters: cfg.Parameters,
|
||||
Commands: cfg.Commands,
|
||||
AuthRequired: cfg.AuthRequired,
|
||||
Client: s.ValkeyClient(),
|
||||
manifest: tools.Manifest{Description: cfg.Description, Parameters: cfg.Parameters.Manifest(), AuthRequired: cfg.AuthRequired},
|
||||
mcpManifest: mcpManifest,
|
||||
}
|
||||
return t, nil
|
||||
}
|
||||
|
||||
// validate interface
|
||||
var _ tools.Tool = Tool{}
|
||||
|
||||
type Tool struct {
|
||||
Name string `yaml:"name"`
|
||||
Kind string `yaml:"kind"`
|
||||
AuthRequired []string `yaml:"authRequired"`
|
||||
Parameters tools.Parameters `yaml:"parameters"`
|
||||
|
||||
Client valkey.Client
|
||||
Commands [][]string
|
||||
manifest tools.Manifest
|
||||
mcpManifest tools.McpManifest
|
||||
}
|
||||
|
||||
func (t Tool) Invoke(ctx context.Context, params tools.ParamValues) ([]any, error) {
|
||||
// Replace parameters
|
||||
commands, err := replaceCommandsParams(t.Commands, t.Parameters, params)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error replacing commands' parameters: %s", err)
|
||||
}
|
||||
|
||||
// Build commands
|
||||
builtCmds := make(valkey.Commands, len(commands))
|
||||
|
||||
for i, cmd := range commands {
|
||||
builtCmds[i] = t.Client.B().Arbitrary(cmd...).Build()
|
||||
}
|
||||
|
||||
if len(builtCmds) == 0 {
|
||||
return nil, fmt.Errorf("no valid commands were built to execute")
|
||||
}
|
||||
|
||||
// Execute commands
|
||||
responses := t.Client.DoMulti(ctx, builtCmds...)
|
||||
|
||||
// Parse responses
|
||||
out := make([]any, len(t.Commands))
|
||||
for i, resp := range responses {
|
||||
if err := resp.Error(); err != nil {
|
||||
// Add error from each command to `errSum`
|
||||
out[i] = fmt.Sprintf("error from executing command at index %d: %s", i, err)
|
||||
continue
|
||||
}
|
||||
val, err := resp.ToAny()
|
||||
if err != nil {
|
||||
out[i] = fmt.Sprintf("error parsing response: %s", err)
|
||||
continue
|
||||
}
|
||||
out[i] = val
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// Helper function to replace parameters in the commands
|
||||
func replaceCommandsParams(commands [][]string, params tools.Parameters, paramValues tools.ParamValues) ([][]string, error) {
|
||||
paramMap := paramValues.AsMapWithDollarPrefix()
|
||||
typeMap := make(map[string]string, len(params))
|
||||
for _, p := range params {
|
||||
placeholder := "$" + p.GetName()
|
||||
typeMap[placeholder] = p.GetType()
|
||||
}
|
||||
newCommands := make([][]string, len(commands))
|
||||
for i, cmd := range commands {
|
||||
newCmd := make([]string, len(cmd))
|
||||
for j, part := range cmd {
|
||||
v, ok := paramMap[part]
|
||||
if !ok {
|
||||
// Command part is not a Parameter placeholder
|
||||
newCmd[j] = part
|
||||
continue
|
||||
}
|
||||
if typeMap[part] == "array" {
|
||||
for _, item := range v.([]any) {
|
||||
// Nested arrays will only be expanded once
|
||||
// e.g., [A, [B, C]] --> ["A", "[B C]"]
|
||||
newCmd = append(newCmd, fmt.Sprintf("%s", item))
|
||||
}
|
||||
continue
|
||||
}
|
||||
newCmd[j] = fmt.Sprintf("%s", v)
|
||||
}
|
||||
newCommands[i] = newCmd
|
||||
}
|
||||
return newCommands, nil
|
||||
}
|
||||
|
||||
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (tools.ParamValues, error) {
|
||||
return tools.ParseParams(t.Parameters, data, claims)
|
||||
}
|
||||
|
||||
func (t Tool) Manifest() tools.Manifest {
|
||||
return t.manifest
|
||||
}
|
||||
|
||||
func (t Tool) McpManifest() tools.McpManifest {
|
||||
return t.mcpManifest
|
||||
}
|
||||
|
||||
func (t Tool) Authorized(verifiedAuthServices []string) bool {
|
||||
return tools.IsAuthorized(t.AuthRequired, verifiedAuthServices)
|
||||
}
|
||||
85
internal/tools/valkey/valkey_test.go
Normal file
85
internal/tools/valkey/valkey_test.go
Normal file
@@ -0,0 +1,85 @@
|
||||
// Copyright 2024 Google LLC
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package valkey_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
yaml "github.com/goccy/go-yaml"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/googleapis/genai-toolbox/internal/server"
|
||||
"github.com/googleapis/genai-toolbox/internal/testutils"
|
||||
"github.com/googleapis/genai-toolbox/internal/tools"
|
||||
"github.com/googleapis/genai-toolbox/internal/tools/valkey"
|
||||
)
|
||||
|
||||
func TestParseFromYamlvalkey(t *testing.T) {
|
||||
ctx, err := testutils.ContextWithNewLogger()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
tcs := []struct {
|
||||
desc string
|
||||
in string
|
||||
want server.ToolConfigs
|
||||
}{
|
||||
{
|
||||
desc: "basic example",
|
||||
in: `
|
||||
tools:
|
||||
valkey_tool:
|
||||
kind: valkey
|
||||
source: my-valkey-instance
|
||||
description: some description
|
||||
commands:
|
||||
- [SET, greeting, "hello, {{.name}}"]
|
||||
- [GET, id]
|
||||
parameters:
|
||||
- name: name
|
||||
type: string
|
||||
description: user name
|
||||
`,
|
||||
want: server.ToolConfigs{
|
||||
"valkey_tool": valkey.Config{
|
||||
Name: "valkey_tool",
|
||||
Kind: "valkey",
|
||||
Source: "my-valkey-instance",
|
||||
Description: "some description",
|
||||
AuthRequired: []string{},
|
||||
Commands: [][]string{{"SET", "greeting", "hello, {{.name}}"}, {"GET", "id"}},
|
||||
Parameters: []tools.Parameter{
|
||||
tools.NewStringParameter("name", "user name"),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
got := struct {
|
||||
Tools server.ToolConfigs `yaml:"tools"`
|
||||
}{}
|
||||
// Parse contents
|
||||
err := yaml.UnmarshalContext(ctx, testutils.FormatYaml(tc.in), &got)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to unmarshal: %s", err)
|
||||
}
|
||||
if diff := cmp.Diff(tc.want, got.Tools); diff != "" {
|
||||
t.Fatalf("incorrect parse: diff %v", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
@@ -451,7 +451,7 @@ func SetupMySQLTable(t *testing.T, ctx context.Context, pool *sql.DB, create_sta
|
||||
// GetRedisWants return the expected wants for redis
|
||||
func GetRedisValkeyWants() (string, string, string, string) {
|
||||
select1Want := "[\"PONG\"]"
|
||||
failInvocationWant := `{"jsonrpc":"2.0","id":"invoke-fail-tool","result":{"content":[{"type":"text","text":"\"error from executing command at index 0: ERR unknown command 'SELEC 1;', with args beginning with: \""}]}}`
|
||||
failInvocationWant := `unknown command 'SELEC 1;', with args beginning with: \""}]}}`
|
||||
invokeParamWant := "[{\"id\":\"1\",\"name\":\"Alice\"},{\"id\":\"3\",\"name\":\"Sid\"}]"
|
||||
mcpInvokeParamWant := `{"jsonrpc":"2.0","id":"my-param-tool","result":{"content":[{"type":"text","text":"{\"id\":\"1\",\"name\":\"Alice\"}"},{"type":"text","text":"{\"id\":\"3\",\"name\":\"Sid\"}"}]}}`
|
||||
return select1Want, failInvocationWant, invokeParamWant, mcpInvokeParamWant
|
||||
|
||||
138
tests/valkey/valkey_test.go
Normal file
138
tests/valkey/valkey_test.go
Normal file
@@ -0,0 +1,138 @@
|
||||
// Copyright 2025 Google LLC
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package valkey
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"os"
|
||||
"regexp"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/googleapis/genai-toolbox/tests"
|
||||
"github.com/valkey-io/valkey-go"
|
||||
)
|
||||
|
||||
var (
|
||||
VALKEY_SOURCE_KIND = "valkey"
|
||||
VALKEY_TOOL_KIND = "valkey"
|
||||
VALKEY_ADDRESS = os.Getenv("VALKEY_ADDRESS")
|
||||
)
|
||||
|
||||
func getValkeyVars(t *testing.T) map[string]any {
|
||||
switch "" {
|
||||
case VALKEY_ADDRESS:
|
||||
t.Fatal("'VALKEY_ADDRESS' not set")
|
||||
}
|
||||
return map[string]any{
|
||||
"kind": VALKEY_SOURCE_KIND,
|
||||
"address": []string{VALKEY_ADDRESS},
|
||||
"disableCache": true,
|
||||
}
|
||||
}
|
||||
|
||||
func initValkeyClient(ctx context.Context, addr []string) (valkey.Client, error) {
|
||||
// Pass in an access token getter fn for IAM auth
|
||||
client, err := valkey.NewClient(valkey.ClientOption{
|
||||
InitAddress: addr,
|
||||
ForceSingleClient: true,
|
||||
DisableCache: true,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("error creating client: %v", err)
|
||||
}
|
||||
|
||||
// Ping the server to check connectivity (using Do)
|
||||
pingCmd := client.B().Ping().Build()
|
||||
_, err = client.Do(ctx, pingCmd).ToString()
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to execute PING command: %v", err)
|
||||
}
|
||||
log.Println("Successfully connected to Valkey")
|
||||
return client, nil
|
||||
}
|
||||
|
||||
func TestValkeyToolEndpoints(t *testing.T) {
|
||||
sourceConfig := getValkeyVars(t)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
|
||||
defer cancel()
|
||||
|
||||
var args []string
|
||||
|
||||
client, err := initValkeyClient(ctx, []string{VALKEY_ADDRESS})
|
||||
if err != nil {
|
||||
t.Fatalf("unable to create Valkey connection: %s", err)
|
||||
}
|
||||
|
||||
// set up data for param tool
|
||||
teardownDB := setupValkeyDB(t, ctx, client)
|
||||
defer teardownDB(t)
|
||||
|
||||
// Write config into a file and pass it to command
|
||||
toolsFile := tests.GetRedisValkeyToolsConfig(sourceConfig, VALKEY_TOOL_KIND)
|
||||
|
||||
cmd, cleanup, err := tests.StartCmd(ctx, toolsFile, args...)
|
||||
if err != nil {
|
||||
t.Fatalf("command initialization returned an error: %s", err)
|
||||
}
|
||||
defer cleanup()
|
||||
|
||||
waitCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
defer cancel()
|
||||
out, err := cmd.WaitForString(waitCtx, regexp.MustCompile(`Server ready to serve`))
|
||||
if err != nil {
|
||||
t.Logf("toolbox command logs: \n%s", out)
|
||||
t.Fatalf("toolbox didn't start successfully: %s", err)
|
||||
}
|
||||
|
||||
tests.RunToolGetTest(t)
|
||||
|
||||
select1Want, failInvocationWant, invokeParamWant, mcpInvokeParamWant := tests.GetRedisValkeyWants()
|
||||
tests.RunToolInvokeTest(t, select1Want, invokeParamWant)
|
||||
tests.RunMCPToolCallMethod(t, mcpInvokeParamWant, failInvocationWant)
|
||||
}
|
||||
|
||||
func setupValkeyDB(t *testing.T, ctx context.Context, client valkey.Client) func(*testing.T) {
|
||||
keys := []string{"row1", "row2", "row3"}
|
||||
commands := [][]string{
|
||||
{"HSET", keys[0], "name", "Alice", "id", "1"},
|
||||
{"HSET", keys[1], "name", "Jane", "id", "2"},
|
||||
{"HSET", keys[2], "name", "Sid", "id", "3"},
|
||||
{"HSET", tests.SERVICE_ACCOUNT_EMAIL, "name", "Alice"},
|
||||
}
|
||||
builtCmds := make(valkey.Commands, len(commands))
|
||||
|
||||
for i, cmd := range commands {
|
||||
builtCmds[i] = client.B().Arbitrary(cmd...).Build()
|
||||
}
|
||||
|
||||
responses := client.DoMulti(ctx, builtCmds...)
|
||||
for _, resp := range responses {
|
||||
if err := resp.Error(); err != nil {
|
||||
t.Fatalf("unable to insert test data: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
return func(t *testing.T) {
|
||||
// tear down test
|
||||
_, err := client.Do(ctx, client.B().Del().Key(keys...).Build()).AsInt64()
|
||||
if err != nil {
|
||||
t.Errorf("Teardown failed: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user