feat: Add wait for operation tool with exponential backoff (#920)

Example:

```
  alloydb-operations-get:
    kind: wait-for-operation
    source: alloydb-api-source
    method: GET
    path: /v1/projects/{{.projectId}}/locations/{{.locationId}}/operations/{{.operationId}}
    description: "Makes API call to check whether operation is done or not. This tool is run first then wait for tool. if its still in create phase trigger it after 3 minutes.  Print a message saying still not done. We will notify once its done."
    pathParams:
      - name: projectId
        type: string
        description: The dynamic path parameter
      - name: locationId
        type: string
        description: The dynamic path parameter
        default: us-central1
      - name: operationId
        type: string
        description: Operation status check for previous task

```
This commit is contained in:
prernakakkar-google
2025-07-23 18:34:36 +00:00
committed by GitHub
parent c67e01bcf9
commit 3f6ec2944e
7 changed files with 744 additions and 1 deletions

View File

@@ -468,6 +468,26 @@ steps:
looker
- id: "alloydbwaitforoperation"
name: golang:1
waitFor: ["compile-test-binary"]
entrypoint: /bin/bash
env:
- "GOPATH=/gopath"
- "API_KEY=$(gcloud auth print-access-token)"
secretEnv: ["CLIENT_ID"]
volumes:
- name: "go"
path: "/gopath"
args:
- -c
- |
.ci/test_with_coverage.sh \
"Alloydb Wait for Operation" \
utility \
utility/alloydbwaitforoperation
availableSecrets:
secretManager:
- versionName: projects/$PROJECT_ID/secrets/cloud_sql_pg_user/versions/latest
@@ -558,4 +578,4 @@ substitutions:
_DGRAPHURL: "https://play.dgraph.io"
_COUCHBASE_BUCKET: "couchbase-bucket"
_COUCHBASE_SCOPE: "couchbase-scope"
_LOOKER_VERIFY_SSL: "true"
_LOOKER_VERIFY_SSL: "true"

View File

@@ -81,6 +81,7 @@ import (
_ "github.com/googleapis/genai-toolbox/internal/tools/spanner/spannerexecutesql"
_ "github.com/googleapis/genai-toolbox/internal/tools/spanner/spannersql"
_ "github.com/googleapis/genai-toolbox/internal/tools/sqlitesql"
_ "github.com/googleapis/genai-toolbox/internal/tools/utility/alloydbwaitforoperation"
_ "github.com/googleapis/genai-toolbox/internal/tools/utility/wait"
_ "github.com/googleapis/genai-toolbox/internal/tools/valkey"

View File

@@ -0,0 +1,47 @@
---
title: "alloydb-wait-for-operation"
type: docs
weight: 10
description: >
Wait for a long-running AlloyDB operation to complete.
---
The `alloydb-wait-for-operation` tool is a utility tool that waits for a long-running AlloyDB operation to complete. It does this by polling the AlloyDB Admin API operation status endpoint until the operation is finished, using exponential backoff.
{{< notice info >}}
This tool is intended for developer assistant workflows with human-in-the-loop and shouldn't be used for production agents.
{{< /notice >}}
## Example
```yaml
sources:
alloydb-api-source:
kind: http
baseUrl: https://alloydb.googleapis.com
headers:
Authorization: Bearer ${API_KEY}
Content-Type: application/json
tools:
alloydb-operations-get:
kind: alloydb-wait-for-operation
source: alloydb-api-source
description: "This will poll on operations API until the operation is done. For checking operation status we need projectId, locationID and operationId. Once instance is created give follow up steps on how to use the variables to bring data plane MCP server up in local and remote setup."
delay: 1s
maxDelay: 4m
multiplier: 2
maxRetries: 10
```
## Reference
| **field** | **type** | **required** | **description** |
| ----------- | :------: | :----------: | ---------------------------------------------------------------------------------------------------------------- |
| kind | string | true | Must be "alloydb-wait-for-operation". |
| source | string | true | Name of the source the HTTP request should be sent to. |
| description | string | true | A description of the tool. |
| delay | duration | false | The initial delay between polling requests (e.g., `3s`). Defaults to 3 seconds. |
| maxDelay | duration | false | The maximum delay between polling requests (e.g., `4m`). Defaults to 4 minutes. |
| multiplier | float | false | The multiplier for the polling delay. The delay is multiplied by this value after each request. Defaults to 2.0. |
| maxRetries | int | false | The maximum number of polling attempts before giving up. Defaults to 10. |

View File

@@ -31,6 +31,7 @@ import (
"github.com/googleapis/genai-toolbox/internal/sources"
httpsrc "github.com/googleapis/genai-toolbox/internal/sources/http"
"github.com/googleapis/genai-toolbox/internal/tools"
"github.com/googleapis/genai-toolbox/internal/util"
)
const kind string = "http"
@@ -330,6 +331,10 @@ func (t Tool) Invoke(ctx context.Context, params tools.ParamValues) (any, error)
req.Header.Set(k, v)
}
if ua, err := util.UserAgentFromContext(ctx); err == nil {
req.Header.Set("User-Agent", ua)
}
// Make request and fetch response
resp, err := t.Client.Do(req)
if err != nil {

View File

@@ -0,0 +1,380 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package alloydbwaitforoperation
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"text/template"
"time"
"maps"
yaml "github.com/goccy/go-yaml"
"github.com/googleapis/genai-toolbox/internal/sources"
httpsrc "github.com/googleapis/genai-toolbox/internal/sources/http"
"github.com/googleapis/genai-toolbox/internal/tools"
"github.com/googleapis/genai-toolbox/internal/util"
)
const kind string = "alloydb-wait-for-operation"
var alloyDBConnectionMessageTemplate = `Your AlloyDB resource is ready.
To connect, please configure your environment. The method depends on how you are running the toolbox:
**If running locally via stdio:**
Update the MCP server configuration with the following environment variables:
` + "```json" + `
{
"mcpServers": {
"alloydb": {
"command": "./PATH/TO/toolbox",
"args": ["--prebuilt","alloydb-postgres","--stdio"],
"env": {
"ALLOYDB_POSTGRES_PROJECT": "{{.Project}}",
"ALLOYDB_POSTGRES_REGION": "{{.Region}}",
"ALLOYDB_POSTGRES_CLUSTER": "{{.Cluster}}",
{{if .Instance}} "ALLOYDB_POSTGRES_INSTANCE": "{{.Instance}}",
{{end}} "ALLOYDB_POSTGRES_DATABASE": "postgres",
"ALLOYDB_POSTGRES_USER": ""{{.User}}",",
"ALLOYDB_POSTGRES_PASSWORD": ""{{.Password}}",
}
}
}
}
` + "```" + `
**If running remotely:**
For remote deployments, you will need to set the following environment variables in your deployment configuration:
` + "```" + `
ALLOYDB_POSTGRES_PROJECT={{.Project}}
ALLOYDB_POSTGRES_REGION={{.Region}}
ALLOYDB_POSTGRES_CLUSTER={{.Cluster}}
{{if .Instance}}ALLOYDB_POSTGRES_INSTANCE={{.Instance}}
{{end}}ALLOYDB_POSTGRES_DATABASE=postgres
ALLOYDB_POSTGRES_USER=<your-user>
ALLOYDB_POSTGRES_PASSWORD=<your-password>
` + "```" + `
Please refer to the official documentation for guidance on deploying the toolbox:
- Deploying the Toolbox: https://googleapis.github.io/genai-toolbox/how-to/deploy_toolbox/
- Deploying on GKE: https://googleapis.github.io/genai-toolbox/how-to/deploy_gke/
`
func init() {
if !tools.Register(kind, newConfig) {
panic(fmt.Sprintf("tool kind %q already registered", kind))
}
}
func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.ToolConfig, error) {
actual := Config{Name: name}
if err := decoder.DecodeContext(ctx, &actual); err != nil {
return nil, err
}
return actual, nil
}
// Config defines the configuration for the wait-for-operation tool.
type Config struct {
Name string `yaml:"name" validate:"required"`
Kind string `yaml:"kind" validate:"required"`
Source string `yaml:"source" validate:"required"`
Description string `yaml:"description" validate:"required"`
AuthRequired []string `yaml:"authRequired"`
Headers map[string]string `yaml:"headers"`
// Polling configuration
Delay string `yaml:"delay"`
MaxDelay string `yaml:"maxDelay"`
Multiplier float64 `yaml:"multiplier"`
MaxRetries int `yaml:"maxRetries"`
}
// validate interface
var _ tools.ToolConfig = Config{}
// ToolConfigKind returns the kind of the tool.
func (cfg Config) ToolConfigKind() string {
return kind
}
// Initialize initializes the tool from the configuration.
func (cfg Config) Initialize(srcs map[string]sources.Source) (tools.Tool, error) {
s, ok := srcs[cfg.Source].(*httpsrc.Source)
if !ok {
return nil, fmt.Errorf("invalid or missing source for %q tool: source kind must be `http`", kind)
}
combinedHeaders := make(map[string]string)
maps.Copy(combinedHeaders, s.DefaultHeaders)
maps.Copy(combinedHeaders, cfg.Headers)
allParameters := tools.Parameters{
tools.NewStringParameter("project", "The project ID"),
tools.NewStringParameter("location", "The location ID"),
tools.NewStringParameter("operation_id", "The operation ID"),
}
paramManifest := allParameters.Manifest()
inputSchema := allParameters.McpManifest()
inputSchema.Required = []string{"project", "location", "operation_id"}
mcpManifest := tools.McpManifest{
Name: cfg.Name,
Description: cfg.Description,
InputSchema: inputSchema,
}
var delay time.Duration
if cfg.Delay == "" {
delay = 3 * time.Second
} else {
var err error
delay, err = time.ParseDuration(cfg.Delay)
if err != nil {
return nil, fmt.Errorf("invalid value for delay: %w", err)
}
}
var maxDelay time.Duration
if cfg.MaxDelay == "" {
maxDelay = 4 * time.Minute
} else {
var err error
maxDelay, err = time.ParseDuration(cfg.MaxDelay)
if err != nil {
return nil, fmt.Errorf("invalid value for maxDelay: %w", err)
}
}
multiplier := cfg.Multiplier
if multiplier == 0 {
multiplier = 2.0
}
maxRetries := cfg.MaxRetries
if maxRetries == 0 {
maxRetries = 10
}
return &Tool{
Name: cfg.Name,
Kind: kind,
BaseURL: s.BaseURL,
Headers: combinedHeaders,
AuthRequired: cfg.AuthRequired,
Client: s.Client,
AllParams: allParameters,
manifest: tools.Manifest{Description: cfg.Description, Parameters: paramManifest, AuthRequired: cfg.AuthRequired},
mcpManifest: mcpManifest,
Delay: delay,
MaxDelay: maxDelay,
Multiplier: multiplier,
MaxRetries: maxRetries,
}, nil
}
// Tool represents the wait-for-operation tool.
type Tool struct {
Name string `yaml:"name"`
Kind string `yaml:"kind"`
Description string `yaml:"description"`
AuthRequired []string `yaml:"authRequired"`
BaseURL string `yaml:"baseURL"`
Headers map[string]string `yaml:"headers"`
AllParams tools.Parameters `yaml:"allParams"`
// Polling configuration
Delay time.Duration
MaxDelay time.Duration
Multiplier float64
MaxRetries int
Client *http.Client
manifest tools.Manifest
mcpManifest tools.McpManifest
}
// Invoke executes the tool's logic.
func (t *Tool) Invoke(ctx context.Context, params tools.ParamValues) (any, error) {
paramsMap := params.AsMap()
project, ok := paramsMap["project"].(string)
if !ok {
return nil, fmt.Errorf("missing 'project' parameter")
}
location, ok := paramsMap["location"].(string)
if !ok {
return nil, fmt.Errorf("missing 'location' parameter")
}
operationID, ok := paramsMap["operation_id"].(string)
if !ok {
return nil, fmt.Errorf("missing 'operation_id' parameter")
}
name := fmt.Sprintf("projects/%s/locations/%s/operations/%s", project, location, operationID)
urlString := fmt.Sprintf("%s/v1beta/%s", t.BaseURL, name)
ctx, cancel := context.WithTimeout(ctx, 30*time.Minute)
defer cancel()
delay := t.Delay
maxDelay := t.MaxDelay
multiplier := t.Multiplier
maxRetries := t.MaxRetries
retries := 0
for retries < maxRetries {
select {
case <-ctx.Done():
return nil, fmt.Errorf("timed out waiting for operation: %w", ctx.Err())
default:
}
req, _ := http.NewRequest(http.MethodGet, urlString, nil)
for k, v := range t.Headers {
req.Header.Set(k, v)
}
if ua, err := util.UserAgentFromContext(ctx); err == nil {
req.Header.Set("User-Agent", ua)
}
resp, err := t.Client.Do(req)
if err != nil {
fmt.Printf("error making HTTP request during polling: %s, retrying in %v\n", err, delay)
} else {
body, err := io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return nil, fmt.Errorf("error reading response body during polling: %w", err)
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status code during polling: %d, response body: %s", resp.StatusCode, string(body))
}
var data map[string]any
if err := json.Unmarshal(body, &data); err == nil {
if val, ok := data["done"]; ok {
if fmt.Sprintf("%v", val) == "true" {
if _, ok := data["error"]; ok {
return nil, fmt.Errorf("operation finished with error: %s", string(body))
}
if msg, ok := t.generateAlloyDBConnectionMessage(data); ok {
return msg, nil
}
return string(body), nil
}
}
}
fmt.Printf("Operation not complete, retrying in %v\n", delay)
}
time.Sleep(delay)
delay = time.Duration(float64(delay) * multiplier)
if delay > maxDelay {
delay = maxDelay
}
retries++
}
return nil, fmt.Errorf("exceeded max retries waiting for operation")
}
func (t *Tool) generateAlloyDBConnectionMessage(opResponse map[string]any) (string, bool) {
responseData, ok := opResponse["response"].(map[string]any)
if !ok {
return "", false
}
resourceName, ok := responseData["name"].(string)
if !ok {
return "", false
}
parts := strings.Split(resourceName, "/")
var project, region, cluster, instance string
// Expected format: projects/{project}/locations/{location}/clusters/{cluster}
// or projects/{project}/locations/{location}/clusters/{cluster}/instances/{instance}
if len(parts) < 6 || parts[0] != "projects" || parts[2] != "locations" || parts[4] != "clusters" {
return "", false
}
project = parts[1]
region = parts[3]
cluster = parts[5]
if len(parts) >= 8 && parts[6] == "instances" {
instance = parts[7]
} else {
return "", false
}
tmpl, err := template.New("alloydb-connection").Parse(alloyDBConnectionMessageTemplate)
if err != nil {
// This should not happen with a static template
return fmt.Sprintf("template parsing error: %v", err), false
}
data := struct {
Project string
Region string
Cluster string
Instance string
}{
Project: project,
Region: region,
Cluster: cluster,
Instance: instance,
}
var b strings.Builder
if err := tmpl.Execute(&b, data); err != nil {
return fmt.Sprintf("template execution error: %v", err), false
}
return b.String(), true
}
// ParseParams parses the parameters for the tool.
func (t *Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (tools.ParamValues, error) {
return tools.ParseParams(t.AllParams, data, claims)
}
// Manifest returns the tool's manifest.
func (t *Tool) Manifest() tools.Manifest {
return t.manifest
}
// McpManifest returns the tool's MCP manifest.
func (t *Tool) McpManifest() tools.McpManifest {
return t.mcpManifest
}
// Authorized checks if the tool is authorized.
func (t *Tool) Authorized(verifiedAuthServices []string) bool {
return tools.IsAuthorized(t.AuthRequired, verifiedAuthServices)
}

View File

@@ -0,0 +1,80 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package alloydbwaitforoperation_test
import (
"testing"
yaml "github.com/goccy/go-yaml"
"github.com/google/go-cmp/cmp"
"github.com/googleapis/genai-toolbox/internal/server"
"github.com/googleapis/genai-toolbox/internal/testutils"
alloydbwaitforoperation "github.com/googleapis/genai-toolbox/internal/tools/utility/alloydbwaitforoperation"
)
func TestParseFromYaml(t *testing.T) {
ctx, err := testutils.ContextWithNewLogger()
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
tcs := []struct {
desc string
in string
want server.ToolConfigs
}{
{
desc: "basic example",
in: `
tools:
wait-for-thing:
kind: alloydb-wait-for-operation
source: my-source
description: some description
delay: 1s
maxDelay: 5s
multiplier: 1.5
maxRetries: 5
`,
want: server.ToolConfigs{
"wait-for-thing": alloydbwaitforoperation.Config{
Name: "wait-for-thing",
Kind: "alloydb-wait-for-operation",
Source: "my-source",
Description: "some description",
AuthRequired: []string{},
Delay: "1s",
MaxDelay: "5s",
Multiplier: 1.5,
MaxRetries: 5,
},
},
},
}
for _, tc := range tcs {
t.Run(tc.desc, func(t *testing.T) {
got := struct {
Tools server.ToolConfigs `yaml:"tools"`
}{}
// Parse contents
err := yaml.UnmarshalContext(ctx, testutils.FormatYaml(tc.in), &got)
if err != nil {
t.Fatalf("unable to unmarshal: %s", err)
}
if diff := cmp.Diff(tc.want, got.Tools); diff != "" {
t.Fatalf("incorrect parse: diff %v", diff)
}
})
}
}

View File

@@ -0,0 +1,210 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package utility
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httptest"
"reflect"
"regexp"
"sync"
"testing"
"time"
"github.com/googleapis/genai-toolbox/internal/testutils"
"github.com/googleapis/genai-toolbox/tests"
)
var (
httpSourceKind = "http"
waitToolKind = "alloydb-wait-for-operation"
)
type operation struct {
Name string `json:"name"`
Done bool `json:"done"`
Result string `json:"result,omitempty"`
Error *struct {
Code int `json:"code"`
Message string `json:"message"`
} `json:"error,omitempty"`
}
type handler struct {
mu sync.Mutex
operations map[string]*operation
}
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.mu.Lock()
defer h.mu.Unlock()
// The format is projects/{project}/locations/{location}/operations/{operation_id}
// We only care about the operation_id for the test
parts := regexp.MustCompile("/").Split(r.URL.Path, -1)
opName := parts[len(parts)-1]
op, ok := h.operations[opName]
if !ok {
http.NotFound(w, r)
return
}
if !op.Done {
op.Done = true
}
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(op); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
func TestWaitToolEndpoints(t *testing.T) {
h := &handler{
operations: map[string]*operation{
"op1": {Name: "op1", Done: false, Result: "success"},
"op2": {Name: "op2", Done: false, Error: &struct {
Code int `json:"code"`
Message string `json:"message"`
}{Code: 1, Message: "failed"}},
},
}
server := httptest.NewServer(h)
defer server.Close()
sourceConfig := map[string]any{
"kind": httpSourceKind,
"baseUrl": server.URL,
}
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
var args []string
toolsFile := getWaitToolsConfig(sourceConfig)
cmd, cleanup, err := tests.StartCmd(ctx, toolsFile, args...)
if err != nil {
t.Fatalf("command initialization returned an error: %s", err)
}
defer cleanup()
waitCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
out, err := testutils.WaitForString(waitCtx, regexp.MustCompile(`Server ready to serve`), cmd.Out)
if err != nil {
t.Logf("toolbox command logs: \n%s", out)
t.Fatalf("toolbox didn't start successfully: %s", err)
}
tcs := []struct {
name string
toolName string
body string
want string
expectError bool
}{
{
name: "successful operation",
toolName: "wait-for-op1",
body: `{"project": "p1", "location": "l1", "operation_id": "op1"}`,
want: `{"name":"op1","done":true,"result":"success"}`,
},
{
name: "failed operation",
toolName: "wait-for-op2",
body: `{"project": "p1", "location": "l1", "operation_id": "op2"}`,
expectError: true,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
api := fmt.Sprintf("http://127.0.0.1:5000/api/tool/%s/invoke", tc.toolName)
req, err := http.NewRequest(http.MethodPost, api, bytes.NewBufferString(tc.body))
if err != nil {
t.Fatalf("unable to create request: %s", err)
}
req.Header.Add("Content-type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("unable to send request: %s", err)
}
defer resp.Body.Close()
if tc.expectError {
if resp.StatusCode == http.StatusOK {
t.Fatal("expected error but got status 200")
}
return
}
if resp.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(resp.Body)
t.Fatalf("response status code is not 200, got %d: %s", resp.StatusCode, string(bodyBytes))
}
var result struct {
Result string `json:"result"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
t.Fatalf("failed to decode response: %v", err)
}
// The result is a JSON-encoded string, so we need to unmarshal it twice.
var unmarshaledResult string
if err := json.Unmarshal([]byte(result.Result), &unmarshaledResult); err != nil {
t.Fatalf("failed to unmarshal result string: %v", err)
}
var got, want map[string]any
if err := json.Unmarshal([]byte(unmarshaledResult), &got); err != nil {
t.Fatalf("failed to unmarshal result: %v", err)
}
if err := json.Unmarshal([]byte(tc.want), &want); err != nil {
t.Fatalf("failed to unmarshal want: %v", err)
}
if !reflect.DeepEqual(got, want) {
t.Fatalf("unexpected result: got %+v, want %+v", got, want)
}
})
}
}
func getWaitToolsConfig(sourceConfig map[string]any) map[string]any {
return map[string]any{
"sources": map[string]any{
"my-instance": sourceConfig,
},
"tools": map[string]any{
"wait-for-op1": map[string]any{
"kind": waitToolKind,
"description": "wait for op1",
"source": "my-instance",
},
"wait-for-op2": map[string]any{
"kind": waitToolKind,
"description": "wait for op2",
"source": "my-instance",
},
},
}
}