Compare commits

...

5 Commits

Author SHA1 Message Date
Yuan Teoh
2714d566ba refactor: move source implementation in Invoke() function to Source 2026-01-07 23:27:54 -08:00
Wenxin Du
4d3332d37d refactor: add configurable result options for ExecuteSql tests (#2271)
Adds WithExecuteCreateWant, WithExecuteDropWant, and
WithExecuteSelectEmptyWant to RunExecuteSqlToolInvokeTest to allow
sources like Snowflake to validate specific DDL/DML status responses
instead of defaulting to null.
2026-01-07 22:30:41 +00:00
dependabot[bot]
1203b7370a chore(deps): bump jws from 4.0.0 to 4.0.1 in /docs/en/getting-started/quickstart/js/llamaindex (#2260)
Bumps [jws](https://github.com/brianloveswords/node-jws) from 4.0.0 to
4.0.1.
<details>
<summary>Release notes</summary>
<p><em>Sourced from <a
href="https://github.com/brianloveswords/node-jws/releases">jws's
releases</a>.</em></p>
<blockquote>
<h2>v4.0.1</h2>
<h3>Changed</h3>
<ul>
<li>Fix advisory GHSA-869p-cjfg-cm3x: createSign and createVerify now
require
that a non empty secret is provided (via opts.secret, opts.privateKey or
opts.key)
when using HMAC algorithms.</li>
<li>Upgrading JWA version to 2.0.1, addressing a compatibility issue for
Node &gt;= 25.</li>
</ul>
</blockquote>
</details>
<details>
<summary>Changelog</summary>
<p><em>Sourced from <a
href="https://github.com/auth0/node-jws/blob/master/CHANGELOG.md">jws's
changelog</a>.</em></p>
<blockquote>
<h2>[4.0.1]</h2>
<h3>Changed</h3>
<ul>
<li>Fix advisory GHSA-869p-cjfg-cm3x: createSign and createVerify now
require
that a non empty secret is provided (via opts.secret, opts.privateKey or
opts.key)
when using HMAC algorithms.</li>
<li>Upgrading JWA version to 2.0.1, adressing a compatibility issue for
Node &gt;= 25.</li>
</ul>
<h2>[3.2.3]</h2>
<h3>Changed</h3>
<ul>
<li>Fix advisory GHSA-869p-cjfg-cm3x: createSign and createVerify now
require
that a non empty secret is provided (via opts.secret, opts.privateKey or
opts.key)
when using HMAC algorithms.</li>
<li>Upgrading JWA version to 1.4.2, adressing a compatibility issue for
Node &gt;= 25.</li>
</ul>
<h2>[3.0.0]</h2>
<h3>Changed</h3>
<ul>
<li><strong>BREAKING</strong>: <code>jwt.verify</code> now requires an
<code>algorithm</code> parameter, and
<code>jws.createVerify</code> requires an <code>algorithm</code> option.
The <code>&quot;alg&quot;</code> field
signature headers is ignored. This mitigates a critical security flaw
in the library which would allow an attacker to generate signatures with
arbitrary contents that would be accepted by <code>jwt.verify</code>.
See
<a
href="https://auth0.com/blog/2015/03/31/critical-vulnerabilities-in-json-web-token-libraries/">https://auth0.com/blog/2015/03/31/critical-vulnerabilities-in-json-web-token-libraries/</a>
for details.</li>
</ul>
<h2><a
href="https://github.com/brianloveswords/node-jws/compare/v1.0.1...v2.0.0">2.0.0</a>
- 2015-01-30</h2>
<h3>Changed</h3>
<ul>
<li>
<p><strong>BREAKING</strong>: Default payload encoding changed from
<code>binary</code> to
<code>utf8</code>. <code>utf8</code> is a is a more sensible default
than <code>binary</code> because
many payloads, as far as I can tell, will contain user-facing
strings that could be in any language. (<!-- raw HTML omitted
-->[6b6de48]<!-- raw HTML omitted -->)</p>
</li>
<li>
<p>Code reorganization, thanks [<a
href="https://github.com/fearphage"><code>@​fearphage</code></a>]! (<!--
raw HTML omitted --><a
href="https://github.com/brianloveswords/node-jws/commit/7880050">7880050</a><!--
raw HTML omitted -->)</p>
</li>
</ul>
<h3>Added</h3>
<ul>
<li>Option in all relevant methods for <code>encoding</code>. For those
few users
that might be depending on a <code>binary</code> encoding of the
messages, this
is for them. (<!-- raw HTML omitted -->[6b6de48]<!-- raw HTML omitted
-->)</li>
</ul>
<!-- raw HTML omitted -->
</blockquote>
<p>... (truncated)</p>
</details>
<details>
<summary>Commits</summary>
<ul>
<li><a
href="34c45b2c04"><code>34c45b2</code></a>
Merge commit from fork</li>
<li><a
href="49bc39b1f5"><code>49bc39b</code></a>
version 4.0.1</li>
<li><a
href="d42350ccab"><code>d42350c</code></a>
Enhance tests for HMAC streaming sign and verify</li>
<li><a
href="5cb007cf82"><code>5cb007c</code></a>
Improve secretOrKey initialization in VerifyStream</li>
<li><a
href="f9a2e1c8c6"><code>f9a2e1c</code></a>
Improve secret handling in SignStream</li>
<li><a
href="b9fb8d30e9"><code>b9fb8d3</code></a>
Merge pull request <a
href="https://redirect.github.com/brianloveswords/node-jws/issues/102">#102</a>
from auth0/SRE-57-Upload-opslevel-yaml</li>
<li><a
href="95b75ee56c"><code>95b75ee</code></a>
Upload OpsLevel YAML</li>
<li><a
href="8857ee7762"><code>8857ee7</code></a>
test: remove unused variable (<a
href="https://redirect.github.com/brianloveswords/node-jws/issues/96">#96</a>)</li>
<li>See full diff in <a
href="https://github.com/brianloveswords/node-jws/compare/v4.0.0...v4.0.1">compare
view</a></li>
</ul>
</details>
<details>
<summary>Maintainer changes</summary>
<p>This version was pushed to npm by <a
href="https://www.npmjs.com/~julien.wollscheid">julien.wollscheid</a>, a
new releaser for jws since your current version.</p>
</details>
<br />


[![Dependabot compatibility
score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=jws&package-manager=npm_and_yarn&previous-version=4.0.0&new-version=4.0.1)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)

Dependabot will resolve any conflicts with this PR as long as you don't
alter it yourself. You can also trigger a rebase manually by commenting
`@dependabot rebase`.

[//]: # (dependabot-automerge-start)
[//]: # (dependabot-automerge-end)

---

<details>
<summary>Dependabot commands and options</summary>
<br />

You can trigger Dependabot actions by commenting on this PR:
- `@dependabot rebase` will rebase this PR
- `@dependabot recreate` will recreate this PR, overwriting any edits
that have been made to it
- `@dependabot merge` will merge this PR after your CI passes on it
- `@dependabot squash and merge` will squash and merge this PR after
your CI passes on it
- `@dependabot cancel merge` will cancel a previously requested merge
and block automerging
- `@dependabot reopen` will reopen this PR if it is closed
- `@dependabot close` will close this PR and stop Dependabot recreating
it. You can achieve the same result by closing it manually
- `@dependabot show <dependency name> ignore conditions` will show all
of the ignore conditions of the specified dependency
- `@dependabot ignore this major version` will close this PR and stop
Dependabot creating any more for this major version (unless you reopen
the PR or upgrade to it yourself)
- `@dependabot ignore this minor version` will close this PR and stop
Dependabot creating any more for this minor version (unless you reopen
the PR or upgrade to it yourself)
- `@dependabot ignore this dependency` will close this PR and stop
Dependabot creating any more for this dependency (unless you reopen the
PR or upgrade to it yourself)
You can disable automated security fix PRs for this repo from the
[Security Alerts
page](https://github.com/googleapis/genai-toolbox/network/alerts).

</details>

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Yuan Teoh <45984206+Yuan325@users.noreply.github.com>
2026-01-07 21:37:47 +00:00
dependabot[bot]
4a26ce3c1b chore(deps): bump jws from 4.0.0 to 4.0.1 in /docs/en/getting-started/quickstart/js/genAI (#2259)
Bumps [jws](https://github.com/brianloveswords/node-jws) from 4.0.0 to
4.0.1.
<details>
<summary>Release notes</summary>
<p><em>Sourced from <a
href="https://github.com/brianloveswords/node-jws/releases">jws's
releases</a>.</em></p>
<blockquote>
<h2>v4.0.1</h2>
<h3>Changed</h3>
<ul>
<li>Fix advisory GHSA-869p-cjfg-cm3x: createSign and createVerify now
require
that a non empty secret is provided (via opts.secret, opts.privateKey or
opts.key)
when using HMAC algorithms.</li>
<li>Upgrading JWA version to 2.0.1, addressing a compatibility issue for
Node &gt;= 25.</li>
</ul>
</blockquote>
</details>
<details>
<summary>Changelog</summary>
<p><em>Sourced from <a
href="https://github.com/auth0/node-jws/blob/master/CHANGELOG.md">jws's
changelog</a>.</em></p>
<blockquote>
<h2>[4.0.1]</h2>
<h3>Changed</h3>
<ul>
<li>Fix advisory GHSA-869p-cjfg-cm3x: createSign and createVerify now
require
that a non empty secret is provided (via opts.secret, opts.privateKey or
opts.key)
when using HMAC algorithms.</li>
<li>Upgrading JWA version to 2.0.1, adressing a compatibility issue for
Node &gt;= 25.</li>
</ul>
<h2>[3.2.3]</h2>
<h3>Changed</h3>
<ul>
<li>Fix advisory GHSA-869p-cjfg-cm3x: createSign and createVerify now
require
that a non empty secret is provided (via opts.secret, opts.privateKey or
opts.key)
when using HMAC algorithms.</li>
<li>Upgrading JWA version to 1.4.2, adressing a compatibility issue for
Node &gt;= 25.</li>
</ul>
<h2>[3.0.0]</h2>
<h3>Changed</h3>
<ul>
<li><strong>BREAKING</strong>: <code>jwt.verify</code> now requires an
<code>algorithm</code> parameter, and
<code>jws.createVerify</code> requires an <code>algorithm</code> option.
The <code>&quot;alg&quot;</code> field
signature headers is ignored. This mitigates a critical security flaw
in the library which would allow an attacker to generate signatures with
arbitrary contents that would be accepted by <code>jwt.verify</code>.
See
<a
href="https://auth0.com/blog/2015/03/31/critical-vulnerabilities-in-json-web-token-libraries/">https://auth0.com/blog/2015/03/31/critical-vulnerabilities-in-json-web-token-libraries/</a>
for details.</li>
</ul>
<h2><a
href="https://github.com/brianloveswords/node-jws/compare/v1.0.1...v2.0.0">2.0.0</a>
- 2015-01-30</h2>
<h3>Changed</h3>
<ul>
<li>
<p><strong>BREAKING</strong>: Default payload encoding changed from
<code>binary</code> to
<code>utf8</code>. <code>utf8</code> is a is a more sensible default
than <code>binary</code> because
many payloads, as far as I can tell, will contain user-facing
strings that could be in any language. (<!-- raw HTML omitted
-->[6b6de48]<!-- raw HTML omitted -->)</p>
</li>
<li>
<p>Code reorganization, thanks [<a
href="https://github.com/fearphage"><code>@​fearphage</code></a>]! (<!--
raw HTML omitted --><a
href="https://github.com/brianloveswords/node-jws/commit/7880050">7880050</a><!--
raw HTML omitted -->)</p>
</li>
</ul>
<h3>Added</h3>
<ul>
<li>Option in all relevant methods for <code>encoding</code>. For those
few users
that might be depending on a <code>binary</code> encoding of the
messages, this
is for them. (<!-- raw HTML omitted -->[6b6de48]<!-- raw HTML omitted
-->)</li>
</ul>
<!-- raw HTML omitted -->
</blockquote>
<p>... (truncated)</p>
</details>
<details>
<summary>Commits</summary>
<ul>
<li><a
href="34c45b2c04"><code>34c45b2</code></a>
Merge commit from fork</li>
<li><a
href="49bc39b1f5"><code>49bc39b</code></a>
version 4.0.1</li>
<li><a
href="d42350ccab"><code>d42350c</code></a>
Enhance tests for HMAC streaming sign and verify</li>
<li><a
href="5cb007cf82"><code>5cb007c</code></a>
Improve secretOrKey initialization in VerifyStream</li>
<li><a
href="f9a2e1c8c6"><code>f9a2e1c</code></a>
Improve secret handling in SignStream</li>
<li><a
href="b9fb8d30e9"><code>b9fb8d3</code></a>
Merge pull request <a
href="https://redirect.github.com/brianloveswords/node-jws/issues/102">#102</a>
from auth0/SRE-57-Upload-opslevel-yaml</li>
<li><a
href="95b75ee56c"><code>95b75ee</code></a>
Upload OpsLevel YAML</li>
<li><a
href="8857ee7762"><code>8857ee7</code></a>
test: remove unused variable (<a
href="https://redirect.github.com/brianloveswords/node-jws/issues/96">#96</a>)</li>
<li>See full diff in <a
href="https://github.com/brianloveswords/node-jws/compare/v4.0.0...v4.0.1">compare
view</a></li>
</ul>
</details>
<details>
<summary>Maintainer changes</summary>
<p>This version was pushed to npm by <a
href="https://www.npmjs.com/~julien.wollscheid">julien.wollscheid</a>, a
new releaser for jws since your current version.</p>
</details>
<br />


[![Dependabot compatibility
score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=jws&package-manager=npm_and_yarn&previous-version=4.0.0&new-version=4.0.1)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)

Dependabot will resolve any conflicts with this PR as long as you don't
alter it yourself. You can also trigger a rebase manually by commenting
`@dependabot rebase`.

[//]: # (dependabot-automerge-start)
[//]: # (dependabot-automerge-end)

---

<details>
<summary>Dependabot commands and options</summary>
<br />

You can trigger Dependabot actions by commenting on this PR:
- `@dependabot rebase` will rebase this PR
- `@dependabot recreate` will recreate this PR, overwriting any edits
that have been made to it
- `@dependabot merge` will merge this PR after your CI passes on it
- `@dependabot squash and merge` will squash and merge this PR after
your CI passes on it
- `@dependabot cancel merge` will cancel a previously requested merge
and block automerging
- `@dependabot reopen` will reopen this PR if it is closed
- `@dependabot close` will close this PR and stop Dependabot recreating
it. You can achieve the same result by closing it manually
- `@dependabot show <dependency name> ignore conditions` will show all
of the ignore conditions of the specified dependency
- `@dependabot ignore this major version` will close this PR and stop
Dependabot creating any more for this major version (unless you reopen
the PR or upgrade to it yourself)
- `@dependabot ignore this minor version` will close this PR and stop
Dependabot creating any more for this minor version (unless you reopen
the PR or upgrade to it yourself)
- `@dependabot ignore this dependency` will close this PR and stop
Dependabot creating any more for this dependency (unless you reopen the
PR or upgrade to it yourself)
You can disable automated security fix PRs for this repo from the
[Security Alerts
page](https://github.com/googleapis/genai-toolbox/network/alerts).

</details>

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Yuan Teoh <45984206+Yuan325@users.noreply.github.com>
2026-01-07 13:17:26 -08:00
Mend Renovate
306b5becda chore(deps): update pip (#2258)
This PR contains the following updates:

| Package | Change |
[Age](https://docs.renovatebot.com/merge-confidence/) |
[Confidence](https://docs.renovatebot.com/merge-confidence/) |
|---|---|---|---|
| [langchain](https://redirect.github.com/langchain-ai/langchain)
([source](https://redirect.github.com/langchain-ai/langchain/tree/HEAD/libs/langchain),
[changelog](https://redirect.github.com/langchain-ai/langchain/releases?q=tag%3A%22langchain%3D%3D1%22))
| `==1.2.0` → `==1.2.1` |
![age](https://developer.mend.io/api/mc/badges/age/pypi/langchain/1.2.1?slim=true)
|
![confidence](https://developer.mend.io/api/mc/badges/confidence/pypi/langchain/1.2.0/1.2.1?slim=true)
|
|
[langchain-google-vertexai](https://redirect.github.com/langchain-ai/langchain-google)
([source](https://redirect.github.com/langchain-ai/langchain-google/tree/HEAD/libs/vertexai),
[changelog](https://redirect.github.com/langchain-ai/langchain-google/releases?q=%22vertexai%22))
| `==3.2.0` → `==3.2.1` |
![age](https://developer.mend.io/api/mc/badges/age/pypi/langchain-google-vertexai/3.2.1?slim=true)
|
![confidence](https://developer.mend.io/api/mc/badges/confidence/pypi/langchain-google-vertexai/3.2.0/3.2.1?slim=true)
|

---

### Configuration

📅 **Schedule**: Branch creation - At any time (no schedule defined),
Automerge - At any time (no schedule defined).

🚦 **Automerge**: Disabled by config. Please merge this manually once you
are satisfied.

♻ **Rebasing**: Whenever PR becomes conflicted, or you tick the
rebase/retry checkbox.

👻 **Immortal**: This PR will be recreated if closed unmerged. Get
[config
help](https://redirect.github.com/renovatebot/renovate/discussions) if
that's undesired.

---

- [ ] <!-- rebase-check -->If you want to rebase/retry this PR, check
this box

---

This PR was generated by [Mend Renovate](https://mend.io/renovate/).
View the [repository job
log](https://developer.mend.io/github/googleapis/genai-toolbox).

<!--renovate-debug:eyJjcmVhdGVkSW5WZXIiOiI0Mi42OS4xIiwidXBkYXRlZEluVmVyIjoiNDIuNjkuMSIsInRhcmdldEJyYW5jaCI6Im1haW4iLCJsYWJlbHMiOltdfQ==-->

Co-authored-by: Yuan Teoh <45984206+Yuan325@users.noreply.github.com>
2026-01-07 10:31:03 -08:00
20 changed files with 411 additions and 390 deletions

View File

@@ -569,11 +569,12 @@
}
},
"node_modules/jws": {
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/jws/-/jws-4.0.0.tgz",
"integrity": "sha512-KDncfTmOZoOMTFG4mBlG0qUIOlc03fmzH+ru6RgYVZhPkyiy/92Owlt/8UEN+a4TXR1FQetfIpJE8ApdvdVxTg==",
"version": "4.0.1",
"resolved": "https://registry.npmjs.org/jws/-/jws-4.0.1.tgz",
"integrity": "sha512-EKI/M/yqPncGUUh44xz0PxSidXFr/+r0pA70+gIYhjv+et7yxM+s29Y+VGDkovRofQem0fs7Uvf4+YmAdyRduA==",
"license": "MIT",
"dependencies": {
"jwa": "^2.0.0",
"jwa": "^2.0.1",
"safe-buffer": "^5.0.1"
}
},

View File

@@ -882,11 +882,12 @@
}
},
"node_modules/jws": {
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/jws/-/jws-4.0.0.tgz",
"integrity": "sha512-KDncfTmOZoOMTFG4mBlG0qUIOlc03fmzH+ru6RgYVZhPkyiy/92Owlt/8UEN+a4TXR1FQetfIpJE8ApdvdVxTg==",
"version": "4.0.1",
"resolved": "https://registry.npmjs.org/jws/-/jws-4.0.1.tgz",
"integrity": "sha512-EKI/M/yqPncGUUh44xz0PxSidXFr/+r0pA70+gIYhjv+et7yxM+s29Y+VGDkovRofQem0fs7Uvf4+YmAdyRduA==",
"license": "MIT",
"dependencies": {
"jwa": "^2.0.0",
"jwa": "^2.0.1",
"safe-buffer": "^5.0.1"
}
},

View File

@@ -1,5 +1,5 @@
langchain==1.2.0
langchain-google-vertexai==3.2.0
langchain==1.2.1
langchain-google-vertexai==3.2.1
langgraph==1.0.5
toolbox-langchain==0.5.4
pytest==9.0.2

View File

@@ -19,6 +19,8 @@ import (
"fmt"
dataplexapi "cloud.google.com/go/dataplex/apiv1"
"cloud.google.com/go/dataplex/apiv1/dataplexpb"
"github.com/cenkalti/backoff/v5"
"github.com/goccy/go-yaml"
"github.com/googleapis/genai-toolbox/internal/sources"
"github.com/googleapis/genai-toolbox/internal/util"
@@ -121,3 +123,101 @@ func initDataplexConnection(
}
return client, nil
}
func (s *Source) LookupEntry(ctx context.Context, name string, view int, aspectTypes []string, entry string) (*dataplexpb.Entry, error) {
viewMap := map[int]dataplexpb.EntryView{
1: dataplexpb.EntryView_BASIC,
2: dataplexpb.EntryView_FULL,
3: dataplexpb.EntryView_CUSTOM,
4: dataplexpb.EntryView_ALL,
}
req := &dataplexpb.LookupEntryRequest{
Name: name,
View: viewMap[view],
AspectTypes: aspectTypes,
Entry: entry,
}
result, err := s.CatalogClient().LookupEntry(ctx, req)
if err != nil {
return nil, err
}
return result, nil
}
func (s *Source) searchRequest(ctx context.Context, query string, pageSize int, orderBy string) (*dataplexapi.SearchEntriesResultIterator, error) {
// Create SearchEntriesRequest with the provided parameters
req := &dataplexpb.SearchEntriesRequest{
Query: query,
Name: fmt.Sprintf("projects/%s/locations/global", s.ProjectID()),
PageSize: int32(pageSize),
OrderBy: orderBy,
SemanticSearch: true,
}
// Perform the search using the CatalogClient - this will return an iterator
it := s.CatalogClient().SearchEntries(ctx, req)
if it == nil {
return nil, fmt.Errorf("failed to create search entries iterator for project %q", s.ProjectID())
}
return it, nil
}
func (s *Source) SearchAspectTypes(ctx context.Context, query string, pageSize int, orderBy string) ([]*dataplexpb.AspectType, error) {
q := query + " type=projects/dataplex-types/locations/global/entryTypes/aspecttype"
it, err := s.searchRequest(ctx, q, pageSize, orderBy)
if err != nil {
return nil, err
}
// Iterate through the search results and call GetAspectType for each result using the resource name
var results []*dataplexpb.AspectType
for {
entry, err := it.Next()
if err != nil {
break
}
// Create an instance of exponential backoff with default values for retrying GetAspectType calls
// InitialInterval, RandomizationFactor, Multiplier, MaxInterval = 500 ms, 0.5, 1.5, 60 s
getAspectBackOff := backoff.NewExponentialBackOff()
resourceName := entry.DataplexEntry.GetEntrySource().Resource
getAspectTypeReq := &dataplexpb.GetAspectTypeRequest{
Name: resourceName,
}
operation := func() (*dataplexpb.AspectType, error) {
aspectType, err := s.CatalogClient().GetAspectType(ctx, getAspectTypeReq)
if err != nil {
return nil, fmt.Errorf("failed to get aspect type for entry %q: %w", resourceName, err)
}
return aspectType, nil
}
// Retry the GetAspectType operation with exponential backoff
aspectType, err := backoff.Retry(ctx, operation, backoff.WithBackOff(getAspectBackOff))
if err != nil {
return nil, fmt.Errorf("failed to get aspect type after retries for entry %q: %w", resourceName, err)
}
results = append(results, aspectType)
}
return results, nil
}
func (s *Source) SearchEntries(ctx context.Context, query string, pageSize int, orderBy string) ([]*dataplexpb.SearchEntriesResult, error) {
it, err := s.searchRequest(ctx, query, pageSize, orderBy)
if err != nil {
return nil, err
}
var results []*dataplexpb.SearchEntriesResult
for {
entry, err := it.Next()
if err != nil {
break
}
results = append(results, entry)
}
return results, nil
}

View File

@@ -16,7 +16,9 @@ package http
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"time"
@@ -143,3 +145,28 @@ func (s *Source) HttpQueryParams() map[string]string {
func (s *Source) Client() *http.Client {
return s.client
}
func (s *Source) RunRequest(req *http.Request) (any, error) {
// Make request and fetch response
resp, err := s.Client().Do(req)
if err != nil {
return nil, fmt.Errorf("error making HTTP request: %s", err)
}
defer resp.Body.Close()
var body []byte
body, err = io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
return nil, fmt.Errorf("unexpected status code: %d, response body: %s", resp.StatusCode, string(body))
}
var data any
if err = json.Unmarshal(body, &data); err != nil {
// if unable to unmarshal data, return result as string.
return string(body), nil
}
return data, nil
}

View File

@@ -16,15 +16,21 @@ package serverlessspark
import (
"context"
"encoding/json"
"fmt"
"time"
dataproc "cloud.google.com/go/dataproc/v2/apiv1"
"cloud.google.com/go/dataproc/v2/apiv1/dataprocpb"
longrunning "cloud.google.com/go/longrunning/autogen"
"cloud.google.com/go/longrunning/autogen/longrunningpb"
"github.com/goccy/go-yaml"
"github.com/googleapis/genai-toolbox/internal/sources"
"github.com/googleapis/genai-toolbox/internal/util"
"go.opentelemetry.io/otel/trace"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/protobuf/encoding/protojson"
)
const SourceKind string = "serverless-spark"
@@ -121,3 +127,168 @@ func (s *Source) Close() error {
}
return nil
}
func (s *Source) CancelOperation(ctx context.Context, operation string) (any, error) {
req := &longrunningpb.CancelOperationRequest{
Name: fmt.Sprintf("projects/%s/locations/%s/operations/%s", s.GetProject(), s.GetLocation(), operation),
}
client, err := s.GetOperationsClient(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get operations client: %w", err)
}
err = client.CancelOperation(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to cancel operation: %w", err)
}
return fmt.Sprintf("Cancelled [%s].", operation), nil
}
func (s *Source) CreateBatch(ctx context.Context, batch *dataprocpb.Batch) (map[string]any, error) {
req := &dataprocpb.CreateBatchRequest{
Parent: fmt.Sprintf("projects/%s/locations/%s", s.GetProject(), s.GetLocation()),
Batch: batch,
}
client := s.GetBatchControllerClient()
op, err := client.CreateBatch(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to create batch: %w", err)
}
meta, err := op.Metadata()
if err != nil {
return nil, fmt.Errorf("failed to get create batch op metadata: %w", err)
}
projectID, location, batchID, err := ExtractBatchDetails(meta.GetBatch())
if err != nil {
return nil, fmt.Errorf("error extracting batch details from name %q: %v", meta.GetBatch(), err)
}
consoleUrl := BatchConsoleURL(projectID, location, batchID)
logsUrl := BatchLogsURL(projectID, location, batchID, meta.GetCreateTime().AsTime(), time.Time{})
wrappedResult := map[string]any{
"opMetadata": meta,
"consoleUrl": consoleUrl,
"logsUrl": logsUrl,
}
return wrappedResult, nil
}
// ListBatchesResponse is the response from the list batches API.
type ListBatchesResponse struct {
Batches []Batch `json:"batches"`
NextPageToken string `json:"nextPageToken"`
}
// Batch represents a single batch job.
type Batch struct {
Name string `json:"name"`
UUID string `json:"uuid"`
State string `json:"state"`
Creator string `json:"creator"`
CreateTime string `json:"createTime"`
Operation string `json:"operation"`
ConsoleURL string `json:"consoleUrl"`
LogsURL string `json:"logsUrl"`
}
func (s *Source) ListBatches(ctx context.Context, ps *int, pt, filter string) (any, error) {
client := s.GetBatchControllerClient()
parent := fmt.Sprintf("projects/%s/locations/%s", s.GetProject(), s.GetLocation())
req := &dataprocpb.ListBatchesRequest{
Parent: parent,
OrderBy: "create_time desc",
}
if ps != nil {
req.PageSize = int32(*ps)
}
if pt != "" {
req.PageToken = pt
}
if filter != "" {
req.Filter = filter
}
it := client.ListBatches(ctx, req)
pager := iterator.NewPager(it, int(req.PageSize), req.PageToken)
var batchPbs []*dataprocpb.Batch
nextPageToken, err := pager.NextPage(&batchPbs)
if err != nil {
return nil, fmt.Errorf("failed to list batches: %w", err)
}
batches, err := ToBatches(batchPbs)
if err != nil {
return nil, err
}
return ListBatchesResponse{Batches: batches, NextPageToken: nextPageToken}, nil
}
// ToBatches converts a slice of protobuf Batch messages to a slice of Batch structs.
func ToBatches(batchPbs []*dataprocpb.Batch) ([]Batch, error) {
batches := make([]Batch, 0, len(batchPbs))
for _, batchPb := range batchPbs {
consoleUrl, err := BatchConsoleURLFromProto(batchPb)
if err != nil {
return nil, fmt.Errorf("error generating console url: %v", err)
}
logsUrl, err := BatchLogsURLFromProto(batchPb)
if err != nil {
return nil, fmt.Errorf("error generating logs url: %v", err)
}
batch := Batch{
Name: batchPb.Name,
UUID: batchPb.Uuid,
State: batchPb.State.Enum().String(),
Creator: batchPb.Creator,
CreateTime: batchPb.CreateTime.AsTime().Format(time.RFC3339),
Operation: batchPb.Operation,
ConsoleURL: consoleUrl,
LogsURL: logsUrl,
}
batches = append(batches, batch)
}
return batches, nil
}
func (s *Source) GetBatch(ctx context.Context, name string) (map[string]any, error) {
client := s.GetBatchControllerClient()
req := &dataprocpb.GetBatchRequest{
Name: fmt.Sprintf("projects/%s/locations/%s/batches/%s", s.GetProject(), s.GetLocation(), name),
}
batchPb, err := client.GetBatch(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to get batch: %w", err)
}
jsonBytes, err := protojson.Marshal(batchPb)
if err != nil {
return nil, fmt.Errorf("failed to marshal batch to JSON: %w", err)
}
var result map[string]any
if err := json.Unmarshal(jsonBytes, &result); err != nil {
return nil, fmt.Errorf("failed to unmarshal batch JSON: %w", err)
}
consoleUrl, err := BatchConsoleURLFromProto(batchPb)
if err != nil {
return nil, fmt.Errorf("error generating console url: %v", err)
}
logsUrl, err := BatchLogsURLFromProto(batchPb)
if err != nil {
return nil, fmt.Errorf("error generating logs url: %v", err)
}
wrappedResult := map[string]any{
"consoleUrl": consoleUrl,
"logsUrl": logsUrl,
"batch": result,
}
return wrappedResult, nil
}

View File

@@ -1,10 +1,10 @@
// Copyright 2025 Google LLC
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package common
package serverlessspark
import (
"fmt"
@@ -23,13 +23,13 @@ import (
"cloud.google.com/go/dataproc/v2/apiv1/dataprocpb"
)
var batchFullNameRegex = regexp.MustCompile(`projects/(?P<project>[^/]+)/locations/(?P<location>[^/]+)/batches/(?P<batch_id>[^/]+)`)
const (
logTimeBufferBefore = 1 * time.Minute
logTimeBufferAfter = 10 * time.Minute
)
var batchFullNameRegex = regexp.MustCompile(`projects/(?P<project>[^/]+)/locations/(?P<location>[^/]+)/batches/(?P<batch_id>[^/]+)`)
// Extract BatchDetails extracts the project ID, location, and batch ID from a fully qualified batch name.
func ExtractBatchDetails(batchName string) (projectID, location, batchID string, err error) {
matches := batchFullNameRegex.FindStringSubmatch(batchName)
@@ -39,26 +39,6 @@ func ExtractBatchDetails(batchName string) (projectID, location, batchID string,
return matches[1], matches[2], matches[3], nil
}
// BatchConsoleURLFromProto builds a URL to the Google Cloud Console linking to the batch summary page.
func BatchConsoleURLFromProto(batchPb *dataprocpb.Batch) (string, error) {
projectID, location, batchID, err := ExtractBatchDetails(batchPb.GetName())
if err != nil {
return "", err
}
return BatchConsoleURL(projectID, location, batchID), nil
}
// BatchLogsURLFromProto builds a URL to the Google Cloud Console showing Cloud Logging for the given batch and time range.
func BatchLogsURLFromProto(batchPb *dataprocpb.Batch) (string, error) {
projectID, location, batchID, err := ExtractBatchDetails(batchPb.GetName())
if err != nil {
return "", err
}
createTime := batchPb.GetCreateTime().AsTime()
stateTime := batchPb.GetStateTime().AsTime()
return BatchLogsURL(projectID, location, batchID, createTime, stateTime), nil
}
// BatchConsoleURL builds a URL to the Google Cloud Console linking to the batch summary page.
func BatchConsoleURL(projectID, location, batchID string) string {
return fmt.Sprintf("https://console.cloud.google.com/dataproc/batches/%s/%s/summary?project=%s", location, batchID, projectID)
@@ -89,3 +69,23 @@ resource.labels.batch_id="%s"`
return "https://console.cloud.google.com/logs/viewer?" + v.Encode()
}
// BatchConsoleURLFromProto builds a URL to the Google Cloud Console linking to the batch summary page.
func BatchConsoleURLFromProto(batchPb *dataprocpb.Batch) (string, error) {
projectID, location, batchID, err := ExtractBatchDetails(batchPb.GetName())
if err != nil {
return "", err
}
return BatchConsoleURL(projectID, location, batchID), nil
}
// BatchLogsURLFromProto builds a URL to the Google Cloud Console showing Cloud Logging for the given batch and time range.
func BatchLogsURLFromProto(batchPb *dataprocpb.Batch) (string, error) {
projectID, location, batchID, err := ExtractBatchDetails(batchPb.GetName())
if err != nil {
return "", err
}
createTime := batchPb.GetCreateTime().AsTime()
stateTime := batchPb.GetStateTime().AsTime()
return BatchLogsURL(projectID, location, batchID, createTime, stateTime), nil
}

View File

@@ -1,10 +1,10 @@
// Copyright 2025 Google LLC
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
@@ -12,19 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package common
package serverlessspark_test
import (
"testing"
"time"
"cloud.google.com/go/dataproc/v2/apiv1/dataprocpb"
"github.com/googleapis/genai-toolbox/internal/sources/serverlessspark"
"google.golang.org/protobuf/types/known/timestamppb"
)
func TestExtractBatchDetails_Success(t *testing.T) {
batchName := "projects/my-project/locations/us-central1/batches/my-batch"
projectID, location, batchID, err := ExtractBatchDetails(batchName)
projectID, location, batchID, err := serverlessspark.ExtractBatchDetails(batchName)
if err != nil {
t.Errorf("ExtractBatchDetails() error = %v, want no error", err)
return
@@ -45,7 +46,7 @@ func TestExtractBatchDetails_Success(t *testing.T) {
func TestExtractBatchDetails_Failure(t *testing.T) {
batchName := "invalid-name"
_, _, _, err := ExtractBatchDetails(batchName)
_, _, _, err := serverlessspark.ExtractBatchDetails(batchName)
wantErr := "failed to parse batch name: invalid-name"
if err == nil || err.Error() != wantErr {
t.Errorf("ExtractBatchDetails() error = %v, want %v", err, wantErr)
@@ -53,7 +54,7 @@ func TestExtractBatchDetails_Failure(t *testing.T) {
}
func TestBatchConsoleURL(t *testing.T) {
got := BatchConsoleURL("my-project", "us-central1", "my-batch")
got := serverlessspark.BatchConsoleURL("my-project", "us-central1", "my-batch")
want := "https://console.cloud.google.com/dataproc/batches/us-central1/my-batch/summary?project=my-project"
if got != want {
t.Errorf("BatchConsoleURL() = %v, want %v", got, want)
@@ -63,7 +64,7 @@ func TestBatchConsoleURL(t *testing.T) {
func TestBatchLogsURL(t *testing.T) {
startTime := time.Date(2025, 10, 1, 5, 0, 0, 0, time.UTC)
endTime := time.Date(2025, 10, 1, 6, 0, 0, 0, time.UTC)
got := BatchLogsURL("my-project", "us-central1", "my-batch", startTime, endTime)
got := serverlessspark.BatchLogsURL("my-project", "us-central1", "my-batch", startTime, endTime)
want := "https://console.cloud.google.com/logs/viewer?advancedFilter=" +
"resource.type%3D%22cloud_dataproc_batch%22" +
"%0Aresource.labels.project_id%3D%22my-project%22" +
@@ -82,7 +83,7 @@ func TestBatchConsoleURLFromProto(t *testing.T) {
batchPb := &dataprocpb.Batch{
Name: "projects/my-project/locations/us-central1/batches/my-batch",
}
got, err := BatchConsoleURLFromProto(batchPb)
got, err := serverlessspark.BatchConsoleURLFromProto(batchPb)
if err != nil {
t.Fatalf("BatchConsoleURLFromProto() error = %v", err)
}
@@ -100,7 +101,7 @@ func TestBatchLogsURLFromProto(t *testing.T) {
CreateTime: timestamppb.New(createTime),
StateTime: timestamppb.New(stateTime),
}
got, err := BatchLogsURLFromProto(batchPb)
got, err := serverlessspark.BatchLogsURLFromProto(batchPb)
if err != nil {
t.Fatalf("BatchLogsURLFromProto() error = %v", err)
}

View File

@@ -18,7 +18,6 @@ import (
"context"
"fmt"
dataplexapi "cloud.google.com/go/dataplex/apiv1"
dataplexpb "cloud.google.com/go/dataplex/apiv1/dataplexpb"
"github.com/goccy/go-yaml"
"github.com/googleapis/genai-toolbox/internal/embeddingmodels"
@@ -44,7 +43,7 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.T
}
type compatibleSource interface {
CatalogClient() *dataplexapi.CatalogClient
LookupEntry(context.Context, string, int, []string, string) (*dataplexpb.Entry, error)
}
type Config struct {
@@ -118,12 +117,6 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
}
paramsMap := params.AsMap()
viewMap := map[int]dataplexpb.EntryView{
1: dataplexpb.EntryView_BASIC,
2: dataplexpb.EntryView_FULL,
3: dataplexpb.EntryView_CUSTOM,
4: dataplexpb.EntryView_ALL,
}
name, _ := paramsMap["name"].(string)
entry, _ := paramsMap["entry"].(string)
view, _ := paramsMap["view"].(int)
@@ -132,19 +125,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
return nil, fmt.Errorf("can't convert aspectTypes to array of strings: %s", err)
}
aspectTypes := aspectTypeSlice.([]string)
req := &dataplexpb.LookupEntryRequest{
Name: name,
View: viewMap[view],
AspectTypes: aspectTypes,
Entry: entry,
}
result, err := source.CatalogClient().LookupEntry(ctx, req)
if err != nil {
return nil, err
}
return result, nil
return source.LookupEntry(ctx, name, view, aspectTypes, entry)
}
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {

View File

@@ -18,9 +18,7 @@ import (
"context"
"fmt"
dataplexapi "cloud.google.com/go/dataplex/apiv1"
dataplexpb "cloud.google.com/go/dataplex/apiv1/dataplexpb"
"github.com/cenkalti/backoff/v5"
"cloud.google.com/go/dataplex/apiv1/dataplexpb"
"github.com/goccy/go-yaml"
"github.com/googleapis/genai-toolbox/internal/embeddingmodels"
"github.com/googleapis/genai-toolbox/internal/sources"
@@ -45,8 +43,7 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.T
}
type compatibleSource interface {
CatalogClient() *dataplexapi.CatalogClient
ProjectID() string
SearchAspectTypes(context.Context, string, int, string) ([]*dataplexpb.AspectType, error)
}
type Config struct {
@@ -101,61 +98,11 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
if err != nil {
return nil, err
}
// Invoke the tool with the provided parameters
paramsMap := params.AsMap()
query, _ := paramsMap["query"].(string)
pageSize := int32(paramsMap["pageSize"].(int))
pageSize, _ := paramsMap["pageSize"].(int)
orderBy, _ := paramsMap["orderBy"].(string)
// Create SearchEntriesRequest with the provided parameters
req := &dataplexpb.SearchEntriesRequest{
Query: query + " type=projects/dataplex-types/locations/global/entryTypes/aspecttype",
Name: fmt.Sprintf("projects/%s/locations/global", source.ProjectID()),
PageSize: pageSize,
OrderBy: orderBy,
SemanticSearch: true,
}
// Perform the search using the CatalogClient - this will return an iterator
it := source.CatalogClient().SearchEntries(ctx, req)
if it == nil {
return nil, fmt.Errorf("failed to create search entries iterator for project %q", source.ProjectID())
}
// Create an instance of exponential backoff with default values for retrying GetAspectType calls
// InitialInterval, RandomizationFactor, Multiplier, MaxInterval = 500 ms, 0.5, 1.5, 60 s
getAspectBackOff := backoff.NewExponentialBackOff()
// Iterate through the search results and call GetAspectType for each result using the resource name
var results []*dataplexpb.AspectType
for {
entry, err := it.Next()
if err != nil {
break
}
resourceName := entry.DataplexEntry.GetEntrySource().Resource
getAspectTypeReq := &dataplexpb.GetAspectTypeRequest{
Name: resourceName,
}
operation := func() (*dataplexpb.AspectType, error) {
aspectType, err := source.CatalogClient().GetAspectType(ctx, getAspectTypeReq)
if err != nil {
return nil, fmt.Errorf("failed to get aspect type for entry %q: %w", resourceName, err)
}
return aspectType, nil
}
// Retry the GetAspectType operation with exponential backoff
aspectType, err := backoff.Retry(ctx, operation, backoff.WithBackOff(getAspectBackOff))
if err != nil {
return nil, fmt.Errorf("failed to get aspect type after retries for entry %q: %w", resourceName, err)
}
results = append(results, aspectType)
}
return results, nil
return source.SearchAspectTypes(ctx, query, pageSize, orderBy)
}
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {

View File

@@ -18,8 +18,7 @@ import (
"context"
"fmt"
dataplexapi "cloud.google.com/go/dataplex/apiv1"
dataplexpb "cloud.google.com/go/dataplex/apiv1/dataplexpb"
"cloud.google.com/go/dataplex/apiv1/dataplexpb"
"github.com/goccy/go-yaml"
"github.com/googleapis/genai-toolbox/internal/embeddingmodels"
"github.com/googleapis/genai-toolbox/internal/sources"
@@ -44,8 +43,7 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.T
}
type compatibleSource interface {
CatalogClient() *dataplexapi.CatalogClient
ProjectID() string
SearchEntries(context.Context, string, int, string) ([]*dataplexpb.SearchEntriesResult, error)
}
type Config struct {
@@ -100,34 +98,11 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
if err != nil {
return nil, err
}
paramsMap := params.AsMap()
query, _ := paramsMap["query"].(string)
pageSize := int32(paramsMap["pageSize"].(int))
pageSize, _ := paramsMap["pageSize"].(int)
orderBy, _ := paramsMap["orderBy"].(string)
req := &dataplexpb.SearchEntriesRequest{
Query: query,
Name: fmt.Sprintf("projects/%s/locations/global", source.ProjectID()),
PageSize: pageSize,
OrderBy: orderBy,
SemanticSearch: true,
}
it := source.CatalogClient().SearchEntries(ctx, req)
if it == nil {
return nil, fmt.Errorf("failed to create search entries iterator for project %q", source.ProjectID())
}
var results []*dataplexpb.SearchEntriesResult
for {
entry, err := it.Next()
if err != nil {
break
}
results = append(results, entry)
}
return results, nil
return source.SearchEntries(ctx, query, pageSize, orderBy)
}
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {

View File

@@ -16,9 +16,7 @@ package http
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"slices"
@@ -54,7 +52,7 @@ type compatibleSource interface {
HttpDefaultHeaders() map[string]string
HttpBaseURL() string
HttpQueryParams() map[string]string
Client() *http.Client
RunRequest(*http.Request) (any, error)
}
type Config struct {
@@ -259,29 +257,7 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
for k, v := range allHeaders {
req.Header.Set(k, v)
}
// Make request and fetch response
resp, err := source.Client().Do(req)
if err != nil {
return nil, fmt.Errorf("error making HTTP request: %s", err)
}
defer resp.Body.Close()
var body []byte
body, err = io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
return nil, fmt.Errorf("unexpected status code: %d, response body: %s", resp.StatusCode, string(body))
}
var data any
if err = json.Unmarshal(body, &data); err != nil {
// if unable to unmarshal data, return result as string.
return string(body), nil
}
return data, nil
return source.RunRequest(req)
}
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {

View File

@@ -19,7 +19,6 @@ import (
"encoding/json"
"fmt"
dataproc "cloud.google.com/go/dataproc/v2/apiv1"
dataprocpb "cloud.google.com/go/dataproc/v2/apiv1/dataprocpb"
"github.com/goccy/go-yaml"
"google.golang.org/protobuf/encoding/protojson"
@@ -36,9 +35,7 @@ func unmarshalProto(data any, m proto.Message) error {
}
type compatibleSource interface {
GetBatchControllerClient() *dataproc.BatchControllerClient
GetProject() string
GetLocation() string
CreateBatch(context.Context, *dataprocpb.Batch) (map[string]any, error)
}
// Config is a common config that can be used with any type of create batch tool. However, each tool

View File

@@ -16,23 +16,19 @@ package createbatch
import (
"context"
"encoding/json"
"fmt"
"time"
dataprocpb "cloud.google.com/go/dataproc/v2/apiv1/dataprocpb"
"github.com/googleapis/genai-toolbox/internal/embeddingmodels"
"github.com/googleapis/genai-toolbox/internal/sources"
"github.com/googleapis/genai-toolbox/internal/tools"
"github.com/googleapis/genai-toolbox/internal/tools/serverlessspark/common"
"github.com/googleapis/genai-toolbox/internal/util/parameters"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)
type BatchBuilder interface {
Parameters() parameters.Parameters
BuildBatch(params parameters.ParamValues) (*dataprocpb.Batch, error)
BuildBatch(parameters.ParamValues) (*dataprocpb.Batch, error)
}
func NewTool(cfg Config, originalCfg tools.ToolConfig, srcs map[string]sources.Source, builder BatchBuilder) (*Tool, error) {
@@ -74,7 +70,6 @@ func (t *Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, par
if err != nil {
return nil, err
}
client := source.GetBatchControllerClient()
batch, err := t.Builder.BuildBatch(params)
if err != nil {
@@ -97,46 +92,7 @@ func (t *Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, par
}
batch.RuntimeConfig.Version = version
}
req := &dataprocpb.CreateBatchRequest{
Parent: fmt.Sprintf("projects/%s/locations/%s", source.GetProject(), source.GetLocation()),
Batch: batch,
}
op, err := client.CreateBatch(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to create batch: %w", err)
}
meta, err := op.Metadata()
if err != nil {
return nil, fmt.Errorf("failed to get create batch op metadata: %w", err)
}
jsonBytes, err := protojson.Marshal(meta)
if err != nil {
return nil, fmt.Errorf("failed to marshal create batch op metadata to JSON: %w", err)
}
var result map[string]any
if err := json.Unmarshal(jsonBytes, &result); err != nil {
return nil, fmt.Errorf("failed to unmarshal create batch op metadata JSON: %w", err)
}
projectID, location, batchID, err := common.ExtractBatchDetails(meta.GetBatch())
if err != nil {
return nil, fmt.Errorf("error extracting batch details from name %q: %v", meta.GetBatch(), err)
}
consoleUrl := common.BatchConsoleURL(projectID, location, batchID)
logsUrl := common.BatchLogsURL(projectID, location, batchID, meta.GetCreateTime().AsTime(), time.Time{})
wrappedResult := map[string]any{
"opMetadata": meta,
"consoleUrl": consoleUrl,
"logsUrl": logsUrl,
}
return wrappedResult, nil
return source.CreateBatch(ctx, batch)
}
func (t *Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {

View File

@@ -19,8 +19,7 @@ import (
"fmt"
"strings"
longrunning "cloud.google.com/go/longrunning/autogen"
"cloud.google.com/go/longrunning/autogen/longrunningpb"
dataproc "cloud.google.com/go/dataproc/v2/apiv1"
"github.com/goccy/go-yaml"
"github.com/googleapis/genai-toolbox/internal/embeddingmodels"
"github.com/googleapis/genai-toolbox/internal/sources"
@@ -45,9 +44,8 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.T
}
type compatibleSource interface {
GetOperationsClient(context.Context) (*longrunning.OperationsClient, error)
GetProject() string
GetLocation() string
GetBatchControllerClient() *dataproc.BatchControllerClient
CancelOperation(context.Context, string) (any, error)
}
type Config struct {
@@ -106,32 +104,15 @@ func (t *Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, par
if err != nil {
return nil, err
}
client, err := source.GetOperationsClient(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get operations client: %w", err)
}
paramMap := params.AsMap()
operation, ok := paramMap["operation"].(string)
if !ok {
return nil, fmt.Errorf("missing required parameter: operation")
}
if strings.Contains(operation, "/") {
return nil, fmt.Errorf("operation must be a short operation name without '/': %s", operation)
}
req := &longrunningpb.CancelOperationRequest{
Name: fmt.Sprintf("projects/%s/locations/%s/operations/%s", source.GetProject(), source.GetLocation(), operation),
}
err = client.CancelOperation(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to cancel operation: %w", err)
}
return fmt.Sprintf("Cancelled [%s].", operation), nil
return source.CancelOperation(ctx, operation)
}
func (t *Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {

View File

@@ -16,19 +16,15 @@ package serverlesssparkgetbatch
import (
"context"
"encoding/json"
"fmt"
"strings"
dataproc "cloud.google.com/go/dataproc/v2/apiv1"
"cloud.google.com/go/dataproc/v2/apiv1/dataprocpb"
"github.com/goccy/go-yaml"
"github.com/googleapis/genai-toolbox/internal/embeddingmodels"
"github.com/googleapis/genai-toolbox/internal/sources"
"github.com/googleapis/genai-toolbox/internal/tools"
"github.com/googleapis/genai-toolbox/internal/tools/serverlessspark/common"
"github.com/googleapis/genai-toolbox/internal/util/parameters"
"google.golang.org/protobuf/encoding/protojson"
)
const kind = "serverless-spark-get-batch"
@@ -49,8 +45,7 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.T
type compatibleSource interface {
GetBatchControllerClient() *dataproc.BatchControllerClient
GetProject() string
GetLocation() string
GetBatch(context.Context, string) (map[string]any, error)
}
type Config struct {
@@ -109,54 +104,15 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
if err != nil {
return nil, err
}
client := source.GetBatchControllerClient()
paramMap := params.AsMap()
name, ok := paramMap["name"].(string)
if !ok {
return nil, fmt.Errorf("missing required parameter: name")
}
if strings.Contains(name, "/") {
return nil, fmt.Errorf("name must be a short batch name without '/': %s", name)
}
req := &dataprocpb.GetBatchRequest{
Name: fmt.Sprintf("projects/%s/locations/%s/batches/%s", source.GetProject(), source.GetLocation(), name),
}
batchPb, err := client.GetBatch(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to get batch: %w", err)
}
jsonBytes, err := protojson.Marshal(batchPb)
if err != nil {
return nil, fmt.Errorf("failed to marshal batch to JSON: %w", err)
}
var result map[string]any
if err := json.Unmarshal(jsonBytes, &result); err != nil {
return nil, fmt.Errorf("failed to unmarshal batch JSON: %w", err)
}
consoleUrl, err := common.BatchConsoleURLFromProto(batchPb)
if err != nil {
return nil, fmt.Errorf("error generating console url: %v", err)
}
logsUrl, err := common.BatchLogsURLFromProto(batchPb)
if err != nil {
return nil, fmt.Errorf("error generating logs url: %v", err)
}
wrappedResult := map[string]any{
"consoleUrl": consoleUrl,
"logsUrl": logsUrl,
"batch": result,
}
return wrappedResult, nil
return source.GetBatch(ctx, name)
}
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {
return parameters.ParseParams(t.Parameters, data, claims)

View File

@@ -17,17 +17,13 @@ package serverlesssparklistbatches
import (
"context"
"fmt"
"time"
dataproc "cloud.google.com/go/dataproc/v2/apiv1"
"cloud.google.com/go/dataproc/v2/apiv1/dataprocpb"
"github.com/goccy/go-yaml"
"github.com/googleapis/genai-toolbox/internal/embeddingmodels"
"github.com/googleapis/genai-toolbox/internal/sources"
"github.com/googleapis/genai-toolbox/internal/tools"
"github.com/googleapis/genai-toolbox/internal/tools/serverlessspark/common"
"github.com/googleapis/genai-toolbox/internal/util/parameters"
"google.golang.org/api/iterator"
)
const kind = "serverless-spark-list-batches"
@@ -48,8 +44,7 @@ func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.T
type compatibleSource interface {
GetBatchControllerClient() *dataproc.BatchControllerClient
GetProject() string
GetLocation() string
ListBatches(context.Context, *int, string, string) (any, error)
}
type Config struct {
@@ -104,95 +99,24 @@ type Tool struct {
Parameters parameters.Parameters
}
// ListBatchesResponse is the response from the list batches API.
type ListBatchesResponse struct {
Batches []Batch `json:"batches"`
NextPageToken string `json:"nextPageToken"`
}
// Batch represents a single batch job.
type Batch struct {
Name string `json:"name"`
UUID string `json:"uuid"`
State string `json:"state"`
Creator string `json:"creator"`
CreateTime string `json:"createTime"`
Operation string `json:"operation"`
ConsoleURL string `json:"consoleUrl"`
LogsURL string `json:"logsUrl"`
}
// Invoke executes the tool's operation.
func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, params parameters.ParamValues, accessToken tools.AccessToken) (any, error) {
source, err := tools.GetCompatibleSource[compatibleSource](resourceMgr, t.Source, t.Name, t.Kind)
if err != nil {
return nil, err
}
client := source.GetBatchControllerClient()
parent := fmt.Sprintf("projects/%s/locations/%s", source.GetProject(), source.GetLocation())
req := &dataprocpb.ListBatchesRequest{
Parent: parent,
OrderBy: "create_time desc",
}
paramMap := params.AsMap()
var pageSize *int
if ps, ok := paramMap["pageSize"]; ok && ps != nil {
req.PageSize = int32(ps.(int))
if (req.PageSize) <= 0 {
return nil, fmt.Errorf("pageSize must be positive: %d", req.PageSize)
pageSizeV := ps.(int)
if pageSizeV <= 0 {
return nil, fmt.Errorf("pageSize must be positive: %d", pageSizeV)
}
pageSize = &pageSizeV
}
if pt, ok := paramMap["pageToken"]; ok && pt != nil {
req.PageToken = pt.(string)
}
if filter, ok := paramMap["filter"]; ok && filter != nil {
req.Filter = filter.(string)
}
it := client.ListBatches(ctx, req)
pager := iterator.NewPager(it, int(req.PageSize), req.PageToken)
var batchPbs []*dataprocpb.Batch
nextPageToken, err := pager.NextPage(&batchPbs)
if err != nil {
return nil, fmt.Errorf("failed to list batches: %w", err)
}
batches, err := ToBatches(batchPbs)
if err != nil {
return nil, err
}
return ListBatchesResponse{Batches: batches, NextPageToken: nextPageToken}, nil
}
// ToBatches converts a slice of protobuf Batch messages to a slice of Batch structs.
func ToBatches(batchPbs []*dataprocpb.Batch) ([]Batch, error) {
batches := make([]Batch, 0, len(batchPbs))
for _, batchPb := range batchPbs {
consoleUrl, err := common.BatchConsoleURLFromProto(batchPb)
if err != nil {
return nil, fmt.Errorf("error generating console url: %v", err)
}
logsUrl, err := common.BatchLogsURLFromProto(batchPb)
if err != nil {
return nil, fmt.Errorf("error generating logs url: %v", err)
}
batch := Batch{
Name: batchPb.Name,
UUID: batchPb.Uuid,
State: batchPb.State.Enum().String(),
Creator: batchPb.Creator,
CreateTime: batchPb.CreateTime.AsTime().Format(time.RFC3339),
Operation: batchPb.Operation,
ConsoleURL: consoleUrl,
LogsURL: logsUrl,
}
batches = append(batches, batch)
}
return batches, nil
pt, _ := paramMap["pageToken"].(string)
filter, _ := paramMap["filter"].(string)
return source.ListBatches(ctx, pageSize, pt, filter)
}
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {

View File

@@ -161,6 +161,9 @@ func WithMcpSelect1Want(want string) McpTestOption {
// ExecuteSqlTestConfig represents the various configuration options for RunExecuteSqlToolInvokeTest()
type ExecuteSqlTestConfig struct {
select1Statement string
createWant string
dropWant string
selectEmptyWant string
}
type ExecuteSqlOption func(*ExecuteSqlTestConfig)
@@ -173,6 +176,27 @@ func WithSelect1Statement(s string) ExecuteSqlOption {
}
}
// WithExecuteCreateWant represents the expected response for a CREATE TABLE statement.
func WithExecuteCreateWant(s string) ExecuteSqlOption {
return func(c *ExecuteSqlTestConfig) {
c.createWant = s
}
}
// WithExecuteDropWant represents the expected response for a DROP TABLE statement.
func WithExecuteDropWant(s string) ExecuteSqlOption {
return func(c *ExecuteSqlTestConfig) {
c.dropWant = s
}
}
// WithExecuteSelectEmptyWant represents the expected response for a SELECT from an empty table.
func WithExecuteSelectEmptyWant(s string) ExecuteSqlOption {
return func(c *ExecuteSqlTestConfig) {
c.selectEmptyWant = s
}
}
/* Configurations for RunToolInvokeWithTemplateParameters() */
// TemplateParameterTestConfig represents the various configuration options for template parameter tests.

View File

@@ -33,8 +33,8 @@ import (
dataproc "cloud.google.com/go/dataproc/v2/apiv1"
"cloud.google.com/go/dataproc/v2/apiv1/dataprocpb"
"github.com/google/go-cmp/cmp"
"github.com/googleapis/genai-toolbox/internal/sources/serverlessspark"
"github.com/googleapis/genai-toolbox/internal/testutils"
"github.com/googleapis/genai-toolbox/internal/tools/serverlessspark/serverlesssparklistbatches"
"github.com/googleapis/genai-toolbox/tests"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
@@ -676,7 +676,7 @@ func runListBatchesTest(t *testing.T, client *dataproc.BatchControllerClient, ct
filter string
pageSize int
numPages int
want []serverlesssparklistbatches.Batch
want []serverlessspark.Batch
}{
{name: "one page", pageSize: 2, numPages: 1, want: batch2},
{name: "two pages", pageSize: 1, numPages: 2, want: batch2},
@@ -701,7 +701,7 @@ func runListBatchesTest(t *testing.T, client *dataproc.BatchControllerClient, ct
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
var actual []serverlesssparklistbatches.Batch
var actual []serverlessspark.Batch
var pageToken string
for i := 0; i < tc.numPages; i++ {
request := map[string]any{
@@ -733,7 +733,7 @@ func runListBatchesTest(t *testing.T, client *dataproc.BatchControllerClient, ct
t.Fatalf("unable to find result in response body")
}
var listResponse serverlesssparklistbatches.ListBatchesResponse
var listResponse serverlessspark.ListBatchesResponse
if err := json.Unmarshal([]byte(result), &listResponse); err != nil {
t.Fatalf("error unmarshalling result: %s", err)
}
@@ -759,7 +759,7 @@ func runListBatchesTest(t *testing.T, client *dataproc.BatchControllerClient, ct
}
}
func listBatchesRpc(t *testing.T, client *dataproc.BatchControllerClient, ctx context.Context, filter string, n int, exact bool) []serverlesssparklistbatches.Batch {
func listBatchesRpc(t *testing.T, client *dataproc.BatchControllerClient, ctx context.Context, filter string, n int, exact bool) []serverlessspark.Batch {
parent := fmt.Sprintf("projects/%s/locations/%s", serverlessSparkProject, serverlessSparkLocation)
req := &dataprocpb.ListBatchesRequest{
Parent: parent,
@@ -783,7 +783,7 @@ func listBatchesRpc(t *testing.T, client *dataproc.BatchControllerClient, ctx co
if !exact && (len(batchPbs) == 0 || len(batchPbs) > n) {
t.Fatalf("expected between 1 and %d batches, got %d", n, len(batchPbs))
}
batches, err := serverlesssparklistbatches.ToBatches(batchPbs)
batches, err := serverlessspark.ToBatches(batchPbs)
if err != nil {
t.Fatalf("failed to convert batches to JSON: %v", err)
}

View File

@@ -611,6 +611,9 @@ func RunExecuteSqlToolInvokeTest(t *testing.T, createTableStatement, select1Want
// Default values for ExecuteSqlTestConfig
configs := &ExecuteSqlTestConfig{
select1Statement: `"SELECT 1"`,
createWant: "null",
dropWant: "null",
selectEmptyWant: "null",
}
// Apply provided options
@@ -646,7 +649,7 @@ func RunExecuteSqlToolInvokeTest(t *testing.T, createTableStatement, select1Want
api: "http://127.0.0.1:5000/api/tool/my-exec-sql-tool/invoke",
requestHeader: map[string]string{},
requestBody: bytes.NewBuffer([]byte(fmt.Sprintf(`{"sql": %s}`, createTableStatement))),
want: "null",
want: configs.createWant,
isErr: false,
},
{
@@ -654,7 +657,7 @@ func RunExecuteSqlToolInvokeTest(t *testing.T, createTableStatement, select1Want
api: "http://127.0.0.1:5000/api/tool/my-exec-sql-tool/invoke",
requestHeader: map[string]string{},
requestBody: bytes.NewBuffer([]byte(`{"sql":"SELECT * FROM t"}`)),
want: "null",
want: configs.selectEmptyWant,
isErr: false,
},
{
@@ -662,7 +665,7 @@ func RunExecuteSqlToolInvokeTest(t *testing.T, createTableStatement, select1Want
api: "http://127.0.0.1:5000/api/tool/my-exec-sql-tool/invoke",
requestHeader: map[string]string{},
requestBody: bytes.NewBuffer([]byte(`{"sql":"DROP TABLE t"}`)),
want: "null",
want: configs.dropWant,
isErr: false,
},
{